Performance and serialization: use case

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Performance and serialization: use case

Pierre B

Hi everyone!

We are just starting a new project with Spark and have a few (newbie) questions.
Any help would be greatly appreciated!

* First a bit of context:

We are working on CSV files, about 400M lines, in which each line represents a record (transaction) from customer_A to customer_B, with around 20 fields (transaction duration, value, type, ...).
We would like to use spark to extract several hundreds of simple variables for each customer.

We have created a Record class to represent the records, and ease their manipulation (filtering on fields, manipulating dates, ...)
        val f = sc.textFile("hdfs://data.csv")
        val data: RDD[Record] = f.map(Record(_))      
where Record.apply(String) is splitting the csv strings and filling the fields accordingly

Now given a set of (field, value), we want to compute a variable for each customer_A, on records that respect the set of fields and values:

        // Filter the initial RDD on the given fields and values
        val filteredData = data.filter(_.check(field1, value1)).filter(_.check(field2, value2)).filter(...)
        // Since we will want to filter later on other combinations of fields and values,
        // we would like to persist these rdd in memory. For instance:
        val filteredData2 = data.filter(_.check(field1, value1)).filter(_.check(field3, value3))
        // should benefit from the RDD already filtered on field1, value1
        // where should we put the cache? in between each filter / at the beginning / at the end ?


        // Compute the variable of interest, grouped by customer
        // The formulation is generic, using functions f, g and h
        // The output is always a rdd of pairs (customerA, variable)
        val variable = filteredData.map( x => ( x.customerA, f( x ) ) ).reduceByKey( g( _, _ ) ).map( h( _ ) )
       
        // For example, if we want to compute the number of records for each customer_A with field1=value1, we define:
        def f(x) = 1
        def g(x,y) = x+y
        def h(x,y) = (x,y)

        // Another example, if we want to compute the number of distinct customer_B are involved in records with customer_A, we define:
        def f(x) = x.customerB
        def g(x,y) =  x ++ y
        def h(x,y) = ( x, y.size )


* Now, my questions ;)

- SERIALIZATION:
        - Is it OK to manipulate RDDs of Record objects or should we stick with simple RDDs of strings, and do all the splitting and computation in each transformation ?
        How to make that efficient in terms of memory and speed?
        I've read the docs about the Tuning (and Kryo serialization) but I'd like to have more info on that...

- PERFORMANCE:
        - Is it a good idea to perform all the filters first, and then the groupBy customer, or should we do the reverse?
        In the second situation, how can we filter on the values? I didn't see a filterValues method in the PairRDD API ?


Thanks for any help you can bring us!


Pierre

Reply | Threaded
Open this post in threaded view
|

Re: Performance and serialization: use case

Evan R. Sparks
See inline


On Fri, Feb 14, 2014 at 5:57 AM, Pierre Borckmans <[hidden email]> wrote:

Hi everyone!

We are just starting a new project with Spark and have a few (newbie) questions.
Any help would be greatly appreciated!

* First a bit of context:

We are working on CSV files, about 400M lines, in which each line represents a record (transaction) from customer_A to customer_B, with around 20 fields (transaction duration, value, type, ...).
We would like to use spark to extract several hundreds of simple variables for each customer.

We have created a Record class to represent the records, and ease their manipulation (filtering on fields, manipulating dates, ...)
        val f = sc.textFile("hdfs://data.csv")
        val data: RDD[Record] = f.map(Record(_))
where Record.apply(String) is splitting the csv strings and filling the fields accordingly

Now given a set of (field, value), we want to compute a variable for each customer_A, on records that respect the set of fields and values:

        // Filter the initial RDD on the given fields and values
        val filteredData = data.filter(_.check(field1, value1)).filter(_.check(field2, value2)).filter(...)
        // Since we will want to filter later on other combinations of fields and values,
        // we would like to persist these rdd in memory. For instance:
        val filteredData2 = data.filter(_.check(field1, value1)).filter(_.check(field3, value3))
        // should benefit from the RDD already filtered on field1, value1
        // where should we put the cache? in between each filter / at the beginning / at the end ?

