possible bug

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

possible bug

Weiand, Markus, NMA-CFD

Hi all,

 

I'm using spark on a c5a.16xlarge machine in amazon cloud (so having  64 cores and 128 GB RAM). I'm using spark 3.01.

 

The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?

 

    import pyspark

    conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

    sc=pyspark.SparkContext.getOrCreate(conf)

    rows=70000

    data=list(range(rows))

    rdd=sc.parallelize(data,rows)

    assert rdd.getNumPartitions()==rows

    rdd0=rdd.filter(lambda x:False)

    assert rdd0.getNumPartitions()==rows

    rdd00=rdd0.coalesce(1)

    data=rdd00.collect()

    assert data==[]

 

output when starting from PyCharm:

 

/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py --mode=client --port=41185

import sys; print('Python %s on %s' % (sys.version, sys.platform))

sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])

PyDev console: starting.

Python 3.8.5 (default, Jan 27 2021, 15:41:15)

[GCC 9.3.0] on linux

import os

os.environ['PYTHONHASHSEED'] = '0'

runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)

WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform

WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations

WARNING: All illegal access operations will be denied in a future release

21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.

[Stage 0:>                                                          (0 + 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).

[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not enough space' (errno=12)

[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

#

# There is insufficient memory for the Java Runtime Environment to continue.

# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.

# An error report file with more information is saved as:

# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log

[thread 17966 also had an error]

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not enough space' (errno=12)

ERROR:root:Exception while sending command.

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command

    raise Py4JNetworkError("Answer from Java side is empty")

py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command

    response = connection.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command

    raise Py4JNetworkError(

py4j.protocol.Py4JNetworkError: Error while receiving

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "<input>", line 3, in <module>

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile

    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile

    exec(compile(contents+"\n", file, 'exec'), glob, loc)

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>

    test_70000()

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000

    data=rdd00.collect()

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__

    self._context._jsc.setCallSite(None)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__

    answer = self.gateway_client.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command

    connection = self._get_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection

    connection = self._create_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection

    connection.start()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start

    raise Py4JNetworkError(msg, e)

py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)

 

 

report of free -m:

              total        used        free      shared  buff/cache   available

Mem:         127462        5548       22680          92       99233      120605

Swap:             0           0           0

 

Thanks

Markus

Reply | Threaded
Open this post in threaded view
|

Re: possible bug

srowen
That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM would have very little memory to operate.
An app running out of mem would not look like this.

On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

Hi all,

 

I'm using spark on a c5a.16xlarge machine in amazon cloud (so having  64 cores and 128 GB RAM). I'm using spark 3.01.

 

The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?

 

    import pyspark

    conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

    sc=pyspark.SparkContext.getOrCreate(conf)

    rows=70000

    data=list(range(rows))

    rdd=sc.parallelize(data,rows)

    assert rdd.getNumPartitions()==rows

    rdd0=rdd.filter(lambda x:False)

    assert rdd0.getNumPartitions()==rows

    rdd00=rdd0.coalesce(1)

    data=rdd00.collect()

    assert data==[]

 

output when starting from PyCharm:

 

/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py --mode=client --port=41185

import sys; print('Python %s on %s' % (sys.version, sys.platform))

sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])

PyDev console: starting.

Python 3.8.5 (default, Jan 27 2021, 15:41:15)

[GCC 9.3.0] on linux

import os

os.environ['PYTHONHASHSEED'] = '0'

runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)

WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform

WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations

WARNING: All illegal access operations will be denied in a future release

21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.

[Stage 0:>                                                          (0 + 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).

[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not enough space' (errno=12)

[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

#

# There is insufficient memory for the Java Runtime Environment to continue.

# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.

# An error report file with more information is saved as:

# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log

[thread 17966 also had an error]

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not enough space' (errno=12)

ERROR:root:Exception while sending command.

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command

    raise Py4JNetworkError("Answer from Java side is empty")

py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command

    response = connection.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command

    raise Py4JNetworkError(

py4j.protocol.Py4JNetworkError: Error while receiving

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "<input>", line 3, in <module>

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile

    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile

    exec(compile(contents+"\n", file, 'exec'), glob, loc)

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>

    test_70000()

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000

    data=rdd00.collect()

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__

    self._context._jsc.setCallSite(None)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__

    answer = self.gateway_client.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command

    connection = self._get_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection

    connection = self._create_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection

    connection.start()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start

    raise Py4JNetworkError(msg, e)

py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)

 

 

report of free -m:

              total        used        free      shared  buff/cache   available

Mem:         127462        5548       22680          92       99233      120605

Swap:             0           0           0

 

Thanks

Markus

Reply | Threaded
Open this post in threaded view
|

AW: possible bug

Weiand, Markus, NMA-CFD

This is the reduction of an error in a complex program where allocated 100 GB driver (=worker=executor as local mode) memory. In the example I used the default size, as the puny example shouldn’t need more anyway.

And without the coalesce or with coalesce(1,True) everything works fine.

I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition, why is this a problem without shuffling?

 

Von: Sean Owen <[hidden email]>
Gesendet: Donnerstag, 8. April 2021 15:00
An: Weiand, Markus, NMA-CFD <[hidden email]>
Cc: [hidden email]
Betreff: Re: possible bug

 

That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM would have very little memory to operate.

An app running out of mem would not look like this.

 

On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

Hi all,

 

I'm using spark on a c5a.16xlarge machine in amazon cloud (so having  64 cores and 128 GB RAM). I'm using spark 3.01.

 

The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?

 

    import pyspark

    conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

    sc=pyspark.SparkContext.getOrCreate(conf)

    rows=70000

    data=list(range(rows))

    rdd=sc.parallelize(data,rows)

    assert rdd.getNumPartitions()==rows

    rdd0=rdd.filter(lambda x:False)

    assert rdd0.getNumPartitions()==rows

    rdd00=rdd0.coalesce(1)

    data=rdd00.collect()

    assert data==[]

 

output when starting from PyCharm:

 

/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py --mode=client --port=41185

import sys; print('Python %s on %s' % (sys.version, sys.platform))

sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])

PyDev console: starting.

Python 3.8.5 (default, Jan 27 2021, 15:41:15)

[GCC 9.3.0] on linux

import os

os.environ['PYTHONHASHSEED'] = '0'

runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)

WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform

WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations

WARNING: All illegal access operations will be denied in a future release

21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.

[Stage 0:>                                                          (0 + 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).

[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not enough space' (errno=12)

[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

#

# There is insufficient memory for the Java Runtime Environment to continue.

# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.

# An error report file with more information is saved as:

# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log

[thread 17966 also had an error]

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not enough space' (errno=12)

ERROR:root:Exception while sending command.

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command

    raise Py4JNetworkError("Answer from Java side is empty")

py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command

    response = connection.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command

    raise Py4JNetworkError(

py4j.protocol.Py4JNetworkError: Error while receiving

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "<input>", line 3, in <module>

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile

    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile

    exec(compile(contents+"\n", file, 'exec'), glob, loc)

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>

    test_70000()

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000

    data=rdd00.collect()

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__

    self._context._jsc.setCallSite(None)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__

    answer = self.gateway_client.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command

    connection = self._get_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection

    connection = self._create_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection

    connection.start()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start

    raise Py4JNetworkError(msg, e)

py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)

 

 

report of free -m:

              total        used        free      shared  buff/cache   available

Mem:         127462        5548       22680          92       99233      120605

Swap:             0           0           0

 

Thanks

Markus

Reply | Threaded
Open this post in threaded view
|

Re: possible bug

Russell Spitzer
Could be that the driver JVM cannot handle the metadata required to store the partition information of a 70k partition RDD. I see you say you have a 100GB driver but i'm not sure where you configured that?

Did you set --driver-memory 100G ? 

On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

This is the reduction of an error in a complex program where allocated 100 GB driver (=worker=executor as local mode) memory. In the example I used the default size, as the puny example shouldn’t need more anyway.

And without the coalesce or with coalesce(1,True) everything works fine.

I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition, why is this a problem without shuffling?

 

Von: Sean Owen <[hidden email]>
Gesendet: Donnerstag, 8. April 2021 15:00
An: Weiand, Markus, NMA-CFD <[hidden email]>
Cc: [hidden email]
Betreff: Re: possible bug

 

That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM would have very little memory to operate.

An app running out of mem would not look like this.

 

On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

Hi all,

 

I'm using spark on a c5a.16xlarge machine in amazon cloud (so having  64 cores and 128 GB RAM). I'm using spark 3.01.

 

The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?

 

    import pyspark

    conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

    sc=pyspark.SparkContext.getOrCreate(conf)

    rows=70000

    data=list(range(rows))

    rdd=sc.parallelize(data,rows)

    assert rdd.getNumPartitions()==rows

    rdd0=rdd.filter(lambda x:False)

    assert rdd0.getNumPartitions()==rows

    rdd00=rdd0.coalesce(1)

    data=rdd00.collect()

    assert data==[]

 

output when starting from PyCharm:

 

/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py --mode=client --port=41185

import sys; print('Python %s on %s' % (sys.version, sys.platform))

sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])

PyDev console: starting.

Python 3.8.5 (default, Jan 27 2021, 15:41:15)

[GCC 9.3.0] on linux

import os

os.environ['PYTHONHASHSEED'] = '0'

runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)

WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform

WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations

WARNING: All illegal access operations will be denied in a future release

21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.

