PyArrow Exception in Pandas UDF GROUPEDAGG()

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

PyArrow Exception in Pandas UDF GROUPEDAGG()

Gautham Acharya

Hi everyone,

 

I’m running a job that runs a Pandas UDF to GROUP BY a large matrix.

 

The GROUP BY function runs on a wide dataset. The first column of the dataset contains string labels that are GROUPed on. The remaining columns are numeric values that are aggregated in the Pandas UDF. The dataset is very wide, with 50,000 columns and 3 million rows.

 

----------

| label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_50000|

|   label_a  |         2.0        |         5.6       |      7.123      |        

|   label_b  |         11.0      |         1.4       |      2.345      |

|   label_a  |         3.1        |         6.2       |      5.444      |

 

 

 

My job runs fine on smaller datasets, with the same number of columns but fewer rows. However, when run on a dataset with 3 million rows, I see the following exception:

 

20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 (TID 2358)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):

  File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py", line 377, in main

    process()

  File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py", line 372, in process

    serializer.dump_stream(func(split_index, iterator), outfile)

  File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream

    for series in iterator:

  File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py", line 303, in load_stream

    for batch in reader:

  File "pyarrow/ipc.pxi", line 266, in __iter__

  File "pyarrow/ipc.pxi", line 282, in pyarrow.lib._CRecordBatchReader.read_next_batch

  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status

pyarrow.lib.ArrowIOError: read length must be positive or -1

 

Looking at this issue, it looks like PyArrow has a 2GB limit for each shard that is sent to the grouping function.

 

I’m currently running this job on 4 nodes with 16cores and 64GB of memory each.  

 

I’ve attached the full error log here as well. What are some workarounds that I can do to get this job running? Unfortunately, we are running up to a production release and this is becoming a severe blocker.

 

Thanks,

Gautham

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: PyArrow Exception in Pandas UDF GROUPEDAGG()

ZHANG Wei
AFAICT, there might be data skews, some partitions got too much rows,
which caused out of memory limitation. Trying .groupBy().count()
or .aggregateByKey().count() may help check each partition data size.
If no data skew, to increase .groupBy() parameter `numPartitions` is
worth a try.

--
Cheers,
-z

On Wed, 6 May 2020 00:07:58 +0000
Gautham Acharya <[hidden email]> wrote:

> Hi everyone,
>
> I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.
>
> The GROUP BY function runs on a wide dataset. The first column of the dataset contains string labels that are GROUPed on. The remaining columns are numeric values that are aggregated in the Pandas UDF. The dataset is very wide, with 50,000 columns and 3 million rows.
>
> ----------
> | label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_50000|
> |   label_a  |         2.0        |         5.6       |      7.123      |
> |   label_b  |         11.0      |         1.4       |      2.345      |
> |   label_a  |         3.1        |         6.2       |      5.444      |
>
>
>
> My job runs fine on smaller datasets, with the same number of columns but fewer rows. However, when run on a dataset with 3 million rows, I see the following exception:
>
> 20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0 (TID 2358)
> org.apache.spark.api.python.PythonException: Traceback (most recent call last):
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py", line 377, in main
>     process()
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py", line 372, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
>     for series in iterator:
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py", line 303, in load_stream
>     for batch in reader:
>   File "pyarrow/ipc.pxi", line 266, in __iter__
>   File "pyarrow/ipc.pxi", line 282, in pyarrow.lib._CRecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
>
> Looking at this issue<https://issues.apache.org/jira/browse/ARROW-4890>, it looks like PyArrow has a 2GB limit for each shard that is sent to the grouping function.
>
> I'm currently running this job on 4 nodes with 16cores and 64GB of memory each.
>
> I've attached the full error log here as well. What are some workarounds that I can do to get this job running? Unfortunately, we are running up to a production release and this is becoming a severe blocker.
>
> Thanks,
> Gautham
>
>
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

RE: PyArrow Exception in Pandas UDF GROUPEDAGG()

Gautham Acharya
Thanks for the quick reply, Zhang.

I don't think that we have too much data skew, and if we do, there isn't much of a way around it - we need to groupby this specific column in order to run aggregates.

I'm running this with PySpark, it doesn't look like the groupBy() function takes a numPartitions column. What other options can I explore?

--gautham

-----Original Message-----
From: ZHANG Wei <[hidden email]>
Sent: Thursday, May 7, 2020 1:34 AM
To: Gautham Acharya <[hidden email]>
Cc: [hidden email]
Subject: Re: PyArrow Exception in Pandas UDF GROUPEDAGG()

CAUTION: This email originated from outside the Allen Institute. Please do not click links or open attachments unless you've validated the sender and know the content is safe.
________________________________

AFAICT, there might be data skews, some partitions got too much rows, which caused out of memory limitation. Trying .groupBy().count() or .aggregateByKey().count() may help check each partition data size.
If no data skew, to increase .groupBy() parameter `numPartitions` is worth a try.

--
Cheers,
-z

On Wed, 6 May 2020 00:07:58 +0000
Gautham Acharya <[hidden email]> wrote:

> Hi everyone,
>
> I'm running a job that runs a Pandas UDF to GROUP BY a large matrix.
>
> The GROUP BY function runs on a wide dataset. The first column of the dataset contains string labels that are GROUPed on. The remaining columns are numeric values that are aggregated in the Pandas UDF. The dataset is very wide, with 50,000 columns and 3 million rows.
>
> ----------
> | label_col | num_col_0 | num_col_1 | num_col_2 |  --- | num_col_50000|
> |   label_a  |         2.0        |         5.6       |      7.123      |
> |   label_b  |         11.0      |         1.4       |      2.345      |
> |   label_a  |         3.1        |         6.2       |      5.444      |
>
>
>
> My job runs fine on smaller datasets, with the same number of columns but fewer rows. However, when run on a dataset with 3 million rows, I see the following exception:
>
> 20/05/05 23:36:27 ERROR Executor: Exception in task 66.1 in stage 12.0
> (TID 2358)
> org.apache.spark.api.python.PythonException: Traceback (most recent call last):
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py", line 377, in main
>     process()
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/worker.py", line 372, in process
>     serializer.dump_stream(func(split_index, iterator), outfile)
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
>     for series in iterator:
>   File "/mnt/yarn/usercache/hadoop/appcache/application_1588710822953_0001/container_1588710822953_0001_01_034974/pyspark.zip/pyspark/serializers.py", line 303, in load_stream
>     for batch in reader:
>   File "pyarrow/ipc.pxi", line 266, in __iter__
>   File "pyarrow/ipc.pxi", line 282, in pyarrow.lib._CRecordBatchReader.read_next_batch
>   File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> pyarrow.lib.ArrowIOError: read length must be positive or -1
>
> Looking at this issue<https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FARROW-4890&amp;data=02%7C01%7C%7Caca3586676f846bc5a3308d7f2617a31%7C32669cd6737f4b398bddd6951120d3fc%7C0%7C0%7C637244372788272964&amp;sdata=21PIT2sq8Kzmi3ct%2FY6e%2BahHhDZkru%2BPnnkTRMpm%2Ffg%3D&amp;reserved=0>, it looks like PyArrow has a 2GB limit for each shard that is sent to the grouping function.
>
> I'm currently running this job on 4 nodes with 16cores and 64GB of memory each.
>
> I've attached the full error log here as well. What are some workarounds that I can do to get this job running? Unfortunately, we are running up to a production release and this is becoming a severe blocker.
>
> Thanks,
> Gautham
>
>
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]