OOM when calling cache on RDD with big data

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

OOM when calling cache on RDD with big data

tdeng
Hi, Spark Users/Devs: 

I have a very simple job that simply caches the hadoopRDD by calling cache/persist on it. I tried MEMORY_ONLY, MEMORY_DISK and DISK_ONLY for caching strategy, I always get OOM on executors. And it works fine if I do not call cache or persist on the RDD: 

java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: Java heap space) 

java.io.ObjectOutputStream$HandleTable.growEntries(ObjectOutputStream.java:2350) 
java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2275) 
java.io.ObjectOutputStream.writeString(ObjectOutputStream.java:1301) 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1171) 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:27) 
org.apache.spark.serializer.SerializationStream$class.writeAll(Serializer.scala:80) 
org.apache.spark.serializer.JavaSerializationStream.writeAll(JavaSerializer.scala:25) 
org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:815) 
org.apache.spark.storage.DiskStore.putValues(DiskStore.scala:69) 
org.apache.spark.storage.BlockManager.liftedTree1$1(BlockManager.scala:562) 
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:546) 
org.apache.spark.storage.BlockManager.put(BlockManager.scala:477) 
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:76) 
org.apache.spark.rdd.RDD.iterator(RDD.scala:224) 
org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:32) 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) 
org.apache.spark.rdd.RDD.iterator(RDD.scala:226) 
org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:29) 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237) 
org.apache.spark.rdd.RDD.iterator(RDD.scala:226) 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:159) 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:100) 
org.apache.spark.scheduler.Task.run(Task.scala:53) 

I noticed https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/CacheManager.scala#L75
The "++=" materializes all the records from RDD, 
Is materializing all the records necessary? Since the iterator of the materialized records are then passed to Serializer to serialize the result. 

Or am I missing anything? 
Any suggestion is highly appreciated, thanks!