Structured Streaming Checkpoint Error

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming Checkpoint Error

German Schiavon Matteo
Hi!

I have an Structured Streaming Application that reads from kafka, performs some aggregations and writes in S3 in parquet format.

Everything seems to work great except that from time to time I get a checkpoint error, at the beginning I thought it was a random error but it happened more than 3 times already in a few days

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


Does this happen to anyone else? 

Thanks in advance.

This is the full error :

ERROR streaming.MicroBatchExecution: Query segmentValidation [id = 14edaddf-25bb-4259-b7a2-6107907f962f, runId = 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Checkpoint Error

Gabor Somogyi
Hi,

Structured Streaming is simply not working when checkpoint location is on S3 due to it's read-after-write consistency.
Please choose an HDFS compliant filesystem and it will work like a charm.

BR,
G


On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <[hidden email]> wrote:
Hi!

I have an Structured Streaming Application that reads from kafka, performs some aggregations and writes in S3 in parquet format.

Everything seems to work great except that from time to time I get a checkpoint error, at the beginning I thought it was a random error but it happened more than 3 times already in a few days

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


Does this happen to anyone else? 

Thanks in advance.

This is the full error :

ERROR streaming.MicroBatchExecution: Query segmentValidation [id = 14edaddf-25bb-4259-b7a2-6107907f962f, runId = 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Checkpoint Error

German Schiavon Matteo
Hi Gabor,

Makes sense, thanks a lot! 

On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi <[hidden email]> wrote:
Hi,

Structured Streaming is simply not working when checkpoint location is on S3 due to it's read-after-write consistency.
Please choose an HDFS compliant filesystem and it will work like a charm.

BR,
G


On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <[hidden email]> wrote:
Hi!

I have an Structured Streaming Application that reads from kafka, performs some aggregations and writes in S3 in parquet format.

Everything seems to work great except that from time to time I get a checkpoint error, at the beginning I thought it was a random error but it happened more than 3 times already in a few days

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


Does this happen to anyone else? 

Thanks in advance.

This is the full error :

ERROR streaming.MicroBatchExecution: Query segmentValidation [id = 14edaddf-25bb-4259-b7a2-6107907f962f, runId = 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)