[Stage 0:>                                                          (0 + 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).

[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not enough space' (errno=12)

[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

#

# There is insufficient memory for the Java Runtime Environment to continue.

# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.

# An error report file with more information is saved as:

# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log

[thread 17966 also had an error]

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not enough space' (errno=12)

ERROR:root:Exception while sending command.

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command

    raise Py4JNetworkError("Answer from Java side is empty")

py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command

    response = connection.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command

    raise Py4JNetworkError(

py4j.protocol.Py4JNetworkError: Error while receiving

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "<input>", line 3, in <module>

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile

    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile

    exec(compile(contents+"\n", file, 'exec'), glob, loc)

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>

    test_70000()

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000

    data=rdd00.collect()

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__

    self._context._jsc.setCallSite(None)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__

    answer = self.gateway_client.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command

    connection = self._get_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection

    connection = self._create_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection

    connection.start()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start

    raise Py4JNetworkError(msg, e)

py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)

 

 

report of free -m:

              total        used        free      shared  buff/cache   available

Mem:         127462        5548       22680          92       99233      120605

Swap:             0           0           0

 

Thanks

Markus

Reply | Threaded
Open this post in threaded view
|

Re: possible bug

Attila Zsolt Piros
Hi!

@Russel: Markus used the default settings.

And Markus' reproduction code can be executed locally.

I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition, why is this a problem without shuffling?

Without shuffle a new CoalescedRDD is created which refers to the parent RDD and when this new CoalescedRDD is evaluated for all the partitions a task is executed and these tasks are running in its own thread (being those partitions empty does not really matter as they are not yet evaluated because of lazy evaluation and no shuffle):

image.png

Best regards,
Attila

On Thu, Apr 8, 2021 at 3:31 PM Russell Spitzer <[hidden email]> wrote:
Could be that the driver JVM cannot handle the metadata required to store the partition information of a 70k partition RDD. I see you say you have a 100GB driver but i'm not sure where you configured that?

Did you set --driver-memory 100G ? 

On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

This is the reduction of an error in a complex program where allocated 100 GB driver (=worker=executor as local mode) memory. In the example I used the default size, as the puny example shouldn’t need more anyway.

And without the coalesce or with coalesce(1,True) everything works fine.

I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition, why is this a problem without shuffling?

 

Von: Sean Owen <[hidden email]>
Gesendet: Donnerstag, 8. April 2021 15:00
An: Weiand, Markus, NMA-CFD <[hidden email]>
Cc: [hidden email]
Betreff: Re: possible bug

 

That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM would have very little memory to operate.

An app running out of mem would not look like this.

 

On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

Hi all,

 

I'm using spark on a c5a.16xlarge machine in amazon cloud (so having  64 cores and 128 GB RAM). I'm using spark 3.01.

 

The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?

 

    import pyspark

    conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

    sc=pyspark.SparkContext.getOrCreate(conf)

    rows=70000

    data=list(range(rows))

    rdd=sc.parallelize(data,rows)

    assert rdd.getNumPartitions()==rows

    rdd0=rdd.filter(lambda x:False)

    assert rdd0.getNumPartitions()==rows

    rdd00=rdd0.coalesce(1)

    data=rdd00.collect()

    assert data==[]

 

output when starting from PyCharm:

 

/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py --mode=client --port=41185

import sys; print('Python %s on %s' % (sys.version, sys.platform))

sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])

PyDev console: starting.

Python 3.8.5 (default, Jan 27 2021, 15:41:15)

[GCC 9.3.0] on linux

import os

os.environ['PYTHONHASHSEED'] = '0'

runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)

WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform

WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations

WARNING: All illegal access operations will be denied in a future release

21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.

