Controlling number of spark partitions in dataframes

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Controlling number of spark partitions in dataframes

Noorul Islam K M
Hi all,

I have the following spark configuration

spark.app.name=Test
spark.cassandra.connection.host=127.0.0.1
spark.cassandra.connection.keep_alive_ms=5000
spark.cassandra.connection.port=10000
spark.cassandra.connection.timeout_ms=30000
spark.cleaner.ttl=3600
spark.default.parallelism=4
spark.master=local[2]
spark.ui.enabled=false
spark.ui.showConsoleProgress=false

Because I am setting spark.default.parallelism to 4, I was expecting
only 4 spark partitions. But it looks like it is not the case

When I do the following

    df.foreachPartition { partition =>
      val groupedPartition = partition.toList.grouped(3).toList
      println("Grouped partition " + groupedPartition)
    }

There are too many print statements with empty list at the top. Only
the relevant partitions are at the bottom. Is there a way to control
number of partitions?

Regards,
Noorul

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Controlling number of spark partitions in dataframes

lucas.gary@gmail.com
I think we'd need to see the code that loads the df.  

Parallelism and partition count are related but they're not the same.  I've found the documentation fuzzy on this, but it looks like default.parrallelism is what spark uses for partitioning when it has no other guidance.  I'm also under the impression (and I could be wrong here) that the data loading step has some impact on partitioning.

In any case, I think it would be more helpful with the df loading code.

Good luck!

Gary Lucas

On 26 October 2017 at 09:35, Noorul Islam Kamal Malmiyoda <[hidden email]> wrote:
Hi all,

I have the following spark configuration

spark.app.name=Test
spark.cassandra.connection.host=127.0.0.1
spark.cassandra.connection.keep_alive_ms=5000
spark.cassandra.connection.port=10000
spark.cassandra.connection.timeout_ms=30000
spark.cleaner.ttl=3600
spark.default.parallelism=4
spark.master=local[2]
spark.ui.enabled=false
spark.ui.showConsoleProgress=false

Because I am setting spark.default.parallelism to 4, I was expecting
only 4 spark partitions. But it looks like it is not the case

When I do the following

    df.foreachPartition { partition =>
      val groupedPartition = partition.toList.grouped(3).toList
      println("Grouped partition " + groupedPartition)
    }

There are too many print statements with empty list at the top. Only
the relevant partitions are at the bottom. Is there a way to control
number of partitions?

Regards,
Noorul

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Controlling number of spark partitions in dataframes

Deepak Sharma
In reply to this post by Noorul Islam K M
I guess the issue is spark.default.parallelism is ignored when you are working with Data frames.It is supposed to work with only raw RDDs.

Thanks
Deepak

On Thu, Oct 26, 2017 at 10:05 PM, Noorul Islam Kamal Malmiyoda <[hidden email]> wrote:
Hi all,

I have the following spark configuration

spark.app.name=Test
spark.cassandra.connection.host=127.0.0.1
spark.cassandra.connection.keep_alive_ms=5000
spark.cassandra.connection.port=10000
spark.cassandra.connection.timeout_ms=30000
spark.cleaner.ttl=3600
spark.default.parallelism=4
spark.master=local[2]
spark.ui.enabled=false
spark.ui.showConsoleProgress=false

Because I am setting spark.default.parallelism to 4, I was expecting
only 4 spark partitions. But it looks like it is not the case

When I do the following

    df.foreachPartition { partition =>
      val groupedPartition = partition.toList.grouped(3).toList
      println("Grouped partition " + groupedPartition)
    }

There are too many print statements with empty list at the top. Only
the relevant partitions are at the bottom. Is there a way to control
number of partitions?

Regards,
Noorul

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--
Reply | Threaded
Open this post in threaded view
|

Re: Controlling number of spark partitions in dataframes

Daniel Siegmann
When working with datasets, Spark uses spark.sql.shuffle.partitions. It defaults to 200. Between that and the default parallelism you can control the number of partitions (except for the initial read).

More info here: http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options

I have no idea why it defaults to a fixed 200 (while default parallelism defaults to a number scaled to your number of cores), or why there are two separate configuration properties.


--
Daniel Siegmann
Senior Software Engineer
SecurityScorecard Inc.
214 W 29th Street, 5th Floor
New York, NY 10001


On Thu, Oct 26, 2017 at 9:53 AM, Deepak Sharma <[hidden email]> wrote:
I guess the issue is spark.default.parallelism is ignored when you are working with Data frames.It is supposed to work with only raw RDDs.

Thanks
Deepak

On Thu, Oct 26, 2017 at 10:05 PM, Noorul Islam Kamal Malmiyoda <[hidden email]> wrote:
Hi all,

