Best way to read batch from Kafka and Offsets

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Best way to read batch from Kafka and Offsets

Ruijing Li
Hi all,

My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop the job. Given the dataframe has a partition and offset column, my first thought for offset management is to groupBy partition and agg the max offset, then store it in HDFS. Next time the job runs, it will read and start from this max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an offset and later decides to resend it, I will have skipped it since I’m starting from the max offset sent. How does spark structured streaming know to continue onwards - does it keep a state of all offsets seen? If so, how can I replicate this for batch without missing data? Any help would be appreciated.


--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Best way to read batch from Kafka and Offsets

Chris Teoh
Kafka can keep track of the offsets (in a separate topic based on your consumer group) you've seen but it is usually best effort and you're probably better off also keeping track of your offsets.

If the producer resends a message you would have to dedupe it as you've most likely already seen it, how you handle that is dependent on your data. I think the offset will increment automatically, you will generally not see the same offset occur more than once in a Kafka topic partition, feel free to correct me on this though. So the most likely scenario you need to handle is if the producer sends a duplicate message with two offsets.

The alternative is you can reprocess the offsets back from where you thought the message was last seen.

Kind regards
Chris

On Mon, 3 Feb 2020, 7:39 pm Ruijing Li, <[hidden email]> wrote:
Hi all,

My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop the job. Given the dataframe has a partition and offset column, my first thought for offset management is to groupBy partition and agg the max offset, then store it in HDFS. Next time the job runs, it will read and start from this max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an offset and later decides to resend it, I will have skipped it since I’m starting from the max offset sent. How does spark structured streaming know to continue onwards - does it keep a state of all offsets seen? If so, how can I replicate this for batch without missing data? Any help would be appreciated.


--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Best way to read batch from Kafka and Offsets

Ruijing Li
Hi Chris,

Thanks for the answer. So if I understand correctly:

- there will be need to dedupe since I should be expecting at least once delivery.

- storing the result of (group by partition and and aggregate max offsets) is enough since kafka message is immutable, so a message will get sent with a different offset instead of the same offset.

So spark when reading from kafka is acting as a least once consumer? Why does spark not do checkpointing for batch read of kafka?

On Mon, Feb 3, 2020 at 1:36 AM Chris Teoh <[hidden email]> wrote:
Kafka can keep track of the offsets (in a separate topic based on your consumer group) you've seen but it is usually best effort and you're probably better off also keeping track of your offsets.

If the producer resends a message you would have to dedupe it as you've most likely already seen it, how you handle that is dependent on your data. I think the offset will increment automatically, you will generally not see the same offset occur more than once in a Kafka topic partition, feel free to correct me on this though. So the most likely scenario you need to handle is if the producer sends a duplicate message with two offsets.

The alternative is you can reprocess the offsets back from where you thought the message was last seen.

Kind regards
Chris

On Mon, 3 Feb 2020, 7:39 pm Ruijing Li, <[hidden email]> wrote:
Hi all,

My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop the job. Given the dataframe has a partition and offset column, my first thought for offset management is to groupBy partition and agg the max offset, then store it in HDFS. Next time the job runs, it will read and start from this max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an offset and later decides to resend it, I will have skipped it since I’m starting from the max offset sent. How does spark structured streaming know to continue onwards - does it keep a state of all offsets seen? If so, how can I replicate this for batch without missing data? Any help would be appreciated.


--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Best way to read batch from Kafka and Offsets

Chris Teoh
The most common delivery semantic for Kafka producer is at least once.

So your consumers have to handle dedupe.