You should put your .cache() after the creation of the RDD you plan to re-use. Cache the data you're going to use over and over again. 


        // Compute the variable of interest, grouped by customer
        // The formulation is generic, using functions f, g and h
        // The output is always a rdd of pairs (customerA, variable)
        val variable = filteredData.map( x => ( x.customerA, f( x ) ) ).reduceByKey( g( _, _ ) ).map( h( _ ) ) 

        // For example, if we want to compute the number of records for each customer_A with field1=value1, we define:
        def f(x) = 1
        def g(x,y) = x+y
        def h(x,y) = (x,y)

        // Another example, if we want to compute the number of distinct customer_B are involved in records with customer_A, we define:
        def f(x) = x.customerB
        def g(x,y) =  x ++ y
        def h(x,y) = ( x, y.size )


* Now, my questions ;)

- SERIALIZATION:
        - Is it OK to manipulate RDDs of Record objects or should we stick with simple RDDs of strings, and do all the splitting and computation in each transformation ?
        How to make that efficient in terms of memory and speed?
        I've read the docs about the Tuning (and Kryo serialization) but I'd like to have more info on that...
RDD of Record is OK as long as Record is serializable. Kryo may help you here but I'm not sure how you're representing Record so I'm not sure how much. Either way, it's usually best to get it working first and then worry about serialization/performance later. 

- PERFORMANCE:
        - Is it a good idea to perform all the filters first, and then the groupBy customer, or should we do the reverse?
Yes, performing filters first is a good idea. Spark doesn't do heavy (SQL style) query optimization where filters are pushed down early in the DAG, etc. If your queries look a lot like database queries, you might think about using Shark for this - it'll save you some pain at setup/serialization and may be much faster if your queries only operate on a few columns at a time. 
        In the second situation, how can we filter on the values? I didn't see a filterValues method in the PairRDD API ?
You can do a .filter() on the value of a pairRDD like this

prdd.filter(_._2 == "something") 
//or
prdd.filter(r => f(r._2))


Thanks for any help you can bring us!


Pierre


Reply | Threaded
Open this post in threaded view
|

Re: Performance and serialization: use case

Mark Hamstra
or prdd.filter { case (_, value) => f(value) }


On Fri, Feb 14, 2014 at 8:30 AM, Evan R. Sparks <[hidden email]> wrote:
See inline


On Fri, Feb 14, 2014 at 5:57 AM, Pierre Borckmans <[hidden email]> wrote:

Hi everyone!

We are just starting a new project with Spark and have a few (newbie) questions.
Any help would be greatly appreciated!

* First a bit of context:

We are working on CSV files, about 400M lines, in which each line represents a record (transaction) from customer_A to customer_B, with around 20 fields (transaction duration, value, type, ...).
We would like to use spark to extract several hundreds of simple variables for each customer.

We have created a Record class to represent the records, and ease their manipulation (filtering on fields, manipulating dates, ...)
        val f = sc.textFile("hdfs://data.csv")
        val data: RDD[Record] = f.map(Record(_))
where Record.apply(String) is splitting the csv strings and filling the fields accordingly

Now given a set of (field, value), we want to compute a variable for each customer_A, on records that respect the set of fields and values:

        // Filter the initial RDD on the given fields and values
        val filteredData = data.filter(_.check(field1, value1)).filter(_.check(field2, value2)).filter(...)
        // Since we will want to filter later on other combinations of fields and values,
        // we would like to persist these rdd in memory. For instance:
        val filteredData2 = data.filter(_.check(field1, value1)).filter(_.check(field3, value3))
        // should benefit from the RDD already filtered on field1, value1
        // where should we put the cache? in between each filter / at the beginning / at the end ?

You should put your .cache() after the creation of the RDD you plan to re-use. Cache the data you're going to use over and over again. 


        // Compute the variable of interest, grouped by customer
        // The formulation is generic, using functions f, g and h
        // The output is always a rdd of pairs (customerA, variable)
        val variable = filteredData.map( x => ( x.customerA, f( x ) ) ).reduceByKey( g( _, _ ) ).map( h( _ ) ) 

        // For example, if we want to compute the number of records for each customer_A with field1=value1, we define:
        def f(x) = 1
        def g(x,y) = x+y
        def h(x,y) = (x,y)

        // Another example, if we want to compute the number of distinct customer_B are involved in records with customer_A, we define:
        def f(x) = x.customerB
        def g(x,y) =  x ++ y
        def h(x,y) = ( x, y.size )


* Now, my questions ;)

- SERIALIZATION:
        - Is it OK to manipulate RDDs of Record objects or should we stick with simple RDDs of strings, and do all the splitting and computation in each transformation ?
        How to make that efficient in terms of memory and speed?
        I've read the docs about the Tuning (and Kryo serialization) but I'd like to have more info on that...
