Structured Streaming to file sink results in illegal state exception

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Structured Streaming to file sink results in illegal state exception

Magnus Nilsson
I'm evaluating Structured Streaming trying to understand how resilient the pipeline is. I ran a small test streaming data from an Azure Event Hub using Azure Databricks saving the data into a parquet file on the Databricks filesystem dbfs:/.

I did an unclean shutdown by cancelling the query. Then tried to restart the query without changing any parameters. This lead to an illegal state exception. 

"Caused by: java.lang.IllegalStateException: dbfs:/test02/_spark_metadata/2 doesn't exist when compacting batch 9 (compactInterval: 10)"

Now I'm trying to reconcile how checkpointing works with the commit logs in the _spark_metadata/ directory in the data output folder.

There isn't any _spark_metadata/2 file, that is correct. How does Spark know there ought to be one? The streaming query created 9 offset log files and 8 commit log files in the checkpoint directory. The data folder's _spark_metdata/ folder contains two files listing two files each, the data directory itself contains 10 parquet files.

If I understand correctly on the input side this means that nine trigger batches has been started, eight has been finished. On the output side 10 files have been started and four have been finished (commited). Six of them are "uncommited", ie dirty or in progress writes as far as Spark is concerned.

I have yet to find where the translation from batch to output files are logged. If the pipeline is capable of exactly-once-delivery semantics to a file store shouldn't the translation from batch per partition to resulting commited file in the data folder be logged somewhere?

Ie in this scenario shouldn't Spark look up what batches are saved in the commited output files, clean up the dirty writes, then replay the stream from the last known good position?

I want to back to the last known good state and resume the stream from there. Any input from the list is greatly appreciated.

Is there any good blog post or other documentation on the metadata handling in Spark? I've looked but only found synoptic documentation.