Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)

classic Classic list List threaded Threaded
23 messages Options
12
Reply | Threaded
Open this post in threaded view
|

elasticsearch-hadoop is not compatible with spark 3.0( scala 2.12)

murat migdisoglu
Hi, 

I'm testing our codebase against spark 3.0.0 stack and I realized that elasticsearch-hadoop libraries are built against scala 2.11 and thus are not working with spark 3.0.0. (and probably 2.4.2). 

Is there anybody else facing this issue? How did you solve it? 
The PR on the ES library is open since Nov 2019... 

Thank you 

Reply | Threaded
Open this post in threaded view
|

Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)

Bartosz Konieczny
In reply to this post by Bartosz Konieczny
Hi Rachana,

I'm really sorry but I'm hard to convince when I don't see the stack traces 🤓 But last weeks I took more time to check the OOM issue on the file sink and indeed, as you pointed out in the JIRAs, there is something to work on here. I wrote a short blog post about this topic, describing possible workarounds and the ongoing community effort to implement a real fix.  https://www.waitingforcode.com/apache-spark-structured-streaming/file-sink-out-of-memory-risk/read 

On Tue, Jun 23, 2020 at 6:38 AM Bartosz Konieczny <[hidden email]> wrote:
Thank you but let's stop focusing on the metadata size :P For me, the error is just saying that there is an OOM, that's all. Maybe it's the root cause of your problem or not. Do you have the trace of anything happening before the ERROR, maybe within the failing micro-batch?
Or what makes you think it's the root cause? It will work every time when you remove the metadata because Spark will start as a new application. So if for whatever reason, it fails always at the same record (say offset=1000 in Kafka topic "a"), it will run correctly during several days to process all records prior offset=1000 and fail just after. Just saying, that it may not be related to the metadata file.

So, few other questions too that may guide you to identify the root cause:
1.
2. Use from_json API from Spark to extract your data for further transformation in a dataset.
   Dataset<Row> dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event"))
       ....withColumn("oem_id", col("metadata.oem_id"));
Does your code fail every time at the same moment, ie. in the offset metadata file you retrieve the same/approximately the same offset? If yes, it can be the indication that something is wrong with the data, like a really big JSON.

2.
3. Construct a temp table of above dataset using SQLContext
   SQLContext sqlContext = new SQLContext(sparkSession);
   dataDf.createOrReplaceTempView("event");
Hmmm, I don't fully understand the purpose of this snippet. 

3.
4. Flatten events since Parquet does not support hierarachical data.
What is the logic behind it? An expression, a custom code? Any snippet? Maybe you're doing a memory-intensive operation or accumulating data in memory that can also lead to OOM problems?

Otherwise, if you are sure at 100% that the reason for your error comes from the metadata files, an extra suggestion to the previous ones as a workaround:
1. Listen for the job's outcome - on AWS you can use Lambda functions https://www.waitingforcode.com/data-aws/listening-emr-events-aws-lambda/read, no idea for the other runtimes
2. Write a function that will keep the last ".compact" file in _spark_metadata AND the corresponding files in offsets/ and commits/ checkpoint directories. By "keeping" I mean that you can move these files into a new checkpoint directory and use this new location on your application. Trigger the function when the job's outcome is an error.
3. Resubmit the job to the cluster from your function.
BTW I'm just thinking now, if you do the steps 2 and 3 manually (better to move them to the new location for this manual test, do not remove them physically), you'll see whether the problem comes from the metadata file or not. Logically, you'll have only a few mb of metadata and your processing should restart (= argument of 13GB doesn't stand anymore, unless your kept .compact file has 13GB) . The drawback of this approach is that you won't be able to reprocess previous micro-batches because you won't have the checkpoint files for them.

Apart from that, I have no other ideas.

Best,
Bartosz.
 

On Mon, Jun 22, 2020 at 8:28 PM Rachana Srivastava <[hidden email]> wrote:
The error that I got when metadata size reached above 13GB is following.  
Error message:
20/06/02 17:27:08 ERROR Client: Application diagnostics message: Application application_1590784455092_0001 failed 2 times in previous 3600000 milliseconds due to AM Container for appattempt_1590784455092_0001_000004 exited with  exitCode: -104
Failing this attempt.Diagnostics: Container [pid=25010,containerID=container_1590784455092_0001_04_000001] is running beyond physical memory limits. Current usage: 5.5 GB of 5.5 GB physical memory used; 7.5 GB of 27.5 GB virtual memory used. Killing container.

