Spark Streaming Checkpointing

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Streaming Checkpointing

andraskolbert

Hi All,

I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct Streaming) running just fine.

I create a context in the following way:

ssc = StreamingContext(sc, 60) opts = 
{"metadata.broker.list":kafka_hosts,
"auto.offset.reset": "largest",         
"group.id": run_type}
kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
kvs.checkpoint(120) 

lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app) ssc.checkpoint(checkpoint)

The streaming app at a high level does this:

  • processes incoming batch
  • unions to the dataframe from the previous batch and aggregates them

Currently, I use checkpointing explicitly (df = df.checkpoint()) to optimise the lineage. Although this is quite an expensive exercise and was wondering if there is a better way to do this.

I tried to disable this explicit checkpointing, as I have a periodical checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will be kept to that checkpointed RDD. Although in reality that is not the case and processing keeps increasing over time.

Am I doing something inherently wrong? Is there a better way of doing this?

Thanks
Andras

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming Checkpointing

Gabor Somogyi
Hi Andras,

A general suggestion is to use Structured Streaming instead of DStreams because it provides several things out of the box (stateful streaming, etc...).
Kafka 0.8 is super old and deprecated (no security...). Do you have a specific reason to use that?

BR,
G


On Thu, Sep 3, 2020 at 11:41 AM András Kolbert <[hidden email]> wrote:

Hi All,

I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct Streaming) running just fine.

I create a context in the following way:

ssc = StreamingContext(sc, 60) opts = 
{"metadata.broker.list":kafka_hosts,
"auto.offset.reset": "largest",         
"group.id": run_type}
kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
kvs.checkpoint(120) 

lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app) ssc.checkpoint(checkpoint)

The streaming app at a high level does this:

  • processes incoming batch
  • unions to the dataframe from the previous batch and aggregates them

Currently, I use checkpointing explicitly (df = df.checkpoint()) to optimise the lineage. Although this is quite an expensive exercise and was wondering if there is a better way to do this.

I tried to disable this explicit checkpointing, as I have a periodical checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will be kept to that checkpointed RDD. Although in reality that is not the case and processing keeps increasing over time.

Am I doing something inherently wrong? Is there a better way of doing this?

Thanks
Andras

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming Checkpointing

andraskolbert
Hi Gábor,


Thanks for your reply on this!

Internally that's used at the company I work at - it hasn't been changed mainly due to the compatibility of the current deployed java applications.

Hence I am attempting to make the most of this version :)

András



On Fri, 4 Sep 2020, 14:09 Gabor Somogyi, <[hidden email]> wrote:
Hi Andras,

A general suggestion is to use Structured Streaming instead of DStreams because it provides several things out of the box (stateful streaming, etc...).
Kafka 0.8 is super old and deprecated (no security...). Do you have a specific reason to use that?

BR,
G


On Thu, Sep 3, 2020 at 11:41 AM András Kolbert <[hidden email]> wrote:

Hi All,

I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct Streaming) running just fine.

I create a context in the following way:

ssc = StreamingContext(sc, 60) opts = 
{"metadata.broker.list":kafka_hosts,
"auto.offset.reset": "largest",         
"group.id": run_type}
kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
kvs.checkpoint(120) 

lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app) ssc.checkpoint(checkpoint)

The streaming app at a high level does this:

  • processes incoming batch
  • unions to the dataframe from the previous batch and aggregates them

Currently, I use checkpointing explicitly (df = df.checkpoint()) to optimise the lineage. Although this is quite an expensive exercise and was wondering if there is a better way to do this.

I tried to disable this explicit checkpointing, as I have a periodical checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will be kept to that checkpointed RDD. Although in reality that is not the case and processing keeps increasing over time.

Am I doing something inherently wrong? Is there a better way of doing this?

Thanks
Andras