[Stage 0:>                                                          (0 + 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).

[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not enough space' (errno=12)

[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

#

# There is insufficient memory for the Java Runtime Environment to continue.

# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.

# An error report file with more information is saved as:

# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log

[thread 17966 also had an error]

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not enough space' (errno=12)

ERROR:root:Exception while sending command.

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command

    raise Py4JNetworkError("Answer from Java side is empty")

py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command

    response = connection.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command

    raise Py4JNetworkError(

py4j.protocol.Py4JNetworkError: Error while receiving

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "<input>", line 3, in <module>

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile

    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile

    exec(compile(contents+"\n", file, 'exec'), glob, loc)

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>

    test_70000()

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000

    data=rdd00.collect()

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__

    self._context._jsc.setCallSite(None)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__

    answer = self.gateway_client.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command

    connection = self._get_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection

    connection = self._create_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection

    connection.start()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start

    raise Py4JNetworkError(msg, e)

py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)

 

 

report of free -m:

              total        used        free      shared  buff/cache   available

Mem:         127462        5548       22680          92       99233      120605

Swap:             0           0           0

 

Thanks

Markus

Reply | Threaded
Open this post in threaded view
|

AW: possible bug

Weiand, Markus, NMA-CFD
In reply to this post by Russell Spitzer

I’ve changed the code to set driver memory to 100g, changed python code:

    import pyspark

    conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1").set(key="spark.driver.memory", value="100g")

    sc=pyspark.SparkContext.getOrCreate(conf)

    rows=70000

    data=list(range(rows))

    rdd=sc.parallelize(data,rows)

    assert rdd.getNumPartitions()==rows

    rdd0=rdd.filter(lambda x:False)

    assert rdd0.getNumPartitions()==rows

    rdd00=rdd0.coalesce(1)

    data=rdd00.collect()

assert data==[]

 

Still the same error happens:

 

21/04/09 04:48:38 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4643550000, 16384, 0) failed; error='Not enough space' (errno=12)

[423.701s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f4640d28000-0x00007f4640d2c000).

[423.701s][warning][os,thread] Attempt to deallocate stack guard pages failed.

[423.704s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

#

# There is insufficient memory for the Java Runtime Environment to continue.

# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.

 

A function which needs 423 seconds to crash with excessive memory consumption when trying to coalesce 70000 empty partitions is not very practical. As I do not know the limits in which coalesce without shuffling can be used safely and with performance, I will now always use coalesce with shuffling, even though in theory this will come with quite a performance decrease.

 

Markus

 

Von: Russell Spitzer <[hidden email]>
Gesendet: Donnerstag, 8. April 2021 15:24
An: Weiand, Markus, NMA-CFD <[hidden email]>
Cc: [hidden email]
Betreff: Re: possible bug

 

Could be that the driver JVM cannot handle the metadata required to store the partition information of a 70k partition RDD. I see you say you have a 100GB driver but i'm not sure where you configured that?

Did you set --driver-memory 100G ? 

 

On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

This is the reduction of an error in a complex program where allocated 100 GB driver (=worker=executor as local mode) memory. In the example I used the default size, as the puny example shouldn’t need more anyway.

And without the coalesce or with coalesce(1,True) everything works fine.

I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition, why is this a problem without shuffling?

 

Von: Sean Owen <[hidden email]>
Gesendet: Donnerstag, 8. April 2021 15:00
An: Weiand, Markus, NMA-CFD <[hidden email]>
Cc: [hidden email]
Betreff: Re: possible bug

 

That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM would have very little memory to operate.

An app running out of mem would not look like this.

 

On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

Hi all,

 

I'm using spark on a c5a.16xlarge machine in amazon cloud (so having  64 cores and 128 GB RAM). I'm using spark 3.01.

 

The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?

 

    import pyspark

    conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

    sc=pyspark.SparkContext.getOrCreate(conf)

    rows=70000

    data=list(range(rows))

    rdd=sc.parallelize(data,rows)

    assert rdd.getNumPartitions()==rows

    rdd0=rdd.filter(lambda x:False)

    assert rdd0.getNumPartitions()==rows

    rdd00=rdd0.coalesce(1)

    data=rdd00.collect()

    assert data==[]

 

output when starting from PyCharm:

 

/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py --mode=client --port=41185

import sys; print('Python %s on %s' % (sys.version, sys.platform))

sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])

PyDev console: starting.

Python 3.8.5 (default, Jan 27 2021, 15:41:15)

[GCC 9.3.0] on linux

import os

os.environ['PYTHONHASHSEED'] = '0'

runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)

WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform

WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations

WARNING: All illegal access operations will be denied in a future release

21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.

