Spark 0.9.0-incubation + Apache Hadoop 2.2.0 + YARN encounter Compression codec com.hadoop.compression.lzo.LzoCodec not found

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark 0.9.0-incubation + Apache Hadoop 2.2.0 + YARN encounter Compression codec com.hadoop.compression.lzo.LzoCodec not found

Andrew Lee
Hi All,

I have been contemplating at this problem and couldn't figure out what is missing in the configuration. I traced the script and try to look for CLASSPATH and see what is included, however, I couldn't find any place that is honoring/inheriting HADOOP_CLASSPATH (or pulling in any map-reduce JARs). The only thing I saw was bringing in the HADOOP_CONF_DIR and YARN_CONF_DIR folders and other JARs, but not the mapred compression JARs.

spark-shell => spark-class => compute-classpath.sh => spark-env.sh(empty)

This is the command I execute to run the spark-shell:

CLASSPATH=/opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo-0.4.18-201403101806.jar:${CLASSPATH} SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar SPARK_YARN_APP_JAR=examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating.jar MASTER=yarn-client ./bin/spark-shell

and try to run the LinearRegression examples from (http://docs.sigmoidanalytics.com/index.php/MLlib), however, every time when I try to generate the model, it runs into LzoCodec not found exception. Does anyone have any clue why this is happening? I see reflection applied from the API stack, but I'm wondering why it isn't bringing in the correct lzo libs? My assumptions are:

 
1. It looks like when Spark launched Yarn application, it override the original classpath for Hadoop on the NodeManager machine? I'm not sure what happened here behind the scene?
2. By running other MR apps, or program, they worked fine, including examples SparkPi, and KMean.
3. Wondering what are the command/tools/places I can look into to figure out why the lib is missing from the remote classpath?
4. On YARN log, I only see 2 JARs were deployed to HDFS in the .sparkStaging folder.

hdfs@alexie-dt ~/spark-0.9.0-incubating $ hdfs dfs -ls /user/hdfs/.sparkStaging/application_1395091699241_0019/

Found 2 items

-rw-r--r--   3 hdfs hdfs   99744537 2014-03-17 23:52 /user/hdfs/.sparkStaging/application_1395091699241_0019/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar

-rw-r--r--   3 hdfs hdfs  135645837 2014-03-17 23:52 /user/hdfs/.sparkStaging/application_1395091699241_0019/spark-examples-assembly-0.9.0-incubating.jar

Any insights and feedback are welcome and appreciated.  Guess I probably overlooked something in the doc.


I copied/pasted the console here for reference.

==============================================

14/03/17 23:30:30 INFO spark.HttpServer: Starting HTTP Server

14/03/17 23:30:30 INFO server.Server: jetty-7.x.y-SNAPSHOT

14/03/17 23:30:30 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:43720

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 0.9.0

      /_/


Using Scala version 2.10.3 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_45)

Type in expressions to have them evaluated.

Type :help for more information.

14/03/17 23:30:34 INFO slf4j.Slf4jLogger: Slf4jLogger started

14/03/17 23:30:34 INFO Remoting: Starting remoting

14/03/17 23:30:34 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@...:47469]

14/03/17 23:30:34 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@...:47469]

14/03/17 23:30:34 INFO spark.SparkEnv: Registering BlockManagerMaster

14/03/17 23:30:34 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140317233034-81b8

14/03/17 23:30:34 INFO storage.MemoryStore: MemoryStore started with capacity 294.9 MB.

14/03/17 23:30:34 INFO network.ConnectionManager: Bound socket to port 43255 with id = ConnectionManagerId(alexie-dt.local.altiscale.com,43255)

14/03/17 23:30:34 INFO storage.BlockManagerMaster: Trying to register BlockManager

14/03/17 23:30:34 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager alexie-dt.local.altiscale.com:43255 with 294.9 MB RAM

14/03/17 23:30:34 INFO storage.BlockManagerMaster: Registered BlockManager

14/03/17 23:30:34 INFO spark.HttpServer: Starting HTTP Server

14/03/17 23:30:34 INFO server.Server: jetty-7.x.y-SNAPSHOT

