Process large JSON file without causing OOM

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Process large JSON file without causing OOM

Alec Swan
Hello,

I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB format. Effectively, my Java service starts up an embedded Spark cluster (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I keep getting OOM errors with large (~1GB) files.

I've tried different ways to reduce memory usage, e.g. by partitioning data with dataSet.partitionBy("customer).save(filePath), or capping memory usage by setting spark.executor.memory=1G, but to no vail.

I am wondering if there is a way to avoid OOM besides splitting the source JSON file into multiple smaller ones and processing the small ones individually? Does Spark SQL have to read the JSON/Snappy (row-based) file in it's entirety before converting it to ORC (columnar)? If so, would it make sense to create a custom receiver that reads the Snappy file and use Spark streaming for ORC conversion?

Thanks,

Alec
Reply | Threaded
Open this post in threaded view
|

Re: Process large JSON file without causing OOM

Joel D
Have you tried increasing driver, exec mem (gc overhead too if required)?

your code snippet and stack trace will be helpful.

On Mon, Nov 13, 2017 at 7:23 PM Alec Swan <[hidden email]> wrote:
Hello,

I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB format. Effectively, my Java service starts up an embedded Spark cluster (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I keep getting OOM errors with large (~1GB) files.

I've tried different ways to reduce memory usage, e.g. by partitioning data with dataSet.partitionBy("customer).save(filePath), or capping memory usage by setting spark.executor.memory=1G, but to no vail.

I am wondering if there is a way to avoid OOM besides splitting the source JSON file into multiple smaller ones and processing the small ones individually? Does Spark SQL have to read the JSON/Snappy (row-based) file in it's entirety before converting it to ORC (columnar)? If so, would it make sense to create a custom receiver that reads the Snappy file and use Spark streaming for ORC conversion?

Thanks,

Alec




Reply | Threaded
Open this post in threaded view
|

Re: Process large JSON file without causing OOM

Alec Swan
Hi Joel,

Here are the relevant snippets of my code and an OOM error thrown in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB even though I am running with -Xmx10G and 4G executor and driver memory as shown below.

        SparkConf sparkConf = new SparkConf()
                .setAppName("My Service")
                .setMaster("local[*]")
                .set("spark.ui.enabled", "true")
                .set("spark.executor.memory", "4G")
                .set("spark.driver.memory", "4G");

        sparkSessionBuilder = SparkSession.builder().config(sparkConf).enableHiveSupport();

        Dataset<Row> events = sparkSession.read()
                .format("json")
                .schema(inputConfig.getSchema())
                .load(inputFile.getPath());

        DataFrameWriter<Row> frameWriter = events.selectExpr(JavaConversions.asScalaBuffer(outputSchema.getColumns())) // select "data.customer AS `customer`", ...
                .write()
                .options(outputConfig.getProperties()) // compression=zlib
                .format("orc")
                .partitionBy(JavaConversions.asScalaBuffer(outputSchema.getPartitions())) // partition by "customer"
                .save(outputUri.getPath());


Here is the error log I get at runtime:

17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms
17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy]
java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid3790.hprof ...
Heap dump file created [62653841 bytes in 2.212 secs]
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing "kill -9 3790"...


And here is the thread from the thread dump that caused OOM:
  
"Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE
at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:48)
at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123)
at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
   Local Variable: byte[]#3957
   Local Variable: org.apache.hadoop.io.compress.BlockDecompressorStream#1
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
   Local Variable: org.apache.hadoop.mapreduce.lib.input.SplitLineReader#1
   Local Variable: org.apache.hadoop.io.Text#5
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
   Local Variable: org.apache.hadoop.mapreduce.lib.input.LineRecordReader#1
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
   Local Variable: org.apache.spark.sql.execution.datasources.RecordReaderIterator#1
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
   Local Variable: org.apache.spark.sql.execution.datasources.HadoopFileLinesReader#1
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
   Local Variable: scala.collection.Iterator$$anon$12#1
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
   Local Variable: org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1#1
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
   Local Variable: org.apache.spark.sql.execution.UnsafeExternalRowSorter#1
   Local Variable: org.apache.spark.executor.TaskMetrics#2
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
   Local Variable: org.apache.spark.sql.execution.SortExec$$anonfun$1#2
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   Local Variable: scala.collection.Iterator$$anon$11#2
   Local Variable: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25#2
   Local Variable: java.lang.Integer#1
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   Local Variable: org.apache.spark.sql.execution.datasources.FilePartition#2
   Local Variable: org.apache.spark.storage.StorageLevel#1
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   Local Variable: org.apache.spark.rdd.MapPartitionsRDD#4
   Local Variable: org.apache.spark.serializer.JavaSerializerInstance#4
   Local Variable: scala.Tuple2#1572
   Local Variable: org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1#2
   Local Variable: scala.Tuple2#1571
   Local Variable: org.apache.spark.TaskContextImpl#1
