Excessive disk IO with Spark structured streaming

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Excessive disk IO with Spark structured streaming

Sergey Oboguev
I am trying to run a Spark structured streaming program simulating basic scenario of ingesting events and calculating aggregates on a window with watermark, and I am observing an inordinate amount of disk IO Spark performs.

The basic structure of the program is like this:

sparkSession = SparkSession.builder()
                           .appName(....)
                           .master("local[*]")
                           .config("spark.executor.memory", "8g")
                           .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                           .config("spark.kryoserializer.buffer", "8m")
                           .config("spark.local.dir", ...local directory...)
                           .getOrCreate();

sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the app ...);


dataset = sparkSession.readStream()
                      .option("checkpointLocation", ... checkpoint dir for source ...)
                      .format(MockStreamingSource.class.getName())
                      .load();

Dataset<Row> ds = dataset
                      .withWatermark("timestamp", "10 minutes")
                      .groupBy(
                              functions.window(functions.col("timestamp"), "2 minutes"),
                              functions.col("source"))
                      .agg(
                              functions.avg("D0").as("AVG_D0"),
                              functions.avg("I0").as("AVG_I0"));

DataStreamWriter<Row> dsw = ds.writeStream()
                              // .trigger(Trigger.ProcessingTime("1 minute"))
                              .option("checkpointLocation", .. checkpoint dir for writer ... );

dsw.outputMode(OutputMode.Append())
   .format("console")
   .option("truncate", "false")
   .option("numRows", Integer.MAX_VALUE)
   .start()
   .awaitTermination();

MockStreamingSource is just that -- a source intended to provide a simulated input. It generates microbatches of mock events and sends them to the app. In the testing scenario, the source simulates 20,000 devices each sending an event every 15 seconds for 11.5 minutes of logical time (just under 12 minutes of window size + watermark), for a total number of 920,000 events.

I initially started with microbatch sized to 500 events, and processing performance was totally dismal because of disk IO. I then increased microbatch size and performance got better, but still very poor. Microbatch size now is 13,334 events per batch, this corresponds to ingestion interval of 10 seconds. Smaller batches resulted in worse performance.

But even with microbatch sized 13,334 event performance is poor because of excessive disk IO generated by Spark.
Just ingesting data generated intra-app takes the program physical time equal to 40% of window size + watermark.

Using strace, I measured that checkpoint directory for the stream writer receives the following number of Linux system calls:
create/open file = 60,500 calls
mkdir = 57,000
readlink = 59,000
unlink = 41,900
rename = 14,700
execve readlink=353,000 (incl. repetitive searches of readlink executable in 6 different locations)
execve chmod=340,620 (incl. repetitive searches of chmod executable in 6 different locations)
In addition, Spark local directory received:
create/open file = 55,000 calls
unlink = 13,800
stat = 42,000
That's for mere 920,000 of small events (each event Row is 600 bytes when in Java heap).

I also tried trigger(...) to see whether it can improve anything, but it just made things worse.

Spark version 2.4.6.

Is this an expected amount of disk IO for Spark, or am I doing something wrong and there is a way to avoid Spark generating such an amount of disk IO?
Reply | Threaded
Open this post in threaded view
|

Re: Excessive disk IO with Spark structured streaming

Jungtaek Lim-2
First of all, you'd want to divide these numbers by the number of micro-batches, as file creations in checkpoint directory would occur similarly per micro-batch.
Second, you'd want to dive inside the checkpoint directory and have separate numbers per top-subdirectory.

After that we can see whether the value would make sense or not.

Regarding file I/O issues on SS, two issues I know about are:
1) If you use streaming aggregation, it unnecessarily creates a temporary file for both read and write on the state store, while the file is only needed for writing. That makes the number of file creations to be 2x. The patch is proposed under SPARK-30294 [1].

2) Spark leverages HDFS API which is configured to create crc file per file by default. (So you'll have 2x files than expected.) There's a bug in HDFS API (HADOOP-16255 [2]) which missed to handle crc files during rename (in short of how checkpoint works in SS, temp file is atomically renamed to be the final file), and as a workaround (SPARK-28025 [3]) Spark tries to delete the crc file which two additional operations (exist -> delete) may occur per crc file.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)



On Sun, Oct 4, 2020 at 10:08 PM Sergey Oboguev <[hidden email]> wrote:
I am trying to run a Spark structured streaming program simulating basic scenario of ingesting events and calculating aggregates on a window with watermark, and I am observing an inordinate amount of disk IO Spark performs.

The basic structure of the program is like this:

sparkSession = SparkSession.builder()
                           .appName(....)
                           .master("local[*]")
                           .config("spark.executor.memory", "8g")
                           .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                           .config("spark.kryoserializer.buffer", "8m")
                           .config("spark.local.dir", ...local directory...)
                           .getOrCreate();

sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the app ...);


dataset = sparkSession.readStream()
                      .option("checkpointLocation", ... checkpoint dir for source ...)
                      .format(MockStreamingSource.class.getName())
                      .load();

Dataset<Row> ds = dataset
                      .withWatermark("timestamp", "10 minutes")
                      .groupBy(
                              functions.window(functions.col("timestamp"), "2 minutes"),
                              functions.col("source"))
                      .agg(
                              functions.avg("D0").as("AVG_D0"),
                              functions.avg("I0").as("AVG_I0"));

DataStreamWriter<Row> dsw = ds.writeStream()
                              // .trigger(Trigger.ProcessingTime("1 minute"))
                              .option("checkpointLocation", .. checkpoint dir for writer ... );

dsw.outputMode(OutputMode.Append())
   .format("console")
   .option("truncate", "false")
   .option("numRows", Integer.MAX_VALUE)
   .start()
   .awaitTermination();

