Spark SequenceFile Java API Repeat Key Values

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark SequenceFile Java API Repeat Key Values

Michael Quinlan
I've spent some time trying to import data into an RDD using the Spark Java API, but am not able to properly load data stored in a Hadoop v1.1.1 sequence file with key and value types both LongWritable. I've attached a copy of the sequence file to this posting. It contains 3000 key, value pairs. I'm attempting to read using the following code snip:

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");

JavaSparkContext ctx = new JavaSparkContext("local[2]",
            "AppName",
            "/Users/mquinlan/spark-0.8.0-incubating","jar.name");
       
//Load DataCube via Spark sequenceFile
JavaPairRDD<LongWritable,LongWritable> DataCube = ctx.sequenceFile("/local_filesystem/output.seq",
            LongWritable.class, LongWritable.class);

The code above produces a DataCube filled with duplicate entries relating in some way to the number of splits. For example, the last 1500 or so entries all have the same key and value: (2999,22483). The previous 1500 entries appear to represent the last key value from first split of the file. I've confirmed that changing the number of threads (local[3]) does change the RDD representation, maintaining this general last key value pattern.

Using the Hadoop (only) API methods, I am able to correctly read the file even from within the same Jar:

Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);        
SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path("/local_filesystem/output.seq"), conf);
LongWritable key = new LongWritable();
LongWritable value = new LongWritable();
while(reader.next(key, value)) {
     System.out.println(key + ":" + value);
}

I've also confirmed that an RDD populated by the ctx.parallelize() method:

int n=100;
List<LongWritable> tl = new ArrayList<LongWritable>(n);
for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1);
DataCube = preCube.map(
                new PairFunction<LongWritable,LongWritable,LongWritable> () {
                    @Override
                    public Tuple2<LongWritable,LongWritable> 
                    call(LongWritable in) throws Exception {
                        return (new Tuple2(in, in));
                    }
                });

can be written to a sequence file using the RDD method:

DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class, LongWritable.class, SequenceFileOutputFormat.class);

and correctly read using the Hadoop (only) API copied above.

It seems like there only a problem when I'm attempting to read the sequence file directly into the RDD. All other operations are performing as expected.

I'd greatly appreciate any advice someone could provide.

Regards,

Michael

output.seq 
Reply | Threaded
Open this post in threaded view
|

Re: Spark SequenceFile Java API Repeat Key Values

Matei Zaharia
Administrator
Yeah, unfortunately sequenceFile() reuses the Writable object across records. If you plan to use each record repeatedly (e.g. cache it), you should clone them using a map function. It was originally designed assuming you only look at each record once, but it’s poorly documented.

Matei

On Jan 7, 2014, at 11:32 PM, Michael Quinlan <[hidden email]> wrote:

> I've spent some time trying to import data into an RDD using the Spark Java
> API, but am not able to properly load data stored in a Hadoop v1.1.1
> sequence file with key and value types both LongWritable. I've attached a
> copy of the sequence file to this posting. It contains 3000 key, value
> pairs. I'm attempting to read using the following code snip:
>
> System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
> System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");
>
> JavaSparkContext ctx = new JavaSparkContext("local[2]",
>            "AppName",
>            "/Users/mquinlan/spark-0.8.0-incubating","jar.name");
>
> //Load DataCube via Spark sequenceFile
> JavaPairRDD<LongWritable,LongWritable> DataCube =
> ctx.sequenceFile("/local_filesystem/output.seq",
>            LongWritable.class, LongWritable.class);
>
> The code above produces a DataCube filled with duplicate entries relating in
> some way to the number of splits. For example, the last 1500 or so entries
> all have the same key and value: (2999,22483). The previous 1500 entries
> appear to represent the last key value from first split of the file. I've
> confirmed that changing the number of threads (local[3]) does change the RDD
> representation, maintaining this general last key value pattern.
>
> Using the Hadoop (only) API methods, I am able to correctly read the file
> even from within the same Jar:
>
> Configuration conf = new Configuration();
> FileSystem fs = FileSystem.get(conf);        
> SequenceFile.Reader reader = new SequenceFile.Reader(fs, new
> Path("/local_filesystem/output.seq"), conf);
> LongWritable key = new LongWritable();
> LongWritable value = new LongWritable();
> while(reader.next(key, value)) {
>     System.out.println(key + ":" + value);
> }
>
> I've also confirmed that an RDD populated by the ctx.parallelize() method:
>
> int n=100;
> List<LongWritable> tl = new ArrayList<LongWritable>(n);
> for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
> JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1);
> DataCube = preCube.map(
>                new PairFunction<LongWritable,LongWritable,LongWritable> ()
> {
>                    @Override
>                    public Tuple2<LongWritable,LongWritable>
>                    call(LongWritable in) throws Exception {
>                        return (new Tuple2(in, in));
>                    }
>                });
>
> can be written to a sequence file using the RDD method:
>
> DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class,
> LongWritable.class, SequenceFileOutputFormat.class);
>
> and correctly read using the Hadoop (only) API copied above.
>
> It seems like there only a problem when I'm attempting to read the sequence
> file directly into the RDD. All other operations are performing as expected.
>
> I'd greatly appreciate any advice someone could provide.
>
> Regards,
>
> Michael
>
> output.seq
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq>  
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Spark SequenceFile Java API Repeat Key Values

Andrew Ash
Matei, do you mean something like A rather than B below?

A) rdd.map(_.clone).cache
B) rdd.cache

I'd be happy to add documentation if there's a good place for it, but I'm not sure there's an obvious place for it.


On Tue, Jan 7, 2014 at 9:35 PM, Matei Zaharia <[hidden email]> wrote:
Yeah, unfortunately sequenceFile() reuses the Writable object across records. If you plan to use each record repeatedly (e.g. cache it), you should clone them using a map function. It was originally designed assuming you only look at each record once, but it’s poorly documented.

Matei

On Jan 7, 2014, at 11:32 PM, Michael Quinlan <[hidden email]> wrote:

> I've spent some time trying to import data into an RDD using the Spark Java
> API, but am not able to properly load data stored in a Hadoop v1.1.1
> sequence file with key and value types both LongWritable. I've attached a
> copy of the sequence file to this posting. It contains 3000 key, value
> pairs. I'm attempting to read using the following code snip:
>
> System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
> System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");
>
> JavaSparkContext ctx = new JavaSparkContext("local[2]",
>            "AppName",
>            "/Users/mquinlan/spark-0.8.0-incubating","jar.name");
>
> //Load DataCube via Spark sequenceFile
> JavaPairRDD<LongWritable,LongWritable> DataCube =
> ctx.sequenceFile("/local_filesystem/output.seq",
>            LongWritable.class, LongWritable.class);
>
> The code above produces a DataCube filled with duplicate entries relating in
> some way to the number of splits. For example, the last 1500 or so entries
> all have the same key and value: (2999,22483). The previous 1500 entries
> appear to represent the last key value from first split of the file. I've
> confirmed that changing the number of threads (local[3]) does change the RDD
> representation, maintaining this general last key value pattern.
>
> Using the Hadoop (only) API methods, I am able to correctly read the file
> even from within the same Jar:
>
> Configuration conf = new Configuration();
> FileSystem fs = FileSystem.get(conf);
> SequenceFile.Reader reader = new SequenceFile.Reader(fs, new
> Path("/local_filesystem/output.seq"), conf);
> LongWritable key = new LongWritable();
> LongWritable value = new LongWritable();
> while(reader.next(key, value)) {
>     System.out.println(key + ":" + value);
> }
>
> I've also confirmed that an RDD populated by the ctx.parallelize() method:
>
> int n=100;
> List<LongWritable> tl = new ArrayList<LongWritable>(n);
> for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
> JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1);
> DataCube = preCube.map(
>                new PairFunction<LongWritable,LongWritable,LongWritable> ()
> {
>                    @Override
>                    public Tuple2<LongWritable,LongWritable>
>                    call(LongWritable in) throws Exception {
>                        return (new Tuple2(in, in));
>                    }
>                });
>
> can be written to a sequence file using the RDD method:
>
> DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class,
> LongWritable.class, SequenceFileOutputFormat.class);
>
> and correctly read using the Hadoop (only) API copied above.
>
> It seems like there only a problem when I'm attempting to read the sequence
> file directly into the RDD. All other operations are performing as expected.
>
> I'd greatly appreciate any advice someone could provide.
>
> Regards,
>
> Michael
>
> output.seq
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Spark SequenceFile Java API Repeat Key Values

