|
|
Hi all,
I'm using spark on a c5a.16xlarge machine in amazon cloud (so having 64 cores and 128 GB RAM). I'm using spark 3.01.
The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?
import pyspark
conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
sc=pyspark.SparkContext.getOrCreate(conf)
rows=70000
data=list(range(rows))
rdd=sc.parallelize(data,rows)
assert rdd.getNumPartitions()==rows
rdd0=rdd.filter(lambda x:False)
assert rdd0.getNumPartitions()==rows
rdd00=rdd0.coalesce(1)
data=rdd00.collect()
assert data==[]
output when starting from PyCharm:
/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py --mode=client --port=41185
import sys; print('Python %s on %s' % (sys.version, sys.platform))
sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])
PyDev console: starting.
Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
import os
os.environ['PYTHONHASHSEED'] = '0'
runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:> (0 + 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).
[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not enough space' (errno=12)
[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log
[thread 17966 also had an error]
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not enough space' (errno=12)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
response = connection.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<input>", line 3, in <module>
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>
test_70000()
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000
data=rdd00.collect()
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__
self._context._jsc.setCallSite(None)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__
answer = self.gateway_client.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command
connection = self._get_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection
connection = self._create_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection
connection.start()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start
raise Py4JNetworkError(msg, e)
py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)
report of free -m:
total used free shared buff/cache available
Mem: 127462 5548 22680 92 99233 120605
Swap: 0 0 0
Thanks
Markus
|
|
That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM would have very little memory to operate. An app running out of mem would not look like this. On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD < [hidden email]> wrote:
Hi all,
I'm using spark on a c5a.16xlarge machine in amazon cloud (so having 64 cores and 128 GB RAM). I'm using spark 3.01.
The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?
import pyspark
conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
sc=pyspark.SparkContext.getOrCreate(conf)
rows=70000
data=list(range(rows))
rdd=sc.parallelize(data,rows)
assert rdd.getNumPartitions()==rows
rdd0=rdd.filter(lambda x:False)
assert rdd0.getNumPartitions()==rows
rdd00=rdd0.coalesce(1)
data=rdd00.collect()
assert data==[]
output when starting from PyCharm:
/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py --mode=client --port=41185
import sys; print('Python %s on %s' % (sys.version, sys.platform))
sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])
PyDev console: starting.
Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
import os
os.environ['PYTHONHASHSEED'] = '0'
runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:> (0 + 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).
[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not enough space' (errno=12)
[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log
[thread 17966 also had an error]
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not enough space' (errno=12)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
response = connection.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<input>", line 3, in <module>
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>
test_70000()
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000
data=rdd00.collect()
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__
self._context._jsc.setCallSite(None)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__
answer = self.gateway_client.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command
connection = self._get_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection
connection = self._create_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection
connection.start()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start
raise Py4JNetworkError(msg, e)
py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)
report of free -m:
total used free shared buff/cache available
Mem: 127462 5548 22680 92 99233 120605
Swap: 0 0 0
Thanks
Markus
|
|
This is the reduction of an error in a complex program where allocated 100 GB driver (=worker=executor as local mode) memory. In the example I used
the default size, as the puny example shouldn’t need more anyway.
And without the coalesce or with coalesce(1,True) everything works fine.
I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition, why is this a problem without shuffling?
That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM would have very little memory to operate.
An app running out of mem would not look like this.
On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:
Hi all,
I'm using spark on a c5a.16xlarge machine in amazon cloud (so having 64 cores and 128 GB RAM). I'm using
spark 3.01.
The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?
import pyspark
conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
sc=pyspark.SparkContext.getOrCreate(conf)
rows=70000
data=list(range(rows))
rdd=sc.parallelize(data,rows)
assert rdd.getNumPartitions()==rows
rdd0=rdd.filter(lambda x:False)
assert rdd0.getNumPartitions()==rows
rdd00=rdd0.coalesce(1)
data=rdd00.collect()
assert data==[]
output when starting from PyCharm:
/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py
--mode=client --port=41185
import sys; print('Python %s on %s' % (sys.version, sys.platform))
sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])
PyDev console: starting.
Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
import os
os.environ['PYTHONHASHSEED'] = '0'
runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar)
to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum
recommended task size is 1000 KiB.
[Stage 0:> (0 + 1) / 1][423.190s][warning][os,thread]
Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).
[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not
enough space' (errno=12)
[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes:
stacksize: 1024k, guardsize: 0k, detached.
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log
[thread 17966 also had an error]
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not
enough space' (errno=12)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
response = connection.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<input>", line 3, in <module>
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in
execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>
test_70000()
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000
data=rdd00.collect()
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__
self._context._jsc.setCallSite(None)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__
answer = self.gateway_client.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command
connection = self._get_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection
connection = self._create_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection
connection.start()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start
raise Py4JNetworkError(msg, e)
py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)
report of free -m:
total used free shared buff/cache available
Mem: 127462 5548 22680 92 99233 120605
Swap: 0 0 0
Thanks
Markus
|
|
Could be that the driver JVM cannot handle the metadata required to store the partition information of a 70k partition RDD. I see you say you have a 100GB driver but i'm not sure where you configured that?
Did you set --driver-memory 100G ? On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD < [hidden email]> wrote:
This is the reduction of an error in a complex program where allocated 100 GB driver (=worker=executor as local mode) memory. In the example I used
the default size, as the puny example shouldn’t need more anyway.
And without the coalesce or with coalesce(1,True) everything works fine.
I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition, why is this a problem without shuffling?
That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM would have very little memory to operate.
An app running out of mem would not look like this.
On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:
Hi all,
I'm using spark on a c5a.16xlarge machine in amazon cloud (so having 64 cores and 128 GB RAM). I'm using
spark 3.01.
The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?
import pyspark
conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
sc=pyspark.SparkContext.getOrCreate(conf)
rows=70000
data=list(range(rows))
rdd=sc.parallelize(data,rows)
assert rdd.getNumPartitions()==rows
rdd0=rdd.filter(lambda x:False)
assert rdd0.getNumPartitions()==rows
rdd00=rdd0.coalesce(1)
data=rdd00.collect()
assert data==[]
output when starting from PyCharm:
/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py
--mode=client --port=41185
import sys; print('Python %s on %s' % (sys.version, sys.platform))
sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])
PyDev console: starting.
Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
import os
os.environ['PYTHONHASHSEED'] = '0'
runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar)
to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum
recommended task size is 1000 KiB.
[Stage 0:> (0 + 1) / 1][423.190s][warning][os,thread]
Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).
[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not
enough space' (errno=12)
[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes:
stacksize: 1024k, guardsize: 0k, detached.
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log
[thread 17966 also had an error]
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not
enough space' (errno=12)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
response = connection.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<input>", line 3, in <module>
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in
execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>
test_70000()
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000
data=rdd00.collect()
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__
self._context._jsc.setCallSite(None)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__
answer = self.gateway_client.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command
connection = self._get_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection
connection = self._create_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection
connection.start()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start
raise Py4JNetworkError(msg, e)
py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)
report of free -m:
total used free shared buff/cache available
Mem: 127462 5548 22680 92 99233 120605
Swap: 0 0 0
Thanks
Markus
|
|
Hi! @Russel: Markus used the default settings. And Markus' reproduction code can be executed locally. > I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition, why is this a problem without shuffling? Without shuffle a new CoalescedRDD is created which refers to the parent RDD and when this new CoalescedRDD is evaluated for all the partitions a task is executed and these tasks are running in its own thread (being those partitions empty does not really matter as they are not yet evaluated because of lazy evaluation and no shuffle):
Best regards,
Attila On Thu, Apr 8, 2021 at 3:31 PM Russell Spitzer < [hidden email]> wrote: Could be that the driver JVM cannot handle the metadata required to store the partition information of a 70k partition RDD. I see you say you have a 100GB driver but i'm not sure where you configured that?
Did you set --driver-memory 100G ?
On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD < [hidden email]> wrote:
This is the reduction of an error in a complex program where allocated 100 GB driver (=worker=executor as local mode) memory. In the example I used
the default size, as the puny example shouldn’t need more anyway.
And without the coalesce or with coalesce(1,True) everything works fine.
I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition, why is this a problem without shuffling?
That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM would have very little memory to operate.
An app running out of mem would not look like this.
On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:
Hi all,
I'm using spark on a c5a.16xlarge machine in amazon cloud (so having 64 cores and 128 GB RAM). I'm using
spark 3.01.
The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?
import pyspark
conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
sc=pyspark.SparkContext.getOrCreate(conf)
rows=70000
data=list(range(rows))
rdd=sc.parallelize(data,rows)
assert rdd.getNumPartitions()==rows
rdd0=rdd.filter(lambda x:False)
assert rdd0.getNumPartitions()==rows
rdd00=rdd0.coalesce(1)
data=rdd00.collect()
assert data==[]
output when starting from PyCharm:
/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py
--mode=client --port=41185
import sys; print('Python %s on %s' % (sys.version, sys.platform))
sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])
PyDev console: starting.
Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
import os
os.environ['PYTHONHASHSEED'] = '0'
runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar)
to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum
recommended task size is 1000 KiB.
[Stage 0:> (0 + 1) / 1][423.190s][warning][os,thread]
Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).
[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not
enough space' (errno=12)
[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes:
stacksize: 1024k, guardsize: 0k, detached.
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log
[thread 17966 also had an error]
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not
enough space' (errno=12)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
response = connection.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<input>", line 3, in <module>
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in
execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>
test_70000()
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000
data=rdd00.collect()
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__
self._context._jsc.setCallSite(None)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__
answer = self.gateway_client.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command
connection = self._get_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection
connection = self._create_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection
connection.start()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start
raise Py4JNetworkError(msg, e)
py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)
report of free -m:
total used free shared buff/cache available
Mem: 127462 5548 22680 92 99233 120605
Swap: 0 0 0
Thanks
Markus
|
|
I’ve changed the code to set driver memory to 100g, changed python code:
import pyspark
conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1").set(key="spark.driver.memory", value="100g")
sc=pyspark.SparkContext.getOrCreate(conf)
rows=70000
data=list(range(rows))
rdd=sc.parallelize(data,rows)
assert rdd.getNumPartitions()==rows
rdd0=rdd.filter(lambda x:False)
assert rdd0.getNumPartitions()==rows
rdd00=rdd0.coalesce(1)
data=rdd00.collect()
assert data==[]
Still the same error happens:
21/04/09 04:48:38 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4643550000, 16384, 0) failed; error='Not enough space' (errno=12)
[423.701s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f4640d28000-0x00007f4640d2c000).
[423.701s][warning][os,thread] Attempt to deallocate stack guard pages failed.
[423.704s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.
A function which needs 423 seconds to crash with excessive memory consumption when trying to coalesce 70000 empty partitions is not very practical.
As I do not know the limits in which coalesce without shuffling can be used safely and with performance, I will now always use coalesce with shuffling, even though in theory this will come with quite a performance decrease.
Markus
Could be that the driver JVM cannot handle the metadata required to store the partition information of a 70k partition RDD. I see you say you have a 100GB driver but i'm not sure where you configured that?
Did you set --driver-memory 100G ?
On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:
This is the reduction of an error in a complex program where allocated 100 GB driver (=worker=executor
as local mode) memory. In the example I used the default size, as the puny example shouldn’t need more anyway.
And without the coalesce or with coalesce(1,True) everything works fine.
I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition,
why is this a problem without shuffling?
That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM
would have very little memory to operate.
An app running out of mem would not look like this.
On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:
Hi all,
I'm using spark on a c5a.16xlarge machine in amazon cloud (so having 64 cores and 128 GB RAM). I'm using
spark 3.01.
The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?
import pyspark
conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
sc=pyspark.SparkContext.getOrCreate(conf)
rows=70000
data=list(range(rows))
rdd=sc.parallelize(data,rows)
assert rdd.getNumPartitions()==rows
rdd0=rdd.filter(lambda x:False)
assert rdd0.getNumPartitions()==rows
rdd00=rdd0.coalesce(1)
data=rdd00.collect()
assert data==[]
output when starting from PyCharm:
/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py
--mode=client --port=41185
import sys; print('Python %s on %s' % (sys.version, sys.platform))
sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])
PyDev console: starting.
Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
import os
os.environ['PYTHONHASHSEED'] = '0'
runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar)
to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum
recommended task size is 1000 KiB.
[Stage 0:> (0 + 1) / 1][423.190s][warning][os,thread]
Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).
[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not
enough space' (errno=12)
[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes:
stacksize: 1024k, guardsize: 0k, detached.
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log
[thread 17966 also had an error]
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not
enough space' (errno=12)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
response = connection.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<input>", line 3, in <module>
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in
execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>
test_70000()
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000
data=rdd00.collect()
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__
self._context._jsc.setCallSite(None)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__
answer = self.gateway_client.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command
connection = self._get_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection
connection = self._create_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection
connection.start()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start
raise Py4JNetworkError(msg, e)
py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)
report of free -m:
total used free shared buff/cache available
Mem: 127462 5548 22680 92 99233 120605
Swap: 0 0 0
Thanks
Markus
|
|
Hi! I looked into the code and find a way to improve it. With the improvement your test runs just fine: Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.2.0-SNAPSHOT /_/
Using Python version 3.8.1 (default, Dec 30 2020 22:53:18) Spark context Web UI available at http://192.168.0.199:4040 Spark context available as 'sc' (master = local, app id = local-1617982367872). SparkSession available as 'spark'.
In [1]: import pyspark
In [2]: conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
In [3]: sc=pyspark.SparkContext.getOrCreate(conf)
In [4]: rows=70000
In [5]: data=list(range(rows))
In [6]: rdd=sc.parallelize(data,rows)
In [7]: assert rdd.getNumPartitions()==rows
In [8]: rdd0=rdd.filter(lambda x:False)
In [9]: assert rdd0.getNumPartitions()==rows
In [10]: rdd00=rdd0.coalesce(1)
In [11]: data=rdd00.collect() 21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very large siz e (4729 KiB). The maximum recommended task size is 1000 KiB.
In [12]: assert data==[]
In [13]:
I will create a jira and need to add some unittest before opening the PR. Best Regards, Attila On Fri, Apr 9, 2021 at 7:04 AM Weiand, Markus, NMA-CFD < [hidden email]> wrote:
I’ve changed the code to set driver memory to 100g, changed python code:
import pyspark
conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1").set(key="spark.driver.memory", value="100g")
sc=pyspark.SparkContext.getOrCreate(conf)
rows=70000
data=list(range(rows))
rdd=sc.parallelize(data,rows)
assert rdd.getNumPartitions()==rows
rdd0=rdd.filter(lambda x:False)
assert rdd0.getNumPartitions()==rows
rdd00=rdd0.coalesce(1)
data=rdd00.collect()
assert data==[]
Still the same error happens:
21/04/09 04:48:38 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4643550000, 16384, 0) failed; error='Not enough space' (errno=12)
[423.701s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f4640d28000-0x00007f4640d2c000).
[423.701s][warning][os,thread] Attempt to deallocate stack guard pages failed.
[423.704s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.
A function which needs 423 seconds to crash with excessive memory consumption when trying to coalesce 70000 empty partitions is not very practical.
As I do not know the limits in which coalesce without shuffling can be used safely and with performance, I will now always use coalesce with shuffling, even though in theory this will come with quite a performance decrease.
Markus
Could be that the driver JVM cannot handle the metadata required to store the partition information of a 70k partition RDD. I see you say you have a 100GB driver but i'm not sure where you configured that?
Did you set --driver-memory 100G ?
On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:
This is the reduction of an error in a complex program where allocated 100 GB driver (=worker=executor
as local mode) memory. In the example I used the default size, as the puny example shouldn’t need more anyway.
And without the coalesce or with coalesce(1,True) everything works fine.
I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition,
why is this a problem without shuffling?
That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM
would have very little memory to operate.
An app running out of mem would not look like this.
On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:
Hi all,
I'm using spark on a c5a.16xlarge machine in amazon cloud (so having 64 cores and 128 GB RAM). I'm using
spark 3.01.
The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?
import pyspark
conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
sc=pyspark.SparkContext.getOrCreate(conf)
rows=70000
data=list(range(rows))
rdd=sc.parallelize(data,rows)
assert rdd.getNumPartitions()==rows
rdd0=rdd.filter(lambda x:False)
assert rdd0.getNumPartitions()==rows
rdd00=rdd0.coalesce(1)
data=rdd00.collect()
assert data==[]
output when starting from PyCharm:
/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py
--mode=client --port=41185
import sys; print('Python %s on %s' % (sys.version, sys.platform))
sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])
PyDev console: starting.
Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
import os
os.environ['PYTHONHASHSEED'] = '0'
runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar)
to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum
recommended task size is 1000 KiB.
[Stage 0:> (0 + 1) / 1][423.190s][warning][os,thread]
Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).
[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not
enough space' (errno=12)
[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes:
stacksize: 1024k, guardsize: 0k, detached.
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log
[thread 17966 also had an error]
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not
enough space' (errno=12)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
response = connection.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<input>", line 3, in <module>
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in
execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>
test_70000()
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000
data=rdd00.collect()
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__
self._context._jsc.setCallSite(None)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__
answer = self.gateway_client.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command
connection = self._get_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection
connection = self._create_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection
connection.start()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start
raise Py4JNetworkError(msg, e)
py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)
report of free -m:
total used free shared buff/cache available
Mem: 127462 5548 22680 92 99233 120605
Swap: 0 0 0
Thanks
Markus
|
|
Yeah I figured it's not something fundamental to the task or Spark. The error is very odd, never seen that. Do you have a theory on what's going on there? I don't! On Fri, Apr 9, 2021 at 10:43 AM Attila Zsolt Piros < [hidden email]> wrote: Hi! I looked into the code and find a way to improve it. With the improvement your test runs just fine: Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.2.0-SNAPSHOT /_/
Using Python version 3.8.1 (default, Dec 30 2020 22:53:18) Spark context Web UI available at http://192.168.0.199:4040 Spark context available as 'sc' (master = local, app id = local-1617982367872). SparkSession available as 'spark'.
In [1]: import pyspark
In [2]: conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
In [3]: sc=pyspark.SparkContext.getOrCreate(conf)
In [4]: rows=70000
In [5]: data=list(range(rows))
In [6]: rdd=sc.parallelize(data,rows)
In [7]: assert rdd.getNumPartitions()==rows
In [8]: rdd0=rdd.filter(lambda x:False)
In [9]: assert rdd0.getNumPartitions()==rows
In [10]: rdd00=rdd0.coalesce(1)
In [11]: data=rdd00.collect() 21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very large siz e (4729 KiB). The maximum recommended task size is 1000 KiB.
In [12]: assert data==[]
In [13]:
I will create a jira and need to add some unittest before opening the PR. Best Regards, Attila
|
|
Interesting unitest not pytest :)
What is data in [11] reused compared to 5 -- list()?
HTH

