Structured Streaming - HDFS State Store Performance Issues

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming - HDFS State Store Performance Issues

William Briggs-2
Hi all, I've got a problem that really has me stumped. I'm running a Structured Streaming query that reads from Kafka, performs some transformations and stateful aggregations (using flatMapGroupsWithState), and outputs any updated aggregates to another Kafka topic.

I'm running this job using Spark 2.4.4 on Amazon EMR 5.28.1. Semi-regularly, all the tasks except one will complete, and the one remaining task will take 1-2 minutes, instead of 1-2 seconds to complete. I've checked the number of input records (and overall size) for that task, and everything seems in-line with all the other tasks - there's no visible skew.

The only thing I have to go on at the moment is that the thread dump on the executor that is hung shows a 'state-store-maintenance-task' thread, which is blocked on an "Executor task launch worker" thread - that second thread shows as TIMED_WAITING, with the following locks:

Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1569026152}), Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171}), Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316}), Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777})

And a stack of:

java.lang.Object.wait(Native Method)
org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:877)
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:736)
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:846) => holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805) => holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:145) => holding Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316})
net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:193)
java.io.FilterOutputStream.close(FilterOutputStream.java:159)
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:417)
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:287) => holding Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777})
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:132)
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)

Based on this, I'm guessing that there's some kind of delay happening with the HDFSStateStore, but my NameNode and DataNode metrics all look good (no large GCs, plenty of free memory, network bandwidth isn't saturated, no under-replicated blocks).

Has anyone run into a problem like this before? Any help would be greatly appreciated!

Regards,
Will
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming - HDFS State Store Performance Issues

Gourav Sengupta
Hi Will,

have you tried using S3 as state store with the option in EMR enabled for faster file sync, also there is an option now of using FSx Lustre.

Thanks and Regards,
Gourav Sengupta

On Wed, Jan 15, 2020 at 5:17 AM William Briggs <[hidden email]> wrote:
Hi all, I've got a problem that really has me stumped. I'm running a Structured Streaming query that reads from Kafka, performs some transformations and stateful aggregations (using flatMapGroupsWithState), and outputs any updated aggregates to another Kafka topic.

I'm running this job using Spark 2.4.4 on Amazon EMR 5.28.1. Semi-regularly, all the tasks except one will complete, and the one remaining task will take 1-2 minutes, instead of 1-2 seconds to complete. I've checked the number of input records (and overall size) for that task, and everything seems in-line with all the other tasks - there's no visible skew.

The only thing I have to go on at the moment is that the thread dump on the executor that is hung shows a 'state-store-maintenance-task' thread, which is blocked on an "Executor task launch worker" thread - that second thread shows as TIMED_WAITING, with the following locks:

Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1569026152}), Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171}), Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316}), Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777})

And a stack of:

java.lang.Object.wait(Native Method)
org.apache.hadoop.hdfs.DataStreamer.waitForAckedSeqno(DataStreamer.java:877)
org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:736)
org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:846) => holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:805) => holding Monitor(org.apache.hadoop.hdfs.DFSOutputStream@321319171})
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:145) => holding Monitor(org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream@235686316})
net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:193)
java.io.FilterOutputStream.close(FilterOutputStream.java:159)
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.finalizeDeltaFile(HDFSBackedStateStoreProvider.scala:417)
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$commitUpdates(HDFSBackedStateStoreProvider.scala:287) => holding Monitor(org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider@1633346777})
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:132)
org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$$anonfun$doExecute$1$$anonfun$apply$1.apply$mcV$sp(FlatMapGroupsWithStateExec.scala:135)

Based on this, I'm guessing that there's some kind of delay happening with the HDFSStateStore, but my NameNode and DataNode metrics all look good (no large GCs, plenty of free memory, network bandwidth isn't saturated, no under-replicated blocks).

Has anyone run into a problem like this before? Any help would be greatly appreciated!

Regards,
Will