ForeachBatch Structured Streaming

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

ForeachBatch Structured Streaming

German Schiavon Matteo
Hi!

In the documentation it says: 

  • By default, foreachBatch provides only at-least-once write guarantees. However, you can use the batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.

Taking the example snippet :


streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

Let's assume I'm reading from Kafka, that means that by default batchDF may or may not have duplicates? 

Thanks!





Reply | Threaded
Open this post in threaded view
|

Re: ForeachBatch Structured Streaming

mmuru
To achieve exactly-once with foreachBatch in SS, you must have a checkpoint enabled. In case of any exceptions or failures the spark SS job will get restarted and the same batchID reprocessed again (for any data sources). To avoid duplicates, you should have an external system to store and dedupe the same batchIds.

On Wed, Oct 14, 2020 at 12:11 AM German Schiavon <[hidden email]> wrote:
Hi!

In the documentation it says: 

  • By default, foreachBatch provides only at-least-once write guarantees. However, you can use the batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.

Taking the example snippet :


streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

Let's assume I'm reading from Kafka, that means that by default batchDF may or may not have duplicates? 

Thanks!