Spark structured streaming + offset management in kafka + kafka headers

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark structured streaming + offset management in kafka + kafka headers

AliGouta
Hello,

I was reading the spark docs about spark structured streaming, since we are thinking about updating our code base that today uses Dstreams, hence spark streaming. Also, one main reason for this change that we want to realize is that reading headers in kafka messages is only supported in spark structured streaming and not in Dstreams.

I was surprised to not see an obvious way to handle manually the offsets by committing the offsets to kafka. In spark streaming we used to do it with something similar to these lines of code:

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

And this works perfectly ! Especially, this works very nice in case of job failure/restart... I am wondering how this can be achieved in spark structured streaming ? 

I read about checkpoints, and this reminds me the old way of doing things in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to commit offsets by ourselves.

Did I miss anything ? What would be the best way of committing offsets to kafka with spark structured streaming to the concerned consumer group ?

Best regards,
Ali Gouta.
Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming + offset management in kafka + kafka headers

Gabor Somogyi
There is no way to store offsets in Kafka and restart from the stored offset. Structured Streaming stores offset in checkpoint and it restart from there without any user code.

Offsets can be stored with a listener but it can be only used for lag calculation.

BR,
G


On Sat, 3 Apr 2021, 21:09 Ali Gouta, <[hidden email]> wrote:
Hello,

I was reading the spark docs about spark structured streaming, since we are thinking about updating our code base that today uses Dstreams, hence spark streaming. Also, one main reason for this change that we want to realize is that reading headers in kafka messages is only supported in spark structured streaming and not in Dstreams.

I was surprised to not see an obvious way to handle manually the offsets by committing the offsets to kafka. In spark streaming we used to do it with something similar to these lines of code:

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

And this works perfectly ! Especially, this works very nice in case of job failure/restart... I am wondering how this can be achieved in spark structured streaming ? 

I read about checkpoints, and this reminds me the old way of doing things in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to commit offsets by ourselves.

Did I miss anything ? What would be the best way of committing offsets to kafka with spark structured streaming to the concerned consumer group ?

Best regards,
Ali Gouta.
Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming + offset management in kafka + kafka headers

Jacek Laskowski
Hi,

Just to add it to Gabor's excellent answer that checkpointing and offsets are infrastructure-related and should not really be in the hands of Spark devs who should instead focus on the business purpose of the code (not offsets that are very low-level and not really important).

BTW That's what happens in Kafka Streams too

On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <[hidden email]> wrote:
There is no way to store offsets in Kafka and restart from the stored offset. Structured Streaming stores offset in checkpoint and it restart from there without any user code.

Offsets can be stored with a listener but it can be only used for lag calculation.

BR,
G


On Sat, 3 Apr 2021, 21:09 Ali Gouta, <[hidden email]> wrote:
Hello,

I was reading the spark docs about spark structured streaming, since we are thinking about updating our code base that today uses Dstreams, hence spark streaming. Also, one main reason for this change that we want to realize is that reading headers in kafka messages is only supported in spark structured streaming and not in Dstreams.

I was surprised to not see an obvious way to handle manually the offsets by committing the offsets to kafka. In spark streaming we used to do it with something similar to these lines of code:

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

And this works perfectly ! Especially, this works very nice in case of job failure/restart... I am wondering how this can be achieved in spark structured streaming ? 

I read about checkpoints, and this reminds me the old way of doing things in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to commit offsets by ourselves.

Did I miss anything ? What would be the best way of committing offsets to kafka with spark structured streaming to the concerned consumer group ?

Best regards,
Ali Gouta.
Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming + offset management in kafka + kafka headers

AliGouta
Thank you guys for your answers, I will dig more this new way of doing things and why not consider leaving the old Dstreams and use instead structured streaming. Hope that strucrured streaming + spark on Kubernetes works well and the combination is production ready.

Best regards,
Ali Gouta.

Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <[hidden email]> a écrit :
Hi,

Just to add it to Gabor's excellent answer that checkpointing and offsets are infrastructure-related and should not really be in the hands of Spark devs who should instead focus on the business purpose of the code (not offsets that are very low-level and not really important).

BTW That's what happens in Kafka Streams too

On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <[hidden email]> wrote:
There is no way to store offsets in Kafka and restart from the stored offset. Structured Streaming stores offset in checkpoint and it restart from there without any user code.

Offsets can be stored with a listener but it can be only used for lag calculation.

BR,
G


On Sat, 3 Apr 2021, 21:09 Ali Gouta, <[hidden email]> wrote:
Hello,