Thanks,

Rachana

On Friday, June 19, 2020, 02:14:34 PM PDT, Rachana Srivastava <[hidden email]> wrote:


Thanks so much Bartosz,

I have restarted the system with the metadata delete settings.  I do not have logs at this moment.  But when the issue happen again I will try to capture the log and ask you if I need any help with the analysis.

Thanks again a lot!

Rachana


On Friday, June 19, 2020, 04:41:13 AM PDT, Bartosz Konieczny <[hidden email]> wrote:


Makes sense Jungtaek 👍  But for the compaction we always take the compaction files according to the compaction interval, right? So never all batches from the beginning:
/**
* Returns all valid batches before the specified `compactionBatchId`. They contain all logs we
* need to do a new compaction.
*
* E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns
* `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
*/
def getValidBatchesBeforeCompactionBatch(
compactionBatchId: Long,
compactInterval: Int): Seq[Long] = {
assert(isCompactionBatch(compactionBatchId, compactInterval),
s"$compactionBatchId is not a compaction batch")
(math.max(0, compactionBatchId - compactInterval)) until compactionBatchId
}

private def compact(batchId: Long, logs: Array[T]): Boolean = {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
val allLogs = validBatches.map { id =>
super.get(id).getOrElse {
throw new IllegalStateException(
s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " +
s"(compactInterval: $compactInterval)")
}
}.flatten ++ logs
// Return false as there is another writer.
super.add(batchId, compactLogs(allLogs).toArray)
}

I don't see how the compaction process itself, supposing we talk about the sink, we keep the defaults and not write nearly 1GB per metadata file, could lead to the memory issues 🤔 ? And regarding Rachana's screenshot, the compacted files have only 5MB and we can see that the compaction happens every 10 files, which is the default compact interval. In theory I got the point but for this specific use case it's hard to imagine. Or maybe I misunderstood the problem and the point is that at some point in time, there are 10 consecutive mico-batches that produce 13GB but it seems quite surprising too.

I suppose that the root cause may be located elsewhere but it's hard to confirm since we don't have the logs or other debugging information.



On Fri, Jun 19, 2020 at 1:09 PM Jungtaek Lim <[hidden email]> wrote:
To whom who would like to understand the details on file stream source and sink, you'll want to look more closely on CompactibleFileStreamLog, and imagine how compact, get, allFiles will work with batch log files when a compact batch file is GBs. Memory issue for example, it materializes all entries into memory to run compact, produce result of get or allFiles. Memory footprint would be definitely smaller than the file size, but still be huge in such case.

On Fri, Jun 19, 2020 at 7:42 PM Bartosz Konieczny <[hidden email]> wrote:
Hi,

Since a few weeks ago I took a closer look at the sink and metadata, I will try to help and complete Jacek's point. Correct me if I'm wrong but that's what I observed for the sink:

- there is no place where Apache Spark keeps all metadata files in the main memory nor their content - eventually, it can only keep the objects representing the files of the currently executed micro-batch
- the metadata files are compressed every spark.sql.streaming.fileSource.log.compactInterval log files
- if spark.sql.streaming.fileSink.log.deletion is set to true (default), the metadata will be cleaned and cleaned with an extra delay of spark.sql.streaming.fileSink.log.cleanupDelay milliseconds
- only spark.sql.streaming.minBatchesToRetain metadata files should be kept; applies  globally: checkpoint metadata, state store and file sink; default to 100

A single place where I see a potential OOM because of listing of all metadata files is FileSreamSink class, here:
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
logInfo(s"Skipping already committed batch $batchId")
}
// ...
override def getLatest(): Option[(Long, T)] = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath))
.sorted
.reverse
for (batchId <- batchIds) {
val batch = get(batchId)
if (batch.isDefined) {
return Some((batchId, batch.get))
}
}
None
}
But it's a list operation which doesn't read the data content and when you talk about 13GB, I understand that it's the content of the metadata files, right? And if the cleanup is enabled, it should never grow so much.