view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
On Fri, 9 Apr 2021 at 16:44, Attila Zsolt Piros < [hidden email]> wrote: Hi! I looked into the code and find a way to improve it. With the improvement your test runs just fine: Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.2.0-SNAPSHOT /_/
Using Python version 3.8.1 (default, Dec 30 2020 22:53:18) Spark context Web UI available at http://192.168.0.199:4040 Spark context available as 'sc' (master = local, app id = local-1617982367872). SparkSession available as 'spark'.
In [1]: import pyspark
In [2]: conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
In [3]: sc=pyspark.SparkContext.getOrCreate(conf)
In [4]: rows=70000
In [5]: data=list(range(rows))
In [6]: rdd=sc.parallelize(data,rows)
In [7]: assert rdd.getNumPartitions()==rows
In [8]: rdd0=rdd.filter(lambda x:False)
In [9]: assert rdd0.getNumPartitions()==rows
In [10]: rdd00=rdd0.coalesce(1)
In [11]: data=rdd00.collect() 21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very large siz e (4729 KiB). The maximum recommended task size is 1000 KiB.
In [12]: assert data==[]
In [13]:
I will create a jira and need to add some unittest before opening the PR. Best Regards, Attila
On Fri, Apr 9, 2021 at 7:04 AM Weiand, Markus, NMA-CFD < [hidden email]> wrote:
I’ve changed the code to set driver memory to 100g, changed python code:
import pyspark
conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1").set(key="spark.driver.memory", value="100g")
sc=pyspark.SparkContext.getOrCreate(conf)
rows=70000
data=list(range(rows))
rdd=sc.parallelize(data,rows)
assert rdd.getNumPartitions()==rows
rdd0=rdd.filter(lambda x:False)
assert rdd0.getNumPartitions()==rows
rdd00=rdd0.coalesce(1)
data=rdd00.collect()
assert data==[]
Still the same error happens:
21/04/09 04:48:38 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum recommended task size is 1000 KiB.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4643550000, 16384, 0) failed; error='Not enough space' (errno=12)
[423.701s][warning][os,thread] Attempt to protect stack guard pages failed (0x00007f4640d28000-0x00007f4640d2c000).
[423.701s][warning][os,thread] Attempt to deallocate stack guard pages failed.
[423.704s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.
A function which needs 423 seconds to crash with excessive memory consumption when trying to coalesce 70000 empty partitions is not very practical.
As I do not know the limits in which coalesce without shuffling can be used safely and with performance, I will now always use coalesce with shuffling, even though in theory this will come with quite a performance decrease.
Markus
Could be that the driver JVM cannot handle the metadata required to store the partition information of a 70k partition RDD. I see you say you have a 100GB driver but i'm not sure where you configured that?
Did you set --driver-memory 100G ?
On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:
This is the reduction of an error in a complex program where allocated 100 GB driver (=worker=executor
as local mode) memory. In the example I used the default size, as the puny example shouldn’t need more anyway.
And without the coalesce or with coalesce(1,True) everything works fine.
I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd with 1 partition,
why is this a problem without shuffling?
That's a very low level error from the JVM. Any chance you are misconfiguring the executor size? like to 10MB instead of 10GB, that kind of thing. Trying to think of why the JVM
would have very little memory to operate.
An app running out of mem would not look like this.
On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <[hidden email]> wrote:
Hi all,
I'm using spark on a c5a.16xlarge machine in amazon cloud (so having 64 cores and 128 GB RAM). I'm using
spark 3.01.
The following python code leads to an exception, is this a bug or is my understanding of the API incorrect?
import pyspark
conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
sc=pyspark.SparkContext.getOrCreate(conf)
rows=70000
data=list(range(rows))
rdd=sc.parallelize(data,rows)
assert rdd.getNumPartitions()==rows
rdd0=rdd.filter(lambda x:False)
assert rdd0.getNumPartitions()==rows
rdd00=rdd0.coalesce(1)
data=rdd00.collect()
assert data==[]
output when starting from PyCharm:
/home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py
--mode=client --port=41185
import sys; print('Python %s on %s' % (sys.version, sys.platform))
sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])
PyDev console: starting.
Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
import os
os.environ['PYTHONHASHSEED'] = '0'
runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar)
to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using
builtin-java classes where applicable
21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very large size (4732 KiB). The maximum
recommended task size is 1000 KiB.
[Stage 0:> (0 + 1) / 1][423.190s][warning][os,thread]
Attempt to protect stack guard pages failed (0x00007f43d23ff000-0x00007f43d2403000).
[423.190s][warning][os,thread] Attempt to deallocate stack guard pages failed.
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not
enough space' (errno=12)
[423.231s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes:
stacksize: 1024k, guardsize: 0k, detached.
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 16384 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log
[thread 17966 also had an error]
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not
enough space' (errno=12)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
response = connection.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:42439)
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
Traceback (most recent call last):
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<input>", line 3, in <module>
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars) # execute the script
File "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in
execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 992, in <module>
test_70000()
File "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", line 974, in test_70000
data=rdd00.collect()
File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__
self._context._jsc.setCallSite(None)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1303, in __call__
answer = self.gateway_client.send_command(command)
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1031, in send_command
connection = self._get_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 979, in _get_connection
connection = self._create_connection()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 985, in _create_connection
connection.start()
File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1127, in start
raise Py4JNetworkError(msg, e)
py4j.protocol.Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:42439)
report of free -m:
total used free shared buff/cache available
Mem: 127462 5548 22680 92 99233 120605
Swap: 0 0 0
Thanks
Markus
|
|
Hi Sean! So the "coalesce" without shuffle will create a CoalescedRDD which during its computation delegates to the parent RDD partitions. As the CoalescedRDD contains only 1 partition so we talk about 1 task and 1 task context. The next stop is PythonRunner.
The PR will next week maybe (I am a bit uncertain as I have many other things to do right now).
Best Regards, Attila Yeah I figured it's not something fundamental to the task or Spark. The error is very odd, never seen that. Do you have a theory on what's going on there? I don't! On Fri, Apr 9, 2021 at 10:43 AM Attila Zsolt Piros < [hidden email]> wrote: Hi! I looked into the code and find a way to improve it. With the improvement your test runs just fine: Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.2.0-SNAPSHOT /_/
Using Python version 3.8.1 (default, Dec 30 2020 22:53:18) Spark context Web UI available at http://192.168.0.199:4040 Spark context available as 'sc' (master = local, app id = local-1617982367872). SparkSession available as 'spark'.
In [1]: import pyspark
In [2]: conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
In [3]: sc=pyspark.SparkContext.getOrCreate(conf)
In [4]: rows=70000
In [5]: data=list(range(rows))
In [6]: rdd=sc.parallelize(data,rows)
In [7]: assert rdd.getNumPartitions()==rows
In [8]: rdd0=rdd.filter(lambda x:False)
In [9]: assert rdd0.getNumPartitions()==rows
In [10]: rdd00=rdd0.coalesce(1)
In [11]: data=rdd00.collect() 21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very large siz e (4729 KiB). The maximum recommended task size is 1000 KiB.
In [12]: assert data==[]
In [13]:
I will create a jira and need to add some unittest before opening the PR. Best Regards, Attila
|
|
OK so it's '70000 threads overwhelming off heap mem in the JVM' kind of thing. Or running afoul of ulimits in the OS. On Fri, Apr 9, 2021 at 11:19 AM Attila Zsolt Piros < [hidden email]> wrote: Hi Sean! So the "coalesce" without shuffle will create a CoalescedRDD which during its computation delegates to the parent RDD partitions. As the CoalescedRDD contains only 1 partition so we talk about 1 task and 1 task context. The next stop is PythonRunner.
The PR will next week maybe (I am a bit uncertain as I have many other things to do right now).
Best Regards, Attila
|
|
I ran this one on RHES 7.6 with 64GB of memory and it hit OOM
>>> data=list(range(rows)) >>> rdd=sc.parallelize(data,rows) >>> assert rdd.getNumPartitions()==rows >>> rdd0=rdd.filter(lambda x:False) >>> assert rdd0.getNumPartitions()==rows >>> rdd00=rdd0.coalesce(1) >>> data=rdd00.collect() 2021-04-09 17:19:01,452 WARN scheduler.TaskSetManager: Stage 1 contains a task of very large size (4729 KiB). The maximum recommended task size is 1000 KiB. 2021-04-09 17:25:14,249 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method)
view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
OK so it's '70000 threads overwhelming off heap mem in the JVM' kind of thing. Or running afoul of ulimits in the OS. On Fri, Apr 9, 2021 at 11:19 AM Attila Zsolt Piros < [hidden email]> wrote: Hi Sean! So the "coalesce" without shuffle will create a CoalescedRDD which during its computation delegates to the parent RDD partitions. As the CoalescedRDD contains only 1 partition so we talk about 1 task and 1 task context. The next stop is PythonRunner.
The PR will next week maybe (I am a bit uncertain as I have many other things to do right now).
Best Regards, Attila
|
|
Spark 3.1.1
view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
I ran this one on RHES 7.6 with 64GB of memory and it hit OOM
>>> data=list(range(rows)) >>> rdd=sc.parallelize(data,rows) >>> assert rdd.getNumPartitions()==rows >>> rdd0=rdd.filter(lambda x:False) >>> assert rdd0.getNumPartitions()==rows >>> rdd00=rdd0.coalesce(1) >>> data=rdd00.collect() 2021-04-09 17:19:01,452 WARN scheduler.TaskSetManager: Stage 1 contains a task of very large size (4729 KiB). The maximum recommended task size is 1000 KiB. 2021-04-09 17:25:14,249 ERROR executor.Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method)
view my Linkedin profile
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.
OK so it's '70000 threads overwhelming off heap mem in the JVM' kind of thing. Or running afoul of ulimits in the OS. On Fri, Apr 9, 2021 at 11:19 AM Attila Zsolt Piros < [hidden email]> wrote: Hi Sean! So the "coalesce" without shuffle will create a CoalescedRDD which during its computation delegates to the parent RDD partitions. As the CoalescedRDD contains only 1 partition so we talk about 1 task and 1 task context. The next stop is PythonRunner.
The PR will next week maybe (I am a bit uncertain as I have many other things to do right now).
Best Regards, Attila
|
|