script running in jupyter 6-7x faster than spark submit

classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

script running in jupyter 6-7x faster than spark submit

Dhrubajyoti Hati
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub
Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

Patrick McCarthy-2
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

Dhrubajyoti Hati
No, i checked for that, hence written "brand new" jupyter notebook. Also the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs compressed base64 encoded text data from a hive table and decompressing and decoding in one of the udfs. Also the time compared is from Spark UI not  how long the job actually takes after submission. Its just the running time i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <[hidden email]> wrote:
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

Stephen Boesch
Sounds like you have done your homework to properly compare .   I'm guessing the answer to the following is yes .. but in any case:  are they both running against the same spark cluster with the same configuration parameters especially executor memory and number of workers?

Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <[hidden email]>:
No, i checked for that, hence written "brand new" jupyter notebook. Also the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs compressed base64 encoded text data from a hive table and decompressing and decoding in one of the udfs. Also the time compared is from Spark UI not  how long the job actually takes after submission. Its just the running time i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <[hidden email]> wrote:
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

Dhrubajyoti Hati
As mentioned in the very first mail:
* same cluster it is submitted.
* from same machine they are submitted and also from same user
* each of them has 128 executors and 2 cores per executor with 8Gigs of memory each and both of them are getting that while running

to clarify more let me quote what I mentioned above. These data is taken from Spark-UI when the jobs are almost finished in both.
"What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins." which means per task time taken is much higher in spark-submit script than jupyter script. This is where I am really puzzled because they are the exact same code. why running them two different ways vary so much in the execution time.

Regards,

Dhrubajyoti Hati.
Mob No: 9886428028/9652029028



On Wed, Sep 11, 2019 at 8:42 AM Stephen Boesch <[hidden email]> wrote:
Sounds like you have done your homework to properly compare .   I'm guessing the answer to the following is yes .. but in any case:  are they both running against the same spark cluster with the same configuration parameters especially executor memory and number of workers?

Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <[hidden email]>:
No, i checked for that, hence written "brand new" jupyter notebook. Also the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs compressed base64 encoded text data from a hive table and decompressing and decoding in one of the udfs. Also the time compared is from Spark UI not  how long the job actually takes after submission. Its just the running time i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <[hidden email]> wrote:
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

AbdealiJK
In reply to this post by Stephen Boesch
Maybe you can try running it in a python shell or jupyter-console/ipython instead of a spark-submit and check how much time it takes too.

Compare the env variables to check that no additional env configuration is present in either environment.

Also is the python environment for both the exact same? I ask because it looks like you're using a UDF and if the Jupyter python has (let's say) numpy compiled with blas it would be faster than a numpy without it. Etc. I.E. Some library you use may be using pure python and another may be using a faster C extension... 

What python libraries are you using in the UDFs? It you don't use UDFs at all and use some very simple pure spark functions does the time difference still exist? 

Also are you using dynamic allocation or some similar spark config which could vary performance between runs because the same resources we're not utilized on Jupyter / spark-submit? 


On Wed, Sep 11, 2019, 08:43 Stephen Boesch <[hidden email]> wrote:
Sounds like you have done your homework to properly compare .   I'm guessing the answer to the following is yes .. but in any case:  are they both running against the same spark cluster with the same configuration parameters especially executor memory and number of workers?

Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <[hidden email]>:
No, i checked for that, hence written "brand new" jupyter notebook. Also the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs compressed base64 encoded text data from a hive table and decompressing and decoding in one of the udfs. Also the time compared is from Spark UI not  how long the job actually takes after submission. Its just the running time i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <[hidden email]> wrote:
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

Stephen Boesch
In reply to this post by Dhrubajyoti Hati
Ok. Can't think of why that would happen.

Am Di., 10. Sept. 2019 um 20:26 Uhr schrieb Dhrubajyoti Hati <[hidden email]>:
As mentioned in the very first mail:
* same cluster it is submitted.
* from same machine they are submitted and also from same user
* each of them has 128 executors and 2 cores per executor with 8Gigs of memory each and both of them are getting that while running

to clarify more let me quote what I mentioned above. These data is taken from Spark-UI when the jobs are almost finished in both.
"What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins." which means per task time taken is much higher in spark-submit script than jupyter script. This is where I am really puzzled because they are the exact same code. why running them two different ways vary so much in the execution time.

Regards,

Dhrubajyoti Hati.
Mob No: 9886428028/9652029028



On Wed, Sep 11, 2019 at 8:42 AM Stephen Boesch <[hidden email]> wrote:
Sounds like you have done your homework to properly compare .   I'm guessing the answer to the following is yes .. but in any case:  are they both running against the same spark cluster with the same configuration parameters especially executor memory and number of workers?

Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <[hidden email]>:
No, i checked for that, hence written "brand new" jupyter notebook. Also the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs compressed base64 encoded text data from a hive table and decompressing and decoding in one of the udfs. Also the time compared is from Spark UI not  how long the job actually takes after submission. Its just the running time i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <[hidden email]> wrote:
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