Spark can do checkpoint but you have to be explicit about it. It only makes sense if your dataframe lineage gets too long (only if you're doing a highly iterative algorithm) and you need to trim it to avoid having to recompute from the start upon failure. It does not keep track of your Kafka offsets for you.

In the context of reading from Kafka, your consumers can explicitly commit an offset so kafka knows you've read up to that point.


On Tue, 4 Feb 2020, 6:13 am Ruijing Li, <[hidden email]> wrote:
Hi Chris,

Thanks for the answer. So if I understand correctly:

- there will be need to dedupe since I should be expecting at least once delivery.

- storing the result of (group by partition and and aggregate max offsets) is enough since kafka message is immutable, so a message will get sent with a different offset instead of the same offset.

So spark when reading from kafka is acting as a least once consumer? Why does spark not do checkpointing for batch read of kafka?

On Mon, Feb 3, 2020 at 1:36 AM Chris Teoh <[hidden email]> wrote:
Kafka can keep track of the offsets (in a separate topic based on your consumer group) you've seen but it is usually best effort and you're probably better off also keeping track of your offsets.

If the producer resends a message you would have to dedupe it as you've most likely already seen it, how you handle that is dependent on your data. I think the offset will increment automatically, you will generally not see the same offset occur more than once in a Kafka topic partition, feel free to correct me on this though. So the most likely scenario you need to handle is if the producer sends a duplicate message with two offsets.

The alternative is you can reprocess the offsets back from where you thought the message was last seen.

Kind regards
Chris

On Mon, 3 Feb 2020, 7:39 pm Ruijing Li, <[hidden email]> wrote:
Hi all,

My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop the job. Given the dataframe has a partition and offset column, my first thought for offset management is to groupBy partition and agg the max offset, then store it in HDFS. Next time the job runs, it will read and start from this max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an offset and later decides to resend it, I will have skipped it since I’m starting from the max offset sent. How does spark structured streaming know to continue onwards - does it keep a state of all offsets seen? If so, how can I replicate this for batch without missing data? Any help would be appreciated.


--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Best way to read batch from Kafka and Offsets

Anil Kulkarni
In reply to this post by Ruijing Li
Hi Ruijing,

We did the below things to read Kafka in batch from spark:

1) Maintain the start offset (could be db, file etc)
2) Get the end offset dynamically when the job executes. 
3) Pass the start and end offsets 
4) Overwrite the start offset with the end offset. (Should be done post processing the data) 

Currently to make it work in batch mode, you need to maintain the state information of the offsets externally. 


Thanks
Anil

-Sent from my mobile
http://anilkulkarni.com/

On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <[hidden email]> wrote:
Hi all,

My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop the job. Given the dataframe has a partition and offset column, my first thought for offset management is to groupBy partition and agg the max offset, then store it in HDFS. Next time the job runs, it will read and start from this max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an offset and later decides to resend it, I will have skipped it since I’m starting from the max offset sent. How does spark structured streaming know to continue onwards - does it keep a state of all offsets seen? If so, how can I replicate this for batch without missing data? Any help would be appreciated.


--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Best way to read batch from Kafka and Offsets

Burak Yavuz-2
Hi Ruijing,

Why do you not want to use structured streaming here? This is exactly why structured streaming + Trigger.Once was built, just so that you don't build that solution yourself.
You also get exactly once semantics if you use the built in sinks.

Best,
Burak

On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <[hidden email]> wrote:
Hi Ruijing,

We did the below things to read Kafka in batch from spark:

1) Maintain the start offset (could be db, file etc)
2) Get the end offset dynamically when the job executes. 
3) Pass the start and end offsets 
4) Overwrite the start offset with the end offset. (Should be done post processing the data) 

Currently to make it work in batch mode, you need to maintain the state information of the offsets externally. 


Thanks
Anil

-Sent from my mobile
http://anilkulkarni.com/

On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <[hidden email]> wrote:
Hi all,

My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop the job. Given the dataframe has a partition and offset column, my first thought for offset management is to groupBy partition and agg the max offset, then store it in HDFS. Next time the job runs, it will read and start from this max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an offset and later decides to resend it, I will have skipped it since I’m starting from the max offset sent. How does spark structured streaming know to continue onwards - does it keep a state of all offsets seen? If so, how can I replicate this for batch without missing data? Any help would be appreciated.