at org.apache.spark.scheduler.Task.run(Task.scala:108)
   Local Variable: org.apache.spark.scheduler.ResultTask#2
   Local Variable: org.apache.spark.metrics.MetricsSystem#1
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
   Local Variable: org.apache.spark.serializer.JavaSerializerInstance#5
   Local Variable: org.apache.spark.memory.TaskMemoryManager#1
   Local Variable: sun.management.ThreadImpl#1
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   Local Variable: java.util.concurrent.ThreadPoolExecutor#6
   Local Variable: org.apache.spark.executor.Executor$TaskRunner#1
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#26
at java.lang.Thread.run(Thread.java:745)



Thanks,

Alec

On Mon, Nov 13, 2017 at 8:30 PM, Joel D <[hidden email]> wrote:
Have you tried increasing driver, exec mem (gc overhead too if required)?

your code snippet and stack trace will be helpful.

On Mon, Nov 13, 2017 at 7:23 PM Alec Swan <[hidden email]> wrote:
Hello,

I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB format. Effectively, my Java service starts up an embedded Spark cluster (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I keep getting OOM errors with large (~1GB) files.

I've tried different ways to reduce memory usage, e.g. by partitioning data with dataSet.partitionBy("customer).save(filePath), or capping memory usage by setting spark.executor.memory=1G, but to no vail.

I am wondering if there is a way to avoid OOM besides splitting the source JSON file into multiple smaller ones and processing the small ones individually? Does Spark SQL have to read the JSON/Snappy (row-based) file in it's entirety before converting it to ORC (columnar)? If so, would it make sense to create a custom receiver that reads the Snappy file and use Spark streaming for ORC conversion?

Thanks,

Alec





Reply | Threaded
Open this post in threaded view
|

Re: Process large JSON file without causing OOM

vaquar khan
In reply to this post by Alec Swan

On Mon, Nov 13, 2017 at 6:22 PM, Alec Swan <[hidden email]> wrote:
Hello,

I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB format. Effectively, my Java service starts up an embedded Spark cluster (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I keep getting OOM errors with large (~1GB) files.

I've tried different ways to reduce memory usage, e.g. by partitioning data with dataSet.partitionBy("customer).save(filePath), or capping memory usage by setting spark.executor.memory=1G, but to no vail.

I am wondering if there is a way to avoid OOM besides splitting the source JSON file into multiple smaller ones and processing the small ones individually? Does Spark SQL have to read the JSON/Snappy (row-based) file in it's entirety before converting it to ORC (columnar)? If so, would it make sense to create a custom receiver that reads the Snappy file and use Spark streaming for ORC conversion?

Thanks,

Alec



--
Regards,
Vaquar Khan
+1 -224-436-0783
Greater Chicago
Reply | Threaded
Open this post in threaded view
|

Re: Process large JSON file without causing OOM

Sonal Goyal
In reply to this post by Alec Swan
If you are running Spark with local[*] as master, there will be a single process whose memory will be controlled by --driver-memory command line option to spark submit. Check 


spark.driver.memory1gAmount of memory to use for the driver process, i.e. where SparkContext is initialized. (e.g. 1g2g). 
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.

Thanks,
Sonal
Nube Technologies 





On Tue, Nov 14, 2017 at 9:37 AM, Alec Swan <[hidden email]> wrote:
Hi Joel,

Here are the relevant snippets of my code and an OOM error thrown in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB even though I am running with -Xmx10G and 4G executor and driver memory as shown below.

        SparkConf sparkConf = new SparkConf()
                .setAppName("My Service")
                .setMaster("local[*]")
                .set("spark.ui.enabled", "true")
                .set("spark.executor.memory", "4G")
                .set("spark.driver.memory", "4G");

        sparkSessionBuilder = SparkSession.builder().config(sparkConf).enableHiveSupport();

        Dataset<Row> events = sparkSession.read()
                .format("json")
                .schema(inputConfig.getSchema())
                .load(inputFile.getPath());

        DataFrameWriter<Row> frameWriter = events.selectExpr(JavaConversions.asScalaBuffer(outputSchema.getColumns())) // select "data.customer AS `customer`", ...
                .write()
                .options(outputConfig.getProperties()) // compression=zlib
                .format("orc")
                .partitionBy(JavaConversions.asScalaBuffer(outputSchema.getPartitions())) // partition by "customer"
                .save(outputUri.getPath());


Here is the error log I get at runtime:

17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms
17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy]
java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid3790.hprof ...
Heap dump file created [62653841 bytes in 2.212 secs]
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing "kill -9 3790"...


And here is the thread from the thread dump that caused OOM:
  
"Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE
at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:48)
at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123)
at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
   Local Variable: byte[]#3957
   Local Variable: org.apache.hadoop.io.compress.BlockDecompressorStream#1
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
   Local Variable: org.apache.hadoop.mapreduce.lib.input.SplitLineReader#1
   Local Variable: org.apache.hadoop.io.Text#5
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
   Local Variable: org.apache.hadoop.mapreduce.lib.input.LineRecordReader#1
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
   Local Variable: org.apache.spark.sql.execution.datasources.RecordReaderIterator#1
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
   Local Variable: org.apache.spark.sql.execution.datasources.HadoopFileLinesReader#1
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
   Local Variable: scala.collection.Iterator$$anon$12#1
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
   Local Variable: org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1#1
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
   Local Variable: org.apache.spark.sql.execution.UnsafeExternalRowSorter#1
   Local Variable: org.apache.spark.executor.TaskMetrics#2
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
   Local Variable: org.apache.spark.sql.execution.SortExec$$anonfun$1#2
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   Local Variable: scala.collection.Iterator$$anon$11#2
   Local Variable: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25#2
   Local Variable: java.lang.Integer#1
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   Local Variable: org.apache.spark.sql.execution.datasources.FilePartition#2
   Local Variable: org.apache.spark.storage.StorageLevel#1
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   Local Variable: org.apache.spark.rdd.MapPartitionsRDD#4
   Local Variable: org.apache.spark.serializer.JavaSerializerInstance#4
   Local Variable: scala.Tuple2#1572
   Local Variable: org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1#2
   Local Variable: scala.Tuple2#1571
   Local Variable: org.apache.spark.TaskContextImpl#1