RDD of Record is OK as long as Record is serializable. Kryo may help you here but I'm not sure how you're representing Record so I'm not sure how much. Either way, it's usually best to get it working first and then worry about serialization/performance later. 

- PERFORMANCE:
        - Is it a good idea to perform all the filters first, and then the groupBy customer, or should we do the reverse?
Yes, performing filters first is a good idea. Spark doesn't do heavy (SQL style) query optimization where filters are pushed down early in the DAG, etc. If your queries look a lot like database queries, you might think about using Shark for this - it'll save you some pain at setup/serialization and may be much faster if your queries only operate on a few columns at a time. 
        In the second situation, how can we filter on the values? I didn't see a filterValues method in the PairRDD API ?
You can do a .filter() on the value of a pairRDD like this

prdd.filter(_._2 == "something") 
//or
prdd.filter(r => f(r._2))


Thanks for any help you can bring us!


Pierre



Reply | Threaded
Open this post in threaded view
|

Re: Performance and serialization: use case

Pierre B
Thanks Evan and Mark for your answers.

As you mention, it’s always best to get things working before thinking about optimisation. And this is the case. Our solution is working, but is unacceptably slow.

I have identified two “issues” in our solution, for which I would need some help (there are probably more, but let’s start with those 2 ;) :

1) [MEMORY] I’m surprised by the amount of memory used when caching my data set. When I load a single CSV file of 1gb, caching it takes about 2.5 gb of RAM.
And that’s with the simple RDD[String] obtained by sc.textFile.
When I use my Record class to represent each record, it’s even worse.
What can I do about this? I guess I should use minimal structures to represent the records (maybe Strings are too heavy and I should use lighter types?)
As I said in my previous mail, I’d like to have more info/examples about how to create a “good” serializable class suitable for RDD (with or without Kryo).

2) [HDFS] Saving my results to HDFS using RDD.saveAsTextFile is REALLY slow. The RDDs I’m saving are just Pairs of (Customer_ID, value) (String, String). Should I use a Hadoop Sequence format ? How to make the storage efficient?

Thanks for your help!

Pierre

On 14 Feb 2014, at 17:41, Mark Hamstra <[hidden email]> wrote:

or prdd.filter { case (_, value) => f(value) }


On Fri, Feb 14, 2014 at 8:30 AM, Evan R. Sparks <[hidden email]> wrote:
See inline


On Fri, Feb 14, 2014 at 5:57 AM, Pierre Borckmans <[hidden email]> wrote:

Hi everyone!

We are just starting a new project with Spark and have a few (newbie) questions.
Any help would be greatly appreciated!

* First a bit of context:

We are working on CSV files, about 400M lines, in which each line represents a record (transaction) from customer_A to customer_B, with around 20 fields (transaction duration, value, type, ...).
We would like to use spark to extract several hundreds of simple variables for each customer.

We have created a Record class to represent the records, and ease their manipulation (filtering on fields, manipulating dates, ...)
        val f = sc.textFile("<a href="hdfs://data.csv">hdfs://data.csv")
        val data: RDD[Record] = f.map(Record(_))
where Record.apply(String) is splitting the csv strings and filling the fields accordingly

Now given a set of (field, value), we want to compute a variable for each customer_A, on records that respect the set of fields and values:

        // Filter the initial RDD on the given fields and values
        val filteredData = data.filter(_.check(field1, value1)).filter(_.check(field2, value2)).filter(...)
        // Since we will want to filter later on other combinations of fields and values,
        // we would like to persist these rdd in memory. For instance:
        val filteredData2 = data.filter(_.check(field1, value1)).filter(_.check(field3, value3))
        // should benefit from the RDD already filtered on field1, value1
        // where should we put the cache? in between each filter / at the beginning / at the end ?

You should put your .cache() after the creation of the RDD you plan to re-use. Cache the data you're going to use over and over again. 


        // Compute the variable of interest, grouped by customer
        // The formulation is generic, using functions f, g and h
        // The output is always a rdd of pairs (customerA, variable)
        val variable = filteredData.map( x => ( x.customerA, f( x ) ) ).reduceByKey( g( _, _ ) ).map( h( _ ) ) 

        // For example, if we want to compute the number of records for each customer_A with field1=value1, we define:
        def f(x) = 1
        def g(x,y) = x+y
        def h(x,y) = (x,y)

        // Another example, if we want to compute the number of distinct customer_B are involved in records with customer_A, we define:
        def f(x) = x.customerB
        def g(x,y) =  x ++ y
        def h(x,y) = ( x, y.size )


* Now, my questions ;)