Dhrubajyoti Hati
In reply to this post by AbdealiJK
Just checked from where the script is submitted i.e. wrt Driver, the python env are different. Jupyter one is running within a the virtual environment which is Python 2.7.5 and the spark-submit one uses 2.6.6. But the executors have the same python version right? I tried doing a spark-submit from jupyter shell, it fails to find python 2.7  which is not there hence throws error.

Here is the udf which might take time:
import base64
import zlib
def decompress(data):

bytecode = base64.b64decode(data)
d = zlib.decompressobj(32 + zlib.MAX_WBITS)
decompressed_data = d.decompress(bytecode )
return(decompressed_data.decode('utf-8'))

Could this because of the two python environment mismatch from Driver side? But the processing
happens in the executor side?

Regards,

Dhrub

On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari <[hidden email]> wrote:
Maybe you can try running it in a python shell or jupyter-console/ipython instead of a spark-submit and check how much time it takes too.

Compare the env variables to check that no additional env configuration is present in either environment.

Also is the python environment for both the exact same? I ask because it looks like you're using a UDF and if the Jupyter python has (let's say) numpy compiled with blas it would be faster than a numpy without it. Etc. I.E. Some library you use may be using pure python and another may be using a faster C extension... 

What python libraries are you using in the UDFs? It you don't use UDFs at all and use some very simple pure spark functions does the time difference still exist? 

Also are you using dynamic allocation or some similar spark config which could vary performance between runs because the same resources we're not utilized on Jupyter / spark-submit? 


On Wed, Sep 11, 2019, 08:43 Stephen Boesch <[hidden email]> wrote:
Sounds like you have done your homework to properly compare .   I'm guessing the answer to the following is yes .. but in any case:  are they both running against the same spark cluster with the same configuration parameters especially executor memory and number of workers?

Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <[hidden email]>:
No, i checked for that, hence written "brand new" jupyter notebook. Also the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs compressed base64 encoded text data from a hive table and decompressing and decoding in one of the udfs. Also the time compared is from Spark UI not  how long the job actually takes after submission. Its just the running time i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <[hidden email]> wrote:
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

Dhrubajyoti Hati
Hi, 

I just ran the same script in a shell in jupyter notebook and find the performance to be similar. So I can confirm this is because the libraries used jupyter notebook python is different than the spark-submit python this is happening. 

But now I have a following question. Are the dependent libraries in a python script also transferred to the worker machines when executing a python script in spark. Because though the driver python versions are different, the workers machines will use their same python environment to run the code. If anyone can explain this part, it would be helpful.

Regards,

Dhrubajyoti Hati.
Mob No: 9886428028/9652029028



On Wed, Sep 11, 2019 at 9:45 AM Dhrubajyoti Hati <[hidden email]> wrote:
Just checked from where the script is submitted i.e. wrt Driver, the python env are different. Jupyter one is running within a the virtual environment which is Python 2.7.5 and the spark-submit one uses 2.6.6. But the executors have the same python version right? I tried doing a spark-submit from jupyter shell, it fails to find python 2.7  which is not there hence throws error.

Here is the udf which might take time:
import base64
import zlib
def decompress(data):

bytecode = base64.b64decode(data)
d = zlib.decompressobj(32 + zlib.MAX_WBITS)
decompressed_data = d.decompress(bytecode )
return(decompressed_data.decode('utf-8'))

Could this because of the two python environment mismatch from Driver side? But the processing
happens in the executor side?

Regards,

Dhrub

On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari <[hidden email]> wrote:
Maybe you can try running it in a python shell or jupyter-console/ipython instead of a spark-submit and check how much time it takes too.

Compare the env variables to check that no additional env configuration is present in either environment.

Also is the python environment for both the exact same? I ask because it looks like you're using a UDF and if the Jupyter python has (let's say) numpy compiled with blas it would be faster than a numpy without it. Etc. I.E. Some library you use may be using pure python and another may be using a faster C extension... 

What python libraries are you using in the UDFs? It you don't use UDFs at all and use some very simple pure spark functions does the time difference still exist? 

Also are you using dynamic allocation or some similar spark config which could vary performance between runs because the same resources we're not utilized on Jupyter / spark-submit? 


On Wed, Sep 11, 2019, 08:43 Stephen Boesch <[hidden email]> wrote:
Sounds like you have done your homework to properly compare .   I'm guessing the answer to the following is yes .. but in any case:  are they both running against the same spark cluster with the same configuration parameters especially executor memory and number of workers?

Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <[hidden email]>:
No, i checked for that, hence written "brand new" jupyter notebook. Also the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs compressed base64 encoded text data from a hive table and decompressing and decoding in one of the udfs. Also the time compared is from Spark UI not  how long the job actually takes after submission. Its just the running time i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <[hidden email]> wrote:
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