MockStreamingSource is just that -- a source intended to provide a simulated input. It generates microbatches of mock events and sends them to the app. In the testing scenario, the source simulates 20,000 devices each sending an event every 15 seconds for 11.5 minutes of logical time (just under 12 minutes of window size + watermark), for a total number of 920,000 events.

I initially started with microbatch sized to 500 events, and processing performance was totally dismal because of disk IO. I then increased microbatch size and performance got better, but still very poor. Microbatch size now is 13,334 events per batch, this corresponds to ingestion interval of 10 seconds. Smaller batches resulted in worse performance.

But even with microbatch sized 13,334 event performance is poor because of excessive disk IO generated by Spark.
Just ingesting data generated intra-app takes the program physical time equal to 40% of window size + watermark.

Using strace, I measured that checkpoint directory for the stream writer receives the following number of Linux system calls:
create/open file = 60,500 calls
mkdir = 57,000
readlink = 59,000
unlink = 41,900
rename = 14,700
execve readlink=353,000 (incl. repetitive searches of readlink executable in 6 different locations)
execve chmod=340,620 (incl. repetitive searches of chmod executable in 6 different locations)
In addition, Spark local directory received:
create/open file = 55,000 calls
unlink = 13,800
stat = 42,000
That's for mere 920,000 of small events (each event Row is 600 bytes when in Java heap).

I also tried trigger(...) to see whether it can improve anything, but it just made things worse.

Spark version 2.4.6.

Is this an expected amount of disk IO for Spark, or am I doing something wrong and there is a way to avoid Spark generating such an amount of disk IO?
Reply | Threaded
Open this post in threaded view
|

Re: Excessive disk IO with Spark structured streaming

Sergey Oboguev
Hi Jungtaek,

Thanks for your response.

> you'd want to dive inside the checkpoint directory and have separate numbers per top-subdirectory
All the checkpoint store numbers are solely for the subdirectory set by
option("checkpointLocation", .. checkpoint dir for writer ... )

Other subdirectories are empty or nearly-empty.
> First of all, you'd want to divide these numbers by the number of micro-batches, as file creations in checkpoint directory would occur similarly per micro-batch
There were 69 microbatches, each containing 13,334 events.

Event's Row object size in Java heap is 620 bytes, thus the total amount of data in a microbatch (in terms of aggregate Java-heap objects sizes) is 8.3 MB.

Average number of system calls per microbatch was:

For query (writer) checkpoint directory:
create/open file = 877
mkdir = 826
readlink = 855
unlink = 607
rename = 213
execve readlink = 5116
execve chmod = 4937
For Spark local directory:
create/open file = 797
unlink = 200
mmap = 197
stat = 2391

(The number for local.stat in the previous message was incorrect).
Physical processing time per microbatch was 3.4 seconds.

That's to store a mere 8.3 MB of uncompressed (Java-heap) data!

Most created "delta" files have file size in the order of 1 KB or less.
"Snapshot" files are several KB in size.
One would think that the tiny size of created files is one key factor in dismal performance. It causes a very high number of system calls and also hugely fragments actual data IO.

As a result, using iostat, typical disk write rate was observed only ~ 100 KB/s.
(Read rate was near-zero, presumably because all data was in Linux block cache.)

Average CPU usage when ingesting data was in the order of 600% (i.e. 6 cores busy), I presume chiefly for serialization/deserialization, even though Kryo was enabled. But the machine has 16 cores (VCPUs), so the most immediate limiting factor must have been not CPU saturation but IO latency (unless there is some obscure setting limiting the number of reading/writing threads). The latency arising, fundamentally, out of very large number of tiny files.

Is there a way to control the size of checkpoint "delta" and "snapshot" files Spark creates, to make them larger?
And the same also for the files in Spark local directory?

* * *

The numbers for checkpoint directory are, of course, captured when it was set to a local drive (or Lustre/NFS.).

For HDFS there are obviously no local file system calls for the checkpoint store, as HDFS does not present itself as an OS-level file system. Nevertheless the name of checkpoint directory was transmitted over HDFS connection socket 1,675 times per microbatch, so the number of high-level HDFS file operations must have been at least that high.

* * *

On a related note, for 920,000 events Spark made 700,000 attempts to execute chmod or readlink program, i.e. to launch an external subprocess with an executable in order to perform a file operation. Those 900,000 attempts actually represent 150,000 cycles, and in each cycle Spark tried to launch the program from 6 different locations (/usr/local/sbin -> /usr/local/bin -> /usr/sbin -> /usr/bin -> /sbin -> /bin),  until it finally finds it in the last. But then on the next cycle Spark/Hadoop does not re-use the knowledge of a previously found utility location, and repeats the search from the very start causing useless file system search operations over and over again.

This may or may not matter when HDFS is used for checkpoint store (depending on how HDFS server implements the calls), but it does matter when a file system like Lustre or NFS is used for checkpoint storage.
(Not to mention spawning readlink and chmod does not seem like a bright idea in the first place, although perhaps there might be a reason why Hadoop layer does it this way).

Thanks,
Sergey

On Mon, Oct 5, 2020 at 5:45 AM Jungtaek Lim <[hidden email]> wrote:
First of all, you'd want to divide these numbers by the number of micro-batches, as file creations in checkpoint directory would occur similarly per micro-batch.
Second, you'd want to dive inside the checkpoint directory and have separate numbers per top-subdirectory.

After that we can see whether the value would make sense or not.

Regarding file I/O issues on SS, two issues I know about are:
1) If you use streaming aggregation, it unnecessarily creates a temporary file for both read and write on the state store, while the file is only needed for writing. That makes the number of file creations to be 2x. The patch is proposed under SPARK-30294 [1].

