OOM while processing read/write to S3 using Spark Structured Streaming

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

OOM while processing read/write to S3 using Spark Structured Streaming

Rachana Srivastava
Issue: I am trying to process 5000+ files of gzipped json file periodically from S3 using Structured Streaming code. 

Here are the key steps:
  1. Read json schema and broadccast to executors

  2. Read Stream

    Dataset inputDS = sparkSession.readStream() .format("text") .option("inferSchema", "true") .option("header", "true") .option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE") .json(inputPath + "/*");

  3. Process each file in a map Dataset ds = inputDS.map(x -> { ... }, Encoders.STRING());

  4. Write output to S3

    StreamingQuery query = ds .coalesce(1) .writeStream() .outputMode("append") .format("csv") ... .start();

maxFilesPerTrigger is set to 500 so I was hoping the streaming will pick only that many file to process. Why are we getting OOM? If in a we have more than 3500 files then system crashes with OOM.


Reply | Threaded
Open this post in threaded view
|

Re: OOM while processing read/write to S3 using Spark Structured Streaming

Jungtaek Lim-2
Please provide logs and dump file for the OOM case - otherwise no one could say what's the cause.

Add JVM options to driver/executor => -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath="...dir..."

On Sun, Jul 19, 2020 at 6:56 PM Rachana Srivastava <[hidden email]> wrote:
Issue: I am trying to process 5000+ files of gzipped json file periodically from S3 using Structured Streaming code. 

Here are the key steps:
  1. Read json schema and broadccast to executors

  2. Read Stream

    Dataset inputDS = sparkSession.readStream() .format("text") .option("inferSchema", "true") .option("header", "true") .option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE") .json(inputPath + "/*");

  3. Process each file in a map Dataset ds = inputDS.map(x -> { ... }, Encoders.STRING());

  4. Write output to S3

    StreamingQuery query = ds .coalesce(1) .writeStream() .outputMode("append") .format("csv") ... .start();

maxFilesPerTrigger is set to 500 so I was hoping the streaming will pick only that many file to process. Why are we getting OOM? If in a we have more than 3500 files then system crashes with OOM.


Reply | Threaded
Open this post in threaded view
|

Re: OOM while processing read/write to S3 using Spark Structured Streaming

Sanjeev Mishra
Can you reduce maxFilesPerTrigger further and see if the OOM still persists, if it does then the problem may be somewhere else.

On Jul 19, 2020, at 5:37 AM, Jungtaek Lim <[hidden email]> wrote:

Please provide logs and dump file for the OOM case - otherwise no one could say what's the cause.

Add JVM options to driver/executor => -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath="...dir..."

On Sun, Jul 19, 2020 at 6:56 PM Rachana Srivastava <[hidden email]> wrote:
Issue: I am trying to process 5000+ files of gzipped json file periodically from S3 using Structured Streaming code. 

Here are the key steps:
  1. Read json schema and broadccast to executors
  2. Read Stream

    Dataset inputDS = sparkSession.readStream() .format("text") .option("inferSchema", "true") .option("header", "true") .option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE") .json(inputPath + "/*");
  3. Process each file in a map Dataset ds = inputDS.map(x -> { ... }, Encoders.STRING());
  4. Write output to S3

    StreamingQuery query = ds .coalesce(1) .writeStream() .outputMode("append") .format("csv") ... .start();

maxFilesPerTrigger is set to 500 so I was hoping the streaming will pick only that many file to process. Why are we getting OOM? If in a we have more than 3500 files then system crashes with OOM.



Reply | Threaded
Open this post in threaded view
|

Re: OOM while processing read/write to S3 using Spark Structured Streaming

Piyush Acharya
In reply to this post by Rachana Srivastava
Please try with maxBytesPerTrigger option, probably files are big enough to crash the JVM.  
Please give some info on Executors and file info ( size etc) 

Regards,
..Piyush

On Sun, Jul 19, 2020 at 3:29 PM Rachana Srivastava <[hidden email]> wrote:
Issue: I am trying to process 5000+ files of gzipped json file periodically from S3 using Structured Streaming code. 

Here are the key steps:
  1. Read json schema and broadccast to executors

  2. Read Stream

    Dataset inputDS = sparkSession.readStream() .format("text") .option("inferSchema", "true") .option("header", "true") .option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE") .json(inputPath + "/*");

  3. Process each file in a map Dataset ds = inputDS.map(x -> { ... }, Encoders.STRING());

  4. Write output to S3

    StreamingQuery query = ds .coalesce(1) .writeStream() .outputMode("append") .format("csv") ... .start();

maxFilesPerTrigger is set to 500 so I was hoping the streaming will pick only that many file to process. Why are we getting OOM? If in a we have more than 3500 files then system crashes with OOM.