Matei Zaharia
Administrator
Yup, a) would make it work.

I’d actually prefer that we change it so it clones the objects by default, and add a boolean flag (default false) for people who want to reuse objects. We’d have to do the same in hadoopRDD and the various versions of that as well.

Matei

On Jan 8, 2014, at 12:38 AM, Andrew Ash <[hidden email]> wrote:

Matei, do you mean something like A rather than B below?

A) rdd.map(_.clone).cache
B) rdd.cache

I'd be happy to add documentation if there's a good place for it, but I'm not sure there's an obvious place for it.


On Tue, Jan 7, 2014 at 9:35 PM, Matei Zaharia <[hidden email]> wrote:
Yeah, unfortunately sequenceFile() reuses the Writable object across records. If you plan to use each record repeatedly (e.g. cache it), you should clone them using a map function. It was originally designed assuming you only look at each record once, but it’s poorly documented.

Matei

On Jan 7, 2014, at 11:32 PM, Michael Quinlan <[hidden email]> wrote:

> I've spent some time trying to import data into an RDD using the Spark Java
> API, but am not able to properly load data stored in a Hadoop v1.1.1
> sequence file with key and value types both LongWritable. I've attached a
> copy of the sequence file to this posting. It contains 3000 key, value
> pairs. I'm attempting to read using the following code snip:
>
> System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
> System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");
>
> JavaSparkContext ctx = new JavaSparkContext("local[2]",
>            "AppName",
>            "/Users/mquinlan/spark-0.8.0-incubating","jar.name");
>
> //Load DataCube via Spark sequenceFile
> JavaPairRDD<LongWritable,LongWritable> DataCube =
> ctx.sequenceFile("/local_filesystem/output.seq",
>            LongWritable.class, LongWritable.class);
>
> The code above produces a DataCube filled with duplicate entries relating in
> some way to the number of splits. For example, the last 1500 or so entries
> all have the same key and value: (2999,22483). The previous 1500 entries
> appear to represent the last key value from first split of the file. I've
> confirmed that changing the number of threads (local[3]) does change the RDD
> representation, maintaining this general last key value pattern.
>
> Using the Hadoop (only) API methods, I am able to correctly read the file
> even from within the same Jar:
>
> Configuration conf = new Configuration();
> FileSystem fs = FileSystem.get(conf);
> SequenceFile.Reader reader = new SequenceFile.Reader(fs, new
> Path("/local_filesystem/output.seq"), conf);
> LongWritable key = new LongWritable();
> LongWritable value = new LongWritable();
> while(reader.next(key, value)) {
>     System.out.println(key + ":" + value);
> }
>
> I've also confirmed that an RDD populated by the ctx.parallelize() method:
>
> int n=100;
> List<LongWritable> tl = new ArrayList<LongWritable>(n);
> for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
> JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1);
> DataCube = preCube.map(
>                new PairFunction<LongWritable,LongWritable,LongWritable> ()
> {
>                    @Override
>                    public Tuple2<LongWritable,LongWritable>
>                    call(LongWritable in) throws Exception {
>                        return (new Tuple2(in, in));
>                    }
>                });
>
> can be written to a sequence file using the RDD method:
>
> DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class,
> LongWritable.class, SequenceFileOutputFormat.class);
>
> and correctly read using the Hadoop (only) API copied above.
>
> It seems like there only a problem when I'm attempting to read the sequence
> file directly into the RDD. All other operations are performing as expected.
>
> I'd greatly appreciate any advice someone could provide.
>
> Regards,
>
> Michael
>
> output.seq
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Reply | Threaded
Open this post in threaded view
|