2) Spark leverages HDFS API which is configured to create crc file per file by default. (So you'll have 2x files than expected.) There's a bug in HDFS API (HADOOP-16255 [2]) which missed to handle crc files during rename (in short of how checkpoint works in SS, temp file is atomically renamed to be the final file), and as a workaround (SPARK-28025 [3]) Spark tries to delete the crc file which two additional operations (exist -> delete) may occur per crc file.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)



On Sun, Oct 4, 2020 at 10:08 PM Sergey Oboguev <[hidden email]> wrote:
I am trying to run a Spark structured streaming program simulating basic scenario of ingesting events and calculating aggregates on a window with watermark, and I am observing an inordinate amount of disk IO Spark performs.

The basic structure of the program is like this:

sparkSession = SparkSession.builder()
                           .appName(....)
                           .master("local[*]")
                           .config("spark.executor.memory", "8g")
                           .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                           .config("spark.kryoserializer.buffer", "8m")
                           .config("spark.local.dir", ...local directory...)
                           .getOrCreate();

sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the app ...);


dataset = sparkSession.readStream()
                      .option("checkpointLocation", ... checkpoint dir for source ...)
                      .format(MockStreamingSource.class.getName())
                      .load();

Dataset<Row> ds = dataset
                      .withWatermark("timestamp", "10 minutes")
                      .groupBy(
                              functions.window(functions.col("timestamp"), "2 minutes"),
                              functions.col("source"))
                      .agg(
                              functions.avg("D0").as("AVG_D0"),
                              functions.avg("I0").as("AVG_I0"));

DataStreamWriter<Row> dsw = ds.writeStream()
                              // .trigger(Trigger.ProcessingTime("1 minute"))
                              .option("checkpointLocation", .. checkpoint dir for writer ... );

dsw.outputMode(OutputMode.Append())
   .format("console")
   .option("truncate", "false")
   .option("numRows", Integer.MAX_VALUE)
   .start()
   .awaitTermination();

MockStreamingSource is just that -- a source intended to provide a simulated input. It generates microbatches of mock events and sends them to the app. In the testing scenario, the source simulates 20,000 devices each sending an event every 15 seconds for 11.5 minutes of logical time (just under 12 minutes of window size + watermark), for a total number of 920,000 events.

I initially started with microbatch sized to 500 events, and processing performance was totally dismal because of disk IO. I then increased microbatch size and performance got better, but still very poor. Microbatch size now is 13,334 events per batch, this corresponds to ingestion interval of 10 seconds. Smaller batches resulted in worse performance.

But even with microbatch sized 13,334 event performance is poor because of excessive disk IO generated by Spark.
Just ingesting data generated intra-app takes the program physical time equal to 40% of window size + watermark.

Using strace, I measured that checkpoint directory for the stream writer receives the following number of Linux system calls:
create/open file = 60,500 calls
mkdir = 57,000
readlink = 59,000
unlink = 41,900
rename = 14,700
execve readlink=353,000 (incl. repetitive searches of readlink executable in 6 different locations)
execve chmod=340,620 (incl. repetitive searches of chmod executable in 6 different locations)
In addition, Spark local directory received:
create/open file = 55,000 calls
unlink = 13,800
stat = 42,000
That's for mere 920,000 of small events (each event Row is 600 bytes when in Java heap).

I also tried trigger(...) to see whether it can improve anything, but it just made things worse.

Spark version 2.4.6.

Is this an expected amount of disk IO for Spark, or am I doing something wrong and there is a way to avoid Spark generating such an amount of disk IO?
Reply | Threaded
Open this post in threaded view
|

Re: Excessive disk IO with Spark structured streaming

Jungtaek Lim-2
Replied inline.

On Tue, Oct 6, 2020 at 6:07 AM Sergey Oboguev <[hidden email]> wrote:
Hi Jungtaek,

Thanks for your response.

> you'd want to dive inside the checkpoint directory and have separate numbers per top-subdirectory
All the checkpoint store numbers are solely for the subdirectory set by
option("checkpointLocation", .. checkpoint dir for writer ... )


Other subdirectories are empty or nearly-empty.

I meant the subdirectory inside the directory you're providing as "checkpointLocation", as there're several directories in that directory, and they exist for different purposes. It'd be nice if we can determine whether the issue is all around these directories or specific to a directory.
 

> First of all, you'd want to divide these numbers by the number of micro-batches, as file creations in checkpoint directory would occur similarly per micro-batch
There were 69 microbatches, each containing 13,334 events.

Event's Row object size in Java heap is 620 bytes, thus the total amount of data in a microbatch (in terms of aggregate Java-heap objects sizes) is 8.3 MB.

Average number of system calls per microbatch was:

For query (writer) checkpoint directory:
create/open file = 877
mkdir = 826
readlink = 855
unlink = 607
rename = 213
execve readlink = 5116
execve chmod = 4937
For Spark local directory:
create/open file = 797
unlink = 200
mmap = 197
stat = 2391

(The number for local.stat in the previous message was incorrect).
Physical processing time per microbatch was 3.4 seconds.

That's to store a mere 8.3 MB of uncompressed (Java-heap) data!

Most created "delta" files have file size in the order of 1 KB or less.
"Snapshot" files are several KB in size.
One would think that the tiny size of created files is one key factor in dismal performance. It causes a very high number of system calls and also hugely fragments actual data IO.

The size of the delta file heavily depends on your stateful operation and data in each micro-batch. delta file only captures the "changes" of state in specific micro-batch, so there're cases you'll have very tiny delta files, e.g. cardinality of grouped key is small (hence cardinality of KVs is also small), small amount of inputs are provided per micro-batch, the overall size of aggregated row is small, there's skew on grouped key (hence some partitions get no input or small inputs), etc.

The snapshot files will be getting bigger as it contains all of the state KVs at the specific micro-batch, so you may not want to worry about that being small.
 

