Using pyspark with Spark 2.4.3 a MultiLayerPerceptron model givens inconsistent outputs if a large amount of data is fed into it and at least one of the model outputs is fed to a Python UDF.

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Using pyspark with Spark 2.4.3 a MultiLayerPerceptron model givens inconsistent outputs if a large amount of data is fed into it and at least one of the model outputs is fed to a Python UDF.

Ben Smith
Hi,

I am having an issue that looks like a potentially serious bug with Spark
2.4.3 as it impacts data accuracy. I have searched in the Spark Jira and
mail lists as best I can and cannot find reference to anyone else having
this issue. I am not sure if this would be suitable for raising as a bug in
the Spark Jira so thought I should request help here.

The simplest summary of my suspected bug is:
Using pyspark with Spark 2.4.3 a MultiLayerPerceptron model givens
inconsistent outputs if a large amount of data is fed into it and at least
one of the model outputs is fed to a Python UDF.

After doing some debugging I haven't been able to get to the bottom of this
or recreate it 100% reliably (it does happen very frequently), but I have
narrowed the problem down somewhat and produced some stripped down code that
demonstrates the problem. Some observations I have made while doing this
are:
- I can recreate the problem with a very simple MultilayerPerceptron with no
hidden layers (just 14 features and 2 outputs), I also see it with a more
complex MultilayerPerceptron model.
- I cannot recreate the problem unless the model output is fed to a python
UDF, removing this leads to good outputs for the model and having it means
that model outputs are inconsistent (note that not just the Python UDF
outputs are inconsistent)
- I cannot recreate the problem on minuscule amounts of data or when my data
is partitioned heavily. 100,000 rows of input with 2 partitions sees the
issue happen most of the time.
- Some of the bad outputs I get could be explained if certain features were
zero when they came into the model (when they are not in my actual feature
data)
- I can recreate the problem on several different environments (with the
same setup) so I don;t think its an issue with my hardware.

My environment is CentOS 7.6 with Python 3.6.3 and Spark 2.4.3. I do not
have the native libraries for mllib installed. I'm aware later release of
Spark are available so please let me know if this is a problem (I would have
difficulty getting a later release installed on my environment, otherwise I
would test with that myself).

The below code sample triggers the problem for me the vast majority of the
time when run from a pyspark shell. This code generates a dataframe
containing 100,000 identical rows, transforms it with a MultiLayerPerceptron
model and feeds one of the model output columns to a simple Python UDF to
generate an additional column. The resulting dataframe has the distinct rows
selected and since all the inputs are identical I would expect to get 1 row
back, instead I get unique many rows with the number returned varying each
time I run the code. To run the code you will need the model files locally.
I have attached the model as a zip archive to this message (I hope),
unzipping this to /tmp should be all you need to do.

Please let me know if I have done anything wrong in this report. I haven't
posted to a mailing list like this before so am unsure on the format and
expectations when raising a message.

model.zip
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t10909/model.zip>  


import pyspark
from pyspark.sql import functions as func

from math import log10

import pyspark
from pyspark.ml import *

from pyspark.ml.classification import MultilayerPerceptronClassifier,
MultilayerPerceptronClassificationModel
from pyspark.ml.feature import VectorAssembler

from pyspark.sql import Window
from pyspark.sql.types import FloatType

#############
sc.stop()

conf = pyspark.SparkConf().setMaster(
    'local'
).setAppName(
    'bug-Testing2'
).set(
    'spark.executor.memory', '1G'
).set(
    'spark.executor.cores', '1'
).set(
    'spark.executor.instances', '1'
).set(
    'spark.sql.shuffle.partitions', '1'
).set(
    'spark.default.parallelism', '2'
)

sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)
#############

data_array = []

for a1 in range(0,100000,1):
    data_array.append((
        1,1,1,1,1,1,1,1,1,1,1,1,1,1
    ))

df = spark.createDataFrame(
    data_array
)


mlp_model =
MultilayerPerceptronClassificationModel.load("file:///tmp/model")


