Structured Streaming restart results in illegal state exception

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming restart results in illegal state exception

Magnus Nilsson-2
Hello,

I'm evaluating Structured Streaming trying to understand how resilient the pipeline is to failures. I ran a small test streaming data from an Azure Event Hub using Azure Databricks saving the data into a parquet file on the Databricks filesystem dbfs:/.

I did an unclean shutdown by cancelling the query. Then tried to restart the query without changing any parameters. This lead to an aborted job due to an illegal state exception.

"Caused by: java.lang.IllegalStateException: dbfs:/test02/_spark_metadata/2 doesn't exist when compacting batch 9 (compactInterval: 10)"

Now I'm trying to reconcile how stream checkpointing works with the commit logs in the _spark_metadata/ directory in the data output folder.

How does Spark know there ought to be a _spark_metadata/2 file? And why is the missing file considered an illegal state. How is the commit metadata in the dbfs:/ file system integrated with structured streaming checkpointing? I haven't found any metadata that links a particular committed file (i.e. where there's a corresponding log file in the _spark_metadata/ folder) to what batch created it. As far as I know checkpointing and commit logs are separate from the file stores commit metadata. Somewhere Spark needs to track what files where created from what batch to be able to uphold exactly once processing to file stores.

If it does one would think Spark could clean up the dirty writes in the sink folder and restart the stream from the last good known offset. This is what I had hoped would happen. No such luck though.

I want to start over from the last known good state and resume the stream from there. Any input from the list on this issue is greatly appreciated.

Is there any good blog post or other documentation on the file sink metadata handling in Spark? I've looked but only found synoptic documentation and nothing that explains the handling in detail.

Thanks,

Magnus