Structured Streaming Microbatch Semantics

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming Microbatch Semantics

Rico B.
Hi all!

I'm using Spark structured streaming for a data ingestion pipeline.
Basically the pipeline reads events (notifications of new available
data) from a Kafka topic and then queries a REST endpoint to get the
real data (within a flatMap).

For one single event the pipeline creates a few thousand records (rows)
that have to be stored. And to write the data I use foreachBatch().

My question is now: Is it guaranteed by Spark that all output records of
one event are always contained in a single batch or can the records also
be split into multiple batches?


Best,

Rico.


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Microbatch Semantics

Roland Johann
Hi Rico,

there is no way to deferr records from one micro batch to the next one. So it‘s guaranteed that the data and trigger event will be processed within the dame batch.

I assume that one trigger event lead to an unknown batch size of actual events pulled via HTTP. This bypasses throughput properties of spark streaming. Depending on the amount of the resulting HTTP records, maybe you consider splitting the pipeline into two parts:
- process trigger event, pull data from HTTP, write to kafka
- perform structured streaming ingestion

Kind regards

Dipl.-Inf. Rico Bergmann <[hidden email]> schrieb am Fr. 5. März 2021 um 09:06:
Hi all!

I'm using Spark structured streaming for a data ingestion pipeline.
Basically the pipeline reads events (notifications of new available
data) from a Kafka topic and then queries a REST endpoint to get the
real data (within a flatMap).

For one single event the pipeline creates a few thousand records (rows)
that have to be stored. And to write the data I use foreachBatch().

My question is now: Is it guaranteed by Spark that all output records of
one event are always contained in a single batch or can the records also
be split into multiple batches?


Best,

Rico.


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

--
Roland Johann
Data Architect/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: [hidden email]
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Microbatch Semantics

Mich Talebzadeh
In reply to this post by Rico B.
Hi Ricco,

Just to clarify, your batch interval  may have a variable number of rows sent to Kafka topic for each event?

In your writeStream code

                   writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     foreachBatch(SendToBigQuery). \
                     trigger(processingTime='2 seconds'). \
                     start()


Have you defined trigger(processingTime)? That is equivalent to your sliding interval.

In general, processingTime == bath interval (the event).

In Spark GUI, under Structured streaming, you have Input Rate, Process Rate and Batch Duration. Your process Rate has to be below Batch Duration. ForeachBatch will process all the data come in before moving to the next batch. It is up to the designer to ensure that the processing time is below the event so Spark can process it. 

HTH


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 5 Mar 2021 at 08:06, Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:
Hi all!

I'm using Spark structured streaming for a data ingestion pipeline.
Basically the pipeline reads events (notifications of new available
data) from a Kafka topic and then queries a REST endpoint to get the
real data (within a flatMap).

For one single event the pipeline creates a few thousand records (rows)
that have to be stored. And to write the data I use foreachBatch().

My question is now: Is it guaranteed by Spark that all output records of
one event are always contained in a single batch or can the records also
be split into multiple batches?


Best,

Rico.


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Microbatch Semantics

Rico B.

Hi!

As abstract code what I do in my streaming program is:

readStream() //from Kafka

.flatMap(readIngestionDatasetViaREST) //can return thousands of records for a single event

.writeStream.outputMode("append").foreachBatch(upsertIntoDeltaTable).start()


I don't use triggers but I limit the number of events per trigger in the Kafka reader.