From that I have some questions:
- @Rachana, did you change the configuration?
- @Rachana, do you have an exact stack trace for your error and what happened before? If it has to be fixed, it should facilitate the work. Maybe the error is elsewhere, or something else produces the OOM?
- for the snippet with addBatch, I'm wondering why we couldn't directly look for the compacted file or for the batch file directly? The exactly-once semantic is about the micro-batch and not the whole history, nope?
💡 Well, probably I got the answer just after writing the question :P If the clean up is enabled, having the direct match at the file number would break the exactly-once because we would rewrite the cleaned metadata file. But IMO, it would be really weird because to reprocess the past  batch (older than minBatchesToRetain), we should keep the checkpoint metadata (one with Kafka offsets) and since they follow the same cleanup policy as sink metadata, it should never happen. Or if it happens, it's only because of manual user changes.

Best,
Bartosz.

On Fri, Jun 19, 2020 at 11:03 AM Jacek Laskowski <[hidden email]> wrote:
Hi,

While we're at it, my basic understanding of the metadata directory is that simply two recent compacts and the non-compact files in-between are really necessary. Is my understanding correct?

On Fri, Jun 19, 2020 at 2:16 AM Jungtaek Lim <[hidden email]> wrote:
Shall we document the known issue on file stream sink and provide workaround? There's more than a couple of questions about this in a couple of months, and there have been 5 related issues. The workaround Burak provided looks nice to those who don't need to have end-to-end exactly once semantics (and in many cases they are OK with the semantics).

On Fri, Jun 19, 2020 at 8:05 AM Burak Yavuz <[hidden email]> wrote:
Hi Rachana,

If you don't need exactly once semantics, you can use foreachBatch to write your data.
df.writeStream.foreachBatch { case (df, batchId) =>
  df.write.mode("append").format(...).save(path)
}

However, I would highly recommend upgrading to some ACID data store project like Delta Lake (which natively supports streaming), Iceberg or Hudi. 

Best,
Burak

On Thu, Jun 18, 2020 at 8:24 AM Rachana Srivastava <[hidden email]> wrote:
Thanks so much for your response.  I agree using Spark Streaming is not recommended.  But I want a stable system we cannot have a system that crashes every 5 days.  As seen in the picture below we have nearly 47 mb of data in the metadata folder.  Issue is when size of data increases to nearly 13 GB and driver memory is 5 GB that time we get OOM.  Not sure how to add TTL to metadata, if I delete metadata then I have to delete checkpoint hence loose the data.  

Inline image


On Thursday, June 18, 2020, 03:23:32 AM PDT, Jacek Laskowski <[hidden email]> wrote:


Hi Rachana,

> Should I go backward and use Spark Streaming DStream based.

No. Never. It's no longer supported (and should really be removed from the codebase once and for all - dreaming...).

Spark focuses on Spark SQL and Spark Structured Streaming as user-facing modules for batch and streaming queries, respectively.

Please note that I'm not a PMC member or even a committer so I'm speaking for myself only (not representing the project in an official way).

On Thu, Jun 18, 2020 at 12:03 AM Rachana Srivastava <[hidden email]> wrote:
Structured Stream Vs Spark Steaming (DStream)?

Which is recommended for system stability.  Exactly once is NOT first priority.  First priority is STABLE system.

I am I need to make a decision soon.  I need help.  Here is the question again.  Should I go backward and use Spark Streaming DStream based.  Write our own checkpoint and go from there.  At least we never encounter these metadata issues there.

Thanks,

Rachana

On Wednesday, June 17, 2020, 02:02:20 PM PDT, Jungtaek Lim <[hidden email]> wrote:


Just in case if anyone prefers ASF projects then there are other alternative projects in ASF as well, alphabetically, Apache Hudi [1] and Apache Iceberg [2]. Both are recently graduated as top level projects. (DISCLAIMER: I'm not involved in both.)

BTW it would be nice if we make the metadata implementation on file stream source/sink be pluggable - from what I've seen, plugin approach has been selected as the way to go whenever some part is going to be complicated and it becomes arguable whether the part should be handled in Spark project vs should be outside. e.g. checkpoint manager, state store provider, etc. It would open up chances for the ecosystem to play with the challenge "without completely re-writing the file stream source and sink", focusing on scalability for metadata in a long run query. Alternative projects described above will still provide more higher-level features and look attractive, but sometimes it may be just "using a sledgehammer to crack a nut".



On Thu, Jun 18, 2020 at 2:34 AM Tathagata Das <[hidden email]> wrote:
Hello Rachana,

Getting exactly-once semantics on files and making it scale to a very large number of files are very hard problems to solve. While Structured Streaming + built-in file sink solves the exactly-once guarantee that DStreams could not, it is definitely limited in other ways (scaling in terms of files, combining batch and streaming writes in the same place, etc). And solving this problem requires a holistic solution that is arguably beyond the scope of the Spark project. 

There are other projects that are trying to solve this file management issue. For example, Delta Lake (full disclosure, I am involved in it) was built to exactly solve this problem - get exactly-once and ACID guarantees on files, but also scale to handling millions of files. Please consider it as part of your solution. 




On Wed, Jun 17, 2020 at 9:50 AM Rachana Srivastava <[hidden email]> wrote:
I have written a simple spark structured steaming app to move data from Kafka to S3. Found that in order to support exactly-once guarantee spark creates _spark_metadata folder, which ends up growing too large as the streaming app is SUPPOSE TO run FOREVER. But when the streaming app runs for a long time the metadata folder grows so big that we start getting OOM errors. Only way to resolve OOM is delete Checkpoint and Metadata folder and loose VALUABLE customer data.

Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and SPARK-24295)

Since Spark Streaming was NOT broken like this. Is Spark Streaming a BETTER choice?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


--


--

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


--


--
Reply | Threaded
Open this post in threaded view
|

Re: Is Spark Structured Streaming TOTALLY BROKEN (Spark Metadata Issues)

Rachana Srivastava
Thanks so much Bartosz!!  Really nice blog and video.  I have not had any issue for last 3 weeks after I added following settings:

--conf spark.sql.streaming.minBatchesToRetain=1 \
--conf spark.sql.streaming.fileSink.log.deletion=true \
--conf spark.sql.streaming.fileSink.log.compactInterval=3 \
--conf spark.sql.streaming.fileSink.log.cleanupDelay=0 \


Thanks,

Rachana

On Saturday, July 11, 2020, 09:46:59 PM PDT, Bartosz Konieczny <[hidden email]> wrote:


Hi Rachana,

I'm really sorry but I'm hard to convince when I don't see the stack traces 🤓 But last weeks I took more time to check the OOM issue on the file sink and indeed, as you pointed out in the JIRAs, there is something to work on here. I wrote a short blog post about this topic, describing possible workarounds and the ongoing community effort to implement a real fix.  https://www.waitingforcode.com/apache-spark-structured-streaming/file-sink-out-of-memory-risk/read 

On Tue, Jun 23, 2020 at 6:38 AM Bartosz Konieczny <[hidden email]> wrote:
Thank you but let's stop focusing on the metadata size :P For me, the error is just saying that there is an OOM, that's all. Maybe it's the root cause of your problem or not. Do you have the trace of anything happening before the ERROR, maybe within the failing micro-batch?
Or what makes you think it's the root cause? It will work every time when you remove the metadata because Spark will start as a new application. So if for whatever reason, it fails always at the same record (say offset=1000 in Kafka topic "a"), it will run correctly during several days to process all records prior offset=1000 and fail just after. Just saying, that it may not be related to the metadata file.

So, few other questions too that may guide you to identify the root cause:
1.
2. Use from_json API from Spark to extract your data for further transformation in a dataset.
   Dataset<Row> dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event"))
       ....withColumn("oem_id", col("metadata.oem_id"));
Does your code fail every time at the same moment, ie. in the offset metadata file you retrieve the same/approximately the same offset? If yes, it can be the indication that something is wrong with the data, like a really big JSON.

2.
3. Construct a temp table of above dataset using SQLContext
   SQLContext sqlContext = new SQLContext(sparkSession);
   dataDf.createOrReplaceTempView("event");