--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Best way to read batch from Kafka and Offsets

Ruijing Li
Thanks Anil, I think that’s the approach I will take.

Hi Burak,

That was a possibility to think about, but my team has custom dataframe writer functions we would like to use, unfortunately they were written for static dataframes in mind. I do see there is a ForEachBatch write mode but my thinking was at that point it was easier to read from kafka through batch mode.

Thanks,
RJ

On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <[hidden email]> wrote:
Hi Ruijing,

Why do you not want to use structured streaming here? This is exactly why structured streaming + Trigger.Once was built, just so that you don't build that solution yourself.
You also get exactly once semantics if you use the built in sinks.

Best,
Burak

On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <[hidden email]> wrote:
Hi Ruijing,

We did the below things to read Kafka in batch from spark:

1) Maintain the start offset (could be db, file etc)
2) Get the end offset dynamically when the job executes. 
3) Pass the start and end offsets 
4) Overwrite the start offset with the end offset. (Should be done post processing the data) 

Currently to make it work in batch mode, you need to maintain the state information of the offsets externally. 


Thanks
Anil

-Sent from my mobile
http://anilkulkarni.com/

On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <[hidden email]> wrote:
Hi all,

My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop the job. Given the dataframe has a partition and offset column, my first thought for offset management is to groupBy partition and agg the max offset, then store it in HDFS. Next time the job runs, it will read and start from this max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an offset and later decides to resend it, I will have skipped it since I’m starting from the max offset sent. How does spark structured streaming know to continue onwards - does it keep a state of all offsets seen? If so, how can I replicate this for batch without missing data? Any help would be appreciated.


--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Best way to read batch from Kafka and Offsets

Burak Yavuz-2
Do you really want to build all of that and open yourself to bugs when you can just use foreachBatch? Here are your options:

1. Build it yourself

// Read offsets from some store
prevOffsets = readOffsets()
latestOffsets = getOffsets()

df = spark.read.format("kafka").option("startOffsets", prevOffsets).option("endOffsets", latestOffsets).load()
batchLogic(df)

saveOffsets(latestOffsets)

2. Structured Streaming + Trigger.Once + foreachBatch

spark.readStream.format("kafka").load().writeStream.foreachBatch((df, batchId) => batchLogic(df)).trigger("once").start()

With Option (1), you're going to have to (re)solve:
 a) Tracking and consistency of offsets 
 b) Potential topic partition mismatches
 c) Offsets that may have aged out due to retention
 d) Re-execution of jobs and data consistency. What if your job fails as you're committing the offsets in the end, but the data was already stored? Will your getOffsets method return the same offsets?

I'd rather not solve problems that other people have solved for me, but ultimately the decision is yours to make.

Best,
Burak




On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <[hidden email]> wrote:
Thanks Anil, I think that’s the approach I will take.

Hi Burak,

That was a possibility to think about, but my team has custom dataframe writer functions we would like to use, unfortunately they were written for static dataframes in mind. I do see there is a ForEachBatch write mode but my thinking was at that point it was easier to read from kafka through batch mode.

Thanks,
RJ

On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <[hidden email]> wrote:
Hi Ruijing,

Why do you not want to use structured streaming here? This is exactly why structured streaming + Trigger.Once was built, just so that you don't build that solution yourself.
You also get exactly once semantics if you use the built in sinks.

Best,
Burak

On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <[hidden email]> wrote:
Hi Ruijing,

We did the below things to read Kafka in batch from spark:

1) Maintain the start offset (could be db, file etc)
2) Get the end offset dynamically when the job executes. 
3) Pass the start and end offsets 
4) Overwrite the start offset with the end offset. (Should be done post processing the data) 

Currently to make it work in batch mode, you need to maintain the state information of the offsets externally. 


Thanks
Anil