What do you mean with process rate below batch duration? The process rate is records per sec. (in my current deployment it's approx. 1), batch duration is sec. (at around 60 sec.)


Best,

Rico

Am 05.03.2021 um 10:58 schrieb Mich Talebzadeh:
Hi Ricco,

Just to clarify, your batch interval  may have a variable number of rows sent to Kafka topic for each event?

In your writeStream code

                   writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     foreachBatch(SendToBigQuery). \
                     trigger(processingTime='2 seconds'). \
                     start()


Have you defined trigger(processingTime)? That is equivalent to your sliding interval.

In general, processingTime == bath interval (the event).

In Spark GUI, under Structured streaming, you have Input Rate, Process Rate and Batch Duration. Your process Rate has to be below Batch Duration. ForeachBatch will process all the data come in before moving to the next batch. It is up to the designer to ensure that the processing time is below the event so Spark can process it. 

HTH


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 5 Mar 2021 at 08:06, Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:
Hi all!

I'm using Spark structured streaming for a data ingestion pipeline.
Basically the pipeline reads events (notifications of new available
data) from a Kafka topic and then queries a REST endpoint to get the
real data (within a flatMap).

For one single event the pipeline creates a few thousand records (rows)
that have to be stored. And to write the data I use foreachBatch().

My question is now: Is it guaranteed by Spark that all output records of
one event are always contained in a single batch or can the records also
be split into multiple batches?


Best,

Rico.


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Microbatch Semantics

Rico B.
In reply to this post by Roland Johann

Hi!

Thanks for your reply!

For several reasons we don't want to "pipe" the real data through Kafka.

What may be a problem arising from this approach?

Best,

Rico.


Am 05.03.2021 um 09:18 schrieb Roland Johann:
Hi Rico,

there is no way to deferr records from one micro batch to the next one. So it‘s guaranteed that the data and trigger event will be processed within the dame batch.

I assume that one trigger event lead to an unknown batch size of actual events pulled via HTTP. This bypasses throughput properties of spark streaming. Depending on the amount of the resulting HTTP records, maybe you consider splitting the pipeline into two parts:
- process trigger event, pull data from HTTP, write to kafka
- perform structured streaming ingestion

Kind regards

Dipl.-Inf. Rico Bergmann <[hidden email]> schrieb am Fr. 5. März 2021 um 09:06:
Hi all!

I'm using Spark structured streaming for a data ingestion pipeline.
Basically the pipeline reads events (notifications of new available
data) from a Kafka topic and then queries a REST endpoint to get the
real data (within a flatMap).

For one single event the pipeline creates a few thousand records (rows)
that have to be stored. And to write the data I use foreachBatch().

My question is now: Is it guaranteed by Spark that all output records of
one event are always contained in a single batch or can the records also
be split into multiple batches?


Best,

Rico.


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

--
Roland Johann
Data Architect/Data Engineer

phenetic GmbH
Lütticher Straße 10, 50674 Köln, Germany

Mobil: +49 172 365 26 46
Mail: [hidden email]
Web: phenetic.io

Handelsregister: Amtsgericht Köln (HRB 92595)
Geschäftsführer: Roland Johann, Uwe Reimann
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Microbatch Semantics

Mich Talebzadeh
In reply to this post by Rico B.
Hi Rico,

Would it be possible for you to provide a snapshot of Structured Streaming Tab (from Spark GUI) if possible?

Thanks


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 5 Mar 2021 at 13:44, Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:

Hi!

As abstract code what I do in my streaming program is:

readStream() //from Kafka

.flatMap(readIngestionDatasetViaREST) //can return thousands of records for a single event

.writeStream.outputMode("append").foreachBatch(upsertIntoDeltaTable).start()


I don't use triggers but I limit the number of events per trigger in the Kafka reader.


What do you mean with process rate below batch duration? The process rate is records per sec. (in my current deployment it's approx. 1), batch duration is sec. (at around 60 sec.)


Best,

Rico

Am 05.03.2021 um 10:58 schrieb Mich Talebzadeh:
Hi Ricco,

Just to clarify, your batch interval  may have a variable number of rows sent to Kafka topic for each event?

In your writeStream code

                   writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     foreachBatch(SendToBigQuery). \
                     trigger(processingTime='2 seconds'). \
                     start()


Have you defined trigger(processingTime)? That is equivalent to your sliding interval.

In general, processingTime == bath interval (the event).

In Spark GUI, under Structured streaming, you have Input Rate, Process Rate and Batch Duration. Your process Rate has to be below Batch Duration. ForeachBatch will process all the data come in before moving to the next batch. It is up to the designer to ensure that the processing time is below the event so Spark can process it. 

HTH


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 5 Mar 2021 at 08:06, Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:
Hi all!

I'm using Spark structured streaming for a data ingestion pipeline.
Basically the pipeline reads events (notifications of new available
data) from a Kafka topic and then queries a REST endpoint to get the
real data (within a flatMap).

For one single event the pipeline creates a few thousand records (rows)
that have to be stored. And to write the data I use foreachBatch().

My question is now: Is it guaranteed by Spark that all output records of
one event are always contained in a single batch or can the records also
be split into multiple batches?


Best,

Rico.


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Microbatch Semantics

Rico B.

Hi Mich!

Here's a screenshot of the processing rates.


Best,

Rico.


Am 05.03.2021 um 16:07 schrieb Mich Talebzadeh:
Hi Rico,

Would it be possible for you to provide a snapshot of Structured Streaming Tab (from Spark GUI) if possible?

Thanks


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 5 Mar 2021 at 13:44, Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:

Hi!

As abstract code what I do in my streaming program is:

readStream() //from Kafka

.flatMap(readIngestionDatasetViaREST) //can return thousands of records for a single event

.writeStream.outputMode("append").foreachBatch(upsertIntoDeltaTable).start()


I don't use triggers but I limit the number of events per trigger in the Kafka reader.


What do you mean with process rate below batch duration? The process rate is records per sec. (in my current deployment it's approx. 1), batch duration is sec. (at around 60 sec.)


Best,

Rico

Am 05.03.2021 um 10:58 schrieb Mich Talebzadeh:
Hi Ricco,

Just to clarify, your batch interval  may have a variable number of rows sent to Kafka topic for each event?

In your writeStream code

                   writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     foreachBatch(SendToBigQuery). \
                     trigger(processingTime='2 seconds'). \
                     start()


Have you defined trigger(processingTime)? That is equivalent to your sliding interval.

In general, processingTime == bath interval (the event).

In Spark GUI, under Structured streaming, you have Input Rate, Process Rate and Batch Duration. Your process Rate has to be below Batch Duration. ForeachBatch will process all the data come in before moving to the next batch. It is up to the designer to ensure that the processing time is below the event so Spark can process it. 

HTH


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 5 Mar 2021 at 08:06, Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:
Hi all!

I'm using Spark structured streaming for a data ingestion pipeline.
Basically the pipeline reads events (notifications of new available
data) from a Kafka topic and then queries a REST endpoint to get the
real data (within a flatMap).

For one single event the pipeline creates a few thousand records (rows)
that have to be stored. And to write the data I use foreachBatch().

My question is now: Is it guaranteed by Spark that all output records of
one event are always contained in a single batch or can the records also
be split into multiple batches?


Best,

Rico.


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

SparkProcessingRates.jpg (134K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Microbatch Semantics

Mich Talebzadeh
Ok thanks for the diagram.

So you have ~ 30 seconds duration of each bach as in foreachBatch and 60 rows per batch

Back to your question:

"My question is now: Is it guaranteed by Spark that all output records of one event are always contained in a single batch or can the records also be split into multiple batches?"


ForeachBatch processes all records sent within that batch (these can vary but includes all that is not processed) until they are completed before the next batch starts. Batches are processed sequentially. There is no way foreachBatch to decide to process half rows etc. 


When you say an event you mean one batch correct? So yes a single event will be processed by one batch until it finishes.


I performed a test on it


In my method that processes each batch like:


                    foreachBatch(SendToBigQuery). \


That method SendToBigQuery will have two params df, batchId

def SendToBigQuery(df, batchId):

    """
        Below uses standard Spark-BigQuery API to write to the table
        Additional transformation logic will be performed here
    """
    print(batchId)
    if(len(df.take(1))) > 0:
        #df.printSchema()
        df. persist()
        print(df.count())
        # Write data to config['MDVariables']['targetTable']
        s.writeTableToBQ(df, "append", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
     

Note that I had not run this streaming job for a couple of days. So the first batch picked up all from where it was left (backlog):


{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}

[]

None

0      <-print(batchId)

1602277  <- print(df.count())

1

450

2

100

3

150

4

130

5

160

6

100

7

170

8

100

9

100

10

130


That first batch (1,602,227 rows) took a long time to be processed (write to Google BigQuery in cloud), which meant the next batch had 450 rows to deal with and so forth. Remember nothing is lost but in terms of processing it takes what it takes


I have attached a structured streaming page. note that the first batch dealing with 1,602,227  rows took 160 seconds to finish!


HTH


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 8 Mar 2021 at 10:24, Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:

Hi Mich!

Here's a screenshot of the processing rates.


Best,

Rico.


Am 05.03.2021 um 16:07 schrieb Mich Talebzadeh:
Hi Rico,

Would it be possible for you to provide a snapshot of Structured Streaming Tab (from Spark GUI) if possible?

Thanks


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 5 Mar 2021 at 13:44, Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:

Hi!

As abstract code what I do in my streaming program is:

readStream() //from Kafka

.flatMap(readIngestionDatasetViaREST) //can return thousands of records for a single event

.writeStream.outputMode("append").foreachBatch(upsertIntoDeltaTable).start()


I don't use triggers but I limit the number of events per trigger in the Kafka reader.


What do you mean with process rate below batch duration? The process rate is records per sec. (in my current deployment it's approx. 1), batch duration is sec. (at around 60 sec.)


Best,

Rico

Am 05.03.2021 um 10:58 schrieb Mich Talebzadeh:
Hi Ricco,

Just to clarify, your batch interval  may have a variable number of rows sent to Kafka topic for each event?

In your writeStream code

                   writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     foreachBatch(SendToBigQuery). \
                     trigger(processingTime='2 seconds'). \
                     start()


Have you defined trigger(processingTime)? That is equivalent to your sliding interval.

In general, processingTime == bath interval (the event).

In Spark GUI, under Structured streaming, you have Input Rate, Process Rate and Batch Duration. Your process Rate has to be below Batch Duration. ForeachBatch will process all the data come in before moving to the next batch. It is up to the designer to ensure that the processing time is below the event so Spark can process it. 

HTH


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 5 Mar 2021 at 08:06, Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:
Hi all!

I'm using Spark structured streaming for a data ingestion pipeline.
Basically the pipeline reads events (notifications of new available
data) from a Kafka topic and then queries a REST endpoint to get the
real data (within a flatMap).

For one single event the pipeline creates a few thousand records (rows)
that have to be stored. And to write the data I use foreachBatch().

My question is now: Is it guaranteed by Spark that all output records of
one event are always contained in a single batch or can the records also
be split into multiple batches?


Best,

Rico.


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

streaming.PNG (60K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Microbatch Semantics

Mich Talebzadeh
BTW what you pickup when you start the job depends on the setting in readStream:

               .option("startingOffsets", "latest") \
 
in my previous example I had it "earliest"

so setting it to "latest" will result in starting from the latest topic arrival as shown below


None
0
DataFrame is empty  <-- expected
1
10  <- that is the actual number of rows in batch I send to Kafka every 2 seconds

HTH

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 8 Mar 2021 at 12:07, Mich Talebzadeh <[hidden email]> wrote:
Ok thanks for the diagram.

So you have ~ 30 seconds duration of each bach as in foreachBatch and 60 rows per batch

Back to your question:

"My question is now: Is it guaranteed by Spark that all output records of one event are always contained in a single batch or can the records also be split into multiple batches?"


ForeachBatch processes all records sent within that batch (these can vary but includes all that is not processed) until they are completed before the next batch starts. Batches are processed sequentially. There is no way foreachBatch to decide to process half rows etc. 


When you say an event you mean one batch correct? So yes a single event will be processed by one batch until it finishes.


I performed a test on it


In my method that processes each batch like:


                    foreachBatch(SendToBigQuery). \


That method SendToBigQuery will have two params df, batchId

def SendToBigQuery(df, batchId):

    """
        Below uses standard Spark-BigQuery API to write to the table
        Additional transformation logic will be performed here
    """
    print(batchId)
    if(len(df.take(1))) > 0:
        #df.printSchema()
        df. persist()
        print(df.count())
        # Write data to config['MDVariables']['targetTable']
        s.writeTableToBQ(df, "append", config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
     

Note that I had not run this streaming job for a couple of days. So the first batch picked up all from where it was left (backlog):


{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}

[]

None

0      <-print(batchId)

1602277  <- print(df.count())

1

450

2

100

3

150

4

130

5

160

6

100

7

170

8

100

9

100

10

130


That first batch (1,602,227 rows) took a long time to be processed (write to Google BigQuery in cloud), which meant the next batch had 450 rows to deal with and so forth. Remember nothing is lost but in terms of processing it takes what it takes


I have attached a structured streaming page. note that the first batch dealing with 1,602,227  rows took 160 seconds to finish!


HTH


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Mon, 8 Mar 2021 at 10:24, Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:

Hi Mich!

Here's a screenshot of the processing rates.


Best,

Rico.


Am 05.03.2021 um 16:07 schrieb Mich Talebzadeh:
Hi Rico,

Would it be possible for you to provide a snapshot of Structured Streaming Tab (from Spark GUI) if possible?

Thanks


Mich


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 5 Mar 2021 at 13:44, Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:

Hi!

As abstract code what I do in my streaming program is:

readStream() //from Kafka

.flatMap(readIngestionDatasetViaREST) //can return thousands of records for a single event

.writeStream.outputMode("append").foreachBatch(upsertIntoDeltaTable).start()


I don't use triggers but I limit the number of events per trigger in the Kafka reader.


What do you mean with process rate below batch duration? The process rate is records per sec. (in my current deployment it's approx. 1), batch duration is sec. (at around 60 sec.)


Best,

Rico

Am 05.03.2021 um 10:58 schrieb Mich Talebzadeh:
Hi Ricco,

Just to clarify, your batch interval  may have a variable number of rows sent to Kafka topic for each event?

In your writeStream code

                   writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     foreachBatch(SendToBigQuery). \
                     trigger(processingTime='2 seconds'). \
                     start()


Have you defined trigger(processingTime)? That is equivalent to your sliding interval.

In general, processingTime == bath interval (the event).

In Spark GUI, under Structured streaming, you have Input Rate, Process Rate and Batch Duration. Your process Rate has to be below Batch Duration. ForeachBatch will process all the data come in before moving to the next batch. It is up to the designer to ensure that the processing time is below the event so Spark can process it. 

HTH


LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 5 Mar 2021 at 08:06, Dipl.-Inf. Rico Bergmann <[hidden email]> wrote:
Hi all!

I'm using Spark structured streaming for a data ingestion pipeline.
Basically the pipeline reads events (notifications of new available
data) from a Kafka topic and then queries a REST endpoint to get the
real data (within a flatMap).

For one single event the pipeline creates a few thousand records (rows)
that have to be stored. And to write the data I use foreachBatch().

My question is now: Is it guaranteed by Spark that all output records of
one event are always contained in a single batch or can the records also
be split into multiple batches?


Best,

Rico.


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]