Patrick McCarthy-2
Are you running in cluster mode? A large virtualenv zip for the driver sent into the cluster on a slow pipe could account for much of that eight minutes.

On Wed, Sep 11, 2019 at 3:17 AM Dhrubajyoti Hati <[hidden email]> wrote:
Hi, 

I just ran the same script in a shell in jupyter notebook and find the performance to be similar. So I can confirm this is because the libraries used jupyter notebook python is different than the spark-submit python this is happening. 

But now I have a following question. Are the dependent libraries in a python script also transferred to the worker machines when executing a python script in spark. Because though the driver python versions are different, the workers machines will use their same python environment to run the code. If anyone can explain this part, it would be helpful.

Regards,

Dhrubajyoti Hati.
Mob No: 9886428028/9652029028



On Wed, Sep 11, 2019 at 9:45 AM Dhrubajyoti Hati <[hidden email]> wrote:
Just checked from where the script is submitted i.e. wrt Driver, the python env are different. Jupyter one is running within a the virtual environment which is Python 2.7.5 and the spark-submit one uses 2.6.6. But the executors have the same python version right? I tried doing a spark-submit from jupyter shell, it fails to find python 2.7  which is not there hence throws error.

Here is the udf which might take time:
import base64
import zlib
def decompress(data):

bytecode = base64.b64decode(data)
d = zlib.decompressobj(32 + zlib.MAX_WBITS)
decompressed_data = d.decompress(bytecode )
return(decompressed_data.decode('utf-8'))

Could this because of the two python environment mismatch from Driver side? But the processing
happens in the executor side?

Regards,

Dhrub

On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari <[hidden email]> wrote:
Maybe you can try running it in a python shell or jupyter-console/ipython instead of a spark-submit and check how much time it takes too.

Compare the env variables to check that no additional env configuration is present in either environment.

Also is the python environment for both the exact same? I ask because it looks like you're using a UDF and if the Jupyter python has (let's say) numpy compiled with blas it would be faster than a numpy without it. Etc. I.E. Some library you use may be using pure python and another may be using a faster C extension... 

What python libraries are you using in the UDFs? It you don't use UDFs at all and use some very simple pure spark functions does the time difference still exist? 

Also are you using dynamic allocation or some similar spark config which could vary performance between runs because the same resources we're not utilized on Jupyter / spark-submit? 


On Wed, Sep 11, 2019, 08:43 Stephen Boesch <[hidden email]> wrote:
Sounds like you have done your homework to properly compare .   I'm guessing the answer to the following is yes .. but in any case:  are they both running against the same spark cluster with the same configuration parameters especially executor memory and number of workers?

Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <[hidden email]>:
No, i checked for that, hence written "brand new" jupyter notebook. Also the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs compressed base64 encoded text data from a hive table and decompressing and decoding in one of the udfs. Also the time compared is from Spark UI not  how long the job actually takes after submission. Its just the running time i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <[hidden email]> wrote:
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016



--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

Dhrubajyoti Hati
But would it be the case for multiple tasks running on the same worker and also both the tasks are running in client mode, so the one true is true for both or for neither. As mentioned earlier all the confs are same. I have checked and compared each conf.

As Abdeali mentioned it must be because the  way libraries are in both the environments. Also i verified by running the same script for jupyter environment and was able to get the same result using the normal script which i was running with spark-submit.

Currently i am searching for the ways the python packages are transferred from driver to spark cluster in client mode. Any info on that topic would be helpful.

Thanks!



On Wed, 11 Sep, 2019, 7:06 PM Patrick McCarthy, <[hidden email]> wrote:
Are you running in cluster mode? A large virtualenv zip for the driver sent into the cluster on a slow pipe could account for much of that eight minutes.

On Wed, Sep 11, 2019 at 3:17 AM Dhrubajyoti Hati <[hidden email]> wrote:
Hi, 

I just ran the same script in a shell in jupyter notebook and find the performance to be similar. So I can confirm this is because the libraries used jupyter notebook python is different than the spark-submit python this is happening. 

But now I have a following question. Are the dependent libraries in a python script also transferred to the worker machines when executing a python script in spark. Because though the driver python versions are different, the workers machines will use their same python environment to run the code. If anyone can explain this part, it would be helpful.

Regards,

Dhrubajyoti Hati.
Mob No: 9886428028/9652029028



On Wed, Sep 11, 2019 at 9:45 AM Dhrubajyoti Hati <[hidden email]> wrote:
Just checked from where the script is submitted i.e. wrt Driver, the python env are different. Jupyter one is running within a the virtual environment which is Python 2.7.5 and the spark-submit one uses 2.6.6. But the executors have the same python version right? I tried doing a spark-submit from jupyter shell, it fails to find python 2.7  which is not there hence throws error.

Here is the udf which might take time:
import base64
import zlib
def decompress(data):

bytecode = base64.b64decode(data)
d = zlib.decompressobj(32 + zlib.MAX_WBITS)
decompressed_data = d.decompress(bytecode )
return(decompressed_data.decode('utf-8'))

