Spark Memory Requirement

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark Memory Requirement

msbreuer
Many threads talk about memory requirements and most often answers are,
to add more memory to spark. My understanding of spark is a scaleable
anyltics engine, which is able to utilize assigned resources and to
calculate the correct answer. So assigning core and memory may speedup
an task.

I am using spark in local mode for calculations. My boundary is the
local workstation and its resources. Large disk, 16gb of ram and 4 cores
are the hardware limit. I decided to use spark with 2 cores, 6gb of
memory and several dozens of disk space. This setup with spark default
(1gb driver / 1gb executor) should satisfy recommended requirements.
Question is, what are the limits to job processing in respect to this
requirements?

// base to generate huge data
List<Integer> list = new ArrayList<>();
for (int val = 1; val < 10000; val++) {
    int valueOf = Integer.valueOf(val);
    list.add(valueOf);
}
// create simple rdd of int
JavaRDD<Integer> rdd = sc.parallelize(list,200);

An array of 10.000 Integers should fit to memory (10k x 4byte + some
overhead is 40-50kb in heap)

JavaRDD<Row> rowRDD =
        rdd.map(value -> RowFactory.create(String.valueOf(value),
createLongText(UUID.randomUUID().toString(), 2 * 1024 * 1024)))
        ;

Becomes more interesting. Map operation creates much larger object. But
spark will process in in partitions. The sample uses 200 partitions with
50 rows. A row object is about 2mb large and data covers 100mb of
memory. Probably some more overhead because of rdd and mapping overhead.
Let's assume 200mb per partition.
Two core will run 2 tasks in parallel and consume 2x200mb of memory.
Probably some queued tasks take some more memory. Spark will spill data
to disk, even if no explicit storage level was given.

What would persist(StorageLevel.MEMORY_AND_DISK) change here?

StructType type = new StructType();
type = type
        .add("c1", DataTypes.StringType)
        .add( "c2", DataTypes.StringType );

Dataset<Row> df = spark.createDataFrame(rowRDD, type);
df = df.sort( col("c2").asc() );

Next a sort operation is defined over data. Data is processed per
partition. I assume spark will sort data per partition and merge the
results. I worked with hadoop map files and used its merging
capabilities, so merging a set of many but sorted map files is easy and
not memory intensive. I expect spark work in same way. Okay, because of
distributed concept of spark part result are exchanged between workers,
this causes some protocol overhead in memory. Combining contents of
several partitions is named shuffling, right?

I calculated this example several times and expected the code to work.
But application always runs out of memory. Is my formula wrong? Are
there aspect I forgot? Is there something I did wrong? Which parameters
should be corrected to avoid out-of-memory errors? Are assumptions correct?

Regards,
Markus

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]