Re: Spark SequenceFile Java API Repeat Key Values

Andrew Ash
Agreed on the clone by default approach -- this reused object gotcha has hit several people I know when using Avro.

We should be careful to not ignore the performance impact that made Hadoop reuse objects in the first place though.  I'm not sure what this means in practice though, you either clone the objects in Spark or you don't.


On Tue, Jan 7, 2014 at 9:47 PM, Matei Zaharia <[hidden email]> wrote:
Yup, a) would make it work.

I’d actually prefer that we change it so it clones the objects by default, and add a boolean flag (default false) for people who want to reuse objects. We’d have to do the same in hadoopRDD and the various versions of that as well.

Matei

On Jan 8, 2014, at 12:38 AM, Andrew Ash <[hidden email]> wrote:

Matei, do you mean something like A rather than B below?

A) rdd.map(_.clone).cache
B) rdd.cache

I'd be happy to add documentation if there's a good place for it, but I'm not sure there's an obvious place for it.


On Tue, Jan 7, 2014 at 9:35 PM, Matei Zaharia <[hidden email]> wrote:
Yeah, unfortunately sequenceFile() reuses the Writable object across records. If you plan to use each record repeatedly (e.g. cache it), you should clone them using a map function. It was originally designed assuming you only look at each record once, but it’s poorly documented.

Matei

On Jan 7, 2014, at 11:32 PM, Michael Quinlan <[hidden email]> wrote:

> I've spent some time trying to import data into an RDD using the Spark Java
> API, but am not able to properly load data stored in a Hadoop v1.1.1
> sequence file with key and value types both LongWritable. I've attached a
> copy of the sequence file to this posting. It contains 3000 key, value
> pairs. I'm attempting to read using the following code snip:
>
> System.setProperty("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer");
> System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");
>
> JavaSparkContext ctx = new JavaSparkContext("local[2]",
>            "AppName",
>            "/Users/mquinlan/spark-0.8.0-incubating","jar.name");
>
> //Load DataCube via Spark sequenceFile
> JavaPairRDD<LongWritable,LongWritable> DataCube =
> ctx.sequenceFile("/local_filesystem/output.seq",
>            LongWritable.class, LongWritable.class);
>
> The code above produces a DataCube filled with duplicate entries relating in
> some way to the number of splits. For example, the last 1500 or so entries
> all have the same key and value: (2999,22483). The previous 1500 entries
> appear to represent the last key value from first split of the file. I've
> confirmed that changing the number of threads (local[3]) does change the RDD
> representation, maintaining this general last key value pattern.
>
> Using the Hadoop (only) API methods, I am able to correctly read the file
> even from within the same Jar:
>
> Configuration conf = new Configuration();
> FileSystem fs = FileSystem.get(conf);
> SequenceFile.Reader reader = new SequenceFile.Reader(fs, new
> Path("/local_filesystem/output.seq"), conf);
> LongWritable key = new LongWritable();
> LongWritable value = new LongWritable();
> while(reader.next(key, value)) {
>     System.out.println(key + ":" + value);
> }
>
> I've also confirmed that an RDD populated by the ctx.parallelize() method:
>
> int n=100;
> List<LongWritable> tl = new ArrayList<LongWritable>(n);
> for (int i = 0; i < n; i++) tl.add(new LongWritable(i));
> JavaRDD<LongWritable> preCube = ctx.parallelize(tl, 1);
> DataCube = preCube.map(
>                new PairFunction<LongWritable,LongWritable,LongWritable> ()
> {
>                    @Override
>                    public Tuple2<LongWritable,LongWritable>
>                    call(LongWritable in) throws Exception {
>                        return (new Tuple2(in, in));
>                    }
>                });
>
> can be written to a sequence file using the RDD method:
>
> DataCube.saveAsHadoopFile("testfile.seq", LongWritable.class,
> LongWritable.class, SequenceFileOutputFormat.class);
>
> and correctly read using the Hadoop (only) API copied above.
>
> It seems like there only a problem when I'm attempting to read the sequence
> file directly into the RDD. All other operations are performing as expected.
>
> I'd greatly appreciate any advice someone could provide.
>
> Regards,
>
> Michael
>
> output.seq
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n353/output.seq>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.




