Collections passed from driver to executors

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Collections passed from driver to executors

Dhrubajyoti Hati
Hi,

I have a question regarding passing a dictionary from driver to executors in spark on yarn. This dictionary is needed in an udf. I am using pyspark.

As I understand this can be passed in two ways:

1. Broadcast the variable and then use it in the udfs

2. Pass the dictionary in the udf itself, in something like this:

  def udf1(col1, dict):
   ..
  def udf1_fn(dict):
    return udf(lambda col_data: udf1(col_data, dict))

  df.withColumn("column_new", udf1_fn(dict)("old_column"))

Well I have tested with both the ways and it works both ways.

Now I am wondering what is fundamentally different between the two. I understand how broadcast work but I am not sure how the data is passed across in the 2nd way. Is the dictionary passed to each executor every time when new task is running on that executor or they are passed only once. Also how the data is passed to the python processes. They are python udfs so I think they are executed natively in python.(Plz correct me if I am wrong). So the data will be serialised and passed to python.

So in summary my question is which will be better/efficient way to write the whole thing and why?

Thank you!

Regards,
Dhrub
Reply | Threaded
Open this post in threaded view
|

Re: Collections passed from driver to executors

Dhrubajyoti Hati
I was wondering if anyone could help with this question.

On Fri, 20 Sep, 2019, 11:52 AM Dhrubajyoti Hati, <[hidden email]> wrote:
Hi,

I have a question regarding passing a dictionary from driver to executors in spark on yarn. This dictionary is needed in an udf. I am using pyspark.

As I understand this can be passed in two ways:

1. Broadcast the variable and then use it in the udfs

2. Pass the dictionary in the udf itself, in something like this:

  def udf1(col1, dict):
   ..
  def udf1_fn(dict):
    return udf(lambda col_data: udf1(col_data, dict))

  df.withColumn("column_new", udf1_fn(dict)("old_column"))

Well I have tested with both the ways and it works both ways.

Now I am wondering what is fundamentally different between the two. I understand how broadcast work but I am not sure how the data is passed across in the 2nd way. Is the dictionary passed to each executor every time when new task is running on that executor or they are passed only once. Also how the data is passed to the python processes. They are python udfs so I think they are executed natively in python.(Plz correct me if I am wrong). So the data will be serialised and passed to python.

So in summary my question is which will be better/efficient way to write the whole thing and why?

Thank you!

Regards,
Dhrub
Reply | Threaded
Open this post in threaded view
|

Re: Collections passed from driver to executors

rxin
A while ago we changed it so the task gets broadcasted too, so I think the two are fairly similar.



On Mon, Sep 23, 2019 at 8:17 PM, Dhrubajyoti Hati <[hidden email]> wrote:
I was wondering if anyone could help with this question.

On Fri, 20 Sep, 2019, 11:52 AM Dhrubajyoti Hati, <[hidden email]> wrote:
Hi,

I have a question regarding passing a dictionary from driver to executors in spark on yarn. This dictionary is needed in an udf. I am using pyspark.

As I understand this can be passed in two ways:

1. Broadcast the variable and then use it in the udfs

2. Pass the dictionary in the udf itself, in something like this:

  def udf1(col1, dict):
   ..
  def udf1_fn(dict):
    return udf(lambda col_data: udf1(col_data, dict))

  df.withColumn("column_new", udf1_fn(dict)("old_column"))

Well I have tested with both the ways and it works both ways.

Now I am wondering what is fundamentally different between the two. I understand how broadcast work but I am not sure how the data is passed across in the 2nd way. Is the dictionary passed to each executor every time when new task is running on that executor or they are passed only once. Also how the data is passed to the python processes. They are python udfs so I think they are executed natively in python.(Plz correct me if I am wrong). So the data will be serialised and passed to python.

So in summary my question is which will be better/efficient way to write the whole thing and why?

Thank you!

Regards,
Dhrub

Reply | Threaded
Open this post in threaded view
|

Re: Collections passed from driver to executors

Dhrubajyoti Hati
Thanks. Could you please let me know which version of spark its changed. We are still at 2.2.

On Tue, 24 Sep, 2019, 9:17 AM Reynold Xin, <[hidden email]> wrote:
A while ago we changed it so the task gets broadcasted too, so I think the two are fairly similar.



On Mon, Sep 23, 2019 at 8:17 PM, Dhrubajyoti Hati <[hidden email]> wrote:
I was wondering if anyone could help with this question.

On Fri, 20 Sep, 2019, 11:52 AM Dhrubajyoti Hati, <[hidden email]> wrote:
Hi,

I have a question regarding passing a dictionary from driver to executors in spark on yarn. This dictionary is needed in an udf. I am using pyspark.

As I understand this can be passed in two ways:

1. Broadcast the variable and then use it in the udfs

2. Pass the dictionary in the udf itself, in something like this:

  def udf1(col1, dict):
   ..
  def udf1_fn(dict):
    return udf(lambda col_data: udf1(col_data, dict))

  df.withColumn("column_new", udf1_fn(dict)("old_column"))

Well I have tested with both the ways and it works both ways.

Now I am wondering what is fundamentally different between the two. I understand how broadcast work but I am not sure how the data is passed across in the 2nd way. Is the dictionary passed to each executor every time when new task is running on that executor or they are passed only once. Also how the data is passed to the python processes. They are python udfs so I think they are executed natively in python.(Plz correct me if I am wrong). So the data will be serialised and passed to python.

So in summary my question is which will be better/efficient way to write the whole thing and why?

Thank you!

Regards,
Dhrub

Reply | Threaded
Open this post in threaded view
|

Re: Collections passed from driver to executors

rxin
It's was done 2014 by yours truly https://github.com/apache/spark/pull/1498

so any modern version would have it.


On Mon, Sep 23, 2019 at 9:04 PM, Dhrubajyoti Hati <[hidden email]> wrote:
Thanks. Could you please let me know which version of spark its changed. We are still at 2.2.

On Tue, 24 Sep, 2019, 9:17 AM Reynold Xin, <[hidden email]> wrote:
A while ago we changed it so the task gets broadcasted too, so I think the two are fairly similar.



On Mon, Sep 23, 2019 at 8:17 PM, Dhrubajyoti Hati <[hidden email]> wrote:
I was wondering if anyone could help with this question.

On Fri, 20 Sep, 2019, 11:52 AM Dhrubajyoti Hati, <[hidden email]> wrote:
Hi,

I have a question regarding passing a dictionary from driver to executors in spark on yarn. This dictionary is needed in an udf. I am using pyspark.

As I understand this can be passed in two ways:

1. Broadcast the variable and then use it in the udfs

2. Pass the dictionary in the udf itself, in something like this:

  def udf1(col1, dict):
   ..
  def udf1_fn(dict):
    return udf(lambda col_data: udf1(col_data, dict))

  df.withColumn("column_new", udf1_fn(dict)("old_column"))

Well I have tested with both the ways and it works both ways.

Now I am wondering what is fundamentally different between the two. I understand how broadcast work but I am not sure how the data is passed across in the 2nd way. Is the dictionary passed to each executor every time when new task is running on that executor or they are passed only once. Also how the data is passed to the python processes. They are python udfs so I think they are executed natively in python.(Plz correct me if I am wrong). So the data will be serialised and passed to python.

So in summary my question is which will be better/efficient way to write the whole thing and why?

Thank you!

Regards,
Dhrub