Recommended way to serialize Hadoop Writables' in Spark

Previous Topic Next Topic
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Recommended way to serialize Hadoop Writables' in Spark


Is there any recommended way of serializing Hadoop Writables' in Spark?
Here is my problem.

I have a pair RDD which is created by reading a SEQ[LongWritable,
RDD[(LongWritable, BytesWritable)]

I have these two settings set in spark conf.

Inside the MyCustomRegistrator, I registered both LongWritable and
BytesWritable classes.

The total size of the SEQ[LongWritable, BytesWritable] that I read to create
the RDD[(LongWritable, BytesWritable)] is *800MB*. I have 10 executors in my
job with 10GB of memory. I am performing reduceByKey operation on this RDD
and I see very huge Shuffle writes of 10GB on each executor which doesn't
make any sense. Also the reduceByKey stage is very very slow and sometimes
executors throw OOM exception.

Can someone explain this shuffle behavior in Spark? Why does Spark show
100GB of shuffle writes for 800MB if input data?
Also when I convert RDD[(LongWritable,BytesWritable)] to RDD[Long,
CustomObject] , the reduceByKey operation takes only 30 seconds to finish
and is very fast.

Now for the same job this time I wrote custom serializers for LongWritable
and BytesWritable. Here is the code.

import com.esotericsoftware.kryo.{Kryo, Serializer}
import{Input, Output}
import{BytesWritable, LongWritable}
  * Kryo Custom Serializer for serializing LongWritable
class LongWritableSerializer extends Serializer[LongWritable] {
  override def write(kryo: Kryo, output: Output, obj: LongWritable): Unit =
  override def read(kryo: Kryo,
                    input: Input,
                    clazz: Class[LongWritable]): LongWritable = {
    val longVal = input.readLong()
    new LongWritable(longVal)
  * Kryo Custom Serializer for serializing BytesWritable
class BytesWritableSerializer extends Serializer[BytesWritable] {
  override def write(kryo: Kryo, output: Output, obj: BytesWritable): Unit =
    val bytes = obj.getBytes
  override def read(kryo: Kryo,
                    input: Input,
                    clazz: Class[BytesWritable]): BytesWritable = {
    val length = input.readInt()
    val bytes = input.readBytes(length)
    new BytesWritable(bytes)

And then I registered these with Kryo inside MyCustomRegistrator.
kryo.register(classOf[LongWritable], new LongWritableSerializer())
kryo.register(classOf[BytesWritable], new BytesWritableSerializer())

I still see the same behavior. Can someone also check this?


Sent from:

To unsubscribe e-mail: [hidden email]