Structured Streaming Dataframe Size

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming Dataframe Size

Nick Dawes
I have a quick newbie question. 

Spark Structured Streaming creates an unbounded dataframe that keeps appending rows to it. 

So what's the max size of data it can hold? What if the size becomes bigger than the JVM? Will it spill to disk? I'm using S3 as storage. So will it write temp data on S3 or on local file system of the cluster?

Nick
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Dataframe Size

Tathagata Das
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts

Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).


On Tue, Aug 27, 2019 at 1:21 PM Nick Dawes <[hidden email]> wrote:
I have a quick newbie question. 

Spark Structured Streaming creates an unbounded dataframe that keeps appending rows to it. 

So what's the max size of data it can hold? What if the size becomes bigger than the JVM? Will it spill to disk? I'm using S3 as storage. So will it write temp data on S3 or on local file system of the cluster?

Nick
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Dataframe Size

Nick Dawes
Thank you, TD. Couple of follow up questions please. 

1) "It only keeps around the minimal intermediate state data"

How do you define "minimal" here? Is there a configuration property to control the time or size of Streaming Dataframe? 

2) I'm not writing anything out to any database or S3. My requirement is to find out a count (real-time) in a 1 hour window. I would like to get this count from a BI tool. So can register as a temp view and access from BI tool? 

I tried something like this In my Streaming application....

AggStreamingDF.createOrReplaceGlobalTempView("streaming_table")

Then, In BI tool, I queried like this...

select * from streaming_table

Error:  Queries with streaming sources must be executed with writeStream.start()

Any suggestions to make this work?

Thank you very much for your help!


On Tue, Aug 27, 2019, 6:42 PM Tathagata Das <[hidden email]> wrote:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts

Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).


On Tue, Aug 27, 2019 at 1:21 PM Nick Dawes <[hidden email]> wrote:
I have a quick newbie question. 

Spark Structured Streaming creates an unbounded dataframe that keeps appending rows to it. 

So what's the max size of data it can hold? What if the size becomes bigger than the JVM? Will it spill to disk? I'm using S3 as storage. So will it write temp data on S3 or on local file system of the cluster?

Nick
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Dataframe Size

Tathagata Das
Responses inline.

On Wed, Aug 28, 2019 at 8:42 AM Nick Dawes <[hidden email]> wrote:
Thank you, TD. Couple of follow up questions please. 

1) "It only keeps around the minimal intermediate state data"

How do you define "minimal" here? Is there a configuration property to control the time or size of Streaming Dataframe? 
Thats what watermarks are for. You can tune how much late data to consider and accordingly how much of the past information need to be buffered as the state. More lateness tolerance = more state in memory to manage.
 

2) I'm not writing anything out to any database or S3. My requirement is to find out a count (real-time) in a 1 hour window. I would like to get this count from a BI tool. So can register as a temp view and access from BI tool? 

I tried something like this In my Streaming application....

AggStreamingDF.createOrReplaceGlobalTempView("streaming_table")

Then, In BI tool, I queried like this...

select * from streaming_table

Error:  Queries with streaming sources must be executed with writeStream.start()

Any suggestions to make this work?


There are two ways of doing this 

1. Write the aggregates to an in-memory table (driver's memory) and query that.
    AggStreamingDF.write.format("memory").outputMode("complete").queryName("myAggTable").start()
Then
    select * from myAggTable

2. Write the aggregates to files using Delta Lake project (docs). 
   AggStreamingDF.write.format("delta").outputMode("complete").start("path/to/delta/table")
Then you can query the delta table using Spark.
  spark.read.format("delta").load("path/to/delta/table").createOrReplaceGlobalTempView("myAggTable") 
Then
    select * from myAggTable
This will give awesome ACID transactional guarantees between reads and writes. Read more on the linked website (full disclosure, I work on that project as well).   
 
 
Thank you very much for your help!


On Tue, Aug 27, 2019, 6:42 PM Tathagata Das <[hidden email]> wrote:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#basic-concepts

Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).


On Tue, Aug 27, 2019 at 1:21 PM Nick Dawes <[hidden email]> wrote:
I have a quick newbie question. 

Spark Structured Streaming creates an unbounded dataframe that keeps appending rows to it. 

So what's the max size of data it can hold? What if the size becomes bigger than the JVM? Will it spill to disk? I'm using S3 as storage. So will it write temp data on S3 or on local file system of the cluster?

Nick