Hmmm, I don't fully understand the purpose of this snippet. 

3.
4. Flatten events since Parquet does not support hierarachical data.
What is the logic behind it? An expression, a custom code? Any snippet? Maybe you're doing a memory-intensive operation or accumulating data in memory that can also lead to OOM problems?

Otherwise, if you are sure at 100% that the reason for your error comes from the metadata files, an extra suggestion to the previous ones as a workaround:
1. Listen for the job's outcome - on AWS you can use Lambda functions https://www.waitingforcode.com/data-aws/listening-emr-events-aws-lambda/read, no idea for the other runtimes
2. Write a function that will keep the last ".compact" file in _spark_metadata AND the corresponding files in offsets/ and commits/ checkpoint directories. By "keeping" I mean that you can move these files into a new checkpoint directory and use this new location on your application. Trigger the function when the job's outcome is an error.
3. Resubmit the job to the cluster from your function.
BTW I'm just thinking now, if you do the steps 2 and 3 manually (better to move them to the new location for this manual test, do not remove them physically), you'll see whether the problem comes from the metadata file or not. Logically, you'll have only a few mb of metadata and your processing should restart (= argument of 13GB doesn't stand anymore, unless your kept .compact file has 13GB) . The drawback of this approach is that you won't be able to reprocess previous micro-batches because you won't have the checkpoint files for them.

Apart from that, I have no other ideas.

Best,
Bartosz.
 

On Mon, Jun 22, 2020 at 8:28 PM Rachana Srivastava <[hidden email]> wrote:
The error that I got when metadata size reached above 13GB is following.  
Error message:
20/06/02 17:27:08 ERROR Client: Application diagnostics message: Application application_1590784455092_0001 failed 2 times in previous 3600000 milliseconds due to AM Container for appattempt_1590784455092_0001_000004 exited with  exitCode: -104
Failing this attempt.Diagnostics: Container [pid=25010,containerID=container_1590784455092_0001_04_000001] is running beyond physical memory limits. Current usage: 5.5 GB of 5.5 GB physical memory used; 7.5 GB of 27.5 GB virtual memory used. Killing container.

Thanks,

Rachana

On Friday, June 19, 2020, 02:14:34 PM PDT, Rachana Srivastava <[hidden email]> wrote:


Thanks so much Bartosz,

I have restarted the system with the metadata delete settings.  I do not have logs at this moment.  But when the issue happen again I will try to capture the log and ask you if I need any help with the analysis.

Thanks again a lot!

Rachana


On Friday, June 19, 2020, 04:41:13 AM PDT, Bartosz Konieczny <[hidden email]> wrote:


Makes sense Jungtaek 👍  But for the compaction we always take the compaction files according to the compaction interval, right? So never all batches from the beginning:
/**
* Returns all valid batches before the specified `compactionBatchId`. They contain all logs we
* need to do a new compaction.
*
* E.g., if `compactInterval` is 3 and `compactionBatchId` is 5, this method should returns
* `Seq(2, 3, 4)` (Note: it includes the previous compaction batch 2).
*/
def getValidBatchesBeforeCompactionBatch(
compactionBatchId: Long,
compactInterval: Int): Seq[Long] = {
assert(isCompactionBatch(compactionBatchId, compactInterval),
s"$compactionBatchId is not a compaction batch")
(math.max(0, compactionBatchId - compactInterval)) until compactionBatchId
}

private def compact(batchId: Long, logs: Array[T]): Boolean = {
val validBatches = getValidBatchesBeforeCompactionBatch(batchId, compactInterval)
val allLogs = validBatches.map { id =>
super.get(id).getOrElse {
throw new IllegalStateException(
s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId " +
s"(compactInterval: $compactInterval)")
}
}.flatten ++ logs
// Return false as there is another writer.
super.add(batchId, compactLogs(allLogs).toArray)
}

I don't see how the compaction process itself, supposing we talk about the sink, we keep the defaults and not write nearly 1GB per metadata file, could lead to the memory issues 🤔 ? And regarding Rachana's screenshot, the compacted files have only 5MB and we can see that the compaction happens every 10 files, which is the default compact interval. In theory I got the point but for this specific use case it's hard to imagine. Or maybe I misunderstood the problem and the point is that at some point in time, there are 10 consecutive mico-batches that produce 13GB but it seems quite surprising too.