Could this because of the two python environment mismatch from Driver side? But the processing
happens in the executor side?

Regards,

Dhrub

On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari <[hidden email]> wrote:
Maybe you can try running it in a python shell or jupyter-console/ipython instead of a spark-submit and check how much time it takes too.

Compare the env variables to check that no additional env configuration is present in either environment.

Also is the python environment for both the exact same? I ask because it looks like you're using a UDF and if the Jupyter python has (let's say) numpy compiled with blas it would be faster than a numpy without it. Etc. I.E. Some library you use may be using pure python and another may be using a faster C extension... 

What python libraries are you using in the UDFs? It you don't use UDFs at all and use some very simple pure spark functions does the time difference still exist? 

Also are you using dynamic allocation or some similar spark config which could vary performance between runs because the same resources we're not utilized on Jupyter / spark-submit? 


On Wed, Sep 11, 2019, 08:43 Stephen Boesch <[hidden email]> wrote:
Sounds like you have done your homework to properly compare .   I'm guessing the answer to the following is yes .. but in any case:  are they both running against the same spark cluster with the same configuration parameters especially executor memory and number of workers?

Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <[hidden email]>:
No, i checked for that, hence written "brand new" jupyter notebook. Also the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs compressed base64 encoded text data from a hive table and decompressing and decoding in one of the udfs. Also the time compared is from Spark UI not  how long the job actually takes after submission. Its just the running time i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <[hidden email]> wrote:
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016



--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

AbdealiJK
The driver python may not always be the same as the executor python.
You can set these using PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON

The dependent libraries are not transferred by spark in any way unless you do a --py-files or .addPyFile()

Could you try this:
import sys; print(sys.prefix)

on the driver, and also run this inside a UDF with:

def dummy(a):
    import sys; raise AssertionError(sys.prefix)

and get the traceback exception on the driver ?
This would be the best way to get the exact sys.prefix (python path) for both the executors and driver.

Also, could you elaborate on what environment is this ?
Linux? - CentOS/Ubuntu/etc. ? 
How was the py 2.6.6 installed ?
How was the py 2.7.5 venv created and how what the base py 2.7.5 installed ?

Also, how are you creating the Spark Session in jupyter ?


On Wed, Sep 11, 2019 at 7:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
But would it be the case for multiple tasks running on the same worker and also both the tasks are running in client mode, so the one true is true for both or for neither. As mentioned earlier all the confs are same. I have checked and compared each conf.

As Abdeali mentioned it must be because the  way libraries are in both the environments. Also i verified by running the same script for jupyter environment and was able to get the same result using the normal script which i was running with spark-submit.

Currently i am searching for the ways the python packages are transferred from driver to spark cluster in client mode. Any info on that topic would be helpful.

Thanks!



On Wed, 11 Sep, 2019, 7:06 PM Patrick McCarthy, <[hidden email]> wrote:
Are you running in cluster mode? A large virtualenv zip for the driver sent into the cluster on a slow pipe could account for much of that eight minutes.

On Wed, Sep 11, 2019 at 3:17 AM Dhrubajyoti Hati <[hidden email]> wrote:
Hi, 

I just ran the same script in a shell in jupyter notebook and find the performance to be similar. So I can confirm this is because the libraries used jupyter notebook python is different than the spark-submit python this is happening. 

But now I have a following question. Are the dependent libraries in a python script also transferred to the worker machines when executing a python script in spark. Because though the driver python versions are different, the workers machines will use their same python environment to run the code. If anyone can explain this part, it would be helpful.

Regards,

Dhrubajyoti Hati.
Mob No: 9886428028/9652029028



On Wed, Sep 11, 2019 at 9:45 AM Dhrubajyoti Hati <[hidden email]> wrote:
Just checked from where the script is submitted i.e. wrt Driver, the python env are different. Jupyter one is running within a the virtual environment which is Python 2.7.5 and the spark-submit one uses 2.6.6. But the executors have the same python version right? I tried doing a spark-submit from jupyter shell, it fails to find python 2.7  which is not there hence throws error.

Here is the udf which might take time:
import base64
import zlib
def decompress(data):

bytecode = base64.b64decode(data)
d = zlib.decompressobj(32 + zlib.MAX_WBITS)
decompressed_data = d.decompress(bytecode )
return(decompressed_data.decode('utf-8'))

Could this because of the two python environment mismatch from Driver side? But the processing
happens in the executor side?

Regards,

Dhrub

On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari <[hidden email]> wrote:
Maybe you can try running it in a python shell or jupyter-console/ipython instead of a spark-submit and check how much time it takes too.

Compare the env variables to check that no additional env configuration is present in either environment.

Also is the python environment for both the exact same? I ask because it looks like you're using a UDF and if the Jupyter python has (let's say) numpy compiled with blas it would be faster than a numpy without it. Etc. I.E. Some library you use may be using pure python and another may be using a faster C extension... 

