Pitfalls of partitioning by host?

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Pitfalls of partitioning by host?

Patrick McCarthy-2
When debugging some behavior on my YARN cluster I wrote the following PySpark UDF to figure out what host was operating on what row of data:

@F.udf(T.StringType())
def add_hostname(x):

    import socket

    return str(socket.gethostname())


It occurred to me that I could use this to enforce node-locality for other operations:

df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)

When working on a big job without obvious partition keys, this seems like a very straightforward way to avoid a shuffle, but it seems too easy.

What problems would I introduce by trying to partition on hostname like this?
Reply | Threaded
Open this post in threaded view
|

Re: Pitfalls of partitioning by host?

MidwestMike
Well if we think of shuffling as a necessity to perform an operation, then the problem would be that you are adding a ln aggregation stage to a job that is going to get shuffled anyway.  Like if you need to join two datasets, then Spark will still shuffle the data, whether they are grouped by hostname prior to that or not.  My question is, is there anything else that you would expect to gain, except for enforcing maybe a dataset that is already bucketed? Like you could enforce that data is where it is supposed to be, but what else would you avoid?

Sent from my iPhone

> On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <[hidden email]> wrote:
>
> When debugging some behavior on my YARN cluster I wrote the following PySpark UDF to figure out what host was operating on what row of data:
>
> @F.udf(T.StringType())
> def add_hostname(x):
>
>     import socket
>
>     return str(socket.gethostname())
>
> It occurred to me that I could use this to enforce node-locality for other operations:
>
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>
> When working on a big job without obvious partition keys, this seems like a very straightforward way to avoid a shuffle, but it seems too easy.
>
> What problems would I introduce by trying to partition on hostname like this?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Pitfalls of partitioning by host?

Patrick McCarthy-2
Mostly I'm guessing that it adds efficiency to a job where partitioning is required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might need to repartition(50000) to get the task size down to an acceptable size for my cluster. If I don't care that it's totally balanced, then I'd hope that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000, 'randkey','host').apply(udf)

On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz <[hidden email]> wrote:
Well if we think of shuffling as a necessity to perform an operation, then the problem would be that you are adding a ln aggregation stage to a job that is going to get shuffled anyway.  Like if you need to join two datasets, then Spark will still shuffle the data, whether they are grouped by hostname prior to that or not.  My question is, is there anything else that you would expect to gain, except for enforcing maybe a dataset that is already bucketed? Like you could enforce that data is where it is supposed to be, but what else would you avoid?

Sent from my iPhone

> On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <[hidden email].INVALID> wrote:
>
> When debugging some behavior on my YARN cluster I wrote the following PySpark UDF to figure out what host was operating on what row of data:
>
> @F.udf(T.StringType())
> def add_hostname(x):
>
>     import socket
>
>     return str(socket.gethostname())
>
> It occurred to me that I could use this to enforce node-locality for other operations:
>
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>
> When working on a big job without obvious partition keys, this seems like a very straightforward way to avoid a shuffle, but it seems too easy.
>
> What problems would I introduce by trying to partition on hostname like this?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Pitfalls of partitioning by host?

Patrick McCarthy-2
Mostly I'm guessing that it adds efficiency to a job where partitioning is required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might need to repartition(50000) to get the task size down to an acceptable size for my cluster. If I don't care that it's totally balanced, then I'd hope that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000, 'randkey','host').apply(udf)


On Tue, Aug 28, 2018 at 10:28 AM, Patrick McCarthy <[hidden email]> wrote:
Mostly I'm guessing that it adds efficiency to a job where partitioning is required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might need to repartition(50000) to get the task size down to an acceptable size for my cluster. If I don't care that it's totally balanced, then I'd hope that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000, 'randkey','host').apply(udf)

On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz <[hidden email]> wrote:
Well if we think of shuffling as a necessity to perform an operation, then the problem would be that you are adding a ln aggregation stage to a job that is going to get shuffled anyway.  Like if you need to join two datasets, then Spark will still shuffle the data, whether they are grouped by hostname prior to that or not.  My question is, is there anything else that you would expect to gain, except for enforcing maybe a dataset that is already bucketed? Like you could enforce that data is where it is supposed to be, but what else would you avoid?

Sent from my iPhone