I suppose that the root cause may be located elsewhere but it's hard to confirm since we don't have the logs or other debugging information.



On Fri, Jun 19, 2020 at 1:09 PM Jungtaek Lim <[hidden email]> wrote:
To whom who would like to understand the details on file stream source and sink, you'll want to look more closely on CompactibleFileStreamLog, and imagine how compact, get, allFiles will work with batch log files when a compact batch file is GBs. Memory issue for example, it materializes all entries into memory to run compact, produce result of get or allFiles. Memory footprint would be definitely smaller than the file size, but still be huge in such case.

On Fri, Jun 19, 2020 at 7:42 PM Bartosz Konieczny <[hidden email]> wrote:
Hi,

Since a few weeks ago I took a closer look at the sink and metadata, I will try to help and complete Jacek's point. Correct me if I'm wrong but that's what I observed for the sink:

- there is no place where Apache Spark keeps all metadata files in the main memory nor their content - eventually, it can only keep the objects representing the files of the currently executed micro-batch
- the metadata files are compressed every spark.sql.streaming.fileSource.log.compactInterval log files
- if spark.sql.streaming.fileSink.log.deletion is set to true (default), the metadata will be cleaned and cleaned with an extra delay of spark.sql.streaming.fileSink.log.cleanupDelay milliseconds
- only spark.sql.streaming.minBatchesToRetain metadata files should be kept; applies  globally: checkpoint metadata, state store and file sink; default to 100

A single place where I see a potential OOM because of listing of all metadata files is FileSreamSink class, here:
override def addBatch(batchId: Long, data: DataFrame): Unit = {
if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) {
logInfo(s"Skipping already committed batch $batchId")
}
// ...
override def getLatest(): Option[(Long, T)] = {
val batchIds = fileManager.list(metadataPath, batchFilesFilter)
.map(f => pathToBatchId(f.getPath))
.sorted
.reverse
for (batchId <- batchIds) {
val batch = get(batchId)
if (batch.isDefined) {
return Some((batchId, batch.get))
}
}
None
}
But it's a list operation which doesn't read the data content and when you talk about 13GB, I understand that it's the content of the metadata files, right? And if the cleanup is enabled, it should never grow so much.


From that I have some questions:
- @Rachana, did you change the configuration?
- @Rachana, do you have an exact stack trace for your error and what happened before? If it has to be fixed, it should facilitate the work. Maybe the error is elsewhere, or something else produces the OOM?
- for the snippet with addBatch, I'm wondering why we couldn't directly look for the compacted file or for the batch file directly? The exactly-once semantic is about the micro-batch and not the whole history, nope?
💡 Well, probably I got the answer just after writing the question :P If the clean up is enabled, having the direct match at the file number would break the exactly-once because we would rewrite the cleaned metadata file. But IMO, it would be really weird because to reprocess the past  batch (older than minBatchesToRetain), we should keep the checkpoint metadata (one with Kafka offsets) and since they follow the same cleanup policy as sink metadata, it should never happen. Or if it happens, it's only because of manual user changes.

Best,
Bartosz.

On Fri, Jun 19, 2020 at 11:03 AM Jacek Laskowski <[hidden email]> wrote:
Hi,

While we're at it, my basic understanding of the metadata directory is that simply two recent compacts and the non-compact files in-between are really necessary. Is my understanding correct?

On Fri, Jun 19, 2020 at 2:16 AM Jungtaek Lim <[hidden email]> wrote:
Shall we document the known issue on file stream sink and provide workaround? There's more than a couple of questions about this in a couple of months, and there have been 5 related issues. The workaround Burak provided looks nice to those who don't need to have end-to-end exactly once semantics (and in many cases they are OK with the semantics).

On Fri, Jun 19, 2020 at 8:05 AM Burak Yavuz <[hidden email]> wrote:
Hi Rachana,

If you don't need exactly once semantics, you can use foreachBatch to write your data.
df.writeStream.foreachBatch { case (df, batchId) =>
  df.write.mode("append").format(...).save(path)
}