As a result, using iostat, typical disk write rate was observed only ~ 100 KB/s.
(Read rate was near-zero, presumably because all data was in Linux block cache.)

Average CPU usage when ingesting data was in the order of 600% (i.e. 6 cores busy), I presume chiefly for serialization/deserialization, even though Kryo was enabled. But the machine has 16 cores (VCPUs), so the most immediate limiting factor must have been not CPU saturation but IO latency (unless there is some obscure setting limiting the number of reading/writing threads). The latency arising, fundamentally, out of very large number of tiny files.

Is there a way to control the size of checkpoint "delta" and "snapshot" files Spark creates, to make them larger?

Unfortunately no - it's used for fault-tolerance guarantee (stateful exactly-once) per micro-batch. All stateful operations should write a delta file per shuffle partition (spark.sql.shuffle.partitions) per micro-batch.

The default value of shuffle partitions is 200, hence in each microbatch the query will create 200 files for each state store by default. (You can reduce this value from the start of the streaming query, so that's a thing you can tweak.) In reality, you still need to multiply it by 4, as there's also a crc file per file if HDFS API picks the filesystem as checksum file system, as well as Spark creates two files (read/write) for streaming aggregation. (I hope SPARK-30294 would address it - after that we no longer need to multiply by 2 because of read/write purpose.)

The only way for now to have less small files is increasing the interval of micro-batch, which may bring another concern, batch size (and output size) and output latency. That is a downside compared to what streaming frameworks provide - in streaming frameworks, having a longer interval of checkpoint only affects the amount of data to restore when failing. If you expect end-to-end exactly-once then output latency is also affected, but it's an option end users can tolerate.

Probably micro-batch could also decouple micro-batch interval and checkpoint interval to provide flexibility, say, I can tolerate reprocessing up to 10 mins of data being processed when fail occurs, but due to the output latency I should have micro-batch interval as 30 seconds. (In other words, do a checkpoint per around 20 micro-batches.) That is a bit tricky to implement actually, and also I don't see any request for this so that is just a sketched idea.

And the same also for the files in Spark local directory?

* * *

The numbers for checkpoint directory are, of course, captured when it was set to a local drive (or Lustre/NFS.).

For HDFS there are obviously no local file system calls for the checkpoint store, as HDFS does not present itself as an OS-level file system. Nevertheless the name of checkpoint directory was transmitted over HDFS connection socket 1,675 times per microbatch, so the number of high-level HDFS file operations must have been at least that high.

* * *

On a related note, for 920,000 events Spark made 700,000 attempts to execute chmod or readlink program, i.e. to launch an external subprocess with an executable in order to perform a file operation. Those 900,000 attempts actually represent 150,000 cycles, and in each cycle Spark tried to launch the program from 6 different locations (/usr/local/sbin -> /usr/local/bin -> /usr/sbin -> /usr/bin -> /sbin -> /bin),  until it finally finds it in the last. But then on the next cycle Spark/Hadoop does not re-use the knowledge of a previously found utility location, and repeats the search from the very start causing useless file system search operations over and over again.

This may or may not matter when HDFS is used for checkpoint store (depending on how HDFS server implements the calls), but it does matter when a file system like Lustre or NFS is used for checkpoint storage.
(Not to mention spawning readlink and chmod does not seem like a bright idea in the first place, although perhaps there might be a reason why Hadoop layer does it this way).

Thanks,
Sergey

On Mon, Oct 5, 2020 at 5:45 AM Jungtaek Lim <[hidden email]> wrote:
First of all, you'd want to divide these numbers by the number of micro-batches, as file creations in checkpoint directory would occur similarly per micro-batch.
Second, you'd want to dive inside the checkpoint directory and have separate numbers per top-subdirectory.

After that we can see whether the value would make sense or not.

Regarding file I/O issues on SS, two issues I know about are:
1) If you use streaming aggregation, it unnecessarily creates a temporary file for both read and write on the state store, while the file is only needed for writing. That makes the number of file creations to be 2x. The patch is proposed under SPARK-30294 [1].

2) Spark leverages HDFS API which is configured to create crc file per file by default. (So you'll have 2x files than expected.) There's a bug in HDFS API (HADOOP-16255 [2]) which missed to handle crc files during rename (in short of how checkpoint works in SS, temp file is atomically renamed to be the final file), and as a workaround (SPARK-28025 [3]) Spark tries to delete the crc file which two additional operations (exist -> delete) may occur per crc file.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)



On Sun, Oct 4, 2020 at 10:08 PM Sergey Oboguev <[hidden email]> wrote:
I am trying to run a Spark structured streaming program simulating basic scenario of ingesting events and calculating aggregates on a window with watermark, and I am observing an inordinate amount of disk IO Spark performs.

The basic structure of the program is like this:

sparkSession = SparkSession.builder()
                           .appName(....)
                           .master("local[*]")
                           .config("spark.executor.memory", "8g")
                           .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                           .config("spark.kryoserializer.buffer", "8m")
                           .config("spark.local.dir", ...local directory...)
                           .getOrCreate();

sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the app ...);


dataset = sparkSession.readStream()
                      .option("checkpointLocation", ... checkpoint dir for source ...)
                      .format(MockStreamingSource.class.getName())
                      .load();

Dataset<Row> ds = dataset
                      .withWatermark("timestamp", "10 minutes")
                      .groupBy(
                              functions.window(functions.col("timestamp"), "2 minutes"),
                              functions.col("source"))
                      .agg(
                              functions.avg("D0").as("AVG_D0"),
                              functions.avg("I0").as("AVG_I0"));

DataStreamWriter<Row> dsw = ds.writeStream()
                              // .trigger(Trigger.ProcessingTime("1 minute"))
                              .option("checkpointLocation", .. checkpoint dir for writer ... );

