DStream.saveAsTextFiles() saves nothing

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

DStream.saveAsTextFiles() saves nothing

robin_up
Hi,

I have a Streaming app which reads from inputs, does some text transformation and try to output to a HDFS text file by using saveAsTextFiles in DSteam object.

But the method saves nothing (not even an empty file), the jobs successfully run, i.e. no error/warning. When I replace the save-to-file part with "print()", it prints out contents on the screen. Also tried "saveAsTextFiles" in SC RDD, works.

Not sure why, did anyone get "saveAsTextFiles" working with DStream?

Here is the line of code I use for output:
actions.saveAsTextFiles("hdfs://nn1:8020/user/ds/actions/test", "test")

I'm using Spark 0.9.0, hadoop2.0.0-cdh4.5.0.

thanks
Robin
-- Robin Li
Reply | Threaded
Open this post in threaded view
|

RE: DStream.saveAsTextFiles() saves nothing

Suraj Satishkumar Sheth
Hi,
I am facing a similar issue.

I am trying a Spark Streaming job with a Text File Stream on HDFS with Spark 0.9.0 from cloudera.
I am saving the RDD (100 seconds is streaming frequency) to HDFS in a different directory. Every 100 seconds, it is creating a new directory in HDFS with _Success(stream-Random/_Success). But, it is not adding any data/output to it. I verified that I am adding new files to the correct HDFS directory. Although, at specified interval, it does create a new folder in HDFS with _Success.
So, the major issue is that it is not able to recognize new files created in HDFS.

Code used :
val ssc = new StreamingContext(ClusterConfig.sparkMaster, "Hybrid", Duration(100000), ClusterConfig.sparkHome, ClusterConfig.jars)
   
 val data = ssc.textFileStream(ClusterConfig.hdfsNN + "correct/path/to/data")  
data.foreachRDD(rdd => rdd.saveAsObjectFile(ClusterConfig.hdfsNN + "/user/<path/to/file/stream>" + Random.nextInt))  
ssc.start


It is creating these directories with only _Success :
stream562343230
stream1228731977
stream318151149
stream603511115


This is the log that I get :
14/02/17 14:08:20 INFO FileInputDStream: Finding new files took 549 ms
14/02/17 14:08:20 INFO FileInputDStream: New files at time 1392626300000 ms:

14/02/17 14:08:20 INFO JobScheduler: Added jobs for time 1392626300000 ms
14/02/17 14:08:20 INFO JobScheduler: Starting job streaming job 1392626300000 ms.0 from job set of time 1392626300000 ms
14/02/17 14:08:20 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
14/02/17 14:08:20 WARN Configuration: mapred.job.id is deprecated. Instead, use mapreduce.job.id
14/02/17 14:08:20 WARN Configuration: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
14/02/17 14:08:20 WARN Configuration: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
14/02/17 14:08:20 WARN Configuration: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
14/02/17 14:08:20 WARN Configuration: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
14/02/17 14:08:20 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29
14/02/17 14:08:20 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 0.001934866 s
14/02/17 14:08:20 INFO JobScheduler: Finished job streaming job 1392626300000 ms.0 from job set of time 1392626300000 ms
14/02/17 14:08:20 INFO JobScheduler: Total delay: 0.741 s for time 1392626300000 ms (execution: 0.167 s)
14/02/17 14:08:20 INFO FileInputDStream: Cleared 0 old files that were older than 1392626200000 ms:
14/02/17 14:10:00 INFO FileInputDStream: Finding new files took 6 ms
14/02/17 14:10:00 INFO FileInputDStream: New files at time 1392626400000 ms:

14/02/17 14:10:00 INFO JobScheduler: Added jobs for time 1392626400000 ms
14/02/17 14:10:00 INFO JobScheduler: Starting job streaming job 1392626400000 ms.0 from job set of time 1392626400000 ms
14/02/17 14:10:00 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
14/02/17 14:10:00 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29
14/02/17 14:10:00 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 1.9016E-5 s
14/02/17 14:10:00 INFO JobScheduler: Finished job streaming job 1392626400000 ms.0 from job set of time 1392626400000 ms
14/02/17 14:10:00 INFO JobScheduler: Total delay: 0.085 s for time 1392626400000 ms (execution: 0.077 s)
14/02/17 14:10:00 INFO FileInputDStream: Cleared 0 old files that were older than 1392626300000 ms:
14/02/17 14:11:40 INFO FileInputDStream: Finding new files took 5 ms
14/02/17 14:11:40 INFO FileInputDStream: New files at time 1392626500000 ms:

14/02/17 14:11:40 INFO JobScheduler: Added jobs for time 1392626500000 ms
14/02/17 14:11:40 INFO JobScheduler: Starting job streaming job 1392626500000 ms.0 from job set of time 1392626500000 ms
14/02/17 14:11:40 INFO SequenceFileRDDFunctions: Saving as sequence file of type (NullWritable,BytesWritable)
14/02/17 14:11:40 INFO SparkContext: Starting job: saveAsObjectFile at TestStreaming.scala:29
14/02/17 14:11:40 INFO SparkContext: Job finished: saveAsObjectFile at TestStreaming.scala:29, took 1.8111E-5 s
14/02/17 14:11:40 INFO JobScheduler: Finished job streaming job 1392626500000 ms.0 from job set of time 1392626500000 ms
14/02/17 14:11:40 INFO FileInputDStream: Cleared 1 old files that were older than 1392626400000 ms: 1392626300000 ms
14/02/17 14:11:40 INFO JobScheduler: Total delay: 0.110 s for time 1392626500000 ms (execution: 0.102 s)


Thanks and Regards,
Suraj Sheth

-----Original Message-----
From: robin_up [mailto:[hidden email]]
Sent: 18 February 2014 12:55
To: [hidden email]
Subject: DStream.saveAsTextFiles() saves nothing

Hi,

I have a Streaming app which reads from inputs, does some text transformation and try to output to a HDFS text file by using saveAsTextFiles in DSteam object.

But the method saves nothing (not even an empty file), the jobs successfully run, i.e. no error/warning. When I replace the save-to-file part with "print()", it prints out contents on the screen. Also tried "saveAsTextFiles" in SC RDD, works.

Not sure why, did anyone get "saveAsTextFiles" working with DStream?

Here is the line of code I use for output:
actions.saveAsTextFiles("hdfs://nn1:8020/user/ds/actions/test", "test")

I'm using Spark 0.9.0, hadoop2.0.0-cdh4.5.0.

thanks
Robin



-----
-- Robin Li
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DStream-saveAsTextFiles-saves-nothing-tp1666.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: DStream.saveAsTextFiles() saves nothing

Amit Behera
Hi Robin,
       Please make sure , whether any new file is taken as a input or not. If it is fine then please check what is the the exact Error is coming  on Cluster WEB UI.

Thanks  & Regards
Amit Ku. Behera

Reply | Threaded
Open this post in threaded view
|

Re: DStream.saveAsTextFiles() saves nothing

gogowater
This post has NOT been accepted by the mailing list yet.
In reply to this post by robin_up
I have exact same problem when I prototype based on the Spark Streaming example by saving JavaTableRDD to HDFS.  

http://apache-spark-user-list.1001560.n3.nabble.com/SaveAsHadoopFiles-not-functioning-in-DStream-example-td1540.html
Reply | Threaded
Open this post in threaded view
|

RE: DStream.saveAsTextFiles() saves nothing

robin_up
In reply to this post by Suraj Satishkumar Sheth
Suraj

I think your issue is the code is not detecting new files in the directory you set, a bit different from what I'm facing. If it detects a new file, you should see something similar in the logs:

14/02/18 19:17:30 INFO JobScheduler: Added jobs for time 1392751050000 ms
14/02/18 19:17:30 INFO JobScheduler: Starting job streaming job 1392751050000 ms.0 from job set of time 1392751050000 ms
14/02/18 19:17:30 WARN Configuration: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
14/02/18 19:17:30 WARN Configuration: mapred.output.compression.codec is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.codec
14/02/18 19:17:30 WARN Configuration: mapred.output.compression.type is deprecated. Instead, use mapreduce.output.fileoutputformat.compress.type
14/02/18 19:17:30 INFO SparkContext: Starting job: saveAsTextFile at web_reqeusts.scala:81
14/02/18 19:17:30 INFO SparkContext: Job finished: saveAsTextFile at web_reqeusts.scala:81, took 2.4977E-5 s
14/02/18 19:17:30 INFO JobScheduler: Finished job streaming job 1392751050000 ms.0 from job set of time 1392751050000 ms
14/02/18 19:17:30 INFO JobScheduler: Total delay: 0.036 s for time 1392751050000 ms (execution: 0.023 s)
14/02/18 19:17:30 INFO FileInputDStream: Cleared 1 old files that were older than 1392751020000 ms: 1392750990000 ms
14/02/18 19:18:00 INFO FileInputDStream: Finding new files took 8 ms
14/02/18 19:18:00 INFO FileInputDStream: New files at time 1392751080000 ms:
hdfs://nn1:8020/user/etl/rtp_sink/staging/0133-8f0e55d43a6c43bba48a97d4c448762a.sdb.gz
hdfs://nn1:8020/user/etl/rtp_sink/staging/0133-ad3925c1ce04450abde4208b731aae1d.sdb.gz
hdfs://nn1:8020/user/etl/rtp_sink/staging/0133-b9c49509293b459cae0715d5905e6805.sdb.gz