I was reading the spark docs about spark structured streaming, since we are thinking about updating our code base that today uses Dstreams, hence spark streaming. Also, one main reason for this change that we want to realize is that reading headers in kafka messages is only supported in spark structured streaming and not in Dstreams.

I was surprised to not see an obvious way to handle manually the offsets by committing the offsets to kafka. In spark streaming we used to do it with something similar to these lines of code:

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

And this works perfectly ! Especially, this works very nice in case of job failure/restart... I am wondering how this can be achieved in spark structured streaming ? 

I read about checkpoints, and this reminds me the old way of doing things in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to commit offsets by ourselves.

Did I miss anything ? What would be the best way of committing offsets to kafka with spark structured streaming to the concerned consumer group ?

Best regards,
Ali Gouta.
Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming + offset management in kafka + kafka headers

Mich Talebzadeh

Hi Ali,


On a practical side, I have used both the old DStreams and the newer Spark structured streaming (SSS).


SSS does a good job at micro-batch level in the form of


foreachBatch(SendToSink)


 "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToSink function. foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch. Using foreachBatch, we write each micro batch eventually to storage defined in our custom logic. In this case, we store the output of our streaming application to Redis or Google BigQuery table or any other sink

 

In Dstream world you would have done something like below


    // Work on every Stream

    dstream.foreachRDD

    { pricesRDD =>

      if (!pricesRDD.isEmpty)  // data exists in RDD

      {


and after some work from that RDD you would have created a DF (df)

With regard to SSS, it allows you to use the passed DataFrame for your work. However, say in my case if you were interested in individual rows of micro-batch (say different collection of prices for different tickers (securities), you could create RDD from the dataframe

for row in df.rdd.collect():
    ticker = row.ticker
    price = row.price
 

With regard to foreach(process_row), I have not really tried it as we don't have a use case for it, so I assume your mileage varies as usual.


HTH

  

   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Apr 2021 at 16:27, Ali Gouta <[hidden email]> wrote:
Thank you guys for your answers, I will dig more this new way of doing things and why not consider leaving the old Dstreams and use instead structured streaming. Hope that strucrured streaming + spark on Kubernetes works well and the combination is production ready.

Best regards,
Ali Gouta.

Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <[hidden email]> a écrit :
Hi,

Just to add it to Gabor's excellent answer that checkpointing and offsets are infrastructure-related and should not really be in the hands of Spark devs who should instead focus on the business purpose of the code (not offsets that are very low-level and not really important).

BTW That's what happens in Kafka Streams too

On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <[hidden email]> wrote:
There is no way to store offsets in Kafka and restart from the stored offset. Structured Streaming stores offset in checkpoint and it restart from there without any user code.

Offsets can be stored with a listener but it can be only used for lag calculation.

BR,
G


On Sat, 3 Apr 2021, 21:09 Ali Gouta, <[hidden email]> wrote:
Hello,

I was reading the spark docs about spark structured streaming, since we are thinking about updating our code base that today uses Dstreams, hence spark streaming. Also, one main reason for this change that we want to realize is that reading headers in kafka messages is only supported in spark structured streaming and not in Dstreams.

I was surprised to not see an obvious way to handle manually the offsets by committing the offsets to kafka. In spark streaming we used to do it with something similar to these lines of code:

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

And this works perfectly ! Especially, this works very nice in case of job failure/restart... I am wondering how this can be achieved in spark structured streaming ? 

I read about checkpoints, and this reminds me the old way of doing things in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to commit offsets by ourselves.

Did I miss anything ? What would be the best way of committing offsets to kafka with spark structured streaming to the concerned consumer group ?

Best regards,
Ali Gouta.
Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming + offset management in kafka + kafka headers

AliGouta
Great, so SSS provides also an api that allows handling RDDs through dataFrames using foreachBatch. Still that I am not sure this is a good practice in general right ? Well, it depends on the use case in any way.

Thank you so much for the hints !

Best regards,
Ali Gouta.

On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh <[hidden email]> wrote:

Hi Ali,


On a practical side, I have used both the old DStreams and the newer Spark structured streaming (SSS).


SSS does a good job at micro-batch level in the form of


foreachBatch(SendToSink)


 "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToSink function. foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch. Using foreachBatch, we write each micro batch eventually to storage defined in our custom logic. In this case, we store the output of our streaming application to Redis or Google BigQuery table or any other sink

 

In Dstream world you would have done something like below


    // Work on every Stream

    dstream.foreachRDD

    { pricesRDD =>

      if (!pricesRDD.isEmpty)  // data exists in RDD

      {


and after some work from that RDD you would have created a DF (df)

With regard to SSS, it allows you to use the passed DataFrame for your work. However, say in my case if you were interested in individual rows of micro-batch (say different collection of prices for different tickers (securities), you could create RDD from the dataframe

for row in df.rdd.collect():
    ticker = row.ticker
    price = row.price
 

With regard to foreach(process_row), I have not really tried it as we don't have a use case for it, so I assume your mileage varies as usual.


HTH

  

   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Apr 2021 at 16:27, Ali Gouta <[hidden email]> wrote:
Thank you guys for your answers, I will dig more this new way of doing things and why not consider leaving the old Dstreams and use instead structured streaming. Hope that strucrured streaming + spark on Kubernetes works well and the combination is production ready.

Best regards,
Ali Gouta.

Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <[hidden email]> a écrit :
Hi,

Just to add it to Gabor's excellent answer that checkpointing and offsets are infrastructure-related and should not really be in the hands of Spark devs who should instead focus on the business purpose of the code (not offsets that are very low-level and not really important).

BTW That's what happens in Kafka Streams too

On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <[hidden email]> wrote:
There is no way to store offsets in Kafka and restart from the stored offset. Structured Streaming stores offset in checkpoint and it restart from there without any user code.

Offsets can be stored with a listener but it can be only used for lag calculation.

BR,
G


On Sat, 3 Apr 2021, 21:09 Ali Gouta, <[hidden email]> wrote:
Hello,

I was reading the spark docs about spark structured streaming, since we are thinking about updating our code base that today uses Dstreams, hence spark streaming. Also, one main reason for this change that we want to realize is that reading headers in kafka messages is only supported in spark structured streaming and not in Dstreams.

I was surprised to not see an obvious way to handle manually the offsets by committing the offsets to kafka. In spark streaming we used to do it with something similar to these lines of code:

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

And this works perfectly ! Especially, this works very nice in case of job failure/restart... I am wondering how this can be achieved in spark structured streaming ? 

I read about checkpoints, and this reminds me the old way of doing things in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to commit offsets by ourselves.

Did I miss anything ? What would be the best way of committing offsets to kafka with spark structured streaming to the concerned consumer group ?

Best regards,
Ali Gouta.
Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming + offset management in kafka + kafka headers

Mich Talebzadeh
Hi Ali,

The old saying of one experiment is worth a hundred hypotheses, still stands.

As per Test driven approach have a go at it and see what comes out. Forum members including myself have reported on SSS in Spark user group, so you are at home on this.

HTH,
  



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Apr 2021 at 17:28, Ali Gouta <[hidden email]> wrote:
Great, so SSS provides also an api that allows handling RDDs through dataFrames using foreachBatch. Still that I am not sure this is a good practice in general right ? Well, it depends on the use case in any way.

Thank you so much for the hints !

Best regards,
Ali Gouta.

On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh <[hidden email]> wrote:

Hi Ali,


On a practical side, I have used both the old DStreams and the newer Spark structured streaming (SSS).


SSS does a good job at micro-batch level in the form of


foreachBatch(SendToSink)


 "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToSink function. foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch. Using foreachBatch, we write each micro batch eventually to storage defined in our custom logic. In this case, we store the output of our streaming application to Redis or Google BigQuery table or any other sink

 

In Dstream world you would have done something like below


    // Work on every Stream

    dstream.foreachRDD

    { pricesRDD =>

      if (!pricesRDD.isEmpty)  // data exists in RDD

      {


and after some work from that RDD you would have created a DF (df)

With regard to SSS, it allows you to use the passed DataFrame for your work. However, say in my case if you were interested in individual rows of micro-batch (say different collection of prices for different tickers (securities), you could create RDD from the dataframe

for row in df.rdd.collect():
    ticker = row.ticker
    price = row.price
 

With regard to foreach(process_row), I have not really tried it as we don't have a use case for it, so I assume your mileage varies as usual.


HTH

  

   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Apr 2021 at 16:27, Ali Gouta <[hidden email]> wrote:
Thank you guys for your answers, I will dig more this new way of doing things and why not consider leaving the old Dstreams and use instead structured streaming. Hope that strucrured streaming + spark on Kubernetes works well and the combination is production ready.

Best regards,
Ali Gouta.

Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <[hidden email]> a écrit :
Hi,

Just to add it to Gabor's excellent answer that checkpointing and offsets are infrastructure-related and should not really be in the hands of Spark devs who should instead focus on the business purpose of the code (not offsets that are very low-level and not really important).

BTW That's what happens in Kafka Streams too

On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <[hidden email]> wrote:
There is no way to store offsets in Kafka and restart from the stored offset. Structured Streaming stores offset in checkpoint and it restart from there without any user code.

Offsets can be stored with a listener but it can be only used for lag calculation.

BR,
G


On Sat, 3 Apr 2021, 21:09 Ali Gouta, <[hidden email]> wrote:
Hello,

I was reading the spark docs about spark structured streaming, since we are thinking about updating our code base that today uses Dstreams, hence spark streaming. Also, one main reason for this change that we want to realize is that reading headers in kafka messages is only supported in spark structured streaming and not in Dstreams.

I was surprised to not see an obvious way to handle manually the offsets by committing the offsets to kafka. In spark streaming we used to do it with something similar to these lines of code:

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

And this works perfectly ! Especially, this works very nice in case of job failure/restart... I am wondering how this can be achieved in spark structured streaming ? 

I read about checkpoints, and this reminds me the old way of doing things in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to commit offsets by ourselves.

Did I miss anything ? What would be the best way of committing offsets to kafka with spark structured streaming to the concerned consumer group ?

Best regards,
Ali Gouta.
Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming + offset management in kafka + kafka headers

AliGouta
Thanks Mich !

Ali Gouta.

On Sun, Apr 4, 2021 at 6:44 PM Mich Talebzadeh <[hidden email]> wrote:
Hi Ali,

The old saying of one experiment is worth a hundred hypotheses, still stands.

As per Test driven approach have a go at it and see what comes out. Forum members including myself have reported on SSS in Spark user group, so you are at home on this.

HTH,
  



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Apr 2021 at 17:28, Ali Gouta <[hidden email]> wrote:
Great, so SSS provides also an api that allows handling RDDs through dataFrames using foreachBatch. Still that I am not sure this is a good practice in general right ? Well, it depends on the use case in any way.

Thank you so much for the hints !

Best regards,
Ali Gouta.

On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh <[hidden email]> wrote:

Hi Ali,


On a practical side, I have used both the old DStreams and the newer Spark structured streaming (SSS).


SSS does a good job at micro-batch level in the form of


foreachBatch(SendToSink)


 "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToSink function. foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch. Using foreachBatch, we write each micro batch eventually to storage defined in our custom logic. In this case, we store the output of our streaming application to Redis or Google BigQuery table or any other sink

 

In Dstream world you would have done something like below


    // Work on every Stream

    dstream.foreachRDD

    { pricesRDD =>

      if (!pricesRDD.isEmpty)  // data exists in RDD

      {


and after some work from that RDD you would have created a DF (df)

With regard to SSS, it allows you to use the passed DataFrame for your work. However, say in my case if you were interested in individual rows of micro-batch (say different collection of prices for different tickers (securities), you could create RDD from the dataframe

for row in df.rdd.collect():
    ticker = row.ticker
    price = row.price
 

With regard to foreach(process_row), I have not really tried it as we don't have a use case for it, so I assume your mileage varies as usual.


HTH

  

   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Apr 2021 at 16:27, Ali Gouta <[hidden email]> wrote:
Thank you guys for your answers, I will dig more this new way of doing things and why not consider leaving the old Dstreams and use instead structured streaming. Hope that strucrured streaming + spark on Kubernetes works well and the combination is production ready.

Best regards,
Ali Gouta.

Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <[hidden email]> a écrit :
Hi,

Just to add it to Gabor's excellent answer that checkpointing and offsets are infrastructure-related and should not really be in the hands of Spark devs who should instead focus on the business purpose of the code (not offsets that are very low-level and not really important).

BTW That's what happens in Kafka Streams too

On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <[hidden email]> wrote:
There is no way to store offsets in Kafka and restart from the stored offset. Structured Streaming stores offset in checkpoint and it restart from there without any user code.

Offsets can be stored with a listener but it can be only used for lag calculation.

BR,
G


On Sat, 3 Apr 2021, 21:09 Ali Gouta, <[hidden email]> wrote:
Hello,

I was reading the spark docs about spark structured streaming, since we are thinking about updating our code base that today uses Dstreams, hence spark streaming. Also, one main reason for this change that we want to realize is that reading headers in kafka messages is only supported in spark structured streaming and not in Dstreams.

I was surprised to not see an obvious way to handle manually the offsets by committing the offsets to kafka. In spark streaming we used to do it with something similar to these lines of code:

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

And this works perfectly ! Especially, this works very nice in case of job failure/restart... I am wondering how this can be achieved in spark structured streaming ? 

I read about checkpoints, and this reminds me the old way of doing things in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to commit offsets by ourselves.

Did I miss anything ? What would be the best way of committing offsets to kafka with spark structured streaming to the concerned consumer group ?

Best regards,
Ali Gouta.
Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming + offset management in kafka + kafka headers

Gabor Somogyi
Just to be crystal clear Dstreams will be deprecated sooner or later and there will be no support so highly advised to migrate...

G


On Sun, 4 Apr 2021, 19:23 Ali Gouta, <[hidden email]> wrote:
Thanks Mich !

Ali Gouta.

On Sun, Apr 4, 2021 at 6:44 PM Mich Talebzadeh <[hidden email]> wrote:
Hi Ali,

The old saying of one experiment is worth a hundred hypotheses, still stands.

As per Test driven approach have a go at it and see what comes out. Forum members including myself have reported on SSS in Spark user group, so you are at home on this.

HTH,
  



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Apr 2021 at 17:28, Ali Gouta <[hidden email]> wrote:
Great, so SSS provides also an api that allows handling RDDs through dataFrames using foreachBatch. Still that I am not sure this is a good practice in general right ? Well, it depends on the use case in any way.

Thank you so much for the hints !

Best regards,
Ali Gouta.

On Sun, Apr 4, 2021 at 6:11 PM Mich Talebzadeh <[hidden email]> wrote:

Hi Ali,


On a practical side, I have used both the old DStreams and the newer Spark structured streaming (SSS).


SSS does a good job at micro-batch level in the form of


foreachBatch(SendToSink)


 "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToSink function. foreachBatch(SendToSink) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch. Using foreachBatch, we write each micro batch eventually to storage defined in our custom logic. In this case, we store the output of our streaming application to Redis or Google BigQuery table or any other sink

 

In Dstream world you would have done something like below


    // Work on every Stream

    dstream.foreachRDD

    { pricesRDD =>

      if (!pricesRDD.isEmpty)  // data exists in RDD

      {


and after some work from that RDD you would have created a DF (df)

With regard to SSS, it allows you to use the passed DataFrame for your work. However, say in my case if you were interested in individual rows of micro-batch (say different collection of prices for different tickers (securities), you could create RDD from the dataframe

for row in df.rdd.collect():
    ticker = row.ticker
    price = row.price
 

With regard to foreach(process_row), I have not really tried it as we don't have a use case for it, so I assume your mileage varies as usual.


HTH

  

   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Apr 2021 at 16:27, Ali Gouta <[hidden email]> wrote:
Thank you guys for your answers, I will dig more this new way of doing things and why not consider leaving the old Dstreams and use instead structured streaming. Hope that strucrured streaming + spark on Kubernetes works well and the combination is production ready.

Best regards,
Ali Gouta.

Le dim. 4 avr. 2021 à 12:52, Jacek Laskowski <[hidden email]> a écrit :
Hi,

Just to add it to Gabor's excellent answer that checkpointing and offsets are infrastructure-related and should not really be in the hands of Spark devs who should instead focus on the business purpose of the code (not offsets that are very low-level and not really important).

BTW That's what happens in Kafka Streams too

On Sun, Apr 4, 2021 at 12:28 PM Gabor Somogyi <[hidden email]> wrote:
There is no way to store offsets in Kafka and restart from the stored offset. Structured Streaming stores offset in checkpoint and it restart from there without any user code.

Offsets can be stored with a listener but it can be only used for lag calculation.

BR,
G


On Sat, 3 Apr 2021, 21:09 Ali Gouta, <[hidden email]> wrote:
Hello,

I was reading the spark docs about spark structured streaming, since we are thinking about updating our code base that today uses Dstreams, hence spark streaming. Also, one main reason for this change that we want to realize is that reading headers in kafka messages is only supported in spark structured streaming and not in Dstreams.

I was surprised to not see an obvious way to handle manually the offsets by committing the offsets to kafka. In spark streaming we used to do it with something similar to these lines of code:

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

And this works perfectly ! Especially, this works very nice in case of job failure/restart... I am wondering how this can be achieved in spark structured streaming ? 

I read about checkpoints, and this reminds me the old way of doing things in spark 1.5/kafka0.8 and is not perfect since we are not deciding when to commit offsets by ourselves.

Did I miss anything ? What would be the best way of committing offsets to kafka with spark structured streaming to the concerned consumer group ?

Best regards,
Ali Gouta.