How to make pyspark use custom python?

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

How to make pyspark use custom python?

mithril
For better looking , please see
https://stackoverflow.com/questions/52178406/howto-make-pyspark-use-custom-python
<https://stackoverflow.com/questions/52178406/howto-make-pyspark-use-custom-python>  

----------------------


I am using zeppelin connect remote spark cluster.

remote spark is using system python 2.7 .

I want to switch to miniconda3, install a lib pyarrow.
What I do is :

1. Download miniconda3, install some libs, scp miniconda3 folder to spark
master and slaves.
2. adding `PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"` to
`spark-env.sh` in spark master and slaves.
3. restart spark and zeppelin
4. Running code

    %spark.pyspark
       
        import pandas as pd
        from pyspark.sql.functions import pandas_udf,PandasUDFType
       
       
        @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
        def process_order_items(pdf):
       
            pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count']
           
            d = {'has_discount':'count',
                'clearance':'count',
                'count': ['count', 'sum'],
                'price_guide':'max',
                'total_price': 'sum'
               
            }
           
            pdf1 = pdf.groupby('day').agg(d)
            pdf1.columns = pdf1.columns.map('_'.join)
            d1 = {'has_discount_count':'discount_order_count',
                'clearance_count':'clearance_order_count',
                'count_count':'order_count',
                'count_sum':'sale_count',
                'price_guide_max':'price_guide',
                'total_price_sum': 'total_price'
            }
               
            pdf2 = pdf1.rename(columns=d1)
       
            pdf2.loc[:, 'discount_sale_count'] = pdf.loc[pdf.has_discount>0,
'count'].resample(freq).sum()
            pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0,
'count'].resample(freq).sum()
            pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count
       
            pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index)
           
            return pdf2
       
       
        results = df.groupby("store_id",
"product_id").apply(process_order_items)
       
        results.select(['store_id', 'price']).show(5)


Got error :

    Py4JJavaError: An error occurred while calling o172.showString.
    : org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in
stage 6.0 (TID 143, 10.104.33.18, executor 2):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
230, in main
        process()
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
225, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
150, in <lambda>
        func = lambda _, it: map(mapper, it)
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py",
line 276, in load_stream
        import pyarrow as pa
    ImportError: No module named pyarrow


`10.104.33.18` is spark master,  so I think the `PYSPARK_PYTHON` is not set
correctly .

`pyspark`

I login to master and slaves, run `pyspark interpreter` in each, and found
`import pyarrow` do not throw exception .


PS: `pyarrow` also installed in the machine which running zeppelin.

--------------

More info:


1. spark cluster is installed in A, B, C , zeppelin is installed in D.
2. `PYSPARK_PYTHON` is set in `spark-env.sh` in each A, B, C
3. `import pyarrow` is fine with `/usr/local/spark/bin/pyspark` in A, B ,C /
4. `import pyarrow` is fine on A, B ,C custom python(miniconda3)
5. `import pyarrow` is fine on D's default python(miniconda3, path is
different with A, B ,C , but it is doesn't matter)



So I completely coundn't understand why it doesn't work.







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to make pyspark use custom python?

Hyukjin Kwon
Are you doubly sure if it is an issue in Spark? I used custom python several times with setting it in PYSPARK_PYTHON before and it was no problem.

2018년 9월 6일 (목) 오후 2:21, mithril <[hidden email]>님이 작성:
For better looking , please see
https://stackoverflow.com/questions/52178406/howto-make-pyspark-use-custom-python
<https://stackoverflow.com/questions/52178406/howto-make-pyspark-use-custom-python

----------------------


I am using zeppelin connect remote spark cluster.

remote spark is using system python 2.7 .

I want to switch to miniconda3, install a lib pyarrow.
What I do is :