I have the following spark configuration

spark.app.name=Test
spark.cassandra.connection.host=127.0.0.1
spark.cassandra.connection.keep_alive_ms=5000
spark.cassandra.connection.port=10000
spark.cassandra.connection.timeout_ms=30000
spark.cleaner.ttl=3600
spark.default.parallelism=4
spark.master=local[2]
spark.ui.enabled=false
spark.ui.showConsoleProgress=false

Because I am setting spark.default.parallelism to 4, I was expecting
only 4 spark partitions. But it looks like it is not the case

When I do the following

    df.foreachPartition { partition =>
      val groupedPartition = partition.toList.grouped(3).toList
      println("Grouped partition " + groupedPartition)
    }

There are too many print statements with empty list at the top. Only
the relevant partitions are at the bottom. Is there a way to control
number of partitions?

Regards,
Noorul

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--

Reply | Threaded
Open this post in threaded view
|

Re: Controlling number of spark partitions in dataframes

lucas.gary@gmail.com
Thanks Daniel!

I've been wondering that for ages!

IE where my JDBC sourced datasets are coming up with 200 partitions on write to S3.

What do you mean for (except for the initial read)?

Can you explain that a bit further?

Gary Lucas

On 26 October 2017 at 11:28, Daniel Siegmann <[hidden email]> wrote:
When working with datasets, Spark uses spark.sql.shuffle.partitions. It defaults to 200. Between that and the default parallelism you can control the number of partitions (except for the initial read).

More info here: http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options

I have no idea why it defaults to a fixed 200 (while default parallelism defaults to a number scaled to your number of cores), or why there are two separate configuration properties.


--
Daniel Siegmann
Senior Software Engineer
SecurityScorecard Inc.


On Thu, Oct 26, 2017 at 9:53 AM, Deepak Sharma <[hidden email]> wrote:
I guess the issue is spark.default.parallelism is ignored when you are working with Data frames.It is supposed to work with only raw RDDs.

Thanks
Deepak

On Thu, Oct 26, 2017 at 10:05 PM, Noorul Islam Kamal Malmiyoda <[hidden email]> wrote:
Hi all,

I have the following spark configuration

spark.app.name=Test
spark.cassandra.connection.host=127.0.0.1
spark.cassandra.connection.keep_alive_ms=5000
spark.cassandra.connection.port=10000
spark.cassandra.connection.timeout_ms=30000
spark.cleaner.ttl=3600
spark.default.parallelism=4
spark.master=local[2]
spark.ui.enabled=false
spark.ui.showConsoleProgress=false

Because I am setting spark.default.parallelism to 4, I was expecting
only 4 spark partitions. But it looks like it is not the case

When I do the following

    df.foreachPartition { partition =>
      val groupedPartition = partition.toList.grouped(3).toList
      println("Grouped partition " + groupedPartition)
    }

There are too many print statements with empty list at the top. Only
the relevant partitions are at the bottom. Is there a way to control
number of partitions?

Regards,
Noorul

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--


Reply | Threaded
Open this post in threaded view
|

Re: Controlling number of spark partitions in dataframes

Daniel Siegmann
Those settings apply when a shuffle happens. But they don't affect the way the data will be partitioned when it is initially read, for example spark.read.parquet("path/to/input"). So for HDFS / S3 I think it depends on how the data is split into chunks, but if there are lots of small chunks Spark will automatically merge them into small partitions. There are going to be various settings depending on what you're reading from.

val df = spark.read.parquet("path/to/input") // partitioning will depend on the data
val df2 = df.groupBy("thing").count() // a shuffle happened, so shuffle partitioning configuration applies


Tip: gzip files can't be split, so if you read a gzip file everything will be in one partition. That's a good reason to avoid large gzip files. :-)

If you don't have a shuffle but you want to change how many partitions there are, you will need to coalesce or repartition.


--
Daniel Siegmann
Senior Software Engineer
SecurityScorecard Inc.
214 W 29th Street, 5th Floor
New York, NY 10001


On Thu, Oct 26, 2017 at 11:31 AM, [hidden email] <[hidden email]> wrote:
Thanks Daniel!

I've been wondering that for ages!

IE where my JDBC sourced datasets are coming up with 200 partitions on write to S3.

What do you mean for (except for the initial read)?

Can you explain that a bit further?

Gary Lucas

On 26 October 2017 at 11:28, Daniel Siegmann <[hidden email]> wrote:
When working with datasets, Spark uses spark.sql.shuffle.partitions. It defaults to 200. Between that and the default parallelism you can control the number of partitions (except for the initial read).

More info here: http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options

I have no idea why it defaults to a fixed 200 (while default parallelism defaults to a number scaled to your number of cores), or why there are two separate configuration properties.