dsw.outputMode(OutputMode.Append())
   .format("console")
   .option("truncate", "false")
   .option("numRows", Integer.MAX_VALUE)
   .start()
   .awaitTermination();

MockStreamingSource is just that -- a source intended to provide a simulated input. It generates microbatches of mock events and sends them to the app. In the testing scenario, the source simulates 20,000 devices each sending an event every 15 seconds for 11.5 minutes of logical time (just under 12 minutes of window size + watermark), for a total number of 920,000 events.

I initially started with microbatch sized to 500 events, and processing performance was totally dismal because of disk IO. I then increased microbatch size and performance got better, but still very poor. Microbatch size now is 13,334 events per batch, this corresponds to ingestion interval of 10 seconds. Smaller batches resulted in worse performance.

But even with microbatch sized 13,334 event performance is poor because of excessive disk IO generated by Spark.
Just ingesting data generated intra-app takes the program physical time equal to 40% of window size + watermark.

Using strace, I measured that checkpoint directory for the stream writer receives the following number of Linux system calls:
create/open file = 60,500 calls
mkdir = 57,000
readlink = 59,000
unlink = 41,900
rename = 14,700
execve readlink=353,000 (incl. repetitive searches of readlink executable in 6 different locations)
execve chmod=340,620 (incl. repetitive searches of chmod executable in 6 different locations)
In addition, Spark local directory received:
create/open file = 55,000 calls
unlink = 13,800
stat = 42,000
That's for mere 920,000 of small events (each event Row is 600 bytes when in Java heap).

I also tried trigger(...) to see whether it can improve anything, but it just made things worse.

Spark version 2.4.6.

Is this an expected amount of disk IO for Spark, or am I doing something wrong and there is a way to avoid Spark generating such an amount of disk IO?
Reply | Threaded
Open this post in threaded view
|

Re: Excessive disk IO with Spark structured streaming

Sergey Oboguev
Hi Jungtaek,

> I meant the subdirectory inside the directory you're providing as "checkpointLocation", as there're several directories in that directory...
There are two:

my-spark-checkpoint-dir/MainApp
created by sparkSession.sparkContext().setCheckpointDir(<checkpoint dir for the app>)
contains only empty subdir with GUID name                        

my-spark-checkpoint-dir/writer
created by ds.writeStream().option("checkpointLocation", <checkpoint dir for writer>)
contains all the files     
Within the latter ("writer") there are four subdirectories: commits, metadata, offsets, state.

Breakdown of file creations within them, per 69 microbatches (when shuffle partition count = 200) is:
commits = 136
metadata = 0
offsets = 138
state = 56232  
(Creation is identified by strace record for "openat" system call with O_CREAT flag and file path in the corresponding directory.)

When shuffle partition count is 10, breakdown of file creations within them, per 69 microbatches, is:
commits = 136
metadata = 0
offsets = 138
state = 2760

> The size of the delta file heavily depends on your stateful operation and data in each micro-batch. delta file only captures the "changes" of state in specific micro-batch, so there're cases you'll have very tiny delta files, e.g. cardinality of grouped key is small (hence cardinality of KVs is also small), small amount of inputs are provided per micro-batch, the overall size of aggregated row is small, there's skew on grouped key (hence some partitions get no input or small inputs), etc.

In my case there is no key in the Row object (unless the bucketized "timestamp" for 2-min windows buckets becomes a key), and the microbatch is large enough: the whole problem is that Spark does not want to save the microbatch as a single file. Even after I reduce the number of shuffle partitions (see below), the number of files per microbatch remains significantly larger than the number of shuffle partitions.

..........

When the number of shuffle partitions is 200, Spark creates 816 files (per microbatch) in checkpoint store and 202 files in Spark local-dir.

Of checkpoint files: 24 per microbatch are snapshot files, and 788 are delta files.
The same per microbatch per partition: 0.12 snapshot files, 4 delta files.
Of local-dir files: 200 temp_shuffle files per microbatch (as expected) and 2 other files (shuffle.data+shuffle.index).

If I reduce the number of shuffle partitions, two things happen:
- Throughput of a single pipeline improves.
- CPU usage by the pipeline is reduced (allowing a single node to co-run larger number of pipelines).
Most of the improvements are gained by the time the number of partitions is reduced to 5-10.
Going below that, further improvements are marginal.

When reducing the number of shuffle partitions from 200 to 10, physical latency of data ingestion into the checkpoint is reduced 1.9 times, and CPU usage is reduced 2.6 times.

When reducing the number of shuffle partitions from 200 to 5, physical latency of data ingestion into the checkpoint is reduced 2.1 times, and CPU usage is reduced 4.5 times.

Still, latency remains high, because the number of created files per microbatch remains high.

..........

With 5 shuffle partitions, Spark creates 23.9 files (per microbatch) in checkpoint store and 6.9 files in Spark local-dir.
Of checkpoint files: 0.15 per microbatch are snapshot files, and 19.7 are delta files.
Of local-dir files: 4.93 temp_shuffle files per microbatch (as expected) and 2 other files (shuffle.data+shuffle.index).

Why would Spark need to create 20 delta files per microbatch, or to put it another way: 4 delta files per microbatch per shuffle partition?

One could try to guess this could be due to changing "timestamp", but this does not bear out. In my produced stream (69 microbatches) there are only 46 distinct values for timestamp, consecutively increasing from first timestamp to last. Thus lots of microbatches will have just one timestamp value. On average, microbatch will have 1.5 distinct timestamp values. But it would, of course, be terribly wrong for Spark to use raw timestamp value as a key, as in real world almost every event would have a unique timestamp, so the number of files required for saving by timestamp as a key would be insane, hopefully Spark does not attempt to do that. But perhaps it may use the index of 2-minute window bucket as a key. If so, there are only 6 distinct values per the whole event set (I have window size set to 2 minutes, watermark 10 minutes, and event set spans 11.5 minutes). Thus, 90% of microbatches will fall wholly in just one window bucket, and 10% in two buckets. So why 4 delta files per microbatch per shuffle partition?