- SERIALIZATION:
        - Is it OK to manipulate RDDs of Record objects or should we stick with simple RDDs of strings, and do all the splitting and computation in each transformation ?
        How to make that efficient in terms of memory and speed?
        I've read the docs about the Tuning (and Kryo serialization) but I'd like to have more info on that...
RDD of Record is OK as long as Record is serializable. Kryo may help you here but I'm not sure how you're representing Record so I'm not sure how much. Either way, it's usually best to get it working first and then worry about serialization/performance later. 

- PERFORMANCE:
        - Is it a good idea to perform all the filters first, and then the groupBy customer, or should we do the reverse?
Yes, performing filters first is a good idea. Spark doesn't do heavy (SQL style) query optimization where filters are pushed down early in the DAG, etc. If your queries look a lot like database queries, you might think about using Shark for this - it'll save you some pain at setup/serialization and may be much faster if your queries only operate on a few columns at a time. 
        In the second situation, how can we filter on the values? I didn't see a filterValues method in the PairRDD API ?
You can do a .filter() on the value of a pairRDD like this

prdd.filter(_._2 == "something") 
//or
prdd.filter(r => f(r._2))


Thanks for any help you can bring us!


Pierre




Reply | Threaded
Open this post in threaded view
|

Re: Performance and serialization: use case

Pierre B
A little up after the weekend.
I don’t know if that’s ok to revive a topic when it’s down, just let me know if it’s not ;)

Thanks guys

On 15 Feb 2014, at 12:38, Pierre Borckmans <[hidden email]> wrote:

Thanks Evan and Mark for your answers.

As you mention, it’s always best to get things working before thinking about optimisation. And this is the case. Our solution is working, but is unacceptably slow.

I have identified two “issues” in our solution, for which I would need some help (there are probably more, but let’s start with those 2 ;) :

1) [MEMORY] I’m surprised by the amount of memory used when caching my data set. When I load a single CSV file of 1gb, caching it takes about 2.5 gb of RAM.
And that’s with the simple RDD[String] obtained by sc.textFile.
When I use my Record class to represent each record, it’s even worse.
What can I do about this? I guess I should use minimal structures to represent the records (maybe Strings are too heavy and I should use lighter types?)
As I said in my previous mail, I’d like to have more info/examples about how to create a “good” serializable class suitable for RDD (with or without Kryo).

2) [HDFS] Saving my results to HDFS using RDD.saveAsTextFile is REALLY slow. The RDDs I’m saving are just Pairs of (Customer_ID, value) (String, String). Should I use a Hadoop Sequence format ? How to make the storage efficient?

Thanks for your help!

Pierre

On 14 Feb 2014, at 17:41, Mark Hamstra <[hidden email]> wrote:

or prdd.filter { case (_, value) => f(value) }


On Fri, Feb 14, 2014 at 8:30 AM, Evan R. Sparks <[hidden email]> wrote:
See inline


On Fri, Feb 14, 2014 at 5:57 AM, Pierre Borckmans <[hidden email]> wrote:

Hi everyone!

We are just starting a new project with Spark and have a few (newbie) questions.
Any help would be greatly appreciated!

* First a bit of context:

We are working on CSV files, about 400M lines, in which each line represents a record (transaction) from customer_A to customer_B, with around 20 fields (transaction duration, value, type, ...).
We would like to use spark to extract several hundreds of simple variables for each customer.

