Please help: possible major memory problem in saveAsTextFile

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Please help: possible major memory problem in saveAsTextFile

Hussam_Jarada

Using spark 0.8.1 running in windows 7 against single master node (no cluster) with spark settings:

 

2014-01-09 16:44:31,045 DEBUG Spark properties  -----

spark.executor.memory= 8g

spark.cores.max= 4

spark.default.parallelism= 12

spark.storage.memoryFraction= null

spark.shuffle.compress= null

spark.rdd.compress= null

spark.akka.frameSize= null

spark.akka.threads= null

 

spark.scheduler.mode=fair

spark.shuffle.consolidateFiles=true

 

I have the following method java code that basically does the following:

1.       Open db connection

2.       Based on dataset size, I am looping x times doing inside the loop:

a.       query large datasets one large block at a time and save results as List<String>

b.      building JavaRDD<String> using SparkDriver.getSparkContext().parallelize(results, partitionNum)

c.       doing union on my JavaRDD with pervious iteration. // unionDataSetRDD = unionDataSetRDD.union(pervious iteration dataSetRDD);

 

3.       Then doing  unionDataSetRDD.coalesce(partitionNum);

4.        Then unionDataSetRDD.saveAsTextFile(tJavaRddFilePath); …. I am saving to local windows disk, i.e. no hdfs usage.

 

unionDataSetRDD.toDebugString shows

 

2014-01-09 16:45:15,832 DEBUG unionDataSetRDD: UnionRDD[6] at union at DatasetServiceImpl.java:256 (16 partitions)

  UnionRDD[4] at union at DatasetServiceImpl.java:256 (12 partitions)

    UnionRDD[2] at union at DatasetServiceImpl.java:256 (8 partitions)

      ParallelCollectionRDD[0] at parallelize at DatasetServiceImpl.java:250 (4 partitions)

      ParallelCollectionRDD[1] at parallelize at DatasetServiceImpl.java:253 (4 partitions)

    ParallelCollectionRDD[3] at parallelize at DatasetServiceImpl.java:253 (4 partitions)

  ParallelCollectionRDD[5] at parallelize at DatasetServiceImpl.java:253 (4 partitions)

 

 

The problem is upon checking java heap of my java process, I am seeing org.apache.spark.scheduler.DAGScheduler is keeping large memory reference in old heap area.

 

Forcing full GC via jconsole, did not free that large memory which I need it for next method invocation.

 

Looking to heap dump, it looks like DAGScheduler hold a reference to this large memory and is not allowing that RDD to get free even my java code is not using it nor have any reference to it after method invocation.

 

Can someone please to explain to me why DAGScheduler reference that memory block even when I did not invoke any cache or persist action on it, i.e. I do not have invoke any explicit caching or persisting via spark API nor have any reference in my java code to that unionDataSetRDD after method invocation (it’s a local variable in my method)?

 

 

Here’s heap dump

Class Name

Shallow Heap

Retained Heap

Percentage

·         <a href="mat://object/0x62cf83ae0">org.apache.spark.scheduler.DAGScheduler @ 0x62cf83ae0

128

1,627,202,208

98.89%

\

·         <a href="mat://object/0x667466838">org.apache.spark.scheduler.Stage @ 0x667466838

64

1,627,198,624

98.89%

.\

·         <a href="mat://object/0x667466878">org.apache.spark.rdd.MappedRDD @ 0x667466878

80

1,627,198,480

98.89%

..+

·         <a href="mat://object/0x62d58bcd0">org.apache.spark.rdd.ParallelCollectionRDD @ 0x62d58bcd0

88

4,862,256

0.30%

..+

·         <a href="mat://object/0x635328908">org.apache.spark.rdd.ParallelCollectionRDD @ 0x635328908

88

4,862,256

0.30%

..+

·         <a href="mat://object/0x6601f7d20">org.apache.spark.rdd.ParallelCollectionRDD @ 0x6601f7d20

88

4,862,256

0.30%

..+

·         <a href="mat://object/0x6674669c8">org.apache.spark.rdd.ParallelCollectionRDD @ 0x6674669c8

88

2,161,176

0.13%

..+

·         <a href="mat://object/0x6675df9f8">org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675df9f8

32

1,000,072

0.06%

..+