at org.apache.spark.scheduler.Task.run(Task.scala:108)
   Local Variable: org.apache.spark.scheduler.ResultTask#2
   Local Variable: org.apache.spark.metrics.MetricsSystem#1
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
   Local Variable: org.apache.spark.serializer.JavaSerializerInstance#5
   Local Variable: org.apache.spark.memory.TaskMemoryManager#1
   Local Variable: sun.management.ThreadImpl#1
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   Local Variable: java.util.concurrent.ThreadPoolExecutor#6
   Local Variable: org.apache.spark.executor.Executor$TaskRunner#1
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#26
at java.lang.Thread.run(Thread.java:745)



Thanks,

Alec

On Mon, Nov 13, 2017 at 8:30 PM, Joel D <[hidden email]> wrote:
Have you tried increasing driver, exec mem (gc overhead too if required)?

your code snippet and stack trace will be helpful.

On Mon, Nov 13, 2017 at 7:23 PM Alec Swan <[hidden email]> wrote:
Hello,

I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB format. Effectively, my Java service starts up an embedded Spark cluster (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I keep getting OOM errors with large (~1GB) files.

I've tried different ways to reduce memory usage, e.g. by partitioning data with dataSet.partitionBy("customer).save(filePath), or capping memory usage by setting spark.executor.memory=1G, but to no vail.

I am wondering if there is a way to avoid OOM besides splitting the source JSON file into multiple smaller ones and processing the small ones individually? Does Spark SQL have to read the JSON/Snappy (row-based) file in it's entirety before converting it to ORC (columnar)? If so, would it make sense to create a custom receiver that reads the Snappy file and use Spark streaming for ORC conversion?

Thanks,

Alec






Reply | Threaded
Open this post in threaded view
|

Re: Process large JSON file without causing OOM

Alec Swan
Thanks all. I am not submitting a spark job explicitly. Instead, I am using the Spark library functionality embedded in my web service as shown in the code I included in the previous email. So, effectively Spark SQL runs in the web service's JVM. Therefore, --driver-memory option would not (and did not) work for me. 

I did try setting the following environment variables SPARK_DRIVER_MEMORY=5g;SPARK_EXECUTOR_MEMORY=5g but they didn't have any effect. Passing "-Dspark.executor.memory=6g  -Dspark.driver.memory=6g" JVM parameters had the same effect as setting them in SparkConf in the code, i.e. they showed up in Spark UI but I still got OOM.

My use case is somewhat strange because I just wanted to use Spark SQL library for it's multi-format (ORC, Parquet, JSON) support but I really didn't really need the rest of Spark functionality. Should I be considering submitting my Spark code as a job (to be run locally) from the web service code?

So far, in this thread we've been focusing on configuring larger memory pools. But I wonder if there is a way to stream/batch the content of JSON file in order to convert it to ORC piecemeal and avoid reading the whole JSON file in memory in the first place?




Thanks,

Alec

On Tue, Nov 14, 2017 at 2:58 AM, Sonal Goyal <[hidden email]> wrote:
If you are running Spark with local[*] as master, there will be a single process whose memory will be controlled by --driver-memory command line option to spark submit. Check 


spark.driver.memory1gAmount of memory to use for the driver process, i.e. where SparkContext is initialized. (e.g. 1g2g). 
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.

Thanks,
Sonal
Nube Technologies 





On Tue, Nov 14, 2017 at 9:37 AM, Alec Swan <[hidden email]> wrote:
Hi Joel,

Here are the relevant snippets of my code and an OOM error thrown in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB even though I am running with -Xmx10G and 4G executor and driver memory as shown below.

        SparkConf sparkConf = new SparkConf()
                .setAppName("My Service")
                .setMaster("local[*]")
                .set("spark.ui.enabled", "true")
                .set("spark.executor.memory", "4G")
                .set("spark.driver.memory", "4G");

        sparkSessionBuilder = SparkSession.builder().config(sparkConf).enableHiveSupport();

        Dataset<Row> events = sparkSession.read()
                .format("json")
                .schema(inputConfig.getSchema())
                .load(inputFile.getPath());

        DataFrameWriter<Row> frameWriter = events.selectExpr(JavaConversions.asScalaBuffer(outputSchema.getColumns())) // select "data.customer AS `customer`", ...
                .write()
                .options(outputConfig.getProperties()) // compression=zlib
                .format("orc")
                .partitionBy(JavaConversions.asScalaBuffer(outputSchema.getPartitions())) // partition by "customer"
                .save(outputUri.getPath());


Here is the error log I get at runtime:

17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms
17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy]
java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid3790.hprof ...
Heap dump file created [62653841 bytes in 2.212 secs]
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing "kill -9 3790"...