What python libraries are you using in the UDFs? It you don't use UDFs at all and use some very simple pure spark functions does the time difference still exist? 

Also are you using dynamic allocation or some similar spark config which could vary performance between runs because the same resources we're not utilized on Jupyter / spark-submit? 


On Wed, Sep 11, 2019, 08:43 Stephen Boesch <[hidden email]> wrote:
Sounds like you have done your homework to properly compare .   I'm guessing the answer to the following is yes .. but in any case:  are they both running against the same spark cluster with the same configuration parameters especially executor memory and number of workers?

Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <[hidden email]>:
No, i checked for that, hence written "brand new" jupyter notebook. Also the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs compressed base64 encoded text data from a hive table and decompressing and decoding in one of the udfs. Also the time compared is from Spark UI not  how long the job actually takes after submission. Its just the running time i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <[hidden email]> wrote:
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016



--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

Dhrubajyoti Hati
If you say that libraries are not transferred by default and in my case I haven't used any --py-files then just because the driver python is different I have facing 6x speed difference ? I am using client mode to submit the program but the udfs and all are executed in the executors, then why is the difference so much?

I tried the prints
For jupyter one the driver prints
../../jupyter-folder/venv

and executors print /usr

For spark-submit both of them print /usr

The cluster is created few years back and used organisation wide. So how python 2.6.6 is installed, i honestly do not know.  I copied the whole jupyter from org git repo as it was shared, so i do not know how the venv was created or python for venv was created even.

The os is CentOS release 6.9 (Final)


Regards,

Dhrubajyoti Hati.
Mob No: 9886428028/9652029028



On Wed, Sep 11, 2019 at 8:22 PM Abdeali Kothari <[hidden email]> wrote:
The driver python may not always be the same as the executor python.
You can set these using PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON

The dependent libraries are not transferred by spark in any way unless you do a --py-files or .addPyFile()

Could you try this:
import sys; print(sys.prefix)

on the driver, and also run this inside a UDF with:

def dummy(a):
    import sys; raise AssertionError(sys.prefix)

and get the traceback exception on the driver ?
This would be the best way to get the exact sys.prefix (python path) for both the executors and driver.

Also, could you elaborate on what environment is this ?
Linux? - CentOS/Ubuntu/etc. ? 
How was the py 2.6.6 installed ?
How was the py 2.7.5 venv created and how what the base py 2.7.5 installed ?

Also, how are you creating the Spark Session in jupyter ?


On Wed, Sep 11, 2019 at 7:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
But would it be the case for multiple tasks running on the same worker and also both the tasks are running in client mode, so the one true is true for both or for neither. As mentioned earlier all the confs are same. I have checked and compared each conf.

As Abdeali mentioned it must be because the  way libraries are in both the environments. Also i verified by running the same script for jupyter environment and was able to get the same result using the normal script which i was running with spark-submit.

Currently i am searching for the ways the python packages are transferred from driver to spark cluster in client mode. Any info on that topic would be helpful.

Thanks!



On Wed, 11 Sep, 2019, 7:06 PM Patrick McCarthy, <[hidden email]> wrote:
Are you running in cluster mode? A large virtualenv zip for the driver sent into the cluster on a slow pipe could account for much of that eight minutes.

On Wed, Sep 11, 2019 at 3:17 AM Dhrubajyoti Hati <[hidden email]> wrote:
Hi, 

I just ran the same script in a shell in jupyter notebook and find the performance to be similar. So I can confirm this is because the libraries used jupyter notebook python is different than the spark-submit python this is happening. 

But now I have a following question. Are the dependent libraries in a python script also transferred to the worker machines when executing a python script in spark. Because though the driver python versions are different, the workers machines will use their same python environment to run the code. If anyone can explain this part, it would be helpful.

Regards,

Dhrubajyoti Hati.
Mob No: 9886428028/9652029028



On Wed, Sep 11, 2019 at 9:45 AM Dhrubajyoti Hati <[hidden email]> wrote:
Just checked from where the script is submitted i.e. wrt Driver, the python env are different. Jupyter one is running within a the virtual environment which is Python 2.7.5 and the spark-submit one uses 2.6.6. But the executors have the same python version right? I tried doing a spark-submit from jupyter shell, it fails to find python 2.7  which is not there hence throws error.

Here is the udf which might take time:
import base64
import zlib
def decompress(data):

bytecode = base64.b64decode(data)
d = zlib.decompressobj(32 + zlib.MAX_WBITS)
decompressed_data = d.decompress(bytecode )
return(decompressed_data.decode('utf-8'))

Could this because of the two python environment mismatch from Driver side? But the processing
happens in the executor side?

Regards,

Dhrub

On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari <[hidden email]> wrote:
Maybe you can try running it in a python shell or jupyter-console/ipython instead of a spark-submit and check how much time it takes too.

Compare the env variables to check that no additional env configuration is present in either environment.