·         <a href="mat://object/0x6675dfa30">org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfa30

32

1,000,072

0.06%

..+

·         <a href="mat://object/0x6675dfa68">org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfa68

32

1,000,072

0.06%

..+

·         <a href="mat://object/0x6675dfaa0">org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfaa0

32

1,000,072

0.06%

..+

·         <a href="mat://object/0x6675dfc80">org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfc80

32

1,000,072

0.06%

..+

·         <a href="mat://object/0x6675dfcf8">org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfcf8

32

1,000,072

0.06%

..+

·         <a href="mat://object/0x668ece0c0">org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece0c0

32

1,000,072

0.06%

..+

·         <a href="mat://object/0x668ece138">org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece138

32

1,000,072

0.06%

..+

·         <a href="mat://object/0x668ece1b0">org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece1b0

32

1,000,072

0.06%

..+

·         <a href="mat://object/0x668ece228">org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece228

32

1,000,072

0.06%

..+

·         <a href="mat://object/0x668ece2a0">org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece2a0

32

1,000,072

0.06%

..+

·         <a href="mat://object/0x668ece318">org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece318

32

1,000,072

0.06%

..+

·         <a href="mat://object/0x667466a40">org.apache.spark.rdd.ParallelCollectionPartition @ 0x667466a40

32

406,168

0.02%

..+

·         <a href="mat://object/0x66753f200">org.apache.spark.rdd.ParallelCollectionPartition @ 0x66753f200

32

406,160

0.02%

..+

·         <a href="mat://object/0x66bc475c0">org.apache.spark.rdd.ParallelCollectionPartition @ 0x66bc475c0

32

406,160

0.02%

..+

·         <a href="mat://object/0x66bc47618">org.apache.spark.rdd.ParallelCollectionPartition @ 0x66bc47618

32

406,160

0.02%

 

 

Thanks,

Hussam

Reply | Threaded
Open this post in threaded view
|

Re: Please help: possible major memory problem in saveAsTextFile

Josh Rosen
I think that parallelize() is the problem here; the driver retains a copy of whatever was passed to parallelize() in order to enable fault-tolerance.  You could try opening your DB connections from inside of a transformation like mapPartitions() so that it's executed on the workers rather than the driver.


On Fri, Jan 10, 2014 at 9:26 AM, <[hidden email]> wrote:

Using spark 0.8.1 running in windows 7 against single master node (no cluster) with spark settings:

 

2014-01-09 16:44:31,045 DEBUG Spark properties  -----

spark.executor.memory= 8g

spark.cores.max= 4

spark.default.parallelism= 12

spark.storage.memoryFraction= null

spark.shuffle.compress= null

spark.rdd.compress= null

spark.akka.frameSize= null

spark.akka.threads= null

 

spark.scheduler.mode=fair

spark.shuffle.consolidateFiles=true

 

I have the following method java code that basically does the following:

1.       Open db connection

2.       Based on dataset size, I am looping x times doing inside the loop:

a.       query large datasets one large block at a time and save results as List<String>

b.      building JavaRDD<String> using SparkDriver.getSparkContext().parallelize(results, partitionNum)

c.       doing union on my JavaRDD with pervious iteration. // unionDataSetRDD = unionDataSetRDD.union(pervious iteration dataSetRDD);

 

3.       Then doing  unionDataSetRDD.coalesce(partitionNum);

4.        Then unionDataSetRDD.saveAsTextFile(tJavaRddFilePath); …. I am saving to local windows disk, i.e. no hdfs usage.

 

unionDataSetRDD.toDebugString shows

 

2014-01-09 16:45:15,832 DEBUG unionDataSetRDD: UnionRDD[6] at union at DatasetServiceImpl.java:256 (16 partitions)

  UnionRDD[4] at union at DatasetServiceImpl.java:256 (12 partitions)

    UnionRDD[2] at union at DatasetServiceImpl.java:256 (8 partitions)

      ParallelCollectionRDD[0] at parallelize at DatasetServiceImpl.java:250 (4 partitions)

      ParallelCollectionRDD[1] at parallelize at DatasetServiceImpl.java:253 (4 partitions)

    ParallelCollectionRDD[3] at parallelize at DatasetServiceImpl.java:253 (4 partitions)

  ParallelCollectionRDD[5] at parallelize at DatasetServiceImpl.java:253 (4 partitions)

 

 

