[Spark Structured Streaming] Changing partitions of (flat)MapGroupsWithState

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[Spark Structured Streaming] Changing partitions of (flat)MapGroupsWithState

theikkila
I have spark structured streaming job and I'm crunching through few terabytes of data.

I'm using file stream reader and it works flawlessly, I can adjust the partitioning of that with spark.default.parallelism
 
However I'm doing sessionization for the data after loading it and I'm currently locked with just 200 partitions for that stage. I've tried to repartition before and after the stateful map but it just adds new stage and so it's not very useful
 
Changing spark.sql.shuffle.partitions doesn't affect the count either.
 
val sessions = streamingSource // -> spark.default.parallelism defined amount of partitions/tasks (ie. 2000)
     .repartition(n) // -> n partitions/tasks
     .groupByKey(event => event.sessid) // -> stage opens / fixed 200 tasks
     .flatMapGroupsWithState(OutputMode.Append, GroupStateTimeout.EventTimeTimeout())(SessionState.updateSessionEvents) // -> fixed 200 tasks / stage closes


I tried to grep through spark source code but couldn’t find that param anywhere.


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Spark Structured Streaming] Changing partitions of (flat)MapGroupsWithState

Michael Armbrust
The relevant config is spark.sql.shuffle.partitions.  Note that once you start a query, this number is fixed.  The config will only affect queries starting from an empty checkpoint.

On Wed, Nov 8, 2017 at 7:34 AM, Teemu Heikkilä <[hidden email]> wrote:
I have spark structured streaming job and I'm crunching through few terabytes of data.

I'm using file stream reader and it works flawlessly, I can adjust the partitioning of that with spark.default.parallelism

However I'm doing sessionization for the data after loading it and I'm currently locked with just 200 partitions for that stage. I've tried to repartition before and after the stateful map but it just adds new stage and so it's not very useful

Changing spark.sql.shuffle.partitions doesn't affect the count either.

val sessions = streamingSource // -> spark.default.parallelism defined amount of partitions/tasks (ie. 2000)
     .repartition(n) // -> n partitions/tasks
     .groupByKey(event => event.sessid) // -> stage opens / fixed 200 tasks
     .flatMapGroupsWithState(OutputMode.Append, GroupStateTimeout.EventTimeTimeout())(SessionState.updateSessionEvents) // -> fixed 200 tasks / stage closes


I tried to grep through spark source code but couldn’t find that param anywhere.


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]