Reply | Threaded
Open this post in threaded view
|

Re: Spark SequenceFile Java API Repeat Key Values

Michael Quinlan
Matei and Andrew,

Thank you both for your prompt responses. Matei is correct in that I am attempting to cache a large RDD for repeated query.

I was able to implement your suggestion in a Scala version of the code, which I've copied below. I should point out two minor details: LongWritable.clone() is a private method and both the key and value need to be "cloned" in order for the data to be cached correctly.

My attempt at a Java version wasn't as successful. If you don't mind, could you please suggest a better way if it currently exists? This is mostly educational since I already have a working version in Scala. I'm new to both.

Regards,

Mike

Java:

public class App
{
    public static void main(String[] args) throws Exception {
        if (args.length < 3) {
          System.err.println("Usage: SynthesisService <master> <input file> <jar file>");
          System.exit(1);
        }
       
        System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");

        JavaSparkContext ctx = new JavaSparkContext(args[0],
            "SynthesisService",
            "~/spark-0.8.0-incubating",args[2]);
       
        //Load DataCube via Spark sequenceFile
        JavaPairRDD<LongWritable,LongWritable> temp_DataCube = ctx.sequenceFile(args[1],
            LongWritable.class, LongWritable.class);
       
        JavaRDD<Tuple2<LongWritable,LongWritable>> DataCube;
        DataCube = temp_DataCube.map(
                new Function2<LongWritable,LongWritable,Tuple2<LongWritable,LongWritable>> () {
                    @Override
                    public Tuple2<LongWritable,LongWritable> 
                    call(LongWritable key, LongWritable value) {
                        return (new Tuple2(new LongWritable(key.get()), value));
                    }
               
                });

-----
COMPILATION ERROR :
-------------------------------------------------------------
spark/synthesis/service/Init/App.java:[51,32] error: no suitable method found for map(<anonymous Function2<LongWritable,LongWritable,Tuple2<LongWritable,LongWritable>>>)
1 error

Scala:

package testspark

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.serializer.KryoRegistrator

import org.apache.hadoop.io.LongWritable

import com.esotericsoftware.kryo.Kryo

class MyRegistrator extends KryoRegistrator{
    def registerClasses(kryo: Kryo){
        kryo.register(classOf[LongWritable]);
        kryo.register(classOf[Tuple2[LongWritable,LongWritable]]);
    }
}

object ScalaSynthesisServer {
       
        def pseudoClone(x: LongWritable, y: LongWritable): (LongWritable,LongWritable) = {
                return new Tuple2(new LongWritable(x.get()) , new LongWritable(y.get()))
        }
       
        def main(args: Array[String]) {
                if (args.length < 3) {
                        System.err.println("Usage: ScalaSynthesisServer <master> <input file> <jar file>")
                        System.exit(1)
                }
               
                System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                System.setProperty("spark.kryo.registrator","testspark.MyRegistrator")
       
                val sc = new SparkContext(args(0), "ScalaSynthesisServer","~/spark-0.8.0-incubating",List(args(2)))
               
                val DataCube = sc.sequenceFile(args(1), classOf[LongWritable], classOf[LongWritable]).map(a => pseudoClone(a._1,a._2))
               
                DataCube.cache()
               
                val list = DataCube.collect();
               
                var x = 0;
                for( x <- list ){
                        println("Key= " + x._1 + " Value= " + x._2);
                }
        }
}