Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits

William Briggs
I am running a Structured Streaming job (Spark 2.2.0) using EMR 5.9. The job sources data from a Kafka topic, performs a variety of filters and transformations, and sinks data back into a different Kafka topic.

Once per day, we stop the query in order to merge the namenode edit logs with the fsimage, because Structured Streaming creates and destroys a significant number of HDFS files, and EMR doesn't support a secondary or HA namenode for fsimage compaction (AWS support directed us to do this, as Namenode edit logs were filling the disk).

Occasionally, the Structured Streaming query will not restart because the most recent file in the "commits" or "offsets" checkpoint subdirectory is empty. This seems like an undesirable behavior, as it requires manual intervention to remove the empty files in order to force the job to fall back onto the last good values. Has anyone run into this behavior? The only similar issue I can find is SPARK-21760, which appears to have no fix or workaround.

Any assistance would be greatly appreciated!

Regards,
Will
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming + Kafka - Corrupted Checkpoint Offsets / Commits

Shixiong(Ryan) Zhu
The root cause is probably that HDFSMetadataLog ignores exceptions thrown by "output.close". I think this should be fixed by this line in Spark 2.2.1 and 3.0.0: https://github.com/apache/spark/commit/6edfff055caea81dc3a98a6b4081313a0c0b0729#diff-aaeb546880508bb771df502318c40a99L126

Could you try 2.2.1?

On Thu, Jan 4, 2018 at 9:08 AM, William Briggs <[hidden email]> wrote:
I am running a Structured Streaming job (Spark 2.2.0) using EMR 5.9. The job sources data from a Kafka topic, performs a variety of filters and transformations, and sinks data back into a different Kafka topic.

Once per day, we stop the query in order to merge the namenode edit logs with the fsimage, because Structured Streaming creates and destroys a significant number of HDFS files, and EMR doesn't support a secondary or HA namenode for fsimage compaction (AWS support directed us to do this, as Namenode edit logs were filling the disk).

Occasionally, the Structured Streaming query will not restart because the most recent file in the "commits" or "offsets" checkpoint subdirectory is empty. This seems like an undesirable behavior, as it requires manual intervention to remove the empty files in order to force the job to fall back onto the last good values. Has anyone run into this behavior? The only similar issue I can find is SPARK-21760, which appears to have no fix or workaround.

Any assistance would be greatly appreciated!

Regards,
Will