> On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <[hidden email].INVALID> wrote:
>
> When debugging some behavior on my YARN cluster I wrote the following PySpark UDF to figure out what host was operating on what row of data:
>
> @F.udf(T.StringType())
> def add_hostname(x):
>
>     import socket
>
>     return str(socket.gethostname())
>
> It occurred to me that I could use this to enforce node-locality for other operations:
>
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>
> When working on a big job without obvious partition keys, this seems like a very straightforward way to avoid a shuffle, but it seems too easy.
>
> What problems would I introduce by trying to partition on hostname like this?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: Pitfalls of partitioning by host?

Patrick McCarthy-2
In reply to this post by MidwestMike
Mostly I'm guessing that it adds efficiency to a job where partitioning is required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might need to repartition(50000) to get the task size down to an acceptable size for my cluster. If I don't care that it's totally balanced, then I'd hope that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000, 'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz <[hidden email]> wrote:
Well if we think of shuffling as a necessity to perform an operation, then the problem would be that you are adding a ln aggregation stage to a job that is going to get shuffled anyway.  Like if you need to join two datasets, then Spark will still shuffle the data, whether they are grouped by hostname prior to that or not.  My question is, is there anything else that you would expect to gain, except for enforcing maybe a dataset that is already bucketed? Like you could enforce that data is where it is supposed to be, but what else would you avoid?

Sent from my iPhone

> On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <[hidden email].INVALID> wrote:
>
> When debugging some behavior on my YARN cluster I wrote the following PySpark UDF to figure out what host was operating on what row of data:
>
> @F.udf(T.StringType())
> def add_hostname(x):
>
>     import socket
>
>     return str(socket.gethostname())
>
> It occurred to me that I could use this to enforce node-locality for other operations:
>
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>
> When working on a big job without obvious partition keys, this seems like a very straightforward way to avoid a shuffle, but it seems too easy.
>
> What problems would I introduce by trying to partition on hostname like this?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Pitfalls of partitioning by host?

Patrick McCarthy-2
In reply to this post by MidwestMike
Mostly I'm guessing that it adds efficiency to a job where partitioning is required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might need to repartition(50000) to get the task size down to an acceptable size for my cluster. If I don't care that it's totally balanced, then I'd hope that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000, 'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz <[hidden email]> wrote:
Well if we think of shuffling as a necessity to perform an operation, then the problem would be that you are adding a ln aggregation stage to a job that is going to get shuffled anyway.  Like if you need to join two datasets, then Spark will still shuffle the data, whether they are grouped by hostname prior to that or not.  My question is, is there anything else that you would expect to gain, except for enforcing maybe a dataset that is already bucketed? Like you could enforce that data is where it is supposed to be, but what else would you avoid?

Sent from my iPhone

> On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <[hidden email].INVALID> wrote:
>
> When debugging some behavior on my YARN cluster I wrote the following PySpark UDF to figure out what host was operating on what row of data:
>
> @F.udf(T.StringType())
> def add_hostname(x):
>
>     import socket
>
>     return str(socket.gethostname())
>
> It occurred to me that I could use this to enforce node-locality for other operations:
>
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>
> When working on a big job without obvious partition keys, this seems like a very straightforward way to avoid a shuffle, but it seems too easy.
>
> What problems would I introduce by trying to partition on hostname like this?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Pitfalls of partitioning by host?

Patrick McCarthy-2
In reply to this post by MidwestMike
Mostly I'm guessing that it adds efficiency to a job where partitioning is required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might need to repartition(50000) to get the task size down to an acceptable size for my cluster. If I don't care that it's totally balanced, then I'd hope that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000, 'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz <[hidden email]> wrote:
Well if we think of shuffling as a necessity to perform an operation, then the problem would be that you are adding a ln aggregation stage to a job that is going to get shuffled anyway.  Like if you need to join two datasets, then Spark will still shuffle the data, whether they are grouped by hostname prior to that or not.  My question is, is there anything else that you would expect to gain, except for enforcing maybe a dataset that is already bucketed? Like you could enforce that data is where it is supposed to be, but what else would you avoid?

Sent from my iPhone

> On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <[hidden email].INVALID> wrote:
>
> When debugging some behavior on my YARN cluster I wrote the following PySpark UDF to figure out what host was operating on what row of data:
>
> @F.udf(T.StringType())
> def add_hostname(x):
>
>     import socket
>
>     return str(socket.gethostname())
>
> It occurred to me that I could use this to enforce node-locality for other operations:
>
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>
> When working on a big job without obvious partition keys, this seems like a very straightforward way to avoid a shuffle, but it seems too easy.
>
> What problems would I introduce by trying to partition on hostname like this?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Pitfalls of partitioning by host?