Also is the python environment for both the exact same? I ask because it looks like you're using a UDF and if the Jupyter python has (let's say) numpy compiled with blas it would be faster than a numpy without it. Etc. I.E. Some library you use may be using pure python and another may be using a faster C extension... 

What python libraries are you using in the UDFs? It you don't use UDFs at all and use some very simple pure spark functions does the time difference still exist? 

Also are you using dynamic allocation or some similar spark config which could vary performance between runs because the same resources we're not utilized on Jupyter / spark-submit? 


On Wed, Sep 11, 2019, 08:43 Stephen Boesch <[hidden email]> wrote:
Sounds like you have done your homework to properly compare .   I'm guessing the answer to the following is yes .. but in any case:  are they both running against the same spark cluster with the same configuration parameters especially executor memory and number of workers?

Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <[hidden email]>:
No, i checked for that, hence written "brand new" jupyter notebook. Also the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs compressed base64 encoded text data from a hive table and decompressing and decoding in one of the udfs. Also the time compared is from Spark UI not  how long the job actually takes after submission. Its just the running time i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <[hidden email]> wrote:
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016



--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

Dhrubajyoti Hati
Also the performance remains identical when running the same script from jupyter terminal instead or normal terminal. In the script the spark context is created by 

spark = SparkSession \
    .builder \
..
..
getOrCreate() command


On Wed, Sep 11, 2019 at 10:28 PM Dhrubajyoti Hati <[hidden email]> wrote:
If you say that libraries are not transferred by default and in my case I haven't used any --py-files then just because the driver python is different I have facing 6x speed difference ? I am using client mode to submit the program but the udfs and all are executed in the executors, then why is the difference so much?

I tried the prints
For jupyter one the driver prints
../../jupyter-folder/venv

and executors print /usr

For spark-submit both of them print /usr

The cluster is created few years back and used organisation wide. So how python 2.6.6 is installed, i honestly do not know.  I copied the whole jupyter from org git repo as it was shared, so i do not know how the venv was created or python for venv was created even.

The os is CentOS release 6.9 (Final)


Regards,

Dhrubajyoti Hati.
Mob No: 9886428028/9652029028



On Wed, Sep 11, 2019 at 8:22 PM Abdeali Kothari <[hidden email]> wrote:
The driver python may not always be the same as the executor python.
You can set these using PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON

The dependent libraries are not transferred by spark in any way unless you do a --py-files or .addPyFile()

Could you try this:
import sys; print(sys.prefix)

on the driver, and also run this inside a UDF with:

def dummy(a):
    import sys; raise AssertionError(sys.prefix)

and get the traceback exception on the driver ?
This would be the best way to get the exact sys.prefix (python path) for both the executors and driver.

Also, could you elaborate on what environment is this ?
Linux? - CentOS/Ubuntu/etc. ? 
How was the py 2.6.6 installed ?
How was the py 2.7.5 venv created and how what the base py 2.7.5 installed ?

Also, how are you creating the Spark Session in jupyter ?


On Wed, Sep 11, 2019 at 7:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
But would it be the case for multiple tasks running on the same worker and also both the tasks are running in client mode, so the one true is true for both or for neither. As mentioned earlier all the confs are same. I have checked and compared each conf.

As Abdeali mentioned it must be because the  way libraries are in both the environments. Also i verified by running the same script for jupyter environment and was able to get the same result using the normal script which i was running with spark-submit.

Currently i am searching for the ways the python packages are transferred from driver to spark cluster in client mode. Any info on that topic would be helpful.

Thanks!



On Wed, 11 Sep, 2019, 7:06 PM Patrick McCarthy, <[hidden email]> wrote:
Are you running in cluster mode? A large virtualenv zip for the driver sent into the cluster on a slow pipe could account for much of that eight minutes.

On Wed, Sep 11, 2019 at 3:17 AM Dhrubajyoti Hati <[hidden email]> wrote:
Hi, 

I just ran the same script in a shell in jupyter notebook and find the performance to be similar. So I can confirm this is because the libraries used jupyter notebook python is different than the spark-submit python this is happening. 

But now I have a following question. Are the dependent libraries in a python script also transferred to the worker machines when executing a python script in spark. Because though the driver python versions are different, the workers machines will use their same python environment to run the code. If anyone can explain this part, it would be helpful.

Regards,

Dhrubajyoti Hati.
Mob No: 9886428028/9652029028



On Wed, Sep 11, 2019 at 9:45 AM Dhrubajyoti Hati <[hidden email]> wrote:
Just checked from where the script is submitted i.e. wrt Driver, the python env are different. Jupyter one is running within a the virtual environment which is Python 2.7.5 and the spark-submit one uses 2.6.6. But the executors have the same python version right? I tried doing a spark-submit from jupyter shell, it fails to find python 2.7  which is not there hence throws error.

Here is the udf which might take time:
import base64
import zlib
def decompress(data):

bytecode = base64.b64decode(data)
d = zlib.decompressobj(32 + zlib.MAX_WBITS)
decompressed_data = d.decompress(bytecode )
return(decompressed_data.decode('utf-8'))