14/03/17 23:30:34 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:33611

14/03/17 23:30:34 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.10.10.4:33611

14/03/17 23:30:34 INFO spark.SparkEnv: Registering MapOutputTracker

14/03/17 23:30:34 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-9abcbe38-ef79-418d-94af-20979b1083fc

14/03/17 23:30:34 INFO spark.HttpServer: Starting HTTP Server

14/03/17 23:30:34 INFO server.Server: jetty-7.x.y-SNAPSHOT

14/03/17 23:30:34 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:53963

14/03/17 23:30:35 INFO server.Server: jetty-7.x.y-SNAPSHOT

14/03/17 23:30:35 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}

14/03/17 23:30:35 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}

14/03/17 23:30:35 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}

14/03/17 23:30:35 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}

14/03/17 23:30:35 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}

14/03/17 23:30:35 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}

14/03/17 23:30:35 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}

14/03/17 23:30:35 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}

14/03/17 23:30:35 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}

14/03/17 23:30:35 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}

14/03/17 23:30:35 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040

14/03/17 23:30:35 INFO ui.SparkUI: Started Spark Web UI at http://alexie-dt.local.altiscale.com:4040

14/03/17 23:30:35 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

14/03/17 23:30:35 INFO client.RMProxy: Connecting to ResourceManager at alexie-rm.local.altiscale.com/10.10.10.6:8032

14/03/17 23:30:36 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 3

14/03/17 23:30:36 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0,

      queueApplicationCount = 10, queueChildQueueCount = 0

14/03/17 23:30:36 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 8192

14/03/17 23:30:36 INFO yarn.Client: Preparing Local resources

14/03/17 23:30:36 INFO yarn.Client: Uploading file:/home/hdfs/spark-0.9.0-incubating/examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating.jar to hdfs://alexie-nn.local.altiscale.com:8020/user/hdfs/.sparkStaging/application_1395091699241_0018/spark-examples-assembly-0.9.0-incubating.jar

14/03/17 23:30:37 INFO yarn.Client: Uploading file:/home/hdfs/spark-0.9.0-incubating/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar to hdfs://alexie-nn.local.altiscale.com:8020/user/hdfs/.sparkStaging/application_1395091699241_0018/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar

14/03/17 23:30:38 INFO yarn.Client: Setting up the launch environment

14/03/17 23:30:39 INFO yarn.Client: Setting up container launch context

14/03/17 23:30:39 INFO yarn.Client: Command for starting the Spark ApplicationMaster: $JAVA_HOME/bin/java -server -Xmx512m -Djava.io.tmpdir=$PWD/tmp org.apache.spark.deploy.yarn.WorkerLauncher --class notused --jar examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating.jar --args  'alexie-dt.local.altiscale.com:47469'  --worker-memory 1024 --worker-cores 1 --num-workers 2 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr

14/03/17 23:30:39 INFO yarn.Client: Submitting application to ASM

14/03/17 23:30:39 INFO impl.YarnClientImpl: Submitted application application_1395091699241_0018 to ResourceManager at alexie-rm.local.altiscale.com/10.10.10.6:8032

14/03/17 23:30:39 INFO cluster.YarnClientSchedulerBackend: Application report from ASM: 

appMasterRpcPort: 0

appStartTime: 1395113439042

yarnAppState: ACCEPTED


14/03/17 23:30:40 INFO cluster.YarnClientSchedulerBackend: Application report from ASM: 

appMasterRpcPort: 0

appStartTime: 1395113439042

yarnAppState: ACCEPTED


14/03/17 23:30:41 INFO cluster.YarnClientSchedulerBackend: Application report from ASM: 

appMasterRpcPort: 0

appStartTime: 1395113439042

yarnAppState: ACCEPTED


14/03/17 23:30:42 INFO cluster.YarnClientSchedulerBackend: Application report from ASM: 

appMasterRpcPort: 0

appStartTime: 1395113439042

yarnAppState: ACCEPTED


14/03/17 23:30:43 INFO cluster.YarnClientSchedulerBackend: Application report from ASM: 

appMasterRpcPort: 0