14/02/18 19:18:00 INFO MemoryStore: ensureFreeSpace(170493) called with curMem=6308257, maxMem=9003781324
14/02/18 19:18:00 INFO MemoryStore: Block broadcast_37 stored as values to memory (estimated size 166.5 KB, free 8.4 GB)
14/02/18 19:18:00 INFO MemoryStore: ensureFreeSpace(170493) called with curMem=6478750, maxMem=9003781324
14/02/18 19:18:00 INFO MemoryStore: Block broadcast_38 stored as values to memory (estimated size 166.5 KB, free 8.4 GB)
14/02/18 19:18:00 INFO MemoryStore: ensureFreeSpace(170493) called with curMem=6649243, maxMem=9003781324
14/02/18 19:18:00 INFO MemoryStore: Block broadcast_39 stored as values to memory (estimated size 166.5 KB, free 8.4 GB)
14/02/18 19:18:00 INFO FileInputFormat: Total input paths to process : 1
14/02/18 19:18:00 INFO FileInputFormat: Total input paths to process : 1
14/02/18 19:18:00 INFO FileInputFormat: Total input paths to process : 1
-- Robin Li
Reply | Threaded
Open this post in threaded view
|

Re: DStream.saveAsTextFiles() saves nothing

robin_up
In reply to this post by Amit Behera
Amit

I'm certain files were taken in without any issue -- I see the intake files names print out in logs as the last message I replied to Suraj.

I did a little further experiments by using spark.RDD.saveAsTextFiles() instead of DStream.saveAsTextFiles(). It works on the same dataset, so I'm pretty sure the problem is with Dstream.

Can anyone confirm this is a bug or I'm using/understanding it in a wrong way?

// This line does not work
actions.saveAsTextFiles("hdfs://nn1:8020/user/etl/rtp_sink/actions/test", "testtest")
// This line works
actions.foreachRDD(rdd => rdd.saveAsTextFile("hdfs://nn1:8020/user/etl/rtp_sink/actions/test", classOf[org.apache.hadoop.io.compress.GzipCodec]))

Logs from using Dsteam.saveAsTextFiles() -- new files detected OK, jobs finished without any error/warning:

14/02/18 19:35:00 INFO FileInputDStream: New files at time 1392752100000 ms:
hdfs://nn1:8020/user/etl/rtp_sink/staging/0134-ae2fc0ed9f824ab1a91f5fd81ad45af3.sdb.gz
hdfs://nn1:8020/user/etl/rtp_sink/staging/0134-f03bd4319bd24f13aa2f7c6b6a0d7631.sdb.gz
14/02/18 19:35:00 INFO MemoryStore: ensureFreeSpace(170493) called with curMem=6990229, maxMem=9003781324
14/02/18 19:35:00 INFO MemoryStore: Block broadcast_41 stored as values to memory (estimated size 166.5 KB, free 8.4 GB)
14/02/18 19:35:00 INFO MemoryStore: ensureFreeSpace(170493) called with curMem=7160722, maxMem=9003781324
14/02/18 19:35:00 INFO MemoryStore: Block broadcast_42 stored as values to memory (estimated size 166.5 KB, free 8.4 GB)
14/02/18 19:35:00 INFO FileInputFormat: Total input paths to process : 1
14/02/18 19:35:00 INFO FileInputFormat: Total input paths to process : 1
14/02/18 19:35:00 INFO JobScheduler: Added jobs for time 1392752100000 ms
14/02/18 19:35:00 INFO JobScheduler: Starting job streaming job 1392752100000 ms.0 from job set of time 1392752100000 ms
14/02/18 19:35:00 INFO SparkContext: Starting job: saveAsTextFile at DStream.scala:762
14/02/18 19:35:00 INFO DAGScheduler: Got job 7 (saveAsTextFile at DStream.scala:762) with 2 output partitions (allowLocal=false)
14/02/18 19:35:00 INFO DAGScheduler: Final stage: Stage 2 (saveAsTextFile at DStream.scala:762)
14/02/18 19:35:00 INFO DAGScheduler: Parents of final stage: List()
14/02/18 19:35:00 INFO DAGScheduler: Missing parents: List()
14/02/18 19:35:00 INFO DAGScheduler: Submitting Stage 2 (MappedRDD[98] at saveAsTextFile at DStream.scala:762), which has no missing parents
14/02/18 19:35:00 INFO DAGScheduler: Submitting 2 missing tasks from Stage 2 (MappedRDD[98] at saveAsTextFile at DStream.scala:762)
14/02/18 19:35:00 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
14/02/18 19:35:00 INFO TaskSetManager: Starting task 2.0:1 as TID 49 on executor 2: hadoop-dal01-dev-dn8.tapjoy.com (NODE_LOCAL)
14/02/18 19:35:00 INFO TaskSetManager: Serialized task 2.0:1 as 12419 bytes in 0 ms
14/02/18 19:35:00 INFO TaskSetManager: Starting task 2.0:0 as TID 50 on executor 5: hadoop-dal01-dev-dn7.tapjoy.com (NODE_LOCAL)
14/02/18 19:35:00 INFO TaskSetManager: Serialized task 2.0:0 as 12419 bytes in 0 ms
14/02/18 19:35:26 INFO TaskSetManager: Finished TID 50 in 26508 ms on hadoop-dal01-dev-dn7.tapjoy.com (progress: 0/2)
14/02/18 19:35:26 INFO DAGScheduler: Completed ResultTask(2, 0)
14/02/18 19:35:30 INFO FileInputDStream: Finding new files took 6 ms
-- Robin Li