[Stage 0:>                                                          (0 + 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).

[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not enough space' (errno=12)

[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

#

# There is insufficient memory for the Java Runtime Environment to continue.

# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.

# An error report file with more information is saved as:

# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log

[thread 17966 also had an error]

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not enough space' (errno=12)

ERROR:root:Exception while sending command.

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command

    raise Py4JNetworkError("Answer from Java side is empty")

py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command

    response = connection.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command

    raise Py4JNetworkError(

py4j.protocol.Py4JNetworkError: Error while receiving

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "<input>", line 3, in <module>

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile

    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile

    exec(compile(contents+"\n", file, 'exec'), glob, loc)

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>

    test_70000()

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000

    data=rdd00.collect()

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__

    self._context._jsc.setCallSite(None)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__

    answer = self.gateway_client.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command

    connection = self._get_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection

    connection = self._create_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection

    connection.start()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start

    raise Py4JNetworkError(msg, e)

py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)

 

 

report of free -m:

              total        used        free      shared  buff/cache   available

Mem:         127462        5548       22680          92       99233      120605

Swap:             0           0           0

 

Thanks

Markus

Reply | Threaded
Open this post in threaded view
|

Re: possible bug

Attila Zsolt Piros
Hi!

I looked into the code and find a way to improve it.

With the improvement your test runs just fine:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT
      /_/

Using Python version 3.8.1 (default, Dec 30 2020 22:53:18)
Spark context Web UI available at http://192.168.0.199:4040
Spark context available as 'sc' (master = local, app id = local-1617982367872).
SparkSession available as 'spark'.

In [1]:     import pyspark

In [2]:     conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

In [3]:     sc=pyspark.SparkContext.getOrCreate(conf)

In [4]:     rows=70000

In [5]:     data=list(range(rows))

In [6]:     rdd=sc.parallelize(data,rows)

In [7]:     assert rdd.getNumPartitions()==rows

In [8]:     rdd0=rdd.filter(lambda x:False)

In [9]:     assert rdd0.getNumPartitions()==rows

In [10]:     rdd00=rdd0.coalesce(1)

In [11]:     data=rdd00.collect()
21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very large siz
e (4729 KiB). The maximum recommended task size is 1000 KiB.

In [12]:     assert data==[]

In [13]:


I will create a jira and need to add some unittest before opening the PR.

Best Regards,
Attila

On Fri, Apr 9, 2021 at 7:04 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

I’ve changed the code to set driver memory to 100g, changed python code:

    import pyspark

    conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1").set(key="spark.driver.memory", value="100g")

    sc=pyspark.SparkContext.getOrCreate(conf)

    rows=70000

    data=list(range(rows))

    rdd=sc.parallelize(data,rows)

    assert rdd.getNumPartitions()==rows

    rdd0=rdd.filter(lambda x:False)

    assert rdd0.getNumPartitions()==rows

    rdd00=rdd0.coalesce(1)

    data=rdd00.collect()

assert data==[]

 

Still the same error happens:

 

21/04/09 04:48:38 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4643550000, 16384, 0) failed; error='Not enough space' (errno=12)

[423.701s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f4640d28000-0x00007f4640d2c000).

[423.701s][warning][os,thread] Attempt to deallocate stack guard pages failed.

[423.704s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

#

# There is insufficient memory for the Java Runtime Environment to continue.

# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.

 

A function which needs 423 seconds to crash with excessive memory consumption when trying to coalesce 70000 empty partitions is not very practical. As I do not know the limits in which coalesce without shuffling can be used safely and with performance, I will now always use coalesce with shuffling, even though in theory this will come with quite a performance decrease.

 

Markus

 

Von: Russell Spitzer <[hidden email]>
Gesendet: Donnerstag, 8. April 2021 15:24
An: Weiand, Markus, NMA-CFD <[hidden email]>
Cc: [hidden email]
Betreff: Re: possible bug

 

Could be that the driver JVM cannot handle the metadata required to store the partition information of a 70k partition RDD. I see you say you have a 100GB driver but i'm not sure where you configured that?

Did you set --driver-memory 100G ? 

 

On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

This is the reduction of an error in a complex program where allocated 100 GB driver (=worker=executor as local mode) memory. In the example I used the default size, as the puny example shouldn’t need more anyway.

And without the coalesce or with coalesce(1,True) everything works fine.

I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition, why is this a problem without shuffling?

 

Von: Sean Owen <[hidden email]>
Gesendet: Donnerstag, 8. April 2021 15:00
An: Weiand, Markus, NMA-CFD <[hidden email]>
Cc: [hidden email]
Betreff: Re: possible bug

 

That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM would have very little memory to operate.

An app running out of mem would not look like this.

 

On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

Hi all,

 

I'm using spark on a c5a.16xlarge machine in amazon cloud (so having  64 cores and 128 GB RAM). I'm using spark 3.01.

 

The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?

 

    import pyspark

    conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

    sc=pyspark.SparkContext.getOrCreate(conf)

    rows=70000

    data=list(range(rows))

    rdd=sc.parallelize(data,rows)

    assert rdd.getNumPartitions()==rows

    rdd0=rdd.filter(lambda x:False)

    assert rdd0.getNumPartitions()==rows

    rdd00=rdd0.coalesce(1)

    data=rdd00.collect()

    assert data==[]

 

output when starting from PyCharm:

 

/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py --mode=client --port=41185

import sys; print('Python %s on %s' % (sys.version, sys.platform))

sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])

PyDev console: starting.

Python 3.8.5 (default, Jan 27 2021, 15:41:15)

[GCC 9.3.0] on linux

import os

os.environ['PYTHONHASHSEED'] = '0'

runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)

WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform

WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations

WARNING: All illegal access operations will be denied in a future release

21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.