And here is the thread from the thread dump that caused OOM:
  
"Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE
at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:48)
at org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123)
at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
   Local Variable: byte[]#3957
   Local Variable: org.apache.hadoop.io.compress.BlockDecompressorStream#1
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
   Local Variable: org.apache.hadoop.mapreduce.lib.input.SplitLineReader#1
   Local Variable: org.apache.hadoop.io.Text#5
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
   Local Variable: org.apache.hadoop.mapreduce.lib.input.LineRecordReader#1
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
   Local Variable: org.apache.spark.sql.execution.datasources.RecordReaderIterator#1
at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
   Local Variable: org.apache.spark.sql.execution.datasources.HadoopFileLinesReader#1
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
   Local Variable: scala.collection.Iterator$$anon$12#1
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
   Local Variable: org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1#1
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
   Local Variable: org.apache.spark.sql.execution.UnsafeExternalRowSorter#1
   Local Variable: org.apache.spark.executor.TaskMetrics#2
at org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
   Local Variable: org.apache.spark.sql.execution.SortExec$$anonfun$1#2
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   Local Variable: scala.collection.Iterator$$anon$11#2
   Local Variable: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25#2
   Local Variable: java.lang.Integer#1
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   Local Variable: org.apache.spark.sql.execution.datasources.FilePartition#2
   Local Variable: org.apache.spark.storage.StorageLevel#1
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   Local Variable: org.apache.spark.rdd.MapPartitionsRDD#4
   Local Variable: org.apache.spark.serializer.JavaSerializerInstance#4
   Local Variable: scala.Tuple2#1572
   Local Variable: org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1#2
   Local Variable: scala.Tuple2#1571
   Local Variable: org.apache.spark.TaskContextImpl#1
