Shouldn't the UNION of SchemaRDDs produce SchemaRDD ?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Shouldn't the UNION of SchemaRDDs produce SchemaRDD ?

Manoj Samel
Hi,

I am trying SparkSQL based on the example on doc ...

....

val people = sc.textFile("/data/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))


val olderThanTeans = people.where('age > 19)
val youngerThanTeans = people.where('age < 13)
val nonTeans = youngerThanTeans.union(olderThanTeans)

I can do a orderBy('age) on first two (which are SchemaRDD) but not on third. The nonTeans is a UnionRDD that does not supports orderBy. This seems different than the SQL behavior where results of 2 SQL unions is a SQL itself with same functionality ...

Not clear why union of 2 SchemaRDDs does not produces a SchemaRDD ....


Thanks,


Reply | Threaded
Open this post in threaded view
|

Re: Shouldn't the UNION of SchemaRDDs produce SchemaRDD ?

Aaron Davidson
Looks like there is a "unionAll" function on SchemaRDD which will do what you want. The contract of RDD#union is unfortunately too general to allow it to return a SchemaRDD without downcasting.


On Sun, Mar 30, 2014 at 7:56 AM, Manoj Samel <[hidden email]> wrote:
Hi,

I am trying SparkSQL based on the example on doc ...

....

val people = sc.textFile("/data/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))


val olderThanTeans = people.where('age > 19)
val youngerThanTeans = people.where('age < 13)
val nonTeans = youngerThanTeans.union(olderThanTeans)

I can do a orderBy('age) on first two (which are SchemaRDD) but not on third. The nonTeans is a UnionRDD that does not supports orderBy. This seems different than the SQL behavior where results of 2 SQL unions is a SQL itself with same functionality ...

Not clear why union of 2 SchemaRDDs does not produces a SchemaRDD ....


Thanks,



Reply | Threaded
Open this post in threaded view
|

Re: Shouldn't the UNION of SchemaRDDs produce SchemaRDD ?

Manoj Samel
Hi Aaron,

unionAll is a workaround ...

* unionAll preserve duplicate v/s union that does not
* SQL union and unionAll result in same output format i.e. another SQL v/s different RDD types here. 
* Understand the existing union contract issue. This may be a class hierarchy discussion for SchemaRDD, UnionRDD etc. ?

Thanks,




On Sun, Mar 30, 2014 at 11:08 AM, Aaron Davidson <[hidden email]> wrote:
Looks like there is a "unionAll" function on SchemaRDD which will do what you want. The contract of RDD#union is unfortunately too general to allow it to return a SchemaRDD without downcasting.


On Sun, Mar 30, 2014 at 7:56 AM, Manoj Samel <[hidden email]> wrote:
Hi,

I am trying SparkSQL based on the example on doc ...

....

val people = sc.textFile("/data/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))


val olderThanTeans = people.where('age > 19)
val youngerThanTeans = people.where('age < 13)
val nonTeans = youngerThanTeans.union(olderThanTeans)

I can do a orderBy('age) on first two (which are SchemaRDD) but not on third. The nonTeans is a UnionRDD that does not supports orderBy. This seems different than the SQL behavior where results of 2 SQL unions is a SQL itself with same functionality ...

Not clear why union of 2 SchemaRDDs does not produces a SchemaRDD ....


Thanks,




Reply | Threaded
Open this post in threaded view
|

Re: Shouldn't the UNION of SchemaRDDs produce SchemaRDD ?

Michael Armbrust
* unionAll preserve duplicate v/s union that does not

This is true, if you want to eliminate duplicate items you should follow the union with a distinct()
 
* SQL union and unionAll result in same output format i.e. another SQL v/s different RDD types here. 
* Understand the existing union contract issue. This may be a class hierarchy discussion for SchemaRDD, UnionRDD etc. ?

This is unfortunately going to be a limitation of the query DSL since it extends standard RDDs.  It is not possible for us to return specialized types from functions that are already defined in RDD (such as union) as the base RDD class has a very opaque notion of schema, and at this point the API for RDDs is very fixed.  If you use SQL however, you will always get back SchemaRDDs.