However, I would highly recommend upgrading to some ACID data store project like Delta Lake (which natively supports streaming), Iceberg or Hudi. 

Best,
Burak

On Thu, Jun 18, 2020 at 8:24 AM Rachana Srivastava <[hidden email]> wrote:
Thanks so much for your response.  I agree using Spark Streaming is not recommended.  But I want a stable system we cannot have a system that crashes every 5 days.  As seen in the picture below we have nearly 47 mb of data in the metadata folder.  Issue is when size of data increases to nearly 13 GB and driver memory is 5 GB that time we get OOM.  Not sure how to add TTL to metadata, if I delete metadata then I have to delete checkpoint hence loose the data.  

Inline image


On Thursday, June 18, 2020, 03:23:32 AM PDT, Jacek Laskowski <[hidden email]> wrote:


Hi Rachana,

> Should I go backward and use Spark Streaming DStream based.

No. Never. It's no longer supported (and should really be removed from the codebase once and for all - dreaming...).

Spark focuses on Spark SQL and Spark Structured Streaming as user-facing modules for batch and streaming queries, respectively.

Please note that I'm not a PMC member or even a committer so I'm speaking for myself only (not representing the project in an official way).

On Thu, Jun 18, 2020 at 12:03 AM Rachana Srivastava <[hidden email]> wrote:
Structured Stream Vs Spark Steaming (DStream)?

Which is recommended for system stability.  Exactly once is NOT first priority.  First priority is STABLE system.

I am I need to make a decision soon.  I need help.  Here is the question again.  Should I go backward and use Spark Streaming DStream based.  Write our own checkpoint and go from there.  At least we never encounter these metadata issues there.

Thanks,

Rachana

On Wednesday, June 17, 2020, 02:02:20 PM PDT, Jungtaek Lim <[hidden email]> wrote:


Just in case if anyone prefers ASF projects then there are other alternative projects in ASF as well, alphabetically, Apache Hudi [1] and Apache Iceberg [2]. Both are recently graduated as top level projects. (DISCLAIMER: I'm not involved in both.)

BTW it would be nice if we make the metadata implementation on file stream source/sink be pluggable - from what I've seen, plugin approach has been selected as the way to go whenever some part is going to be complicated and it becomes arguable whether the part should be handled in Spark project vs should be outside. e.g. checkpoint manager, state store provider, etc. It would open up chances for the ecosystem to play with the challenge "without completely re-writing the file stream source and sink", focusing on scalability for metadata in a long run query. Alternative projects described above will still provide more higher-level features and look attractive, but sometimes it may be just "using a sledgehammer to crack a nut".



On Thu, Jun 18, 2020 at 2:34 AM Tathagata Das <[hidden email]> wrote:
Hello Rachana,

Getting exactly-once semantics on files and making it scale to a very large number of files are very hard problems to solve. While Structured Streaming + built-in file sink solves the exactly-once guarantee that DStreams could not, it is definitely limited in other ways (scaling in terms of files, combining batch and streaming writes in the same place, etc). And solving this problem requires a holistic solution that is arguably beyond the scope of the Spark project. 

There are other projects that are trying to solve this file management issue. For example, Delta Lake (full disclosure, I am involved in it) was built to exactly solve this problem - get exactly-once and ACID guarantees on files, but also scale to handling millions of files. Please consider it as part of your solution. 




On Wed, Jun 17, 2020 at 9:50 AM Rachana Srivastava <[hidden email]> wrote:
I have written a simple spark structured steaming app to move data from Kafka to S3. Found that in order to support exactly-once guarantee spark creates _spark_metadata folder, which ends up growing too large as the streaming app is SUPPOSE TO run FOREVER. But when the streaming app runs for a long time the metadata folder grows so big that we start getting OOM errors. Only way to resolve OOM is delete Checkpoint and Metadata folder and loose VALUABLE customer data.

Spark open JIRAs SPARK-24295 and SPARK-29995, SPARK-30462, and SPARK-24295)

Since Spark Streaming was NOT broken like this. Is Spark Streaming a BETTER choice?

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


--


--

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


--


--


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
12