Patrick McCarthy-2
In reply to this post by MidwestMike
Mostly I'm guessing that it adds efficiency to a job where partitioning is required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might need to repartition(50000) to get the task size down to an acceptable size for my cluster. If I don't care that it's totally balanced, then I'd hope that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000, 'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz <[hidden email]> wrote:
Well if we think of shuffling as a necessity to perform an operation, then the problem would be that you are adding a ln aggregation stage to a job that is going to get shuffled anyway.  Like if you need to join two datasets, then Spark will still shuffle the data, whether they are grouped by hostname prior to that or not.  My question is, is there anything else that you would expect to gain, except for enforcing maybe a dataset that is already bucketed? Like you could enforce that data is where it is supposed to be, but what else would you avoid?

Sent from my iPhone

> On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <[hidden email].INVALID> wrote:
>
> When debugging some behavior on my YARN cluster I wrote the following PySpark UDF to figure out what host was operating on what row of data:
>
> @F.udf(T.StringType())
> def add_hostname(x):
>
>     import socket
>
>     return str(socket.gethostname())
>
> It occurred to me that I could use this to enforce node-locality for other operations:
>
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>
> When working on a big job without obvious partition keys, this seems like a very straightforward way to avoid a shuffle, but it seems too easy.
>
> What problems would I introduce by trying to partition on hostname like this?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Pitfalls of partitioning by host?

Sonal Goyal
Hi Patrick,

Sorry is there something here that helps you beyond repartition(number of partitons) or calling your udf on foreachPartition? If your data is on disk, Spark is already partitioning it for you by rows. How is adding the host info helping? 

Thanks,
Sonal
Nube Technologies 





On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <[hidden email]> wrote:
Mostly I'm guessing that it adds efficiency to a job where partitioning is required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might need to repartition(50000) to get the task size down to an acceptable size for my cluster. If I don't care that it's totally balanced, then I'd hope that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000, 'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz <[hidden email]> wrote:
Well if we think of shuffling as a necessity to perform an operation, then the problem would be that you are adding a ln aggregation stage to a job that is going to get shuffled anyway.  Like if you need to join two datasets, then Spark will still shuffle the data, whether they are grouped by hostname prior to that or not.  My question is, is there anything else that you would expect to gain, except for enforcing maybe a dataset that is already bucketed? Like you could enforce that data is where it is supposed to be, but what else would you avoid?

Sent from my iPhone

> On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <[hidden email].INVALID> wrote:
>
> When debugging some behavior on my YARN cluster I wrote the following PySpark UDF to figure out what host was operating on what row of data:
>
> @F.udf(T.StringType())
> def add_hostname(x):
>
>     import socket
>
>     return str(socket.gethostname())
>
> It occurred to me that I could use this to enforce node-locality for other operations:
>
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>
> When working on a big job without obvious partition keys, this seems like a very straightforward way to avoid a shuffle, but it seems too easy.
>
> What problems would I introduce by trying to partition on hostname like this?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



Reply | Threaded
Open this post in threaded view
|

Re: [External Sender] Pitfalls of partitioning by host?

JayeshLalwani
In reply to this post by Patrick McCarthy-2
If you group by the host that you have computed using the UDF, Spark is always going to shuffle your dataset, even if the end result is that all the new partitions look exactly like the old partitions, just placed on differrent nodes. Remember the hostname will probably hash differrently than the partition key of the data.

Let's say, you are trying to do is read a file, apply a UDF, and write out to file. Without your "performance improvement", Spark will read partitions , apply the UDF to the rows in the partitions, and write the rows out.. With your upgrade, it will read the partitions, apply the hostname udf, shuffle by host name, apply the UDF on the shuffled rows, and write the data out

If your intent is to increase efficiency, this will do the opposite of what you are trying to do

On Mon, Aug 27, 2018 at 1:23 PM Patrick McCarthy <[hidden email]> wrote:
When debugging some behavior on my YARN cluster I wrote the following PySpark UDF to figure out what host was operating on what row of data:

@F.udf(T.StringType())
def add_hostname(x):

    import socket

    return str(socket.gethostname())


It occurred to me that I could use this to enforce node-locality for other operations:

df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)

When working on a big job without obvious partition keys, this seems like a very straightforward way to avoid a shuffle, but it seems too easy.

What problems would I introduce by trying to partition on hostname like this?


The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: Pitfalls of partitioning by host?