[Stage 0:>                                                          (0 + 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).

[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not enough space' (errno=12)

[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

#

# There is insufficient memory for the Java Runtime Environment to continue.

# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.

# An error report file with more information is saved as:

# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log

[thread 17966 also had an error]

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not enough space' (errno=12)

ERROR:root:Exception while sending command.

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command

    raise Py4JNetworkError("Answer from Java side is empty")

py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command

    response = connection.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command

    raise Py4JNetworkError(

py4j.protocol.Py4JNetworkError: Error while receiving

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "<input>", line 3, in <module>

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile

    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile

    exec(compile(contents+"\n", file, 'exec'), glob, loc)

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>

    test_70000()

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000

    data=rdd00.collect()

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__

    self._context._jsc.setCallSite(None)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__

    answer = self.gateway_client.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command

    connection = self._get_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection

    connection = self._create_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection

    connection.start()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start

    raise Py4JNetworkError(msg, e)

py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)

 

 

report of free -m:

              total        used        free      shared  buff/cache   available

Mem:         127462        5548       22680          92       99233      120605

Swap:             0           0           0

 

Thanks

Markus

Reply | Threaded
Open this post in threaded view
|

Re: possible bug

srowen
Yeah I figured it's not something fundamental to the task or Spark. The error is very odd, never seen that. Do you have a theory on what's going on there? I don't!

On Fri, Apr 9, 2021 at 10:43 AM Attila Zsolt Piros <[hidden email]> wrote:
Hi!

I looked into the code and find a way to improve it.

With the improvement your test runs just fine:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT
      /_/

Using Python version 3.8.1 (default, Dec 30 2020 22:53:18)
Spark context Web UI available at http://192.168.0.199:4040
Spark context available as 'sc' (master = local, app id = local-1617982367872).
SparkSession available as 'spark'.

In [1]:     import pyspark

In [2]:     conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

In [3]:     sc=pyspark.SparkContext.getOrCreate(conf)

In [4]:     rows=70000

In [5]:     data=list(range(rows))

In [6]:     rdd=sc.parallelize(data,rows)

In [7]:     assert rdd.getNumPartitions()==rows

In [8]:     rdd0=rdd.filter(lambda x:False)

In [9]:     assert rdd0.getNumPartitions()==rows

In [10]:     rdd00=rdd0.coalesce(1)

In [11]:     data=rdd00.collect()
21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very large siz
e (4729 KiB). The maximum recommended task size is 1000 KiB.

In [12]:     assert data==[]

In [13]:


I will create a jira and need to add some unittest before opening the PR.

Best Regards,
Attila
Reply | Threaded
Open this post in threaded view
|

Re: possible bug

Mich Talebzadeh
In reply to this post by Attila Zsolt Piros
Interesting unitest not pytest :)

What is data in [11] reused compared to 5  -- list()?

HTH



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 9 Apr 2021 at 16:44, Attila Zsolt Piros <[hidden email]> wrote:
Hi!

I looked into the code and find a way to improve it.

With the improvement your test runs just fine:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT
      /_/

Using Python version 3.8.1 (default, Dec 30 2020 22:53:18)
Spark context Web UI available at http://192.168.0.199:4040
Spark context available as 'sc' (master = local, app id = local-1617982367872).
SparkSession available as 'spark'.

In [1]:     import pyspark

In [2]:     conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

In [3]:     sc=pyspark.SparkContext.getOrCreate(conf)

In [4]:     rows=70000

In [5]:     data=list(range(rows))

In [6]:     rdd=sc.parallelize(data,rows)

In [7]:     assert rdd.getNumPartitions()==rows

In [8]:     rdd0=rdd.filter(lambda x:False)

In [9]:     assert rdd0.getNumPartitions()==rows

In [10]:     rdd00=rdd0.coalesce(1)

In [11]:     data=rdd00.collect()
21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very large siz
e (4729 KiB). The maximum recommended task size is 1000 KiB.

In [12]:     assert data==[]

In [13]:


I will create a jira and need to add some unittest before opening the PR.

Best Regards,
Attila

On Fri, Apr 9, 2021 at 7:04 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

I’ve changed the code to set driver memory to 100g, changed python code:

    import pyspark

    conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1").set(key="spark.driver.memory", value="100g")

    sc=pyspark.SparkContext.getOrCreate(conf)

    rows=70000

    data=list(range(rows))

    rdd=sc.parallelize(data,rows)

    assert rdd.getNumPartitions()==rows

    rdd0=rdd.filter(lambda x:False)

    assert rdd0.getNumPartitions()==rows

    rdd00=rdd0.coalesce(1)

    data=rdd00.collect()

assert data==[]

 

Still the same error happens:

 

21/04/09 04:48:38 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4643550000, 16384, 0) failed; error='Not enough space' (errno=12)

[423.701s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f4640d28000-0x00007f4640d2c000).

[423.701s][warning][os,thread] Attempt to deallocate stack guard pages failed.

[423.704s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

#

# There is insufficient memory for the Java Runtime Environment to continue.

# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.

 

A function which needs 423 seconds to crash with excessive memory consumption when trying to coalesce 70000 empty partitions is not very practical. As I do not know the limits in which coalesce without shuffling can be used safely and with performance, I will now always use coalesce with shuffling, even though in theory this will come with quite a performance decrease.

 

Markus

 

Von: Russell Spitzer <[hidden email]>
Gesendet: Donnerstag, 8. April 2021 15:24
An: Weiand, Markus, NMA-CFD <[hidden email]>
Cc: [hidden email]
Betreff: Re: possible bug

 

Could be that the driver JVM cannot handle the metadata required to store the partition information of a 70k partition RDD. I see you say you have a 100GB driver but i'm not sure where you configured that?

Did you set --driver-memory 100G ? 

 

On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

This is the reduction of an error in a complex program where allocated 100 GB driver (=worker=executor as local mode) memory. In the example I used the default size, as the puny example shouldn’t need more anyway.

And without the coalesce or with coalesce(1,True) everything works fine.

I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition, why is this a problem without shuffling?

 

Von: Sean Owen <[hidden email]>
Gesendet: Donnerstag, 8. April 2021 15:00
An: Weiand, Markus, NMA-CFD <[hidden email]>
Cc: [hidden email]
Betreff: Re: possible bug

 

That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM would have very little memory to operate.

An app running out of mem would not look like this.

 

On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:

Hi all,

 

I'm using spark on a c5a.16xlarge machine in amazon cloud (so having  64 cores and 128 GB RAM). I'm using spark 3.01.

 

The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?

 

    import pyspark

    conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

    sc=pyspark.SparkContext.getOrCreate(conf)

    rows=70000

    data=list(range(rows))

    rdd=sc.parallelize(data,rows)

    assert rdd.getNumPartitions()==rows

    rdd0=rdd.filter(lambda x:False)

    assert rdd0.getNumPartitions()==rows

    rdd00=rdd0.coalesce(1)

    data=rdd00.collect()

    assert data==[]

 

output when starting from PyCharm:

 

/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py --mode=client --port=41185

import sys; print('Python %s on %s' % (sys.version, sys.platform))

sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])