--
Daniel Siegmann
Senior Software Engineer
SecurityScorecard Inc.


On Thu, Oct 26, 2017 at 9:53 AM, Deepak Sharma <[hidden email]> wrote:
I guess the issue is spark.default.parallelism is ignored when you are working with Data frames.It is supposed to work with only raw RDDs.

Thanks
Deepak

On Thu, Oct 26, 2017 at 10:05 PM, Noorul Islam Kamal Malmiyoda <[hidden email]> wrote:
Hi all,

I have the following spark configuration

spark.app.name=Test
spark.cassandra.connection.host=127.0.0.1
spark.cassandra.connection.keep_alive_ms=5000
spark.cassandra.connection.port=10000
spark.cassandra.connection.timeout_ms=30000
spark.cleaner.ttl=3600
spark.default.parallelism=4
spark.master=local[2]
spark.ui.enabled=false
spark.ui.showConsoleProgress=false

Because I am setting spark.default.parallelism to 4, I was expecting
only 4 spark partitions. But it looks like it is not the case

When I do the following

    df.foreachPartition { partition =>
      val groupedPartition = partition.toList.grouped(3).toList
      println("Grouped partition " + groupedPartition)
    }

There are too many print statements with empty list at the top. Only
the relevant partitions are at the bottom. Is there a way to control
number of partitions?

Regards,
Noorul

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--



Reply | Threaded
Open this post in threaded view
|

Re: Controlling number of spark partitions in dataframes

lucas.gary@gmail.com
Ok, so for JDBC I presume it defaults to a single partition if you don't provide partitioning meta data?

Thanks!

Gary

On 26 October 2017 at 13:43, Daniel Siegmann <[hidden email]> wrote:
Those settings apply when a shuffle happens. But they don't affect the way the data will be partitioned when it is initially read, for example spark.read.parquet("path/to/input"). So for HDFS / S3 I think it depends on how the data is split into chunks, but if there are lots of small chunks Spark will automatically merge them into small partitions. There are going to be various settings depending on what you're reading from.

val df = spark.read.parquet("path/to/input") // partitioning will depend on the data
val df2 = df.groupBy("thing").count() // a shuffle happened, so shuffle partitioning configuration applies


Tip: gzip files can't be split, so if you read a gzip file everything will be in one partition. That's a good reason to avoid large gzip files. :-)

If you don't have a shuffle but you want to change how many partitions there are, you will need to coalesce or repartition.


--
Daniel Siegmann
Senior Software Engineer
SecurityScorecard Inc.


On Thu, Oct 26, 2017 at 11:31 AM, [hidden email] <[hidden email]> wrote:
Thanks Daniel!

I've been wondering that for ages!

IE where my JDBC sourced datasets are coming up with 200 partitions on write to S3.

What do you mean for (except for the initial read)?

Can you explain that a bit further?

Gary Lucas

On 26 October 2017 at 11:28, Daniel Siegmann <[hidden email]> wrote:
When working with datasets, Spark uses spark.sql.shuffle.partitions. It defaults to 200. Between that and the default parallelism you can control the number of partitions (except for the initial read).

More info here: http://spark.apache.org/docs/latest/sql-programming-guide.html#other-configuration-options

I have no idea why it defaults to a fixed 200 (while default parallelism defaults to a number scaled to your number of cores), or why there are two separate configuration properties.


--
Daniel Siegmann
Senior Software Engineer
SecurityScorecard Inc.


On Thu, Oct 26, 2017 at 9:53 AM, Deepak Sharma <[hidden email]> wrote:
I guess the issue is spark.default.parallelism is ignored when you are working with Data frames.It is supposed to work with only raw RDDs.

Thanks
Deepak

On Thu, Oct 26, 2017 at 10:05 PM, Noorul Islam Kamal Malmiyoda <[hidden email]> wrote:
Hi all,

I have the following spark configuration

spark.app.name=Test
spark.cassandra.connection.host=127.0.0.1
spark.cassandra.connection.keep_alive_ms=5000
spark.cassandra.connection.port=10000
spark.cassandra.connection.timeout_ms=30000
spark.cleaner.ttl=3600
spark.default.parallelism=4
spark.master=local[2]
spark.ui.enabled=false
spark.ui.showConsoleProgress=false

Because I am setting spark.default.parallelism to 4, I was expecting
only 4 spark partitions. But it looks like it is not the case

When I do the following

    df.foreachPartition { partition =>
      val groupedPartition = partition.toList.grouped(3).toList
      println("Grouped partition " + groupedPartition)
    }

There are too many print statements with empty list at the top. Only
the relevant partitions are at the bottom. Is there a way to control
number of partitions?

Regards,
Noorul

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--