Structured Streaming Checkpoint Error

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming Checkpoint Error

German Schiavon Matteo
Hi!

I have an Structured Streaming Application that reads from kafka, performs some aggregations and writes in S3 in parquet format.

Everything seems to work great except that from time to time I get a checkpoint error, at the beginning I thought it was a random error but it happened more than 3 times already in a few days

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


Does this happen to anyone else? 

Thanks in advance.

This is the full error :

ERROR streaming.MicroBatchExecution: Query segmentValidation [id = 14edaddf-25bb-4259-b7a2-6107907f962f, runId = 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Checkpoint Error

Gabor Somogyi
Hi,

Structured Streaming is simply not working when checkpoint location is on S3 due to it's read-after-write consistency.
Please choose an HDFS compliant filesystem and it will work like a charm.

BR,
G


On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <[hidden email]> wrote:
Hi!

I have an Structured Streaming Application that reads from kafka, performs some aggregations and writes in S3 in parquet format.

Everything seems to work great except that from time to time I get a checkpoint error, at the beginning I thought it was a random error but it happened more than 3 times already in a few days

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


Does this happen to anyone else? 

Thanks in advance.

This is the full error :

ERROR streaming.MicroBatchExecution: Query segmentValidation [id = 14edaddf-25bb-4259-b7a2-6107907f962f, runId = 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Checkpoint Error

German Schiavon Matteo
Hi Gabor,

Makes sense, thanks a lot! 

On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi <[hidden email]> wrote:
Hi,

Structured Streaming is simply not working when checkpoint location is on S3 due to it's read-after-write consistency.
Please choose an HDFS compliant filesystem and it will work like a charm.

BR,
G


On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <[hidden email]> wrote:
Hi!

I have an Structured Streaming Application that reads from kafka, performs some aggregations and writes in S3 in parquet format.

Everything seems to work great except that from time to time I get a checkpoint error, at the beginning I thought it was a random error but it happened more than 3 times already in a few days

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


Does this happen to anyone else? 

Thanks in advance.

This is the full error :

ERROR streaming.MicroBatchExecution: Query segmentValidation [id = 14edaddf-25bb-4259-b7a2-6107907f962f, runId = 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Checkpoint Error

German Schiavon Matteo
Hello! 

[hidden email]  I wonder that now that s3 is strongly consistent , would work fine.


Regards!

On Thu, 17 Sep 2020 at 11:55, German Schiavon <[hidden email]> wrote:
Hi Gabor,

Makes sense, thanks a lot! 

On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi <[hidden email]> wrote:
Hi,

Structured Streaming is simply not working when checkpoint location is on S3 due to it's read-after-write consistency.
Please choose an HDFS compliant filesystem and it will work like a charm.

BR,
G


On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <[hidden email]> wrote:
Hi!

I have an Structured Streaming Application that reads from kafka, performs some aggregations and writes in S3 in parquet format.

Everything seems to work great except that from time to time I get a checkpoint error, at the beginning I thought it was a random error but it happened more than 3 times already in a few days

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


Does this happen to anyone else? 

Thanks in advance.

This is the full error :

ERROR streaming.MicroBatchExecution: Query segmentValidation [id = 14edaddf-25bb-4259-b7a2-6107907f962f, runId = 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Checkpoint Error

Jungtaek Lim-2
In theory it would work, but works very inefficiently on checkpointing. If I understand correctly, it will write the content to the temp file on s3, and rename the file which actually gets the temp file from s3 and write the content of temp file to the final path on s3. Compared to checkpoint with HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom implementation of checkpoint manager on S3.

Also atomic rename is still not working for S3, as well as S3 doesn't support write with overwrite=false. That said, there's no barrier if concurrent streaming queries access to the same checkpoint and mess up. With checkpoint in HDFS, the rename is atomic and only one succeeds even in parallel and the other query lost writing to the checkpoint file simply fails. That's a caveat you may want to keep in mind.

On Wed, Dec 2, 2020 at 11:35 PM German Schiavon <[hidden email]> wrote:
Hello! 

[hidden email]  I wonder that now that s3 is strongly consistent , would work fine.


Regards!

On Thu, 17 Sep 2020 at 11:55, German Schiavon <[hidden email]> wrote:
Hi Gabor,

Makes sense, thanks a lot! 

On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi <[hidden email]> wrote:
Hi,

Structured Streaming is simply not working when checkpoint location is on S3 due to it's read-after-write consistency.
Please choose an HDFS compliant filesystem and it will work like a charm.

BR,
G


On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <[hidden email]> wrote:
Hi!

I have an Structured Streaming Application that reads from kafka, performs some aggregations and writes in S3 in parquet format.

Everything seems to work great except that from time to time I get a checkpoint error, at the beginning I thought it was a random error but it happened more than 3 times already in a few days

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


Does this happen to anyone else? 

Thanks in advance.

This is the full error :

ERROR streaming.MicroBatchExecution: Query segmentValidation [id = 14edaddf-25bb-4259-b7a2-6107907f962f, runId = 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)
Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming Checkpoint Error

German Schiavon Matteo
Thanks Jungtaek!

It makes sense, we are currently changing to an HDFS-Compatible FS, I was wondering how this change would impact the checkpoint, but after what you said it is more clear now.



On Thu, 3 Dec 2020 at 00:23, Jungtaek Lim <[hidden email]> wrote:
In theory it would work, but works very inefficiently on checkpointing. If I understand correctly, it will write the content to the temp file on s3, and rename the file which actually gets the temp file from s3 and write the content of temp file to the final path on s3. Compared to checkpoint with HDFS, 1 unnecessary write, 1 unnecessary read. It probably warrants custom implementation of checkpoint manager on S3.

Also atomic rename is still not working for S3, as well as S3 doesn't support write with overwrite=false. That said, there's no barrier if concurrent streaming queries access to the same checkpoint and mess up. With checkpoint in HDFS, the rename is atomic and only one succeeds even in parallel and the other query lost writing to the checkpoint file simply fails. That's a caveat you may want to keep in mind.

On Wed, Dec 2, 2020 at 11:35 PM German Schiavon <[hidden email]> wrote:
Hello! 

[hidden email]  I wonder that now that s3 is strongly consistent , would work fine.


Regards!

On Thu, 17 Sep 2020 at 11:55, German Schiavon <[hidden email]> wrote:
Hi Gabor,

Makes sense, thanks a lot! 

On Thu, 17 Sep 2020 at 11:51, Gabor Somogyi <[hidden email]> wrote:
Hi,

Structured Streaming is simply not working when checkpoint location is on S3 due to it's read-after-write consistency.
Please choose an HDFS compliant filesystem and it will work like a charm.

BR,
G


On Wed, Sep 16, 2020 at 4:12 PM German Schiavon <[hidden email]> wrote:
Hi!

I have an Structured Streaming Application that reads from kafka, performs some aggregations and writes in S3 in parquet format.

Everything seems to work great except that from time to time I get a checkpoint error, at the beginning I thought it was a random error but it happened more than 3 times already in a few days

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx/validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp


Does this happen to anyone else? 

Thanks in advance.

This is the full error :

ERROR streaming.MicroBatchExecution: Query segmentValidation [id = 14edaddf-25bb-4259-b7a2-6107907f962f, runId = 0a757476-94ec-4a53-960a-91f54ce47110] terminated with error

java.io.FileNotFoundException: No such file or directory: s3a://xxx/xxx//validation-checkpoint/offsets/.140.11adef9a-7636-4752-9e6c-48d605a9cca5.tmp

at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310)

at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204)

at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143)

at org.apache.hadoop.fs.FileSystem.getFileLinkStatus(FileSystem.java:2664)

at org.apache.hadoop.fs.DelegateToFileSystem.getFileLinkStatus(DelegateToFileSystem.java:131)

at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:726)

at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:699)

at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1032)

at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:329)

at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:147)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.writeBatchToFile(HDFSMetadataLog.scala:134)

at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$add$3(HDFSMetadataLog.scala:120)

at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at scala.Option.getOrElse(Option.scala:189)