Could this because of the two python environment mismatch from Driver side? But the processing
happens in the executor side?

Regards,

Dhrub

On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari <[hidden email]> wrote:
Maybe you can try running it in a python shell or jupyter-console/ipython instead of a spark-submit and check how much time it takes too.

Compare the env variables to check that no additional env configuration is present in either environment.

Also is the python environment for both the exact same? I ask because it looks like you're using a UDF and if the Jupyter python has (let's say) numpy compiled with blas it would be faster than a numpy without it. Etc. I.E. Some library you use may be using pure python and another may be using a faster C extension... 

What python libraries are you using in the UDFs? It you don't use UDFs at all and use some very simple pure spark functions does the time difference still exist? 

Also are you using dynamic allocation or some similar spark config which could vary performance between runs because the same resources we're not utilized on Jupyter / spark-submit? 


On Wed, Sep 11, 2019, 08:43 Stephen Boesch <[hidden email]> wrote:
Sounds like you have done your homework to properly compare .   I'm guessing the answer to the following is yes .. but in any case:  are they both running against the same spark cluster with the same configuration parameters especially executor memory and number of workers?

Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <[hidden email]>:
No, i checked for that, hence written "brand new" jupyter notebook. Also the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs compressed base64 encoded text data from a hive table and decompressing and decoding in one of the udfs. Also the time compared is from Spark UI not  how long the job actually takes after submission. Its just the running time i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <[hidden email]> wrote:
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016



--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016

Reply | Threaded
Open this post in threaded view
|

Re: script running in jupyter 6-7x faster than spark submit

AbdealiJK
In a bash terminal, can you do:
export PYSPARK_DRIVER_PYTHON=/path/to/venv/bin/python
and then:
run the spark-shell script ?

This should mimic the behaviour of jupyter in spark-shell and should be fast (1-2mins similar to jupyter notebook)
This would confirm the guess that the python2.7 venv has some magic ^_^



On Wed, Sep 11, 2019 at 10:32 PM Dhrubajyoti Hati <[hidden email]> wrote:
Also the performance remains identical when running the same script from jupyter terminal instead or normal terminal. In the script the spark context is created by 

spark = SparkSession \
    .builder \
..
..
getOrCreate() command


On Wed, Sep 11, 2019 at 10:28 PM Dhrubajyoti Hati <[hidden email]> wrote:
If you say that libraries are not transferred by default and in my case I haven't used any --py-files then just because the driver python is different I have facing 6x speed difference ? I am using client mode to submit the program but the udfs and all are executed in the executors, then why is the difference so much?

I tried the prints
For jupyter one the driver prints
../../jupyter-folder/venv

and executors print /usr

For spark-submit both of them print /usr

The cluster is created few years back and used organisation wide. So how python 2.6.6 is installed, i honestly do not know.  I copied the whole jupyter from org git repo as it was shared, so i do not know how the venv was created or python for venv was created even.

The os is CentOS release 6.9 (Final)


Regards,

Dhrubajyoti Hati.
Mob No: 9886428028/9652029028



On Wed, Sep 11, 2019 at 8:22 PM Abdeali Kothari <[hidden email]> wrote:
The driver python may not always be the same as the executor python.
You can set these using PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON

The dependent libraries are not transferred by spark in any way unless you do a --py-files or .addPyFile()

Could you try this:
import sys; print(sys.prefix)

on the driver, and also run this inside a UDF with:

def dummy(a):
    import sys; raise AssertionError(sys.prefix)

and get the traceback exception on the driver ?
This would be the best way to get the exact sys.prefix (python path) for both the executors and driver.

Also, could you elaborate on what environment is this ?
Linux? - CentOS/Ubuntu/etc. ? 
How was the py 2.6.6 installed ?
How was the py 2.7.5 venv created and how what the base py 2.7.5 installed ?

Also, how are you creating the Spark Session in jupyter ?


On Wed, Sep 11, 2019 at 7:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
But would it be the case for multiple tasks running on the same worker and also both the tasks are running in client mode, so the one true is true for both or for neither. As mentioned earlier all the confs are same. I have checked and compared each conf.

As Abdeali mentioned it must be because the  way libraries are in both the environments. Also i verified by running the same script for jupyter environment and was able to get the same result using the normal script which i was running with spark-submit.

Currently i am searching for the ways the python packages are transferred from driver to spark cluster in client mode. Any info on that topic would be helpful.

Thanks!



On Wed, 11 Sep, 2019, 7:06 PM Patrick McCarthy, <[hidden email]> wrote:
Are you running in cluster mode? A large virtualenv zip for the driver sent into the cluster on a slow pipe could account for much of that eight minutes.

On Wed, Sep 11, 2019 at 3:17 AM Dhrubajyoti Hati <[hidden email]> wrote:
Hi, 