..........

For the completeness of the picture, if I run the test with the shuffle partition count set to 1, then:
Spark creates 8 files (per microbatch) in the checkpoint store and 3 files in Spark local-dir.
Of checkpoint files: 0.03 per microbatch are snapshot files (only 2 snapshot files in the whole run), and 4 delta files per microbatch.
Of local-dir files: 1 temp_shuffle files per microbatch (as expected) and 2 other files (shuffle.data+shuffle.index).

..........

Thus Spark SS seems to keep 4 delta files per microbatch per shuffle partition, no matter what is the number of shuffle partitions.
Why would it have to do this?

Also unsure why Spark has to create local-dir files per every microbatch, rather than keeping them open across microbatches and re-using from one microbatch to another (writing data over, but without having to go through file creation)...

> Spark leverages HDFS API which is configured to create crc file per file by default.

This is unfortunate. Much better would be to create checkpoint files with HDFS CRC "shadow file" disabled, and having instead CRC (if desired) right inside the main file itself, rather than as a separate file.

* * *

While we are at it, I wished to ask a related question. Suppose I create multiple parallel streaming pipelines in the applications, by pipeline meaning the whole data stream from initial Dataset/DatasetReader to the output of Spark query. Suppose I have multiple parallel pipelines in the application, a large number, let us say dozens or hundreds.

How would Spark process them in terms of threading model? Will there be a separate thread per active stream/query or does Spark use a bounded thread pool? Do many streams/queries result in many threads, or a limited number of threads?

Thanks,
Sergey
Reply | Threaded
Open this post in threaded view
|

Re: Excessive disk IO with Spark structured streaming

Jungtaek Lim-2
I can't spend too much time on explaining one by one. I strongly encourage you to do a deep-dive instead of just looking around as you want to know about "details" - that's how open source works.

I'll go through a general explanation instead of replying inline; probably I'd write a blog doc if there's no existing doc (I guess there should be one) instead of putting too much time here.

In short, the reason Spark has to create these files "per micro-batch" is to ensure fault-tolerance. For example, If the query fails at batch 5 and you rerun the query, it should rerun batch 5. How?

Spark should be aware the offsets the query has been read for batch 4, preferably the offsets the query read for batch 5. They're offsets/commits.
State is for storing accumulated values on stateful operations. Same here - Spark should be able to read the state for batch 4 so that it can calculate the new accumulated values for batch 5. In addition, partition means max parallelism (they aren't aware of each other and they shouldn't), hence the state for partition should be stored individually.