We have created a Record class to represent the records, and ease their manipulation (filtering on fields, manipulating dates, ...)
        val f = sc.textFile("<a href="hdfs://data.csv">hdfs://data.csv")
        val data: RDD[Record] = f.map(Record(_))
where Record.apply(String) is splitting the csv strings and filling the fields accordingly

Now given a set of (field, value), we want to compute a variable for each customer_A, on records that respect the set of fields and values:

        // Filter the initial RDD on the given fields and values
        val filteredData = data.filter(_.check(field1, value1)).filter(_.check(field2, value2)).filter(...)
        // Since we will want to filter later on other combinations of fields and values,
        // we would like to persist these rdd in memory. For instance:
        val filteredData2 = data.filter(_.check(field1, value1)).filter(_.check(field3, value3))
        // should benefit from the RDD already filtered on field1, value1
        // where should we put the cache? in between each filter / at the beginning / at the end ?

You should put your .cache() after the creation of the RDD you plan to re-use. Cache the data you're going to use over and over again. 


        // Compute the variable of interest, grouped by customer
        // The formulation is generic, using functions f, g and h
        // The output is always a rdd of pairs (customerA, variable)
        val variable = filteredData.map( x => ( x.customerA, f( x ) ) ).reduceByKey( g( _, _ ) ).map( h( _ ) ) 

        // For example, if we want to compute the number of records for each customer_A with field1=value1, we define:
        def f(x) = 1
        def g(x,y) = x+y
        def h(x,y) = (x,y)

        // Another example, if we want to compute the number of distinct customer_B are involved in records with customer_A, we define:
        def f(x) = x.customerB
        def g(x,y) =  x ++ y
        def h(x,y) = ( x, y.size )


* Now, my questions ;)

- SERIALIZATION:
        - Is it OK to manipulate RDDs of Record objects or should we stick with simple RDDs of strings, and do all the splitting and computation in each transformation ?
        How to make that efficient in terms of memory and speed?
        I've read the docs about the Tuning (and Kryo serialization) but I'd like to have more info on that...
RDD of Record is OK as long as Record is serializable. Kryo may help you here but I'm not sure how you're representing Record so I'm not sure how much. Either way, it's usually best to get it working first and then worry about serialization/performance later. 

- PERFORMANCE:
        - Is it a good idea to perform all the filters first, and then the groupBy customer, or should we do the reverse?
Yes, performing filters first is a good idea. Spark doesn't do heavy (SQL style) query optimization where filters are pushed down early in the DAG, etc. If your queries look a lot like database queries, you might think about using Shark for this - it'll save you some pain at setup/serialization and may be much faster if your queries only operate on a few columns at a time. 
        In the second situation, how can we filter on the values? I didn't see a filterValues method in the PairRDD API ?
You can do a .filter() on the value of a pairRDD like this

prdd.filter(_._2 == "something") 
//or
prdd.filter(r => f(r._2))


Thanks for any help you can bring us!


Pierre





Reply | Threaded
Open this post in threaded view
|

Re: Performance and serialization: use case

Guillaume Pitel
In reply to this post by Pierre B
Hi Pierre,

I don't know what's your exact configuration and what's your exact code / caching, so I'll assume you have basic configuration and use cache()

As stated in the Tuning section of the Spark documentation https://spark.incubator.apache.org/docs/0.9.0/tuning.html , when faced with memory issues, you should try to persist(StorageLevel.MEMORY_ONLY_SER), because persisting serialized versions of object is way more efficient in terms of space than native Java serialization. The CPU overhead introduced by serialization is, most of the time, largely less a problem than issues cause by OOM/ low memory

Also make sure you're not devoting to much memory to caching (spark.storage.memoryFraction default of 0.66 is, in my opinion generally too high), because it can have an adverse effect on the GC pressure, so remember to always keep an eye on the GC activity "by adding -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps to your SPARK_JAVA_OPTS environment variable" in each of your workers (or by explicitly passing SPARK_JAVA_OPTS in your SparkContext environment)

Persisting after having converted to a record class is not necessarily the best idea if you are using native serialization. I would persist the textRdd (at least if using MEMORY_ONLY_SER, it's almost useless if you persist it on disk, unless you read it from a low througput place)

About the saveAsTextFile slowliness, if you're looking at the saveAsTextFile stage time, you're actually looking at the whole processing (Spark only show the operation that acutally performs an action, so saveAsTextFile actually contains all your operations).

Also, it depends on how your HDFS is configured, where the nodes are, and so on. How many nodes you have is also an important factor.

Another thing to check is wether your input file is split in enough pieces (possibly gzipped, it's probably better for the kind of text you'rre talking about)

Regards
Guillaume

I have identified two “issues” in our solution, for which I would need some help (there are probably more, but let’s start with those 2 ;) :

1) [MEMORY] I’m surprised by the amount of memory used when caching my data set. When I load a single CSV file of 1gb, caching it takes about 2.5 gb of RAM.
And that’s with the simple RDD[String] obtained by sc.textFile.
When I use my Record class to represent each record, it’s even worse.
What can I do about this? I guess I should use minimal structures to represent the records (maybe Strings are too heavy and I should use lighter types?)
As I said in my previous mail, I’d like to have more info/examples about how to create a “good” serializable class suitable for RDD (with or without Kryo).