-Sent from my mobile
http://anilkulkarni.com/

On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <[hidden email]> wrote:
Hi all,

My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop the job. Given the dataframe has a partition and offset column, my first thought for offset management is to groupBy partition and agg the max offset, then store it in HDFS. Next time the job runs, it will read and start from this max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an offset and later decides to resend it, I will have skipped it since I’m starting from the max offset sent. How does spark structured streaming know to continue onwards - does it keep a state of all offsets seen? If so, how can I replicate this for batch without missing data? Any help would be appreciated.


--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Best way to read batch from Kafka and Offsets

Gourav Sengupta
Hi Burak,

I am not quite used to streaming, but was almost thinking on the same lines :) makes a lot of sense to me now.

Regards,
Gourav

On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <[hidden email]> wrote:
Do you really want to build all of that and open yourself to bugs when you can just use foreachBatch? Here are your options:

1. Build it yourself

// Read offsets from some store
prevOffsets = readOffsets()
latestOffsets = getOffsets()

df = spark.read.format("kafka").option("startOffsets", prevOffsets).option("endOffsets", latestOffsets).load()
batchLogic(df)

saveOffsets(latestOffsets)

2. Structured Streaming + Trigger.Once + foreachBatch

spark.readStream.format("kafka").load().writeStream.foreachBatch((df, batchId) => batchLogic(df)).trigger("once").start()

With Option (1), you're going to have to (re)solve:
 a) Tracking and consistency of offsets 
 b) Potential topic partition mismatches
 c) Offsets that may have aged out due to retention
 d) Re-execution of jobs and data consistency. What if your job fails as you're committing the offsets in the end, but the data was already stored? Will your getOffsets method return the same offsets?

I'd rather not solve problems that other people have solved for me, but ultimately the decision is yours to make.

Best,
Burak




On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <[hidden email]> wrote:
Thanks Anil, I think that’s the approach I will take.

Hi Burak,

That was a possibility to think about, but my team has custom dataframe writer functions we would like to use, unfortunately they were written for static dataframes in mind. I do see there is a ForEachBatch write mode but my thinking was at that point it was easier to read from kafka through batch mode.

Thanks,
RJ

On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <[hidden email]> wrote:
Hi Ruijing,

Why do you not want to use structured streaming here? This is exactly why structured streaming + Trigger.Once was built, just so that you don't build that solution yourself.
You also get exactly once semantics if you use the built in sinks.

Best,
Burak

On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <[hidden email]> wrote:
Hi Ruijing,

We did the below things to read Kafka in batch from spark:

1) Maintain the start offset (could be db, file etc)
2) Get the end offset dynamically when the job executes. 
3) Pass the start and end offsets 
4) Overwrite the start offset with the end offset. (Should be done post processing the data) 

Currently to make it work in batch mode, you need to maintain the state information of the offsets externally. 


Thanks
Anil

-Sent from my mobile
http://anilkulkarni.com/

On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <[hidden email]> wrote:
Hi all,

My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop the job. Given the dataframe has a partition and offset column, my first thought for offset management is to groupBy partition and agg the max offset, then store it in HDFS. Next time the job runs, it will read and start from this max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an offset and later decides to resend it, I will have skipped it since I’m starting from the max offset sent. How does spark structured streaming know to continue onwards - does it keep a state of all offsets seen? If so, how can I replicate this for batch without missing data? Any help would be appreciated.


--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Best way to read batch from Kafka and Offsets

Ruijing Li
Hi all,

I tried with forEachBatch but got an error. Is this expected?

Code is

df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) => 
batchDF.write.parquet(hdfsPath) 
}
.option(“checkPointLocation”, anotherHdfsPath)
.start()

Exception is: Queries with streaming sources must be executed with writeStream.start()

But I thought forEachBatch would treat the batchDF as a static dataframe?

Thanks,
RJ

On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta <[hidden email]> wrote:
Hi Burak,

