Spark Streaming on HDFS

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark Streaming on HDFS

nilmish
This post has NOT been accepted by the mailing list yet.
I am trying to run spark streaming. I am reading input from directory in hdfs.

Here is my code :

 JavaPairDStream<String,String> flumeStream=ssc.fileStream("hdfs://172.16.200.128:8020/flume");

 flumeStream.count();

 flumeStream.count().map(new Function<Long, String>() {
            @Override
            public String call(Long in) {
                return "Received " + in + " flume events.";
            }
        }).print();

        ssc.start();
        ssc.awaitTermination();

I am getting this exception. Not getting the reason for this error.

14/03/06 13:33:54 ERROR JobScheduler: Error generating jobs for time 1394112820000 ms
java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.hadoop.mapreduce.InputFormat
        at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:70)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
        at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125)
        at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124)
        at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
        at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:160)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:160)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:160)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:104)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:69)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Exception in thread "main" java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.hadoop.mapreduce.InputFormat
        at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:70)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
        at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:125)
        at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD$1.apply(FileInputDStream.scala:124)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:124)
        at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:83)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
        at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:40)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:40)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:292)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:160)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:160)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:160)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:104)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:69)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Please help.