1. Download miniconda3, install some libs, scp miniconda3 folder to spark
master and slaves.
2. adding `PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"` to
`spark-env.sh` in spark master and slaves.
3. restart spark and zeppelin
4. Running code

    %spark.pyspark

        import pandas as pd
        from pyspark.sql.functions import pandas_udf,PandasUDFType


        @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
        def process_order_items(pdf):

            pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count']

            d = {'has_discount':'count',
                'clearance':'count',
                'count': ['count', 'sum'],
                'price_guide':'max',
                'total_price': 'sum'

            }

            pdf1 = pdf.groupby('day').agg(d)
            pdf1.columns = pdf1.columns.map('_'.join)
            d1 = {'has_discount_count':'discount_order_count',
                'clearance_count':'clearance_order_count',
                'count_count':'order_count',
                'count_sum':'sale_count',
                'price_guide_max':'price_guide',
                'total_price_sum': 'total_price'
            }

            pdf2 = pdf1.rename(columns=d1)

            pdf2.loc[:, 'discount_sale_count'] = pdf.loc[pdf.has_discount>0,
'count'].resample(freq).sum()
            pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0,
'count'].resample(freq).sum()
            pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count

            pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index)

            return pdf2


        results = df.groupby("store_id",
"product_id").apply(process_order_items)

        results.select(['store_id', 'price']).show(5)


Got error :

    Py4JJavaError: An error occurred while calling o172.showString.
    : org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in
stage 6.0 (TID 143, 10.104.33.18, executor 2):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
230, in main
        process()
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
225, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
150, in <lambda>
        func = lambda _, it: map(mapper, it)
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py",
line 276, in load_stream
        import pyarrow as pa
    ImportError: No module named pyarrow


`10.104.33.18` is spark master,  so I think the `PYSPARK_PYTHON` is not set
correctly .

`pyspark`

I login to master and slaves, run `pyspark interpreter` in each, and found
`import pyarrow` do not throw exception .


PS: `pyarrow` also installed in the machine which running zeppelin.

--------------

More info:


1. spark cluster is installed in A, B, C , zeppelin is installed in D.
2. `PYSPARK_PYTHON` is set in `spark-env.sh` in each A, B, C
3. `import pyarrow` is fine with `/usr/local/spark/bin/pyspark` in A, B ,C /
4. `import pyarrow` is fine on A, B ,C custom python(miniconda3)
5. `import pyarrow` is fine on D's default python(miniconda3, path is
different with A, B ,C , but it is doesn't matter)



So I completely coundn't understand why it doesn't work.







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to make pyspark use custom python?

Patrick McCarthy-2
It looks like for whatever reason your cluster isn't using the python you distributed, or said distribution doesn't contain what you think.

I've used the following with success to deploy a conda environment to my cluster at runtime: https://henning.kropponline.de/2016/09/24/running-pyspark-with-conda-env/

On Thu, Sep 6, 2018 at 2:58 AM, Hyukjin Kwon <[hidden email]> wrote:
Are you doubly sure if it is an issue in Spark? I used custom python several times with setting it in PYSPARK_PYTHON before and it was no problem.

2018년 9월 6일 (목) 오후 2:21, mithril <[hidden email]>님이 작성:
For better looking , please see
https://stackoverflow.com/questions/52178406/howto-make-pyspark-use-custom-python
<https://stackoverflow.com/questions/52178406/howto-make-pyspark-use-custom-python

----------------------


I am using zeppelin connect remote spark cluster.

remote spark is using system python 2.7 .

I want to switch to miniconda3, install a lib pyarrow.
What I do is :

1. Download miniconda3, install some libs, scp miniconda3 folder to spark
master and slaves.
2. adding `PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"` to
`spark-env.sh` in spark master and slaves.
3. restart spark and zeppelin
4. Running code

    %spark.pyspark

        import pandas as pd
        from pyspark.sql.functions import pandas_udf,PandasUDFType


        @pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
        def process_order_items(pdf):

            pdf.loc[:, 'total_price'] = pdf['price'] * pdf['count']

            d = {'has_discount':'count',
                'clearance':'count',
                'count': ['count', 'sum'],
                'price_guide':'max',
                'total_price': 'sum'

            }

            pdf1 = pdf.groupby('day').agg(d)
            pdf1.columns = pdf1.columns.map('_'.join)
            d1 = {'has_discount_count':'discount_order_count',
                'clearance_count':'clearance_order_count',
                'count_count':'order_count',
                'count_sum':'sale_count',
                'price_guide_max':'price_guide',
                'total_price_sum': 'total_price'
            }

            pdf2 = pdf1.rename(columns=d1)

            pdf2.loc[:, 'discount_sale_count'] = pdf.loc[pdf.has_discount>0,
'count'].resample(freq).sum()
            pdf2.loc[:, 'clearance_sale_count'] = pdf.loc[pdf.clearance>0,
'count'].resample(freq).sum()
            pdf2.loc[:, 'price'] = pdf2.total_price / pdf2.sale_count

            pdf2 = pdf2.drop(pdf2[pdf2.order_count == 0].index)

            return pdf2


        results = df.groupby("store_id",
"product_id").apply(process_order_items)

        results.select(['store_id', 'price']).show(5)