Storing 4 files (in the end we'll only have "2" files, but here I count temp files with crc files, as we are talking about performance aspect) per partition per micro-batch is the thing I already explained - I agree it's not ideal, e.g. I submitted the PR for SPARK-30294 [1] which reduces the number of files by half. Probably we could propose Hadoop to skip creating CRC files (I'm not sure it can be simply done as of now), but Spark is conservative about upgrading the versions for dependencies so it might not be available soon even if we address it right away.

As you've found here it's super important to find the right value of shuffle partitions. It's partitioned by hash function, so it strongly depends on the group key. If the cardinality of group key is low, probably the right value of shuffle partitions should be fairly small. Unfortunately once the query runs you can't change the value of shuffle partitions, as Spark doesn't have the feature of state migration once the number of partitions change. Either you need to predict the overall cardinality at specific time and set the right value, or try to use a 3rd party state tool. [2] (DISCLAIMER: I'm the author.)



On Wed, Oct 7, 2020 at 11:16 PM Sergey Oboguev <[hidden email]> wrote:
Hi Jungtaek,

> I meant the subdirectory inside the directory you're providing as "checkpointLocation", as there're several directories in that directory...
There are two:

my-spark-checkpoint-dir/MainApp
created by sparkSession.sparkContext().setCheckpointDir(<checkpoint dir for the app>)
contains only empty subdir with GUID name                        

my-spark-checkpoint-dir/writer
created by ds.writeStream().option("checkpointLocation", <checkpoint dir for writer>)
contains all the files     
Within the latter ("writer") there are four subdirectories: commits, metadata, offsets, state.

Breakdown of file creations within them, per 69 microbatches (when shuffle partition count = 200) is:
commits = 136
metadata = 0
offsets = 138
state = 56232  
(Creation is identified by strace record for "openat" system call with O_CREAT flag and file path in the corresponding directory.)

When shuffle partition count is 10, breakdown of file creations within them, per 69 microbatches, is:
commits = 136
metadata = 0
offsets = 138
state = 2760

> The size of the delta file heavily depends on your stateful operation and data in each micro-batch. delta file only captures the "changes" of state in specific micro-batch, so there're cases you'll have very tiny delta files, e.g. cardinality of grouped key is small (hence cardinality of KVs is also small), small amount of inputs are provided per micro-batch, the overall size of aggregated row is small, there's skew on grouped key (hence some partitions get no input or small inputs), etc.

In my case there is no key in the Row object (unless the bucketized "timestamp" for 2-min windows buckets becomes a key), and the microbatch is large enough: the whole problem is that Spark does not want to save the microbatch as a single file. Even after I reduce the number of shuffle partitions (see below), the number of files per microbatch remains significantly larger than the number of shuffle partitions.

..........

When the number of shuffle partitions is 200, Spark creates 816 files (per microbatch) in checkpoint store and 202 files in Spark local-dir.

Of checkpoint files: 24 per microbatch are snapshot files, and 788 are delta files.
The same per microbatch per partition: 0.12 snapshot files, 4 delta files.
Of local-dir files: 200 temp_shuffle files per microbatch (as expected) and 2 other files (shuffle.data+shuffle.index).

If I reduce the number of shuffle partitions, two things happen:
- Throughput of a single pipeline improves.
- CPU usage by the pipeline is reduced (allowing a single node to co-run larger number of pipelines).
Most of the improvements are gained by the time the number of partitions is reduced to 5-10.
Going below that, further improvements are marginal.

When reducing the number of shuffle partitions from 200 to 10, physical latency of data ingestion into the checkpoint is reduced 1.9 times, and CPU usage is reduced 2.6 times.

When reducing the number of shuffle partitions from 200 to 5, physical latency of data ingestion into the checkpoint is reduced 2.1 times, and CPU usage is reduced 4.5 times.

Still, latency remains high, because the number of created files per microbatch remains high.

..........

With 5 shuffle partitions, Spark creates 23.9 files (per microbatch) in checkpoint store and 6.9 files in Spark local-dir.
Of checkpoint files: 0.15 per microbatch are snapshot files, and 19.7 are delta files.
Of local-dir files: 4.93 temp_shuffle files per microbatch (as expected) and 2 other files (shuffle.data+shuffle.index).

Why would Spark need to create 20 delta files per microbatch, or to put it another way: 4 delta files per microbatch per shuffle partition?

One could try to guess this could be due to changing "timestamp", but this does not bear out. In my produced stream (69 microbatches) there are only 46 distinct values for timestamp, consecutively increasing from first timestamp to last. Thus lots of microbatches will have just one timestamp value. On average, microbatch will have 1.5 distinct timestamp values. But it would, of course, be terribly wrong for Spark to use raw timestamp value as a key, as in real world almost every event would have a unique timestamp, so the number of files required for saving by timestamp as a key would be insane, hopefully Spark does not attempt to do that. But perhaps it may use the index of 2-minute window bucket as a key. If so, there are only 6 distinct values per the whole event set (I have window size set to 2 minutes, watermark 10 minutes, and event set spans 11.5 minutes). Thus, 90% of microbatches will fall wholly in just one window bucket, and 10% in two buckets. So why 4 delta files per microbatch per shuffle partition?

..........

For the completeness of the picture, if I run the test with the shuffle partition count set to 1, then:
Spark creates 8 files (per microbatch) in the checkpoint store and 3 files in Spark local-dir.
Of checkpoint files: 0.03 per microbatch are snapshot files (only 2 snapshot files in the whole run), and 4 delta files per microbatch.
Of local-dir files: 1 temp_shuffle files per microbatch (as expected) and 2 other files (shuffle.data+shuffle.index).

..........

Thus Spark SS seems to keep 4 delta files per microbatch per shuffle partition, no matter what is the number of shuffle partitions.
Why would it have to do this?

Also unsure why Spark has to create local-dir files per every microbatch, rather than keeping them open across microbatches and re-using from one microbatch to another (writing data over, but without having to go through file creation)...

> Spark leverages HDFS API which is configured to create crc file per file by default.

This is unfortunate. Much better would be to create checkpoint files with HDFS CRC "shadow file" disabled, and having instead CRC (if desired) right inside the main file itself, rather than as a separate file.

* * *

While we are at it, I wished to ask a related question. Suppose I create multiple parallel streaming pipelines in the applications, by pipeline meaning the whole data stream from initial Dataset/DatasetReader to the output of Spark query. Suppose I have multiple parallel pipelines in the application, a large number, let us say dozens or hundreds.

How would Spark process them in terms of threading model? Will there be a separate thread per active stream/query or does Spark use a bounded thread pool? Do many streams/queries result in many threads, or a limited number of threads?

Thanks,
Sergey
Reply | Threaded
Open this post in threaded view
|

Re: Excessive disk IO with Spark structured streaming

Jungtaek Lim-2
FYI, SPARK-30294 is merged and will be available in Spark 3.1.0. This reduces the number of temp files for the state store to half when you use streaming aggregation.


On Thu, Oct 8, 2020 at 11:55 AM Jungtaek Lim <[hidden email]> wrote:
I can't spend too much time on explaining one by one. I strongly encourage you to do a deep-dive instead of just looking around as you want to know about "details" - that's how open source works.

I'll go through a general explanation instead of replying inline; probably I'd write a blog doc if there's no existing doc (I guess there should be one) instead of putting too much time here.

In short, the reason Spark has to create these files "per micro-batch" is to ensure fault-tolerance. For example, If the query fails at batch 5 and you rerun the query, it should rerun batch 5. How?

Spark should be aware the offsets the query has been read for batch 4, preferably the offsets the query read for batch 5. They're offsets/commits.
State is for storing accumulated values on stateful operations. Same here - Spark should be able to read the state for batch 4 so that it can calculate the new accumulated values for batch 5. In addition, partition means max parallelism (they aren't aware of each other and they shouldn't), hence the state for partition should be stored individually.

Storing 4 files (in the end we'll only have "2" files, but here I count temp files with crc files, as we are talking about performance aspect) per partition per micro-batch is the thing I already explained - I agree it's not ideal, e.g. I submitted the PR for SPARK-30294 [1] which reduces the number of files by half. Probably we could propose Hadoop to skip creating CRC files (I'm not sure it can be simply done as of now), but Spark is conservative about upgrading the versions for dependencies so it might not be available soon even if we address it right away.

As you've found here it's super important to find the right value of shuffle partitions. It's partitioned by hash function, so it strongly depends on the group key. If the cardinality of group key is low, probably the right value of shuffle partitions should be fairly small. Unfortunately once the query runs you can't change the value of shuffle partitions, as Spark doesn't have the feature of state migration once the number of partitions change. Either you need to predict the overall cardinality at specific time and set the right value, or try to use a 3rd party state tool. [2] (DISCLAIMER: I'm the author.)



On Wed, Oct 7, 2020 at 11:16 PM Sergey Oboguev <[hidden email]> wrote:
Hi Jungtaek,

> I meant the subdirectory inside the directory you're providing as "checkpointLocation", as there're several directories in that directory...
There are two:

my-spark-checkpoint-dir/MainApp
created by sparkSession.sparkContext().setCheckpointDir(<checkpoint dir for the app>)
contains only empty subdir with GUID name                        

my-spark-checkpoint-dir/writer
created by ds.writeStream().option("checkpointLocation", <checkpoint dir for writer>)
contains all the files     
Within the latter ("writer") there are four subdirectories: commits, metadata, offsets, state.

Breakdown of file creations within them, per 69 microbatches (when shuffle partition count = 200) is:
commits = 136
metadata = 0
offsets = 138
state = 56232  
(Creation is identified by strace record for "openat" system call with O_CREAT flag and file path in the corresponding directory.)

When shuffle partition count is 10, breakdown of file creations within them, per 69 microbatches, is:
commits = 136
metadata = 0
offsets = 138
state = 2760

> The size of the delta file heavily depends on your stateful operation and data in each micro-batch. delta file only captures the "changes" of state in specific micro-batch, so there're cases you'll have very tiny delta files, e.g. cardinality of grouped key is small (hence cardinality of KVs is also small), small amount of inputs are provided per micro-batch, the overall size of aggregated row is small, there's skew on grouped key (hence some partitions get no input or small inputs), etc.

In my case there is no key in the Row object (unless the bucketized "timestamp" for 2-min windows buckets becomes a key), and the microbatch is large enough: the whole problem is that Spark does not want to save the microbatch as a single file. Even after I reduce the number of shuffle partitions (see below), the number of files per microbatch remains significantly larger than the number of shuffle partitions.

..........

When the number of shuffle partitions is 200, Spark creates 816 files (per microbatch) in checkpoint store and 202 files in Spark local-dir.

Of checkpoint files: 24 per microbatch are snapshot files, and 788 are delta files.
The same per microbatch per partition: 0.12 snapshot files, 4 delta files.
Of local-dir files: 200 temp_shuffle files per microbatch (as expected) and 2 other files (shuffle.data+shuffle.index).

If I reduce the number of shuffle partitions, two things happen:
- Throughput of a single pipeline improves.
- CPU usage by the pipeline is reduced (allowing a single node to co-run larger number of pipelines).
Most of the improvements are gained by the time the number of partitions is reduced to 5-10.
Going below that, further improvements are marginal.

When reducing the number of shuffle partitions from 200 to 10, physical latency of data ingestion into the checkpoint is reduced 1.9 times, and CPU usage is reduced 2.6 times.

When reducing the number of shuffle partitions from 200 to 5, physical latency of data ingestion into the checkpoint is reduced 2.1 times, and CPU usage is reduced 4.5 times.

Still, latency remains high, because the number of created files per microbatch remains high.

..........

With 5 shuffle partitions, Spark creates 23.9 files (per microbatch) in checkpoint store and 6.9 files in Spark local-dir.
Of checkpoint files: 0.15 per microbatch are snapshot files, and 19.7 are delta files.
Of local-dir files: 4.93 temp_shuffle files per microbatch (as expected) and 2 other files (shuffle.data+shuffle.index).

Why would Spark need to create 20 delta files per microbatch, or to put it another way: 4 delta files per microbatch per shuffle partition?

One could try to guess this could be due to changing "timestamp", but this does not bear out. In my produced stream (69 microbatches) there are only 46 distinct values for timestamp, consecutively increasing from first timestamp to last. Thus lots of microbatches will have just one timestamp value. On average, microbatch will have 1.5 distinct timestamp values. But it would, of course, be terribly wrong for Spark to use raw timestamp value as a key, as in real world almost every event would have a unique timestamp, so the number of files required for saving by timestamp as a key would be insane, hopefully Spark does not attempt to do that. But perhaps it may use the index of 2-minute window bucket as a key. If so, there are only 6 distinct values per the whole event set (I have window size set to 2 minutes, watermark 10 minutes, and event set spans 11.5 minutes). Thus, 90% of microbatches will fall wholly in just one window bucket, and 10% in two buckets. So why 4 delta files per microbatch per shuffle partition?

..........

For the completeness of the picture, if I run the test with the shuffle partition count set to 1, then:
Spark creates 8 files (per microbatch) in the checkpoint store and 3 files in Spark local-dir.
Of checkpoint files: 0.03 per microbatch are snapshot files (only 2 snapshot files in the whole run), and 4 delta files per microbatch.
Of local-dir files: 1 temp_shuffle files per microbatch (as expected) and 2 other files (shuffle.data+shuffle.index).

..........

Thus Spark SS seems to keep 4 delta files per microbatch per shuffle partition, no matter what is the number of shuffle partitions.
Why would it have to do this?

Also unsure why Spark has to create local-dir files per every microbatch, rather than keeping them open across microbatches and re-using from one microbatch to another (writing data over, but without having to go through file creation)...

> Spark leverages HDFS API which is configured to create crc file per file by default.

This is unfortunate. Much better would be to create checkpoint files with HDFS CRC "shadow file" disabled, and having instead CRC (if desired) right inside the main file itself, rather than as a separate file.

* * *

While we are at it, I wished to ask a related question. Suppose I create multiple parallel streaming pipelines in the applications, by pipeline meaning the whole data stream from initial Dataset/DatasetReader to the output of Spark query. Suppose I have multiple parallel pipelines in the application, a large number, let us say dozens or hundreds.

How would Spark process them in terms of threading model? Will there be a separate thread per active stream/query or does Spark use a bounded thread pool? Do many streams/queries result in many threads, or a limited number of threads?

Thanks,
Sergey