I am not quite used to streaming, but was almost thinking on the same lines :) makes a lot of sense to me now.

Regards,
Gourav

On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <[hidden email]> wrote:
Do you really want to build all of that and open yourself to bugs when you can just use foreachBatch? Here are your options:

1. Build it yourself

// Read offsets from some store
prevOffsets = readOffsets()
latestOffsets = getOffsets()

df = spark.read.format("kafka").option("startOffsets", prevOffsets).option("endOffsets", latestOffsets).load()
batchLogic(df)

saveOffsets(latestOffsets)

2. Structured Streaming + Trigger.Once + foreachBatch

spark.readStream.format("kafka").load().writeStream.foreachBatch((df, batchId) => batchLogic(df)).trigger("once").start()

With Option (1), you're going to have to (re)solve:
 a) Tracking and consistency of offsets 
 b) Potential topic partition mismatches
 c) Offsets that may have aged out due to retention
 d) Re-execution of jobs and data consistency. What if your job fails as you're committing the offsets in the end, but the data was already stored? Will your getOffsets method return the same offsets?

I'd rather not solve problems that other people have solved for me, but ultimately the decision is yours to make.

Best,
Burak




On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <[hidden email]> wrote:
Thanks Anil, I think that’s the approach I will take.

Hi Burak,

That was a possibility to think about, but my team has custom dataframe writer functions we would like to use, unfortunately they were written for static dataframes in mind. I do see there is a ForEachBatch write mode but my thinking was at that point it was easier to read from kafka through batch mode.

Thanks,
RJ

On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <[hidden email]> wrote:
Hi Ruijing,

Why do you not want to use structured streaming here? This is exactly why structured streaming + Trigger.Once was built, just so that you don't build that solution yourself.
You also get exactly once semantics if you use the built in sinks.

Best,
Burak

On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <[hidden email]> wrote:
Hi Ruijing,

We did the below things to read Kafka in batch from spark:

1) Maintain the start offset (could be db, file etc)
2) Get the end offset dynamically when the job executes. 
3) Pass the start and end offsets 
4) Overwrite the start offset with the end offset. (Should be done post processing the data) 

Currently to make it work in batch mode, you need to maintain the state information of the offsets externally. 


Thanks
Anil

-Sent from my mobile
http://anilkulkarni.com/

On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <[hidden email]> wrote:
Hi all,

My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop the job. Given the dataframe has a partition and offset column, my first thought for offset management is to groupBy partition and agg the max offset, then store it in HDFS. Next time the job runs, it will read and start from this max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an offset and later decides to resend it, I will have skipped it since I’m starting from the max offset sent. How does spark structured streaming know to continue onwards - does it keep a state of all offsets seen? If so, how can I replicate this for batch without missing data? Any help would be appreciated.


--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Best way to read batch from Kafka and Offsets

Ruijing Li
Looks like I’m wrong, since I tried that exact snippet and it worked

So to be clear, in the part where I do batchDF.write.parquet, that is not the exact code I’m using.

I’m using a custom write function that does similar to write.parquet but has some added functionality. Somehow my custom write function isn’t working correctly

 Is batchDF a static dataframe though?

Thanks

On Wed, Feb 5, 2020 at 6:13 PM Ruijing Li <[hidden email]> wrote:
Hi all,

I tried with forEachBatch but got an error. Is this expected?

Code is

df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) => 
batchDF.write.parquet(hdfsPath) 
}
.option(“checkPointLocation”, anotherHdfsPath)
.start()

Exception is: Queries with streaming sources must be executed with writeStream.start()

But I thought forEachBatch would treat the batchDF as a static dataframe?

Thanks,
RJ

On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta <[hidden email]> wrote:
Hi Burak,

I am not quite used to streaming, but was almost thinking on the same lines :) makes a lot of sense to me now.

Regards,
Gourav