Patrick McCarthy-2
In reply to this post by Sonal Goyal
I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If this is actually happening, it's just wasteful overhead.

On Tue, Aug 28, 2018 at 1:03 PM, Sonal Goyal <[hidden email]> wrote:
Hi Patrick,

Sorry is there something here that helps you beyond repartition(number of partitons) or calling your udf on foreachPartition? If your data is on disk, Spark is already partitioning it for you by rows. How is adding the host info helping? 

Thanks,
Sonal
Nube Technologies 





On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <[hidden email]> wrote:
Mostly I'm guessing that it adds efficiency to a job where partitioning is required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might need to repartition(50000) to get the task size down to an acceptable size for my cluster. If I don't care that it's totally balanced, then I'd hope that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000, 'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz <[hidden email]> wrote:
Well if we think of shuffling as a necessity to perform an operation, then the problem would be that you are adding a ln aggregation stage to a job that is going to get shuffled anyway.  Like if you need to join two datasets, then Spark will still shuffle the data, whether they are grouped by hostname prior to that or not.  My question is, is there anything else that you would expect to gain, except for enforcing maybe a dataset that is already bucketed? Like you could enforce that data is where it is supposed to be, but what else would you avoid?

Sent from my iPhone

> On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <[hidden email].INVALID> wrote:
>
> When debugging some behavior on my YARN cluster I wrote the following PySpark UDF to figure out what host was operating on what row of data:
>
> @F.udf(T.StringType())
> def add_hostname(x):
>
>     import socket
>
>     return str(socket.gethostname())
>
> It occurred to me that I could use this to enforce node-locality for other operations:
>
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>
> When working on a big job without obvious partition keys, this seems like a very straightforward way to avoid a shuffle, but it seems too easy.
>
> What problems would I introduce by trying to partition on hostname like this?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




Reply | Threaded
Open this post in threaded view
|

Re: Pitfalls of partitioning by host?

Patrick McCarthy-2
I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If this is actually happening, it's just wasteful overhead. The ambition is to say "divide the data into partitions, but make sure you don't move it in doing so".



On Tue, Aug 28, 2018 at 2:06 PM, Patrick McCarthy <[hidden email]> wrote:
I'm not 100% sure, but a naive repartition() seems to cause a shuffle. If this is actually happening, it's just wasteful overhead.

On Tue, Aug 28, 2018 at 1:03 PM, Sonal Goyal <[hidden email]> wrote:
Hi Patrick,

Sorry is there something here that helps you beyond repartition(number of partitons) or calling your udf on foreachPartition? If your data is on disk, Spark is already partitioning it for you by rows. How is adding the host info helping? 

Thanks,
Sonal
Nube Technologies 





On Tue, Aug 28, 2018 at 8:29 PM, Patrick McCarthy <[hidden email]> wrote:
Mostly I'm guessing that it adds efficiency to a job where partitioning is required but shuffling is not.

For example, if I want to apply a UDF to 1tb of records on disk, I might need to repartition(50000) to get the task size down to an acceptable size for my cluster. If I don't care that it's totally balanced, then I'd hope that I could save a lot of overhead with

foo = df.withColumn('randkey',F.floor(1000*F.rand())).repartition(5000, 'randkey','host').apply(udf)


On Mon, Aug 27, 2018 at 10:11 PM, Michael Artz <[hidden email]> wrote:
Well if we think of shuffling as a necessity to perform an operation, then the problem would be that you are adding a ln aggregation stage to a job that is going to get shuffled anyway.  Like if you need to join two datasets, then Spark will still shuffle the data, whether they are grouped by hostname prior to that or not.  My question is, is there anything else that you would expect to gain, except for enforcing maybe a dataset that is already bucketed? Like you could enforce that data is where it is supposed to be, but what else would you avoid?

Sent from my iPhone

> On Aug 27, 2018, at 12:22 PM, Patrick McCarthy <[hidden email].INVALID> wrote:
>
> When debugging some behavior on my YARN cluster I wrote the following PySpark UDF to figure out what host was operating on what row of data:
>
> @F.udf(T.StringType())
> def add_hostname(x):
>
>     import socket
>
>     return str(socket.gethostname())
>
> It occurred to me that I could use this to enforce node-locality for other operations:
>
> df.withColumn('host', add_hostname(x)).groupBy('host').apply(...)
>
> When working on a big job without obvious partition keys, this seems like a very straightforward way to avoid a shuffle, but it seems too easy.
>
> What problems would I introduce by trying to partition on hostname like this?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]