Fwd: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Fwd: Spark Streaming Checkpoint and Exactly Once Guarantee on Kafka Direct Stream

anbucheeralan
This post has NOT been accepted by the mailing list yet.
I am using Spark Streaming Checkpoint and Kafka Direct Stream.
It uses a 30 sec batch duration and normally the job is successful in 15-20 sec.

If the spark application fails after the successful completion (1496684280000ms in the log below) and restarts, it's duplicating the last batch again.

Is this the expected behavior? I was expecting this to start a new batch window.


Here are some logs:

Last successful run:
17/06/05 13:38:00 INFO JobScheduler: Total delay: 0.040 s for time 1496684280000 ms (execution: 0.029 s)
17/06/05 13:38:00 INFO KafkaRDD: Removing RDD 0 from persistence list
17/06/05 13:38:00 INFO BlockManager: Removing RDD 0
17/06/05 13:38:00 INFO JobGenerator: Checkpointing graph for time 1496684280000 ms
17/06/05 13:38:00 INFO DStreamGraph: Updating checkpoint data for time 1496684280000 ms
17/06/05 13:38:00 INFO DStreamGraph: Updated checkpoint data for time 1496684280000 ms
17/06/05 13:38:00 INFO CheckpointWriter: Submitted checkpoint of time 1496684280000 ms to writer queue
17/06/05 13:38:00 INFO CheckpointWriter: Saving checkpoint for time 1496684280000 ms to file 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-1496684280000'
17/06/05 13:38:00 INFO CheckpointWriter: Checkpoint for time 1496684280000 ms saved to file 'file:/Users/anbucheeralan/IdeaProjects/Spark2Example/ckpt/checkpoint-1496684280000', took 4032 bytes and 9 ms
17/06/05 13:38:00 INFO DStreamGraph: Clearing checkpoint data for time 1496684280000 ms
17/06/05 13:38:00 INFO DStreamGraph: Cleared checkpoint data for time 1496684280000 ms

After the restart,

17/06/05 13:42:31 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring KafkaRDD for time 1496684280000 ms [(my_test,0,2000,2000)]
17/06/05 13:42:31 INFO DirectKafkaInputDStream: Restored checkpoint data
17/06/05 13:42:31 INFO JobGenerator: Batches during down time (10 batches): 1496684280000 ms, 1496684310000 ms, 1496684340000 ms, 1496684370000 ms, 1496684400000 ms, 1496684430000 ms, 1496684460000 ms, 1496684490000 ms, 1496684520000 ms, 1496684550000 ms
17/06/05 13:42:31 INFO JobGenerator: Batches pending processing (0 batches): 
17/06/05 13:42:31 INFO JobGenerator: Batches to reschedule (10 batches): 1496684280000 ms, 1496684310000 ms, 1496684340000 ms, 1496684370000 ms, 1496684400000 ms, 1496684430000 ms, 1496684460000 ms, 1496684490000 ms, 1496684520000 ms, 1496684550000 ms
17/06/05 13:42:31 INFO JobScheduler: Added jobs for time 1496684280000 ms
17/06/05 13:42:31 INFO JobScheduler: Starting job streaming job 1496684280000 ms.0 from job set of time 1496684280000 ms


Loading...