On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <[hidden email]> wrote:
Do you really want to build all of that and open yourself to bugs when you can just use foreachBatch? Here are your options:

1. Build it yourself

// Read offsets from some store
prevOffsets = readOffsets()
latestOffsets = getOffsets()

df = spark.read.format("kafka").option("startOffsets", prevOffsets).option("endOffsets", latestOffsets).load()
batchLogic(df)

saveOffsets(latestOffsets)

2. Structured Streaming + Trigger.Once + foreachBatch

spark.readStream.format("kafka").load().writeStream.foreachBatch((df, batchId) => batchLogic(df)).trigger("once").start()

With Option (1), you're going to have to (re)solve:
 a) Tracking and consistency of offsets 
 b) Potential topic partition mismatches
 c) Offsets that may have aged out due to retention
 d) Re-execution of jobs and data consistency. What if your job fails as you're committing the offsets in the end, but the data was already stored? Will your getOffsets method return the same offsets?

I'd rather not solve problems that other people have solved for me, but ultimately the decision is yours to make.

Best,
Burak




On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <[hidden email]> wrote:
Thanks Anil, I think that’s the approach I will take.

Hi Burak,

That was a possibility to think about, but my team has custom dataframe writer functions we would like to use, unfortunately they were written for static dataframes in mind. I do see there is a ForEachBatch write mode but my thinking was at that point it was easier to read from kafka through batch mode.

Thanks,
RJ

On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <[hidden email]> wrote:
Hi Ruijing,

Why do you not want to use structured streaming here? This is exactly why structured streaming + Trigger.Once was built, just so that you don't build that solution yourself.
You also get exactly once semantics if you use the built in sinks.

Best,
Burak

On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <[hidden email]> wrote:
Hi Ruijing,

We did the below things to read Kafka in batch from spark:

1) Maintain the start offset (could be db, file etc)
2) Get the end offset dynamically when the job executes. 
3) Pass the start and end offsets 
4) Overwrite the start offset with the end offset. (Should be done post processing the data) 

Currently to make it work in batch mode, you need to maintain the state information of the offsets externally. 


Thanks
Anil

-Sent from my mobile
http://anilkulkarni.com/

On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <[hidden email]> wrote:
Hi all,

My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop the job. Given the dataframe has a partition and offset column, my first thought for offset management is to groupBy partition and agg the max offset, then store it in HDFS. Next time the job runs, it will read and start from this max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an offset and later decides to resend it, I will have skipped it since I’m starting from the max offset sent. How does spark structured streaming know to continue onwards - does it keep a state of all offsets seen? If so, how can I replicate this for batch without missing data? Any help would be appreciated.


--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Best way to read batch from Kafka and Offsets

Ruijing Li
Thought to update this thread. Figured out my issue with forEachBatch and structured streaming, I had an issue where I did a count() before write() so my streaming query branched into 2. I am now using Trigger and structured streaming to handle checkpointing instead of doing it myself. Thanks all for your help!

On Wed, Feb 5, 2020 at 7:07 PM Ruijing Li <[hidden email]> wrote:
Looks like I’m wrong, since I tried that exact snippet and it worked

So to be clear, in the part where I do batchDF.write.parquet, that is not the exact code I’m using.

I’m using a custom write function that does similar to write.parquet but has some added functionality. Somehow my custom write function isn’t working correctly

 Is batchDF a static dataframe though?

Thanks

On Wed, Feb 5, 2020 at 6:13 PM Ruijing Li <[hidden email]> wrote:
Hi all,

I tried with forEachBatch but got an error. Is this expected?

Code is

df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) => 
batchDF.write.parquet(hdfsPath) 
}
.option(“checkPointLocation”, anotherHdfsPath)
.start()

Exception is: Queries with streaming sources must be executed with writeStream.start()

But I thought forEachBatch would treat the batchDF as a static dataframe?

Thanks,
RJ

On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta <[hidden email]> wrote:
Hi Burak,