appStartTime: 1395113439042

yarnAppState: ACCEPTED


14/03/17 23:30:44 INFO cluster.YarnClientSchedulerBackend: Application report from ASM: 

appMasterRpcPort: 0

appStartTime: 1395113439042

yarnAppState: RUNNING


14/03/17 23:30:46 INFO cluster.YarnClientClusterScheduler: YarnClientClusterScheduler.postStartHook done

Created spark context..

Spark context available as sc.


scala> 14/03/17 23:30:47 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@...:36752/user/Executor#-560732716] with ID 1

14/03/17 23:30:48 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager alexie-s3.local.altiscale.com:43611 with 589.2 MB RAM

14/03/17 23:30:49 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@...:36292/user/Executor#395229219] with ID 2

14/03/17 23:30:50 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager alexie-s1.local.altiscale.com:50270 with 589.2 MB RAM



scala> 


scala> import org.apache.spark.mllib.regression.LinearRegressionWithSGD

import org.apache.spark.mllib.regression.LinearRegressionWithSGD


scala> import org.apache.spark.mllib.regression.LabeledPoint

import org.apache.spark.mllib.regression.LabeledPoint


scala> 


scala> // Load and parse the data


scala> val data = sc.textFile("mllib/data/ridge-data/lpsa.data")

14/03/17 23:31:05 INFO storage.MemoryStore: ensureFreeSpace(181512) called with curMem=0, maxMem=309225062

14/03/17 23:31:05 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 177.3 KB, free 294.7 MB)

data: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:14


scala> val parsedData = data.map { line =>

     |   val parts = line.split(',')

     |   LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray)

     | }

parsedData: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MappedRDD[2] at map at <console>:16


scala> 


scala> // Building the model


scala> val numIterations = 20

numIterations: Int = 20

scala> val model = LinearRegressionWithSGD.train(parsedData, numIterations)

java.lang.RuntimeException: Error in configuring object

at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)

at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)

at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)

at org.apache.spark.rdd.HadoopRDD.getInputFormat(HadoopRDD.scala:123)

at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:136)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)

at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)

at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)

at org.apache.spark.rdd.RDD.take(RDD.scala:824)

at org.apache.spark.rdd.RDD.first(RDD.scala:856)

at org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121)

at org.apache.spark.mllib.regression.LinearRegressionWithSGD$.train(LinearRegression.scala:118)

at org.apache.spark.mllib.regression.LinearRegressionWithSGD$.train(LinearRegression.scala:154)

at $iwC$$iwC$$iwC$$iwC.<init>(<console>:20)

at $iwC$$iwC$$iwC.<init>(<console>:25)

at $iwC$$iwC.<init>(<console>:27)

at $iwC.<init>(<console>:29)

at <init>(<console>:31)

at .<init>(<console>:35)

at .<clinit>(<console>)

at .<init>(<console>:7)

at .<clinit>(<console>)

at $print(<console>)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)

at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)

at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)

at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)

at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:788)

at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:833)

at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:745)

at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:593)

at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:600)

at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:603)

at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:926)

at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)

at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)

at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:876)

at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:968)

at org.apache.spark.repl.Main$.main(Main.scala:31)

at org.apache.spark.repl.Main.main(Main.scala)

Caused by: java.lang.reflect.InvocationTargetException

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)

... 56 more

Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found.

at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:135)

at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:175)

at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45)

... 61 more

Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found

at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1626)

at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:128)

... 63 more



Reply | Threaded
Open this post in threaded view
|

Re: Spark 0.9.0-incubation + Apache Hadoop 2.2.0 + YARN encounter Compression codec com.hadoop.compression.lzo.LzoCodec not found

alee526
You can try to add the following to your shell:


In bin/compute-classpath.sh, append the JAR lzo JAR from Mapreduce:

CLASSPATH=$CLASSPATH:$HADOOP_HOME/share/hadoop/mapreduce/lib/hadoop-lzo.jar
export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:$HADOOP_HOME/lib/native/
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native/


In bin/spark-class, before the JAVA_OPTS:
Add the following:

SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:$HADOOP_HOME/lib/native/

This fixed my problem.