PyDev console: starting.

Python 3.8.5 (default, Jan 27 2021, 15:41:15)

[GCC 9.3.0] on linux

import os

os.environ['PYTHONHASHSEED'] = '0'

runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)

WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform

WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations

WARNING: All illegal access operations will be denied in a future release

21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.

[Stage 0:>                                                          (0 + 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).

[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not enough space' (errno=12)

[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.

#

# There is insufficient memory for the Java Runtime Environment to continue.

# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.

# An error report file with more information is saved as:

# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log

[thread 17966 also had an error]

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not enough space' (errno=12)

ERROR:root:Exception while sending command.

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command

    raise Py4JNetworkError("Answer from Java side is empty")

py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command

    response = connection.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command

    raise Py4JNetworkError(

py4j.protocol.Py4JNetworkError: Error while receiving

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

Traceback (most recent call last):

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__

    return_value = get_return_value(

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value

    raise Py4JError(

py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection

    connection = self.deque.pop()

IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start

    self.socket.connect((self.address, self.port))

ConnectionRefusedError: [Errno 111] Connection refused

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "<input>", line 3, in <module>

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile

    pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script

  File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile

    exec(compile(contents+"\n", file, 'exec'), glob, loc)

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>

    test_70000()

  File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000

    data=rdd00.collect()

  File "/opt/spark/python/pyspark/rdd.py", line 889, in collect

    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())

  File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__

    self._context._jsc.setCallSite(None)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__

    answer = self.gateway_client.send_command(command)

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command

    connection = self._get_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection

    connection = self._create_connection()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection

    connection.start()

  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start

    raise Py4JNetworkError(msg, e)

py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)

 

 

report of free -m:

              total        used        free      shared  buff/cache   available

Mem:         127462        5548       22680          92       99233      120605

Swap:             0           0           0

 

Thanks

Markus

Reply | Threaded
Open this post in threaded view
|

Re: possible bug

Attila Zsolt Piros
In reply to this post by srowen
Hi Sean!

So the "coalesce" without shuffle will create a CoalescedRDD which during its computation delegates to the parent RDD partitions.
As the CoalescedRDD contains only 1 partition so we talk about 1 task and 1 task context.

The next stop is PythonRunner.

Here the python workers at least are reused (when "spark.python.worker.reuse" is true, and true is the default) but the MonitorThreads are not reused and what is worse all the MonitorThreads are created for the same worker and same TaskContext.
This means the CoalescedRDD's 1 tasks should be completed to stop the first monitor thread, relevant code:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L570

So this will lead to creating 70000 extra threads when 1 would be enough.

The jira is: https://issues.apache.org/jira/browse/SPARK-35009
The PR will next week maybe (I am a bit uncertain as I have many other things to do right now).

Best Regards,
Attila

On Fri, Apr 9, 2021 at 5:54 PM Sean Owen <[hidden email]> wrote:
Yeah I figured it's not something fundamental to the task or Spark. The error is very odd, never seen that. Do you have a theory on what's going on there? I don't!

On Fri, Apr 9, 2021 at 10:43 AM Attila Zsolt Piros <[hidden email]> wrote:
Hi!

I looked into the code and find a way to improve it.

With the improvement your test runs just fine:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT
      /_/

Using Python version 3.8.1 (default, Dec 30 2020 22:53:18)
Spark context Web UI available at http://192.168.0.199:4040
Spark context available as 'sc' (master = local, app id = local-1617982367872).
SparkSession available as 'spark'.

In [1]:     import pyspark

In [2]:     conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

In [3]:     sc=pyspark.SparkContext.getOrCreate(conf)

In [4]:     rows=70000

In [5]:     data=list(range(rows))

In [6]:     rdd=sc.parallelize(data,rows)

In [7]:     assert rdd.getNumPartitions()==rows

In [8]:     rdd0=rdd.filter(lambda x:False)

In [9]:     assert rdd0.getNumPartitions()==rows

In [10]:     rdd00=rdd0.coalesce(1)

In [11]:     data=rdd00.collect()
21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very large siz
e (4729 KiB). The maximum recommended task size is 1000 KiB.

In [12]:     assert data==[]

In [13]:


I will create a jira and need to add some unittest before opening the PR.

Best Regards,
Attila
Reply | Threaded
Open this post in threaded view
|

Re: possible bug

srowen
OK so it's '70000 threads overwhelming off heap mem in the JVM' kind of thing. Or running afoul of ulimits in the OS.

On Fri, Apr 9, 2021 at 11:19 AM Attila Zsolt Piros <[hidden email]> wrote:
Hi Sean!

So the "coalesce" without shuffle will create a CoalescedRDD which during its computation delegates to the parent RDD partitions.
As the CoalescedRDD contains only 1 partition so we talk about 1 task and 1 task context.

The next stop is PythonRunner.

Here the python workers at least are reused (when "spark.python.worker.reuse" is true, and true is the default) but the MonitorThreads are not reused and what is worse all the MonitorThreads are created for the same worker and same TaskContext.
This means the CoalescedRDD's 1 tasks should be completed to stop the first monitor thread, relevant code:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L570

So this will lead to creating 70000 extra threads when 1 would be enough.

The jira is: https://issues.apache.org/jira/browse/SPARK-35009
The PR will next week maybe (I am a bit uncertain as I have many other things to do right now).

Best Regards,
Attila

Reply | Threaded
Open this post in threaded view
|

Re: possible bug

Mich Talebzadeh
I ran this one on RHES 7.6 with 64GB of memory and it hit OOM

>>> data=list(range(rows))
>>> rdd=sc.parallelize(data,rows)
>>> assert rdd.getNumPartitions()==rows
>>> rdd0=rdd.filter(lambda x:False)
>>> assert rdd0.getNumPartitions()==rows
>>> rdd00=rdd0.coalesce(1)
>>> data=rdd00.collect()
2021-04-09 17:19:01,452 WARN scheduler.TaskSetManager: Stage 1 contains a task of very large size (4729 KiB). The maximum recommended task size is 1000 KiB.
2021-04-09 17:25:14,249 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.OutOfMemoryError: unable to create new native thread
        at java.lang.Thread.start0(Native Method)
 




   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 9 Apr 2021 at 17:33, Sean Owen <[hidden email]> wrote:
OK so it's '70000 threads overwhelming off heap mem in the JVM' kind of thing. Or running afoul of ulimits in the OS.

On Fri, Apr 9, 2021 at 11:19 AM Attila Zsolt Piros <[hidden email]> wrote:
Hi Sean!

So the "coalesce" without shuffle will create a CoalescedRDD which during its computation delegates to the parent RDD partitions.
As the CoalescedRDD contains only 1 partition so we talk about 1 task and 1 task context.

The next stop is PythonRunner.

Here the python workers at least are reused (when "spark.python.worker.reuse" is true, and true is the default) but the MonitorThreads are not reused and what is worse all the MonitorThreads are created for the same worker and same TaskContext.
This means the CoalescedRDD's 1 tasks should be completed to stop the first monitor thread, relevant code:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L570

So this will lead to creating 70000 extra threads when 1 would be enough.

The jira is: https://issues.apache.org/jira/browse/SPARK-35009
The PR will next week maybe (I am a bit uncertain as I have many other things to do right now).

Best Regards,
Attila

Reply | Threaded
Open this post in threaded view
|

Re: possible bug

Mich Talebzadeh
Spark 3.1.1



   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 9 Apr 2021 at 17:36, Mich Talebzadeh <[hidden email]> wrote:
I ran this one on RHES 7.6 with 64GB of memory and it hit OOM

>>> data=list(range(rows))
>>> rdd=sc.parallelize(data,rows)
>>> assert rdd.getNumPartitions()==rows
>>> rdd0=rdd.filter(lambda x:False)
>>> assert rdd0.getNumPartitions()==rows
>>> rdd00=rdd0.coalesce(1)
>>> data=rdd00.collect()
2021-04-09 17:19:01,452 WARN scheduler.TaskSetManager: Stage 1 contains a task of very large size (4729 KiB). The maximum recommended task size is 1000 KiB.
2021-04-09 17:25:14,249 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.OutOfMemoryError: unable to create new native thread
        at java.lang.Thread.start0(Native Method)
 




   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Fri, 9 Apr 2021 at 17:33, Sean Owen <[hidden email]> wrote:
OK so it's '70000 threads overwhelming off heap mem in the JVM' kind of thing. Or running afoul of ulimits in the OS.

On Fri, Apr 9, 2021 at 11:19 AM Attila Zsolt Piros <[hidden email]> wrote:
Hi Sean!

So the "coalesce" without shuffle will create a CoalescedRDD which during its computation delegates to the parent RDD partitions.
As the CoalescedRDD contains only 1 partition so we talk about 1 task and 1 task context.

The next stop is PythonRunner.

Here the python workers at least are reused (when "spark.python.worker.reuse" is true, and true is the default) but the MonitorThreads are not reused and what is worse all the MonitorThreads are created for the same worker and same TaskContext.
This means the CoalescedRDD's 1 tasks should be completed to stop the first monitor thread, relevant code:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L570

So this will lead to creating 70000 extra threads when 1 would be enough.

The jira is: https://issues.apache.org/jira/browse/SPARK-35009
The PR will next week maybe (I am a bit uncertain as I have many other things to do right now).

Best Regards,
Attila