The problem is upon checking java heap of my java process, I am seeing org.apache.spark.scheduler.DAGScheduler is keeping large memory reference in old heap area.

 

Forcing full GC via jconsole, did not free that large memory which I need it for next method invocation.

 

Looking to heap dump, it looks like DAGScheduler hold a reference to this large memory and is not allowing that RDD to get free even my java code is not using it nor have any reference to it after method invocation.

 

Can someone please to explain to me why DAGScheduler reference that memory block even when I did not invoke any cache or persist action on it, i.e. I do not have invoke any explicit caching or persisting via spark API nor have any reference in my java code to that unionDataSetRDD after method invocation (it’s a local variable in my method)?

 

 

Here’s heap dump

Class Name

Shallow Heap

Retained Heap

Percentage

·         org.apache.spark.scheduler.DAGScheduler @ 0x62cf83ae0

128

1,627,202,208

98.89%

\

·         org.apache.spark.scheduler.Stage @ 0x667466838

64

1,627,198,624

98.89%

.\

·         org.apache.spark.rdd.MappedRDD @ 0x667466878

80

1,627,198,480

98.89%

..+

·         org.apache.spark.rdd.ParallelCollectionRDD @ 0x62d58bcd0

88

4,862,256

0.30%

..+

·         org.apache.spark.rdd.ParallelCollectionRDD @ 0x635328908

88

4,862,256

0.30%

..+

·         org.apache.spark.rdd.ParallelCollectionRDD @ 0x6601f7d20

88

4,862,256

0.30%

..+

·         org.apache.spark.rdd.ParallelCollectionRDD @ 0x6674669c8

88

2,161,176

0.13%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675df9f8

32

1,000,072

0.06%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfa30

32

1,000,072

0.06%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfa68

32

1,000,072

0.06%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfaa0

32

1,000,072

0.06%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfc80

32

1,000,072

0.06%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfcf8

32

1,000,072

0.06%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece0c0

32

1,000,072

0.06%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece138

32

1,000,072

0.06%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece1b0

32

1,000,072

0.06%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece228

32

1,000,072

0.06%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece2a0

32

1,000,072

0.06%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece318

32

1,000,072

0.06%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x667466a40

32

406,168

0.02%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x66753f200

32

406,160

0.02%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x66bc475c0

32

406,160

0.02%

..+

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x66bc47618

32

406,160

0.02%

 

 

Thanks,

Hussam


Reply | Threaded
Open this post in threaded view
|

RE: Please help: possible major memory problem in saveAsTextFile

Hussam_Jarada

Thank u Josh but in my humble opinion we have a serious memory leak by spark api parallelize() based on your feedback.

 

I even did full GC from jconsole and that large memory still not freed even my java code not using it.

 

I am seeing critical issues here by the spark api’ from my java code:

1.       It is retaining a copy of my large data even I did not ask for explicit or care about in my use case.  Currently I don’t care about fault-tolerance when running single master node so why keeping this data.  If I did not invoke cache() or persist() why spark api’s keeping this data in memory and not cleaning it when it should be.

2.       I cannot use (not visible to spark context cause I did not invoke explicit cache() or persist()) or free it using system.gc() or full GC from jconsole.

 

Moving my code to spark workers is not a working model cause sometimes I need to convert java/scala objects to temporary spark RDD objects using parallelize( ) before generating my final goal RDD object that later I may persist or save to hdfs, i.e. I believe using parallelize() in workers would generate memory leak as it does in the driver and would not fix the issue.

 

Any plan to fix parallelize() memory issue?

 

Thanks,

Hussam

 

From: Josh Rosen [mailto:[hidden email]]
Sent: Friday, January 10, 2014 11:06 AM
To: [hidden email]
Subject: Re: Please help: possible major memory problem in saveAsTextFile

 

I think that parallelize() is the problem here; the driver retains a copy of whatever was passed to parallelize() in order to enable fault-tolerance.  You could try opening your DB connections from inside of a transformation like mapPartitions() so that it's executed on the workers rather than the driver.

 

On Fri, Jan 10, 2014 at 9:26 AM, <[hidden email]> wrote:

Using spark 0.8.1 running in windows 7 against single master node (no cluster) with spark settings:

 

2014-01-09 16:44:31,045 DEBUG Spark properties  -----

spark.executor.memory= 8g

spark.cores.max= 4

spark.default.parallelism= 12

spark.storage.memoryFraction= null

spark.shuffle.compress= null

spark.rdd.compress= null

spark.akka.frameSize= null

spark.akka.threads= null

 

spark.scheduler.mode=fair

spark.shuffle.consolidateFiles=true

 

I have the following method java code that basically does the following:

1.       Open db connection

2.       Based on dataset size, I am looping x times doing inside the loop:

a.       query large datasets one large block at a time and save results as List<String>

b.      building JavaRDD<String> using SparkDriver.getSparkContext().parallelize(results, partitionNum)

c.       doing union on my JavaRDD with pervious iteration. // unionDataSetRDD = unionDataSetRDD.union(pervious iteration dataSetRDD);

 

3.       Then doing  unionDataSetRDD.coalesce(partitionNum);

4.        Then unionDataSetRDD.saveAsTextFile(tJavaRddFilePath); …. I am saving to local windows disk, i.e. no hdfs usage.

 

unionDataSetRDD.toDebugString shows

 

2014-01-09 16:45:15,832 DEBUG unionDataSetRDD: UnionRDD[6] at union at DatasetServiceImpl.java:256 (16 partitions)

  UnionRDD[4] at union at DatasetServiceImpl.java:256 (12 partitions)

    UnionRDD[2] at union at DatasetServiceImpl.java:256 (8 partitions)

      ParallelCollectionRDD[0] at parallelize at DatasetServiceImpl.java:250 (4 partitions)

      ParallelCollectionRDD[1] at parallelize at DatasetServiceImpl.java:253 (4 partitions)

    ParallelCollectionRDD[3] at parallelize at DatasetServiceImpl.java:253 (4 partitions)

  ParallelCollectionRDD[5] at parallelize at DatasetServiceImpl.java:253 (4 partitions)

 

 

The problem is upon checking java heap of my java process, I am seeing org.apache.spark.scheduler.DAGScheduler is keeping large memory reference in old heap area.

 

Forcing full GC via jconsole, did not free that large memory which I need it for next method invocation.

 

Looking to heap dump, it looks like DAGScheduler hold a reference to this large memory and is not allowing that RDD to get free even my java code is not using it nor have any reference to it after method invocation.

 

Can someone please to explain to me why DAGScheduler reference that memory block even when I did not invoke any cache or persist action on it, i.e. I do not have invoke any explicit caching or persisting via spark API nor have any reference in my java code to that unionDataSetRDD after method invocation (it’s a local variable in my method)?

 

 

Here’s heap dump

Class Name

Shallow Heap

Retained Heap

Percentage

cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.scheduler.DAGScheduler @ 0x62cf83ae0

128

1,627,202,208

98.89%

\cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.scheduler.Stage @ 0x667466838

64

1,627,198,624

98.89%

.\cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.MappedRDD @ 0x667466878

80

1,627,198,480

98.89%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionRDD @ 0x62d58bcd0

88

4,862,256

0.30%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionRDD @ 0x635328908

88

4,862,256

0.30%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionRDD @ 0x6601f7d20

88

4,862,256

0.30%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionRDD @ 0x6674669c8

88

2,161,176

0.13%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675df9f8

32

1,000,072

0.06%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfa30

32

1,000,072

0.06%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfa68

32

1,000,072

0.06%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfaa0

32

1,000,072

0.06%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfc80

32

1,000,072

0.06%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x6675dfcf8

32

1,000,072

0.06%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece0c0

32

1,000,072

0.06%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece138

32

1,000,072

0.06%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece1b0

32

1,000,072

0.06%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece228

32

1,000,072

0.06%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece2a0

32

1,000,072

0.06%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x668ece318

32

1,000,072

0.06%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x667466a40

32

406,168

0.02%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x66753f200

32

406,160

0.02%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x66bc475c0

32

406,160

0.02%

..+cid:image001.gif@01CF0DF5.87C9ED60

·         org.apache.spark.rdd.ParallelCollectionPartition @ 0x66bc47618

32

406,160

0.02%

 

 

Thanks,

Hussam