Arrow type issue with Pandas UDF

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Arrow type issue with Pandas UDF

Patrick McCarthy-2
PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.

I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions in the last stage of the job regardless of my output type.


The problem I'm trying to solve:
I have a column of scalar values, and each value on the same row has a sorted vector. I'm trying to replace each scalar value with its closest index from its vector. I'm applying the grouping arbitrarily and performing a python operation row-wise because even when the same vector appears on many rows it's not clear how I would get the lookup to scale.

My input data, the product of a join of hive tables, has the following schema:

root
 |-- scalar_value: float (nullable = true)
 |-- quantilelist: array (nullable = true)
 |    |-- element: double (containsNull = true)

My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform an operation on two columns, and because I want to take advantage of Arrow to avoid serialization.

The schema my UDF returns is this:

pos_schema = T.StructType([
    T.StructField('feature_value',T.FloatType(),True),
    T.StructField('error',T.StringType())
])

...however when I try to apply my UDF, either with saveAsTable or show(), I get the following exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
        at org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
        at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
        at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
        at org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
        at org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)


I assumed it was the result of some bad typing on my part, until I did a test with a degenerate UDF that only returns a column of 1:

@F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),
    F.PandasUDFType.GROUPED_MAP)

def groupedPercentileInt(df):

    return pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True)



This clearly only has one return value of type int, yet I get the same exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
        at org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
        at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
        at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
        at org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
        at org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)


What seems very strange is that it's still falling over when trying to work with double types, even though I'm not working with any double types. I tried to look into the underlying code, but I don't know Scala well enough to suss out the issue.


Is this a bug?

My UDF:

@F.pandas_udf(pos_schema, F.PandasUDFType.GROUPED_MAP)
def groupedPercentile(df):
    """
    Pandas UDF to apply binary search for a group of records.
    """

    def getPercentile(x):
        """
        Given a scalar v and a 1000-length vector of quantiles q
        produce the percentile of the distribution most closely
        corresponding to v's position in q
        """

        v = x['scalar_value']
        q = x['quantilelist']

        # the vector is length 1000 so for the sake of simplicity
        # we're going to pretend it's actually 1024  

        q1024 = []
        q1024.extend(q.tolist())
        q1024.extend([q[-1]]*24)

        start = 0
        end = 1024

        while start != end:

            half_len = int((end - start) / 2)

            if v > q1024[start + half_len]:
                start = (end - half_len)
            else:
                end = (end - half_len)

        if start > 1000:
            start = 1000

        return start
   
    try:
        df.loc[:,'feature_value'] = df.apply(getPercentile,axis=1)
        df.loc[:,'error'] = [None]*df.shape[0]
   
    except Exception as e:
        df.loc[:,'feature_value'] = [None]*df.shape[0]
        df.loc[:,'error'] = [str(e)]*df.shape[0]
       
    finally:
        return df[['feature_value','error']]
Reply | Threaded
Open this post in threaded view
|

Re: Arrow type issue with Pandas UDF

Bryan Cutler
Hi Patrick,

It looks like it's failing in Scala before it even gets to Python to execute your udf, which is why it doesn't seem to matter what's in your udf. Since you are doing a grouped map udf maybe your group sizes are too big or skewed? Could you try to reduce the size of your groups by adding more keys or sampling a fraction of the data? If the problem persists could you make a jira? At the very least a better exception would be nice.

Bryan

On Thu, Jul 19, 2018, 7:07 AM Patrick McCarthy <[hidden email]> wrote:
PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.

I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions in the last stage of the job regardless of my output type.


The problem I'm trying to solve:
I have a column of scalar values, and each value on the same row has a sorted vector. I'm trying to replace each scalar value with its closest index from its vector. I'm applying the grouping arbitrarily and performing a python operation row-wise because even when the same vector appears on many rows it's not clear how I would get the lookup to scale.

My input data, the product of a join of hive tables, has the following schema:

root
 |-- scalar_value: float (nullable = true)
 |-- quantilelist: array (nullable = true)
 |    |-- element: double (containsNull = true)

My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform an operation on two columns, and because I want to take advantage of Arrow to avoid serialization.

The schema my UDF returns is this:

pos_schema = T.StructType([
    T.StructField('feature_value',T.FloatType(),True),
    T.StructField('error',T.StringType())
])

...however when I try to apply my UDF, either with saveAsTable or show(), I get the following exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
        at org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
        at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
        at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
        at org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
        at org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)


I assumed it was the result of some bad typing on my part, until I did a test with a degenerate UDF that only returns a column of 1:

@F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),
    F.PandasUDFType.GROUPED_MAP)

def groupedPercentileInt(df):

    return pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True)



This clearly only has one return value of type int, yet I get the same exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
        at org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
        at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
        at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
        at org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
        at org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)


What seems very strange is that it's still falling over when trying to work with double types, even though I'm not working with any double types. I tried to look into the underlying code, but I don't know Scala well enough to suss out the issue.


Is this a bug?

My UDF:

@F.pandas_udf(pos_schema, F.PandasUDFType.GROUPED_MAP)
def groupedPercentile(df):
    """
    Pandas UDF to apply binary search for a group of records.
    """

    def getPercentile(x):
        """
        Given a scalar v and a 1000-length vector of quantiles q
        produce the percentile of the distribution most closely
        corresponding to v's position in q
        """

        v = x['scalar_value']
        q = x['quantilelist']

        # the vector is length 1000 so for the sake of simplicity
        # we're going to pretend it's actually 1024  

        q1024 = []
        q1024.extend(q.tolist())
        q1024.extend([q[-1]]*24)

        start = 0
        end = 1024

        while start != end:

            half_len = int((end - start) / 2)

            if v > q1024[start + half_len]:
                start = (end - half_len)
            else:
                end = (end - half_len)

        if start > 1000:
            start = 1000

        return start
   
    try:
        df.loc[:,'feature_value'] = df.apply(getPercentile,axis=1)
        df.loc[:,'error'] = [None]*df.shape[0]
   
    except Exception as e:
        df.loc[:,'feature_value'] = [None]*df.shape[0]
        df.loc[:,'error'] = [str(e)]*df.shape[0]
       
    finally:
        return df[['feature_value','error']]
Reply | Threaded
Open this post in threaded view
|

Re: Arrow type issue with Pandas UDF

Patrick McCarthy-2
Thanks Byran. I think it was ultimately groupings that were too large - after setting spark.sql.shuffle.partitions to a much higher number I was able to get the UDF to execute.

On Fri, Jul 20, 2018 at 12:45 AM, Bryan Cutler <[hidden email]> wrote:
Hi Patrick,

It looks like it's failing in Scala before it even gets to Python to execute your udf, which is why it doesn't seem to matter what's in your udf. Since you are doing a grouped map udf maybe your group sizes are too big or skewed? Could you try to reduce the size of your groups by adding more keys or sampling a fraction of the data? If the problem persists could you make a jira? At the very least a better exception would be nice.

Bryan

On Thu, Jul 19, 2018, 7:07 AM Patrick McCarthy <[hidden email].invalid> wrote:
PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.

I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions in the last stage of the job regardless of my output type.


The problem I'm trying to solve:
I have a column of scalar values, and each value on the same row has a sorted vector. I'm trying to replace each scalar value with its closest index from its vector. I'm applying the grouping arbitrarily and performing a python operation row-wise because even when the same vector appears on many rows it's not clear how I would get the lookup to scale.

My input data, the product of a join of hive tables, has the following schema:

root
 |-- scalar_value: float (nullable = true)
 |-- quantilelist: array (nullable = true)
 |    |-- element: double (containsNull = true)

My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform an operation on two columns, and because I want to take advantage of Arrow to avoid serialization.

The schema my UDF returns is this:

pos_schema = T.StructType([
    T.StructField('feature_value',T.FloatType(),True),
    T.StructField('error',T.StringType())
])

...however when I try to apply my UDF, either with saveAsTable or show(), I get the following exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
        at org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
        at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
        at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
        at org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
        at org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)


I assumed it was the result of some bad typing on my part, until I did a test with a degenerate UDF that only returns a column of 1:

@F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),
    F.PandasUDFType.GROUPED_MAP)

def groupedPercentileInt(df):

    return pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True)



This clearly only has one return value of type int, yet I get the same exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
        at org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
        at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
        at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
        at org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
        at org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)


What seems very strange is that it's still falling over when trying to work with double types, even though I'm not working with any double types. I tried to look into the underlying code, but I don't know Scala well enough to suss out the issue.


Is this a bug?

My UDF:

@F.pandas_udf(pos_schema, F.PandasUDFType.GROUPED_MAP)
def groupedPercentile(df):
    """
    Pandas UDF to apply binary search for a group of records.
    """

    def getPercentile(x):
        """
        Given a scalar v and a 1000-length vector of quantiles q
        produce the percentile of the distribution most closely
        corresponding to v's position in q
        """

        v = x['scalar_value']
        q = x['quantilelist']

        # the vector is length 1000 so for the sake of simplicity
        # we're going to pretend it's actually 1024  

        q1024 = []
        q1024.extend(q.tolist())
        q1024.extend([q[-1]]*24)

        start = 0
        end = 1024

        while start != end:

            half_len = int((end - start) / 2)

            if v > q1024[start + half_len]:
                start = (end - half_len)
            else:
                end = (end - half_len)

        if start > 1000:
            start = 1000

        return start
   
    try:
        df.loc[:,'feature_value'] = df.apply(getPercentile,axis=1)
        df.loc[:,'error'] = [None]*df.shape[0]
   
    except Exception as e:
        df.loc[:,'feature_value'] = [None]*df.shape[0]
        df.loc[:,'error'] = [str(e)]*df.shape[0]
       
    finally:
        return df[['feature_value','error']]

Reply | Threaded
Open this post in threaded view
|

Re: Arrow type issue with Pandas UDF

Gourav Sengupta
super duper :) 

On Tue, Jul 24, 2018 at 7:11 PM, Patrick McCarthy <[hidden email]> wrote:
Thanks Byran. I think it was ultimately groupings that were too large - after setting spark.sql.shuffle.partitions to a much higher number I was able to get the UDF to execute.

On Fri, Jul 20, 2018 at 12:45 AM, Bryan Cutler <[hidden email]> wrote:
Hi Patrick,

It looks like it's failing in Scala before it even gets to Python to execute your udf, which is why it doesn't seem to matter what's in your udf. Since you are doing a grouped map udf maybe your group sizes are too big or skewed? Could you try to reduce the size of your groups by adding more keys or sampling a fraction of the data? If the problem persists could you make a jira? At the very least a better exception would be nice.

Bryan

On Thu, Jul 19, 2018, 7:07 AM Patrick McCarthy <[hidden email].invalid> wrote:
PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.

I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions in the last stage of the job regardless of my output type.


The problem I'm trying to solve:
I have a column of scalar values, and each value on the same row has a sorted vector. I'm trying to replace each scalar value with its closest index from its vector. I'm applying the grouping arbitrarily and performing a python operation row-wise because even when the same vector appears on many rows it's not clear how I would get the lookup to scale.

My input data, the product of a join of hive tables, has the following schema:

root
 |-- scalar_value: float (nullable = true)
 |-- quantilelist: array (nullable = true)
 |    |-- element: double (containsNull = true)

My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform an operation on two columns, and because I want to take advantage of Arrow to avoid serialization.

The schema my UDF returns is this:

pos_schema = T.StructType([
    T.StructField('feature_value',T.FloatType(),True),
    T.StructField('error',T.StringType())
])

...however when I try to apply my UDF, either with saveAsTable or show(), I get the following exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
        at org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
        at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
        at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
        at org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
        at org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)


I assumed it was the result of some bad typing on my part, until I did a test with a degenerate UDF that only returns a column of 1:

@F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),
    F.PandasUDFType.GROUPED_MAP)

def groupedPercentileInt(df):

    return pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True)



This clearly only has one return value of type int, yet I get the same exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
        at org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
        at org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
        at org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
        at org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
        at org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
        at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
        at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
        at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
        at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)


What seems very strange is that it's still falling over when trying to work with double types, even though I'm not working with any double types. I tried to look into the underlying code, but I don't know Scala well enough to suss out the issue.


Is this a bug?

My UDF:

@F.pandas_udf(pos_schema, F.PandasUDFType.GROUPED_MAP)
def groupedPercentile(df):
    """
    Pandas UDF to apply binary search for a group of records.
    """

    def getPercentile(x):
        """
        Given a scalar v and a 1000-length vector of quantiles q
        produce the percentile of the distribution most closely
        corresponding to v's position in q
        """

        v = x['scalar_value']
        q = x['quantilelist']

        # the vector is length 1000 so for the sake of simplicity
        # we're going to pretend it's actually 1024  

        q1024 = []
        q1024.extend(q.tolist())
        q1024.extend([q[-1]]*24)

        start = 0
        end = 1024

        while start != end:

            half_len = int((end - start) / 2)

            if v > q1024[start + half_len]:
                start = (end - half_len)
            else:
                end = (end - half_len)

        if start > 1000:
            start = 1000

        return start
   
    try:
        df.loc[:,'feature_value'] = df.apply(getPercentile,axis=1)
        df.loc[:,'error'] = [None]*df.shape[0]
   
    except Exception as e:
        df.loc[:,'feature_value'] = [None]*df.shape[0]
        df.loc[:,'error'] = [str(e)]*df.shape[0]
       
    finally:
        return df[['feature_value','error']]