EMR Spark 2.4.3 executor hang

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

EMR Spark 2.4.3 executor hang

Daniel Zhang
Hi, All:
We are testing the EMR and compare with our on-premise HDP solution. We use one application as the test:
EMR (5.21.1) with Hadoop 2.8.5 + Spark 2.4.3 vs HDP (2.6.3) with Hadoop 2.7.3 + Spark 2.2.0
The application is very simple, just read Parquet raw file, then do a DS.repartition(id_col).flatMap().write.partitionBy(col).save() operation.

For the testing data on HDP with 6 slave nodes (32G each), the whole application can finish around 3 hours. We are fine with it.
This application will run a Spark application with 2 stages. The 2nd stage will run with 200 tasks as default.
On EMR, we observed that 2 of 200 tasks is hanging for more than 10 hours, while the rests are done, and we have to give up.

The first test is to read the raw parquet file from S3 and use AWS S3 as the output directly. So I think it could be some issue with S3 output committer. So we change the test to read parquet file from S3 and use EMR HDFS as the output location.
To my surprise, we observed the same behavior using HDFS, 2 of 200 tasks hanging forever, and they are on different executors. These 2 executors are normal to process other tasks but just hang for these 2 tasks, while all the rest finished. 

This looks like data skew, but we know it is not. As the same application and the same data work fine on HDP, and we saw well-balanced data across all 200 tasks.

Now I checked more careful for the executors log on EMR for using HDFS test case, and I know the S3 is not an issue here, as all the parquet raw data being read in the first stage of the job WITHOUT any delay.

Sample log from the finished executor on EMR:
19/08/29 20:18:49 INFO Executor: Finished task 157.0 in stage 2.0 (TID 170). 3854 bytes result sent to driver
19/08/29 20:18:49 INFO CoarseGrainedExecutorBackend: Got assigned task 179
19/08/29 20:18:49 INFO Executor: Running task 166.0 in stage 2.0 (TID 179)
19/08/29 20:18:49 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty blocks including 1 local blocks and 11 remote blocks
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-147.ec2.internal/10.51.51.147:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-147.ec2.internal/10.51.51.147:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-141.ec2.internal/10.51.51.141:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-141.ec2.internal/10.51.51.141:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-155.ec2.internal/10.51.51.155:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-155.ec2.internal/10.51.51.155:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-142.ec2.internal/10.51.51.142:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-142.ec2.internal/10.51.51.142:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-140.ec2.internal/10.51.51.140:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-140.ec2.internal/10.51.51.140:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-157.ec2.internal/10.51.51.157:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-157.ec2.internal/10.51.51.157:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 20:18:49 INFO ShuffleBlockFetcherIterator: Started 11 remote fetches in 61 ms
19/08/29 20:28:55 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
.................

The last log from the hanging executor on EMR:
19/08/29 19:40:40 INFO Executor: Finished task 78.0 in stage 2.0 (TID 91). 3854 bytes result sent to driver
19/08/29 19:40:40 INFO CoarseGrainedExecutorBackend: Got assigned task 101
19/08/29 19:40:40 INFO Executor: Running task 88.0 in stage 2.0 (TID 101)
19/08/29 19:40:40 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty blocks including 1 local blocks and 11 remote blocks
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-147.ec2.internal/10.51.51.147:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-147.ec2.internal/10.51.51.147:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-157.ec2.internal/10.51.51.157:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-157.ec2.internal/10.51.51.157:7337 after 1 ms (0 ms spent in bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-142.ec2.internal/10.51.51.142:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-142.ec2.internal/10.51.51.142:7337 after 1 ms (0 ms spent in bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-141.ec2.internal/10.51.51.141:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-141.ec2.internal/10.51.51.141:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-155.ec2.internal/10.51.51.155:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-155.ec2.internal/10.51.51.155:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-140.ec2.internal/10.51.51.140:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-140.ec2.internal/10.51.51.140:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 19:40:40 INFO ShuffleBlockFetcherIterator: Started 11 remote fetches in 73 ms

It shows that on the hanging executor, it started fetching data for task "101", but never reached "FileOutputCommitter", for this particular task "101". There were other tasks "91" finished without any issue on this executor before.
I checked the HDFS output location:
[hadoop@ip-10-51-51-151 ~]$ hadoop fs -ls -R /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101
drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-01
-rw-r--r--   2 hadoop hadoop  170976376 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-01/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet
drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-02
-rw-r--r--   2 hadoop hadoop  102985213 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-02/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet
drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-03
-rw-r--r--   2 hadoop hadoop   58306503 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-03/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet
drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=UNKNOWN
-rw-r--r--   2 hadoop hadoop  258330267 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=UNKNOWN/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet

In fact, for me, all the intermedia data for this task "101" SHOULD ALREADY BE DONE on HDFS at "19:51". The output parquet files size is close to other tasks' output which already was finished.

So my questions are:

1) What COULD stop these 2 executors reaching "FileOutputCommitter" in Spark 2.4.3 in this case? I really don't believe at this time they were still fetching data from remote.
2) Of course, this Spark 2.4.3 is running on EMR, and AWS gave us the following configurations may related to the above issue as below:

spark.hadoop.yarn.timeline-service.enabled false
spark.yarn.appMasterEnv.SPARK_PUBLIC_DNS $(hostname -f)
spark.files.fetchFailure.unRegisterOutputOnHost true
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem 2
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem true
spark.sql.parquet.output.committer.class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
spark.sql.parquet.fs.optimized.committer.optimization-enabled true
spark.sql.emr.internal.extensions com.amazonaws.emr.spark.EmrSparkSessionExtensions

Can anyone give me some idea what could cause this issue?

Thanks

Yong
Reply | Threaded
Open this post in threaded view
|

Re: EMR Spark 2.4.3 executor hang

Vadim Semenov-3
Try "spark.shuffle.io.numConnectionsPerPeer=10"

On Fri, Aug 30, 2019 at 10:22 AM Daniel Zhang <[hidden email]> wrote:
Hi, All:
We are testing the EMR and compare with our on-premise HDP solution. We use one application as the test:
EMR (5.21.1) with Hadoop 2.8.5 + Spark 2.4.3 vs HDP (2.6.3) with Hadoop 2.7.3 + Spark 2.2.0
The application is very simple, just read Parquet raw file, then do a DS.repartition(id_col).flatMap().write.partitionBy(col).save() operation.

For the testing data on HDP with 6 slave nodes (32G each), the whole application can finish around 3 hours. We are fine with it.
This application will run a Spark application with 2 stages. The 2nd stage will run with 200 tasks as default.
On EMR, we observed that 2 of 200 tasks is hanging for more than 10 hours, while the rests are done, and we have to give up.

The first test is to read the raw parquet file from S3 and use AWS S3 as the output directly. So I think it could be some issue with S3 output committer. So we change the test to read parquet file from S3 and use EMR HDFS as the output location.
To my surprise, we observed the same behavior using HDFS, 2 of 200 tasks hanging forever, and they are on different executors. These 2 executors are normal to process other tasks but just hang for these 2 tasks, while all the rest finished. 

This looks like data skew, but we know it is not. As the same application and the same data work fine on HDP, and we saw well-balanced data across all 200 tasks.

Now I checked more careful for the executors log on EMR for using HDFS test case, and I know the S3 is not an issue here, as all the parquet raw data being read in the first stage of the job WITHOUT any delay.

Sample log from the finished executor on EMR:
19/08/29 20:18:49 INFO Executor: Finished task 157.0 in stage 2.0 (TID 170). 3854 bytes result sent to driver
19/08/29 20:18:49 INFO CoarseGrainedExecutorBackend: Got assigned task 179
19/08/29 20:18:49 INFO Executor: Running task 166.0 in stage 2.0 (TID 179)
19/08/29 20:18:49 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty blocks including 1 local blocks and 11 remote blocks
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-147.ec2.internal/10.51.51.147:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-147.ec2.internal/10.51.51.147:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-141.ec2.internal/10.51.51.141:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-141.ec2.internal/10.51.51.141:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-155.ec2.internal/10.51.51.155:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-155.ec2.internal/10.51.51.155:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-142.ec2.internal/10.51.51.142:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-142.ec2.internal/10.51.51.142:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-140.ec2.internal/10.51.51.140:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-140.ec2.internal/10.51.51.140:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-157.ec2.internal/10.51.51.157:7337, creating a new one.
19/08/29 20:18:49 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-157.ec2.internal/10.51.51.157:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 20:18:49 INFO ShuffleBlockFetcherIterator: Started 11 remote fetches in 61 ms
19/08/29 20:28:55 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
.................