at org.apache.spark.scheduler.Task.run(Task.scala:108)
   Local Variable: org.apache.spark.scheduler.ResultTask#2
   Local Variable: org.apache.spark.metrics.MetricsSystem#1
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
   Local Variable: org.apache.spark.serializer.JavaSerializerInstance#5
   Local Variable: org.apache.spark.memory.TaskMemoryManager#1
   Local Variable: sun.management.ThreadImpl#1
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   Local Variable: java.util.concurrent.ThreadPoolExecutor#6
   Local Variable: org.apache.spark.executor.Executor$TaskRunner#1
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#26
at java.lang.Thread.run(Thread.java:745)



Thanks,

Alec

On Mon, Nov 13, 2017 at 8:30 PM, Joel D <[hidden email]> wrote:
Have you tried increasing driver, exec mem (gc overhead too if required)?

your code snippet and stack trace will be helpful.

On Mon, Nov 13, 2017 at 7:23 PM Alec Swan <[hidden email]> wrote:
Hello,

I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB format. Effectively, my Java service starts up an embedded Spark cluster (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I keep getting OOM errors with large (~1GB) files.

I've tried different ways to reduce memory usage, e.g. by partitioning data with dataSet.partitionBy("customer).save(filePath), or capping memory usage by setting spark.executor.memory=1G, but to no vail.

I am wondering if there is a way to avoid OOM besides splitting the source JSON file into multiple smaller ones and processing the small ones individually? Does Spark SQL have to read the JSON/Snappy (row-based) file in it's entirety before converting it to ORC (columnar)? If so, would it make sense to create a custom receiver that reads the Snappy file and use Spark streaming for ORC conversion?

Thanks,

Alec







Reply | Threaded
Open this post in threaded view
|

Re: Process large JSON file without causing OOM

Steve Loughran


On 14 Nov 2017, at 15:32, Alec Swan <[hidden email]> wrote:

 But I wonder if there is a way to stream/batch the content of JSON file in order to convert it to ORC piecemeal and avoid reading the whole JSON file in memory in the first place?




That is what you'll need to do; you'd hit similar problems if you had the same files, same allocated JVM space and the same # of threads trying to read in the files.

Jackson has a streaming API: http://www.baeldung.com/jackson-streaming-api
Reply | Threaded
Open this post in threaded view
|

Re: Process large JSON file without causing OOM

Vadim Semenov
In reply to this post by Alec Swan
There's a lot of off-heap memory involved in decompressing Snappy, compressing ZLib.

Since you're running using `local[*]`, you process multiple tasks simultaneously, so they all might consume memory.

I don't think that increasing heap will help, since it looks like you're hitting system memory limits.

I'd suggest trying to run with `local[2]` and checking what's the memory usage of the jvm process.

On Mon, Nov 13, 2017 at 7:22 PM, Alec Swan <[hidden email]> wrote:
Hello,

I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB format. Effectively, my Java service starts up an embedded Spark cluster (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I keep getting OOM errors with large (~1GB) files.

I've tried different ways to reduce memory usage, e.g. by partitioning data with dataSet.partitionBy("customer).save(filePath), or capping memory usage by setting spark.executor.memory=1G, but to no vail.

I am wondering if there is a way to avoid OOM besides splitting the source JSON file into multiple smaller ones and processing the small ones individually? Does Spark SQL have to read the JSON/Snappy (row-based) file in it's entirety before converting it to ORC (columnar)? If so, would it make sense to create a custom receiver that reads the Snappy file and use Spark streaming for ORC conversion?

Thanks,

Alec

Reply | Threaded
Open this post in threaded view
|

Re: Process large JSON file without causing OOM

Alec Swan
Thanks Steve and Vadim for the feedback.

@Steve, are you suggesting creating a custom receiver and somehow piping it through Spark Streaming/Spark SQL? Or are you suggesting creating smaller datasets from the stream and using my original code to process smaller datasets? It'd be very helpful for a novice, like myself, if you could provide code samples or links to docs/articles.

@Vadim, I ran my test with local[1] and got OOM in the same place. What puzzles me is that when I expect the heap dump with VisualVM (see below) it says that the heap is pretty small ~35MB. I am running my test with "-Xmx10G -Dspark.executor.memory=6g  -Dspark.driver.memory=6g" JVM opts and I can see them reflected in Spark UI. Am I missing some memory settings?

    Date taken: Wed Nov 15 10:46:06 MST 2017
    File: /tmp/java_pid69786.hprof
    File size: 59.5 MB

    Total bytes: 39,728,337
    Total classes: 15,749
    Total instances: 437,979
    Classloaders: 123
    GC roots: 2,831
    Number of objects pending for finalization: 5,198


Thanks,

Alec

On Wed, Nov 15, 2017 at 11:15 AM, Vadim Semenov <[hidden email]> wrote:
There's a lot of off-heap memory involved in decompressing Snappy, compressing ZLib.

Since you're running using `local[*]`, you process multiple tasks simultaneously, so they all might consume memory.

I don't think that increasing heap will help, since it looks like you're hitting system memory limits.

I'd suggest trying to run with `local[2]` and checking what's the memory usage of the jvm process.

On Mon, Nov 13, 2017 at 7:22 PM, Alec Swan <[hidden email]> wrote:
Hello,

I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB format. Effectively, my Java service starts up an embedded Spark cluster (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I keep getting OOM errors with large (~1GB) files.

I've tried different ways to reduce memory usage, e.g. by partitioning data with dataSet.partitionBy("customer).save(filePath), or capping memory usage by setting spark.executor.memory=1G, but to no vail.

I am wondering if there is a way to avoid OOM besides splitting the source JSON file into multiple smaller ones and processing the small ones individually? Does Spark SQL have to read the JSON/Snappy (row-based) file in it's entirety before converting it to ORC (columnar)? If so, would it make sense to create a custom receiver that reads the Snappy file and use Spark streaming for ORC conversion?

Thanks,

Alec


Reply | Threaded
Open this post in threaded view
|

Re: Process large JSON file without causing OOM

Alec Swan
Pinging back to see if anybody could provide me with some pointers on hot to stream/batch JSON-to-ORC conversion in Spark SQL or why I get an OOM dump with such small memory footprint?

Thanks,

Alec

On Wed, Nov 15, 2017 at 11:03 AM, Alec Swan <[hidden email]> wrote:
Thanks Steve and Vadim for the feedback.

@Steve, are you suggesting creating a custom receiver and somehow piping it through Spark Streaming/Spark SQL? Or are you suggesting creating smaller datasets from the stream and using my original code to process smaller datasets? It'd be very helpful for a novice, like myself, if you could provide code samples or links to docs/articles.

@Vadim, I ran my test with local[1] and got OOM in the same place. What puzzles me is that when I expect the heap dump with VisualVM (see below) it says that the heap is pretty small ~35MB. I am running my test with "-Xmx10G -Dspark.executor.memory=6g  -Dspark.driver.memory=6g" JVM opts and I can see them reflected in Spark UI. Am I missing some memory settings?

    Date taken: Wed Nov 15 10:46:06 MST 2017
    File: /tmp/java_pid69786.hprof
    File size: 59.5 MB

    Total bytes: 39,728,337
    Total classes: 15,749
    Total instances: 437,979
    Classloaders: 123
    GC roots: 2,831
    Number of objects pending for finalization: 5,198


Thanks,

Alec

On Wed, Nov 15, 2017 at 11:15 AM, Vadim Semenov <[hidden email]> wrote:
There's a lot of off-heap memory involved in decompressing Snappy, compressing ZLib.

Since you're running using `local[*]`, you process multiple tasks simultaneously, so they all might consume memory.

I don't think that increasing heap will help, since it looks like you're hitting system memory limits.

I'd suggest trying to run with `local[2]` and checking what's the memory usage of the jvm process.

On Mon, Nov 13, 2017 at 7:22 PM, Alec Swan <[hidden email]> wrote:
Hello,

I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB format. Effectively, my Java service starts up an embedded Spark cluster (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I keep getting OOM errors with large (~1GB) files.

I've tried different ways to reduce memory usage, e.g. by partitioning data with dataSet.partitionBy("customer).save(filePath), or capping memory usage by setting spark.executor.memory=1G, but to no vail.

I am wondering if there is a way to avoid OOM besides splitting the source JSON file into multiple smaller ones and processing the small ones individually? Does Spark SQL have to read the JSON/Snappy (row-based) file in it's entirety before converting it to ORC (columnar)? If so, would it make sense to create a custom receiver that reads the Snappy file and use Spark streaming for ORC conversion?

Thanks,

Alec