Spark 0.8.1: I can't solve java.lang.OutOfMemoryError: GC overhead limit exceeded Exception

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark 0.8.1: I can't solve java.lang.OutOfMemoryError: GC overhead limit exceeded Exception

ngoclinh
This post has NOT been accepted by the mailing list yet.
This post was updated on .
Hi,
I run spark in two PC.
It has RAM: 2G
My data files size is 4G. I set persist RDD in DISK_ONLY. But it sill have exception as follows:
14/01/09 08:35:50 ERROR executor.Executor: Exception in task ID 1861
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.lang.StringBuilder.toString(StringBuilder.java:405)
        at java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3068)
        at java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2864)
        at java.io.ObjectInputStream.readString(ObjectInputStream.java:1638)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1341)
        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
        at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:101)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:440)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:26)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:27)
        at scala.collection.Iterator$class.foreach(Iterator.scala:772)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:24)
        at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:128)
        at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:118)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
        at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:38)
        at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:118)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:32)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:226)
        at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:32)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:237)

My code in Driver Program as:

 def main(args: Array[String]): Unit = {
      val start = System.currentTimeMillis
      System.setProperty("spark.driver.host", System.getenv("SPARK_LOCAL_IP"))
      System.setProperty("spark.driver.port", "8498")
      System.setProperty("spark.default.parallelism", "64")
      System.setProperty("spark.storage.memoryFraction","0.4")
      System.err.println("Bien IP: " + System.getenv("SPARK_LOCAL_IP"))
      val fsubFields = new FSubFields()
      val hourlyFields = new HourlyFields()
      val isdn : Int = fsubFields.ISDN
      val callingNumber : Int = hourlyFields.CALLING_NUMBER
      val spark = new SparkContext(args(0), "JoinFile","/home/hduser/spark/spark-0.8.1-incubating")
      try{
          val file1 = spark.textFile("hdfs://master:8024/subs")
          val file2 = spark.textFile("hdfs://master:8024/output/hourly")
         
          val set1 = file1.map(line => convertToKeyValue(line,4));
            set1.persist(StorageLevel.DISK_ONLY)
         
          val set2 = file2.map(line => convertToKeyValue(line,15));
          set2.persist(StorageLevel.DISK_ONLY)
             
          val join2set = set2.leftOuterJoin(set1)
          join2set.persist(StorageLevel.DISK_ONLY)

          var mapKeyValue = join2set.map(joined => createKeyValueToGroup(joined._2))
           mapKeyValue.persist(StorageLevel.DISK_ONLY)
         
          var result = mapKeyValue.groupByKey().map(item =>calculateDailyData(item))
          result.persist(StorageLevel.DISK_ONLY)
         

          result.saveAsTextFile("hdfs://master:8024/output/daily")
        } catch {
          case e : Exception => {System.err.println("Linhntn: ")
                                  e.printStackTrace
                                 }
         
        }
      val finish = System.currentTimeMillis
      println("Time running : " + (finish-start)/1000 + "s" )
  }

I attack stderr for tracking.stderr.txt
Please check for helping me.

Thank all!
Reply | Threaded
Open this post in threaded view
|

Re: Spark 0.8.1: I can't solve java.lang.OutOfMemoryError: GC overhead limit exceeded Exception

ngoclinh
This post has NOT been accepted by the mailing list yet.
I checked that it will be failed when I call leftOuterJoin in 2 RDD.
1 RDD about 1.5G
1 RDD about 3.2G
It throws exception like above