The last log from the hanging executor on EMR:
19/08/29 19:40:40 INFO Executor: Finished task 78.0 in stage 2.0 (TID 91). 3854 bytes result sent to driver
19/08/29 19:40:40 INFO CoarseGrainedExecutorBackend: Got assigned task 101
19/08/29 19:40:40 INFO Executor: Running task 88.0 in stage 2.0 (TID 101)
19/08/29 19:40:40 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty blocks including 1 local blocks and 11 remote blocks
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-147.ec2.internal/10.51.51.147:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-147.ec2.internal/10.51.51.147:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-157.ec2.internal/10.51.51.157:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-157.ec2.internal/10.51.51.157:7337 after 1 ms (0 ms spent in bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-142.ec2.internal/10.51.51.142:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-142.ec2.internal/10.51.51.142:7337 after 1 ms (0 ms spent in bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-141.ec2.internal/10.51.51.141:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-141.ec2.internal/10.51.51.141:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-155.ec2.internal/10.51.51.155:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-155.ec2.internal/10.51.51.155:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 19:40:40 INFO TransportClientFactory: Found inactive connection to ip-10-51-51-140.ec2.internal/10.51.51.140:7337, creating a new one.
19/08/29 19:40:40 INFO TransportClientFactory: Successfully created connection to ip-10-51-51-140.ec2.internal/10.51.51.140:7337 after 0 ms (0 ms spent in bootstraps)
19/08/29 19:40:40 INFO ShuffleBlockFetcherIterator: Started 11 remote fetches in 73 ms

It shows that on the hanging executor, it started fetching data for task "101", but never reached "FileOutputCommitter", for this particular task "101". There were other tasks "91" finished without any issue on this executor before.
I checked the HDFS output location:
[hadoop@ip-10-51-51-151 ~]$ hadoop fs -ls -R /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101
drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-01
-rw-r--r--   2 hadoop hadoop  170976376 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-01/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet
drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-02
-rw-r--r--   2 hadoop hadoop  102985213 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-02/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet
drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-03
-rw-r--r--   2 hadoop hadoop   58306503 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=2019-08-03/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet
drwxr-xr-x   - hadoop hadoop          0 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=UNKNOWN
-rw-r--r--   2 hadoop hadoop  258330267 2019-08-29 19:51 /user/hadoop/permalink/working/working/20190829190515/data/_temporary/0/task_20190829190552_0002_m_000101/dt=UNKNOWN/part-00101-cc72b7f3-cb01-4bdf-992d-1567cfc9ebd6.c000.snappy.parquet

In fact, for me, all the intermedia data for this task "101" SHOULD ALREADY BE DONE on HDFS at "19:51". The output parquet files size is close to other tasks' output which already was finished.

So my questions are:

1) What COULD stop these 2 executors reaching "FileOutputCommitter" in Spark 2.4.3 in this case? I really don't believe at this time they were still fetching data from remote.
2) Of course, this Spark 2.4.3 is running on EMR, and AWS gave us the following configurations may related to the above issue as below:

spark.hadoop.yarn.timeline-service.enabled false
spark.yarn.appMasterEnv.SPARK_PUBLIC_DNS $(hostname -f)
spark.files.fetchFailure.unRegisterOutputOnHost true
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem 2
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem true
spark.sql.parquet.output.committer.class com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
spark.sql.parquet.fs.optimized.committer.optimization-enabled true
spark.sql.emr.internal.extensions com.amazonaws.emr.spark.EmrSparkSessionExtensions

Can anyone give me some idea what could cause this issue?

Thanks

Yong


--
Sent from my iPhone