Got error :

    Py4JJavaError: An error occurred while calling o172.showString.
    : org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in
stage 6.0 (TID 143, 10.104.33.18, executor 2):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
230, in main
        process()
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
225, in process
        serializer.dump_stream(func(split_index, iterator), outfile)
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line
150, in <lambda>
        func = lambda _, it: map(mapper, it)
      File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py",
line 276, in load_stream
        import pyarrow as pa
    ImportError: No module named pyarrow


`10.104.33.18` is spark master,  so I think the `PYSPARK_PYTHON` is not set
correctly .

`pyspark`

I login to master and slaves, run `pyspark interpreter` in each, and found
`import pyarrow` do not throw exception .


PS: `pyarrow` also installed in the machine which running zeppelin.

--------------

More info:


1. spark cluster is installed in A, B, C , zeppelin is installed in D.
2. `PYSPARK_PYTHON` is set in `spark-env.sh` in each A, B, C
3. `import pyarrow` is fine with `/usr/local/spark/bin/pyspark` in A, B ,C /
4. `import pyarrow` is fine on A, B ,C custom python(miniconda3)
5. `import pyarrow` is fine on D's default python(miniconda3, path is
different with A, B ,C , but it is doesn't matter)



So I completely coundn't understand why it doesn't work.







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: How to make pyspark use custom python?

mithril

The whole content in `spark-env.sh` is

```
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=10.104.85.78:2181,10.104.114.131:2181,10.135.2.132:2181
-Dspark.deploy.zookeeper.dir=/spark"
PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"
```

I ran `/usr/local/spark/sbin/stop-all.sh`  and
`/usr/local/spark/sbin/start-all.sh` to restart spark cluster.

Anything wrong ??



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [External Sender] Re: How to make pyspark use custom python?

Anthony, Olufemi
Are you sure that pyarrow is deployed on your slave hosts ? If not, you will either have to get it installed or ship it along when you call spark-submit by zipping it up and specifying the zipfile to be shipped using the 
--py-files zipfile.zip option

A quick check would be to ssh to a slave host, run pyspark and try to import pyarrow.

Femi

On Thu, Sep 6, 2018 at 9:25 PM mithril <[hidden email]> wrote:

The whole content in `spark-env.sh` is

```
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=10.104.85.78:2181,10.104.114.131:2181,10.135.2.132:2181
-Dspark.deploy.zookeeper.dir=/spark"
PYSPARK_PYTHON="/usr/local/miniconda3/bin/python"
```

I ran `/usr/local/spark/sbin/stop-all.sh`  and
`/usr/local/spark/sbin/start-all.sh` to restart spark cluster.

Anything wrong ??



--
Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dspark-2Duser-2Dlist.1001560.n3.nabble.com_&d=DwICAg&c=pLULRYW__RtkwsQUPxJVDGboCTdgji3AcHNJU0BpTJE&r=yGeUxkUZBNPLfjlLWOxq5_p1UIOy_S4ghJsg2_iDHFY&m=MukYKwEikKwBiW7D3pP5WDVQCs39Xo8dHytUwL1JjLM&s=5Bta_aRxRPJk58UXz-hQd7A1EzF-PX3A5C3vENHe3OQ&e=

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: [External Sender] Re: How to make pyspark use custom python?

mithril
I am sure, all writen as my first post.
So this make me very confusing.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]