[Spark Structured Streaming]: truncated Parquet after driver crash or kill

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[Spark Structured Streaming]: truncated Parquet after driver crash or kill

dcam
This post has NOT been accepted by the mailing list yet.
Hello list


We have a Spark application that performs a set of ETLs: reading messages from a Kafka topic, categorizing them, and writing the contents out as Parquet files on HDFS. After writing, we are querying the data from HDFS using Presto's hive integration. We are having problems because the Parquet files are frequently truncated after the Spark driver is killed or crashes.

The meat of the (Scala) Spark jobs look like this:
Spark
  .openSession()
  .initKafkaStream("our_topic")
  .filter(...)
  .map(...)
  .coalesce(1)
  .writeStream
  .trigger(ProcessingTime("1 hours"))
  .outputMode("append")
  .queryName("MyETL")
  .format("parquet")
  .option("path", path)
  .start()

Is it expected that Parquet files could be truncated during crashes?

Sometimes the files are only 4 bytes long, sometimes they are longer but still too short to be valid Parquet files. Presto detects the short files and refuses to query the entire table. I hoped the write out of the files would be transactional, so that incomplete files would not be output.

We can fix crashes as they come up, but we will always need to kill the job periodically to deploy new versions of the code. We want to run the application as a long lived process that is continually reading from the Kafka queue and writing out to HDFS for archival purposes.


Thanks,
Dave Cameron
Reply | Threaded
Open this post in threaded view
|

Re: [Spark Structured Streaming]: truncated Parquet after driver crash or kill

cbowden
This post has NOT been accepted by the mailing list yet.
The default spark.sql.streaming.commitProtocolClass is https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala which may or may not be the best suited for all needs.

Code deploys could be improved by ensuring you shutdown gracefully, eg. invoke StreamingQuery#stop. https://issues.apache.org/jira/browse/SPARK-21029 is probably of interest.