I just ran the same script in a shell in jupyter notebook and find the performance to be similar. So I can confirm this is because the libraries used jupyter notebook python is different than the spark-submit python this is happening. 

But now I have a following question. Are the dependent libraries in a python script also transferred to the worker machines when executing a python script in spark. Because though the driver python versions are different, the workers machines will use their same python environment to run the code. If anyone can explain this part, it would be helpful.

Regards,

Dhrubajyoti Hati.
Mob No: 9886428028/9652029028



On Wed, Sep 11, 2019 at 9:45 AM Dhrubajyoti Hati <[hidden email]> wrote:
Just checked from where the script is submitted i.e. wrt Driver, the python env are different. Jupyter one is running within a the virtual environment which is Python 2.7.5 and the spark-submit one uses 2.6.6. But the executors have the same python version right? I tried doing a spark-submit from jupyter shell, it fails to find python 2.7  which is not there hence throws error.

Here is the udf which might take time:
import base64
import zlib
def decompress(data):

bytecode = base64.b64decode(data)
d = zlib.decompressobj(32 + zlib.MAX_WBITS)
decompressed_data = d.decompress(bytecode )
return(decompressed_data.decode('utf-8'))

Could this because of the two python environment mismatch from Driver side? But the processing
happens in the executor side?

Regards,

Dhrub

On Wed, Sep 11, 2019 at 8:59 AM Abdeali Kothari <[hidden email]> wrote:
Maybe you can try running it in a python shell or jupyter-console/ipython instead of a spark-submit and check how much time it takes too.

Compare the env variables to check that no additional env configuration is present in either environment.

Also is the python environment for both the exact same? I ask because it looks like you're using a UDF and if the Jupyter python has (let's say) numpy compiled with blas it would be faster than a numpy without it. Etc. I.E. Some library you use may be using pure python and another may be using a faster C extension... 

What python libraries are you using in the UDFs? It you don't use UDFs at all and use some very simple pure spark functions does the time difference still exist? 

Also are you using dynamic allocation or some similar spark config which could vary performance between runs because the same resources we're not utilized on Jupyter / spark-submit? 


On Wed, Sep 11, 2019, 08:43 Stephen Boesch <[hidden email]> wrote:
Sounds like you have done your homework to properly compare .   I'm guessing the answer to the following is yes .. but in any case:  are they both running against the same spark cluster with the same configuration parameters especially executor memory and number of workers?

Am Di., 10. Sept. 2019 um 20:05 Uhr schrieb Dhrubajyoti Hati <[hidden email]>:
No, i checked for that, hence written "brand new" jupyter notebook. Also the time taken by both are 30 mins and ~3hrs as i am reading a 500  gigs compressed base64 encoded text data from a hive table and decompressing and decoding in one of the udfs. Also the time compared is from Spark UI not  how long the job actually takes after submission. Its just the running time i am comparing/mentioning.

As mentioned earlier, all the spark conf params even match in two scripts and that's why i am puzzled what going on.

On Wed, 11 Sep, 2019, 12:44 AM Patrick McCarthy, <[hidden email]> wrote:
It's not obvious from what you pasted, but perhaps the juypter notebook already is connected to a running spark context, while spark-submit needs to get a new spot in the (YARN?) queue.

I would check the cluster job IDs for both to ensure you're getting new cluster tasks for each.

On Tue, Sep 10, 2019 at 2:33 PM Dhrubajyoti Hati <[hidden email]> wrote:
Hi,

I am facing a weird behaviour while running a python script. Here is what the code looks like mostly:

def fn1(ip):
   some code...
    ...

def fn2(row):
    ...
    some operations
    ...
    return row1


udf_fn1 = udf(fn1)
cdf = spark.read.table("xxxx") //hive table is of size > 500 Gigs with ~4500 partitions
ddf = cdf.withColumn("coly", udf_fn1(cdf.colz)) \
    .drop("colz") \
    .withColumnRenamed("colz", "coly")

edf = ddf \
    .filter(ddf.colp == 'some_value') \
    .rdd.map(lambda row: fn2(row)) \
    .toDF()

print edf.count() // simple way for the performance test in both platforms

Now when I run the same code in a brand new jupyter notebook it runs 6x faster than when I run this python script using spark-submit. The configurations are printed and  compared from both the platforms and they are exact same. I even tried to run this script in a single cell of jupyter notebook and still have the same performance. I need to understand if I am missing something in the spark-submit which is causing the issue.  I tried to minimise the script to reproduce the same error without much code.

Both are run in client mode on a yarn based spark cluster. The machines from which both are executed are also the same and from same user.

What i found is the  the quantile values for median for one ran with jupyter was 1.3 mins and one ran with spark-submit was ~8.5 mins.  I am not able to figure out why this is happening.

Any one faced this kind of issue before or know how to resolve this?

Regards,
Dhrub


--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016



--

Patrick McCarthy 

Senior Data Scientist, Machine Learning Engineering

Dstillery

470 Park Ave South, 17th Floor, NYC 10016