I am not quite used to streaming, but was almost thinking on the same lines :) makes a lot of sense to me now.

Regards,
Gourav

On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <[hidden email]> wrote:
Do you really want to build all of that and open yourself to bugs when you can just use foreachBatch? Here are your options:

1. Build it yourself

// Read offsets from some store
prevOffsets = readOffsets()
latestOffsets = getOffsets()

df = spark.read.format("kafka").option("startOffsets", prevOffsets).option("endOffsets", latestOffsets).load()
batchLogic(df)

saveOffsets(latestOffsets)

2. Structured Streaming + Trigger.Once + foreachBatch

spark.readStream.format("kafka").load().writeStream.foreachBatch((df, batchId) => batchLogic(df)).trigger("once").start()

With Option (1), you're going to have to (re)solve:
 a) Tracking and consistency of offsets 
 b) Potential topic partition mismatches
 c) Offsets that may have aged out due to retention
 d) Re-execution of jobs and data consistency. What if your job fails as you're committing the offsets in the end, but the data was already stored? Will your getOffsets method return the same offsets?

I'd rather not solve problems that other people have solved for me, but ultimately the decision is yours to make.

Best,
Burak




On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <[hidden email]> wrote:
Thanks Anil, I think that’s the approach I will take.

Hi Burak,

That was a possibility to think about, but my team has custom dataframe writer functions we would like to use, unfortunately they were written for static dataframes in mind. I do see there is a ForEachBatch write mode but my thinking was at that point it was easier to read from kafka through batch mode.

Thanks,
RJ

On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <[hidden email]> wrote:
Hi Ruijing,

Why do you not want to use structured streaming here? This is exactly why structured streaming + Trigger.Once was built, just so that you don't build that solution yourself.
You also get exactly once semantics if you use the built in sinks.

Best,
Burak

On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <[hidden email]> wrote:
Hi Ruijing,

We did the below things to read Kafka in batch from spark:

1) Maintain the start offset (could be db, file etc)
2) Get the end offset dynamically when the job executes. 
3) Pass the start and end offsets 
4) Overwrite the start offset with the end offset. (Should be done post processing the data) 

Currently to make it work in batch mode, you need to maintain the state information of the offsets externally. 


Thanks
Anil

-Sent from my mobile
http://anilkulkarni.com/

On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <[hidden email]> wrote:
Hi all,

My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop the job. Given the dataframe has a partition and offset column, my first thought for offset management is to groupBy partition and agg the max offset, then store it in HDFS. Next time the job runs, it will read and start from this max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an offset and later decides to resend it, I will have skipped it since I’m starting from the max offset sent. How does spark structured streaming know to continue onwards - does it keep a state of all offsets seen? If so, how can I replicate this for batch without missing data? Any help would be appreciated.


--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Best way to read batch from Kafka and Offsets

Burak Yavuz-2
Yay! Glad you could figure it out!

On Sat, Feb 15, 2020 at 7:41 AM Ruijing Li <[hidden email]> wrote:
Thought to update this thread. Figured out my issue with forEachBatch and structured streaming, I had an issue where I did a count() before write() so my streaming query branched into 2. I am now using Trigger and structured streaming to handle checkpointing instead of doing it myself. Thanks all for your help!

On Wed, Feb 5, 2020 at 7:07 PM Ruijing Li <[hidden email]> wrote:
Looks like I’m wrong, since I tried that exact snippet and it worked

So to be clear, in the part where I do batchDF.write.parquet, that is not the exact code I’m using.

I’m using a custom write function that does similar to write.parquet but has some added functionality. Somehow my custom write function isn’t working correctly

 Is batchDF a static dataframe though?

Thanks

On Wed, Feb 5, 2020 at 6:13 PM Ruijing Li <[hidden email]> wrote:
Hi all,

I tried with forEachBatch but got an error. Is this expected?

Code is