features_vector = VectorAssembler(
        inputCols=[
            '_1',
            '_2',
            '_3',
            '_4',
            '_5',
            '_6',
            '_7',
            '_8',
            '_9',
            '_10',
            '_11',
            '_12',
            '_13',
            '_14'],
        outputCol="scaledFeatures"
).transform(df).select(
    [
        'scaledFeatures'
    ]
)

features_vector.cache()

def __return(
    vec,
    position):
    return float(vec[position])

__return_udf = func.udf(__return, FloatType())


transform_result = mlp_model.transform(features_vector)

final_results = transform_result.withColumn(
        'python_score',
        __return_udf(
            'probability',
            func.lit(1)
        )
    )

final_results.select('python_score','rawPrediction','probability').distinct().collect()



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using pyspark with Spark 2.4.3 a MultiLayerPerceptron model givens inconsistent outputs if a large amount of data is fed into it and at least one of the model outputs is fed to a Python UDF.

srowen
I can't reproduce it (on Databricks / Spark 2.4), but as you say,
sounds really specific to some way of executing it.
I can't off the top of my head imagine why that would be. As you say,
no matter the model, it should be the same result.
I don't recall a bug being fixed around there, but nevertheless you
might try 2.4.6 or 3.0.0 just to see.
As a workaround, maybe avoid the UDF and just select "probability[1]"
with Spark SQL - I think that might work.

On Fri, Jul 17, 2020 at 4:24 AM Ben Smith
<[hidden email]> wrote:

>
> Hi,
>
> I am having an issue that looks like a potentially serious bug with Spark
> 2.4.3 as it impacts data accuracy. I have searched in the Spark Jira and
> mail lists as best I can and cannot find reference to anyone else having
> this issue. I am not sure if this would be suitable for raising as a bug in
> the Spark Jira so thought I should request help here.
>
> The simplest summary of my suspected bug is:
> Using pyspark with Spark 2.4.3 a MultiLayerPerceptron model givens
> inconsistent outputs if a large amount of data is fed into it and at least
> one of the model outputs is fed to a Python UDF.
>
> After doing some debugging I haven't been able to get to the bottom of this
> or recreate it 100% reliably (it does happen very frequently), but I have
> narrowed the problem down somewhat and produced some stripped down code that
> demonstrates the problem. Some observations I have made while doing this
> are:
> - I can recreate the problem with a very simple MultilayerPerceptron with no
> hidden layers (just 14 features and 2 outputs), I also see it with a more
> complex MultilayerPerceptron model.
> - I cannot recreate the problem unless the model output is fed to a python
> UDF, removing this leads to good outputs for the model and having it means
> that model outputs are inconsistent (note that not just the Python UDF
> outputs are inconsistent)
> - I cannot recreate the problem on minuscule amounts of data or when my data
> is partitioned heavily. 100,000 rows of input with 2 partitions sees the
> issue happen most of the time.
> - Some of the bad outputs I get could be explained if certain features were
> zero when they came into the model (when they are not in my actual feature
> data)
> - I can recreate the problem on several different environments (with the
> same setup) so I don;t think its an issue with my hardware.
>
> My environment is CentOS 7.6 with Python 3.6.3 and Spark 2.4.3. I do not
> have the native libraries for mllib installed. I'm aware later release of
> Spark are available so please let me know if this is a problem (I would have
> difficulty getting a later release installed on my environment, otherwise I
> would test with that myself).
>
> The below code sample triggers the problem for me the vast majority of the
> time when run from a pyspark shell. This code generates a dataframe
> containing 100,000 identical rows, transforms it with a MultiLayerPerceptron
> model and feeds one of the model output columns to a simple Python UDF to
> generate an additional column. The resulting dataframe has the distinct rows
> selected and since all the inputs are identical I would expect to get 1 row
> back, instead I get unique many rows with the number returned varying each
> time I run the code. To run the code you will need the model files locally.
> I have attached the model as a zip archive to this message (I hope),
> unzipping this to /tmp should be all you need to do.
>
> Please let me know if I have done anything wrong in this report. I haven't
> posted to a mailing list like this before so am unsure on the format and
> expectations when raising a message.
>
> model.zip
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/t10909/model.zip>
>
>
> import pyspark
> from pyspark.sql import functions as func
>
> from math import log10
>
> import pyspark
> from pyspark.ml import *
>
> from pyspark.ml.classification import MultilayerPerceptronClassifier,
> MultilayerPerceptronClassificationModel
> from pyspark.ml.feature import VectorAssembler
>
> from pyspark.sql import Window
> from pyspark.sql.types import FloatType
>
> #############
> sc.stop()
>
> conf = pyspark.SparkConf().setMaster(
>     'local'
> ).setAppName(
>     'bug-Testing2'
> ).set(
>     'spark.executor.memory', '1G'
> ).set(
>     'spark.executor.cores', '1'
> ).set(
>     'spark.executor.instances', '1'
> ).set(
>     'spark.sql.shuffle.partitions', '1'
> ).set(
>     'spark.default.parallelism', '2'
> )
>
> sc = pyspark.SparkContext(conf=conf)
> spark = pyspark.sql.SparkSession(sc)
> #############
>
> data_array = []
>
> for a1 in range(0,100000,1):
>     data_array.append((
>         1,1,1,1,1,1,1,1,1,1,1,1,1,1
>     ))
>
> df = spark.createDataFrame(
>     data_array
> )
>
>
> mlp_model =
> MultilayerPerceptronClassificationModel.load("file:///tmp/model")
>
>
> features_vector = VectorAssembler(
>         inputCols=[
>             '_1',
>             '_2',
>             '_3',
>             '_4',
>             '_5',
>             '_6',
>             '_7',
>             '_8',
>             '_9',
>             '_10',
>             '_11',
>             '_12',
>             '_13',
>             '_14'],
>         outputCol="scaledFeatures"
> ).transform(df).select(
>     [
>         'scaledFeatures'
>     ]
> )
>
> features_vector.cache()
>
> def __return(
>     vec,
>     position):
>     return float(vec[position])
>
> __return_udf = func.udf(__return, FloatType())
>
>
> transform_result = mlp_model.transform(features_vector)
>
> final_results = transform_result.withColumn(
>         'python_score',
>         __return_udf(
>             'probability',
>             func.lit(1)
>         )
>     )
>
> final_results.select('python_score','rawPrediction','probability').distinct().collect()
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using pyspark with Spark 2.4.3 a MultiLayerPerceptron model givens inconsistent outputs if a large amount of data is fed into it and at least one of the model outputs is fed to a Python UDF.

Ben Smith
Thanks for that. I have played with this a bit more after your feedback and
found:

I can only recreate the problem with python 3.6+. If I change between python
2.7, python 3.6 and python 3.7 I find that the problem occurs in the python
3.6 and 3.7 case but not in the python 2.7.
- I have used minimal python virtual environments with the same dependencies
between python 2.7 and python 3.x (basically nothing installed except
numpy), so I don't think it's a python dependency version issue
- I have compared the DAG's and execution plans generated by Spark and they
look the same between the working and broken cases. I don't think the python
version is impact Sparks execution plan

Note that in the python3.6+ case I still can't recreate the problem every
time, but it does seem to happen the majority of the times I try.

I also tested with Spark 2.4.6 and still get the problem. I cannot try with
3.0.0 as that hits a fatal exception due to defect SPARK-32232

The workaround you suggest isn't going to work in my case as the code sample
I provide is a simplified version of what I'm actually doing in python.
However I think I have a workaround where I force a cache/persist of the
data after the model has transformed the features as I cannot recreate the
issue if the python UDF is run on the cached data in a separate action.

I will add another message if I find any more info



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using pyspark with Spark 2.4.3 a MultiLayerPerceptron model givens inconsistent outputs if a large amount of data is fed into it and at least one of the model outputs is fed to a Python UDF.

Ben Smith
I can also recreate with the very latest master branch (3.1.0-SNAPSHOT) if I
compile it locally



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]