2) [HDFS] Saving my results to HDFS using RDD.saveAsTextFile is REALLY slow. The RDDs I’m saving are just Pairs of (Customer_ID, value) (String, String). Should I use a Hadoop Sequence format ? How to make the storage efficient?

Thanks for your help!

Pierre

On 14 Feb 2014, at 17:41, Mark Hamstra <[hidden email]> wrote:

or prdd.filter { case (_, value) => f(value) }


On Fri, Feb 14, 2014 at 8:30 AM, Evan R. Sparks <[hidden email]> wrote:
See inline


On Fri, Feb 14, 2014 at 5:57 AM, Pierre Borckmans <[hidden email]> wrote:

Hi everyone!

We are just starting a new project with Spark and have a few (newbie) questions.
Any help would be greatly appreciated!

* First a bit of context:

We are working on CSV files, about 400M lines, in which each line represents a record (transaction) from customer_A to customer_B, with around 20 fields (transaction duration, value, type, ...).
We would like to use spark to extract several hundreds of simple variables for each customer.

We have created a Record class to represent the records, and ease their manipulation (filtering on fields, manipulating dates, ...)
        val f = sc.textFile("<a moz-do-not-send="true" href="hdfs://data.csv">hdfs://data.csv")
        val data: RDD[Record] = f.map(Record(_))
where Record.apply(String) is splitting the csv strings and filling the fields accordingly

Now given a set of (field, value), we want to compute a variable for each customer_A, on records that respect the set of fields and values:

        // Filter the initial RDD on the given fields and values
        val filteredData = data.filter(_.check(field1, value1)).filter(_.check(field2, value2)).filter(...)
        // Since we will want to filter later on other combinations of fields and values,
        // we would like to persist these rdd in memory. For instance:
        val filteredData2 = data.filter(_.check(field1, value1)).filter(_.check(field3, value3))
        // should benefit from the RDD already filtered on field1, value1
        // where should we put the cache? in between each filter / at the beginning / at the end ?

You should put your .cache() after the creation of the RDD you plan to re-use. Cache the data you're going to use over and over again. 


        // Compute the variable of interest, grouped by customer
        // The formulation is generic, using functions f, g and h
        // The output is always a rdd of pairs (customerA, variable)
        val variable = filteredData.map( x => ( x.customerA, f( x ) ) ).reduceByKey( g( _, _ ) ).map( h( _ ) ) 

        // For example, if we want to compute the number of records for each customer_A with field1=value1, we define:
        def f(x) = 1
        def g(x,y) = x+y
        def h(x,y) = (x,y)

        // Another example, if we want to compute the number of distinct customer_B are involved in records with customer_A, we define:
        def f(x) = x.customerB
        def g(x,y) =  x ++ y
        def h(x,y) = ( x, y.size )


* Now, my questions ;)

- SERIALIZATION:
        - Is it OK to manipulate RDDs of Record objects or should we stick with simple RDDs of strings, and do all the splitting and computation in each transformation ?
        How to make that efficient in terms of memory and speed?
        I've read the docs about the Tuning (and Kryo serialization) but I'd like to have more info on that...
RDD of Record is OK as long as Record is serializable. Kryo may help you here but I'm not sure how you're representing Record so I'm not sure how much. Either way, it's usually best to get it working first and then worry about serialization/performance later. 

- PERFORMANCE:
        - Is it a good idea to perform all the filters first, and then the groupBy customer, or should we do the reverse?
Yes, performing filters first is a good idea. Spark doesn't do heavy (SQL style) query optimization where filters are pushed down early in the DAG, etc. If your queries look a lot like database queries, you might think about using Shark for this - it'll save you some pain at setup/serialization and may be much faster if your queries only operate on a few columns at a time. 
        In the second situation, how can we filter on the values? I didn't see a filterValues method in the PairRDD API ?
You can do a .filter() on the value of a pairRDD like this

prdd.filter(_._2 == "something") 
//or
prdd.filter(r => f(r._2))


Thanks for any help you can bring us!


Pierre






--
eXenSa
Guillaume PITEL, Président
+33(0)6 25 48 86 80

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05