df.writeStream.trigger(Trigger.Once).forEachBatch { (batchDF, batchId) => 
batchDF.write.parquet(hdfsPath) 
}
.option(“checkPointLocation”, anotherHdfsPath)
.start()

Exception is: Queries with streaming sources must be executed with writeStream.start()

But I thought forEachBatch would treat the batchDF as a static dataframe?

Thanks,
RJ

On Wed, Feb 5, 2020 at 12:48 AM Gourav Sengupta <[hidden email]> wrote:
Hi Burak,

I am not quite used to streaming, but was almost thinking on the same lines :) makes a lot of sense to me now.

Regards,
Gourav

On Wed, Feb 5, 2020 at 1:00 AM Burak Yavuz <[hidden email]> wrote:
Do you really want to build all of that and open yourself to bugs when you can just use foreachBatch? Here are your options:

1. Build it yourself

// Read offsets from some store
prevOffsets = readOffsets()
latestOffsets = getOffsets()

df = spark.read.format("kafka").option("startOffsets", prevOffsets).option("endOffsets", latestOffsets).load()
batchLogic(df)

saveOffsets(latestOffsets)

2. Structured Streaming + Trigger.Once + foreachBatch

spark.readStream.format("kafka").load().writeStream.foreachBatch((df, batchId) => batchLogic(df)).trigger("once").start()

With Option (1), you're going to have to (re)solve:
 a) Tracking and consistency of offsets 
 b) Potential topic partition mismatches
 c) Offsets that may have aged out due to retention
 d) Re-execution of jobs and data consistency. What if your job fails as you're committing the offsets in the end, but the data was already stored? Will your getOffsets method return the same offsets?

I'd rather not solve problems that other people have solved for me, but ultimately the decision is yours to make.

Best,
Burak




On Tue, Feb 4, 2020 at 4:41 PM Ruijing Li <[hidden email]> wrote:
Thanks Anil, I think that’s the approach I will take.

Hi Burak,

That was a possibility to think about, but my team has custom dataframe writer functions we would like to use, unfortunately they were written for static dataframes in mind. I do see there is a ForEachBatch write mode but my thinking was at that point it was easier to read from kafka through batch mode.

Thanks,
RJ

On Tue, Feb 4, 2020 at 4:20 PM Burak Yavuz <[hidden email]> wrote:
Hi Ruijing,

Why do you not want to use structured streaming here? This is exactly why structured streaming + Trigger.Once was built, just so that you don't build that solution yourself.
You also get exactly once semantics if you use the built in sinks.

Best,
Burak

On Mon, Feb 3, 2020 at 3:15 PM Anil Kulkarni <[hidden email]> wrote:
Hi Ruijing,

We did the below things to read Kafka in batch from spark:

1) Maintain the start offset (could be db, file etc)
2) Get the end offset dynamically when the job executes. 
3) Pass the start and end offsets 
4) Overwrite the start offset with the end offset. (Should be done post processing the data) 

Currently to make it work in batch mode, you need to maintain the state information of the offsets externally. 


Thanks
Anil

-Sent from my mobile
http://anilkulkarni.com/

On Mon, Feb 3, 2020, 12:39 AM Ruijing Li <[hidden email]> wrote:
Hi all,

My use case is to read from single kafka topic using a batch spark sql job (not structured streaming ideally). I want this batch job every time it starts to get the last offset it stopped at, and start reading from there until it caught up to the latest offset, store the result and stop the job. Given the dataframe has a partition and offset column, my first thought for offset management is to groupBy partition and agg the max offset, then store it in HDFS. Next time the job runs, it will read and start from this max offset using startingOffsets

However, I was wondering if this will work. If the kafka producer failed an offset and later decides to resend it, I will have skipped it since I’m starting from the max offset sent. How does spark structured streaming know to continue onwards - does it keep a state of all offsets seen? If so, how can I replicate this for batch without missing data? Any help would be appreciated.


--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li