Union of 2 RDD's only returns the first one

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Union of 2 RDD's only returns the first one

Aureliano Buendia
Hi,

I'm trying to find a way to create a csv header when using saveAsTextFile, and I came up with this:

(sc.makeRDD(Array("col1,col2,col3"), 1) ++ myRdd.coalesce(1).map(_.mkString(",")))
      .saveAsTextFile("out.csv")


But it only saves the header part. Why is that the union method does not return both RDD's?
Reply | Threaded
Open this post in threaded view
|

Re: Union of 2 RDD's only returns the first one

Patrick Wendell
What is the ++ operator here? Is this something you defined?

Another issue is that RDD's are not ordered, so when you union two
together it doesn't have a well defined ordering.

If you do want to do this you could coalesce into one partition, then
call MapPartitions and return an iterator that first adds your header
and then the rest of the file, then call saveAsTextFile. Keep in mind
this will only work if you coalesce into a single partition.

myRdd.coalesce(1)
.map(_.mkString(",")))
.mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
.saveAsTextFile("out.csv")

- Patrick

On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
<[hidden email]> wrote:

> Hi,
>
> I'm trying to find a way to create a csv header when using saveAsTextFile,
> and I came up with this:
>
> (sc.makeRDD(Array("col1,col2,col3"), 1) ++
> myRdd.coalesce(1).map(_.mkString(",")))
>       .saveAsTextFile("out.csv")
>
> But it only saves the header part. Why is that the union method does not
> return both RDD's?
Reply | Threaded
Open this post in threaded view
|

Re: Union of 2 RDD's only returns the first one

Aureliano Buendia



On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell <[hidden email]> wrote:
What is the ++ operator here? Is this something you defined?

No, it's an alias for union defined in RDD.scala:

def ++(other: RDD[T]): RDD[T] = this.union(other)
 

Another issue is that RDD's are not ordered, so when you union two
together it doesn't have a well defined ordering.

If you do want to do this you could coalesce into one partition, then
call MapPartitions and return an iterator that first adds your header
and then the rest of the file, then call saveAsTextFile. Keep in mind
this will only work if you coalesce into a single partition.

Thanks! I'll give this a try.
 

myRdd.coalesce(1)
.map(_.mkString(",")))
.mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
.saveAsTextFile("out.csv")

- Patrick

On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
<[hidden email]> wrote:
> Hi,
>
> I'm trying to find a way to create a csv header when using saveAsTextFile,
> and I came up with this:
>
> (sc.makeRDD(Array("col1,col2,col3"), 1) ++
> myRdd.coalesce(1).map(_.mkString(",")))
>       .saveAsTextFile("out.csv")
>
> But it only saves the header part. Why is that the union method does not
> return both RDD's?

Reply | Threaded
Open this post in threaded view
|

Re: Union of 2 RDD's only returns the first one

Patrick Wendell
Ah somehow after all this time I've never seen that!

On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia <[hidden email]> wrote:

>
>
>
> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell <[hidden email]>
> wrote:
>>
>> What is the ++ operator here? Is this something you defined?
>
>
> No, it's an alias for union defined in RDD.scala:
>
> def ++(other: RDD[T]): RDD[T] = this.union(other)
>
>>
>>
>> Another issue is that RDD's are not ordered, so when you union two
>> together it doesn't have a well defined ordering.
>>
>> If you do want to do this you could coalesce into one partition, then
>> call MapPartitions and return an iterator that first adds your header
>> and then the rest of the file, then call saveAsTextFile. Keep in mind
>> this will only work if you coalesce into a single partition.
>
>
> Thanks! I'll give this a try.
>
>>
>>
>> myRdd.coalesce(1)
>> .map(_.mkString(",")))
>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>> .saveAsTextFile("out.csv")
>>
>> - Patrick
>>
>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>> <[hidden email]> wrote:
>> > Hi,
>> >
>> > I'm trying to find a way to create a csv header when using
>> > saveAsTextFile,
>> > and I came up with this:
>> >
>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>> > myRdd.coalesce(1).map(_.mkString(",")))
>> >       .saveAsTextFile("out.csv")
>> >
>> > But it only saves the header part. Why is that the union method does not
>> > return both RDD's?
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Union of 2 RDD's only returns the first one

Mingyu Kim
Hi Patrick,

I¹m a little confused about your comment that RDDs are not ordered. As far
as I know, RDDs keep list of partitions that are ordered and this is why I
can call RDD.take() and get the same first k rows every time I call it and
RDD.take() returns the same entries as RDD.map(Š).take() because map
preserves the partition order. RDD order is also what allows me to get the
top k out of RDD by doing RDD.sort().take().

Am I misunderstanding it? Or, is it just when RDD is written to disk that
the order is not well preserved? Thanks in advance!

Mingyu




On 1/22/14, 4:46 PM, "Patrick Wendell" <[hidden email]> wrote:

>Ah somehow after all this time I've never seen that!
>
>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia <[hidden email]>
>wrote:
>>
>>
>>
>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell <[hidden email]>
>> wrote:
>>>
>>> What is the ++ operator here? Is this something you defined?
>>
>>
>> No, it's an alias for union defined in RDD.scala:
>>
>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>
>>>
>>>
>>> Another issue is that RDD's are not ordered, so when you union two
>>> together it doesn't have a well defined ordering.
>>>
>>> If you do want to do this you could coalesce into one partition, then
>>> call MapPartitions and return an iterator that first adds your header
>>> and then the rest of the file, then call saveAsTextFile. Keep in mind
>>> this will only work if you coalesce into a single partition.
>>
>>
>> Thanks! I'll give this a try.
>>
>>>
>>>
>>> myRdd.coalesce(1)
>>> .map(_.mkString(",")))
>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>> .saveAsTextFile("out.csv")
>>>
>>> - Patrick
>>>
>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>> <[hidden email]> wrote:
>>> > Hi,
>>> >
>>> > I'm trying to find a way to create a csv header when using
>>> > saveAsTextFile,
>>> > and I came up with this:
>>> >
>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>> >       .saveAsTextFile("out.csv")
>>> >
>>> > But it only saves the header part. Why is that the union method does
>>>not
>>> > return both RDD's?
>>
>>

smime.p7s (7K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Union of 2 RDD's only returns the first one

Patrick Wendell
You are right, once you sort() the RDD, then yes it has a well defined ordering.

But that ordering is lost as soon as you transform the RDD, including
if you union it with another RDD.

On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <[hidden email]> wrote:

> Hi Patrick,
>
> I¹m a little confused about your comment that RDDs are not ordered. As far
> as I know, RDDs keep list of partitions that are ordered and this is why I
> can call RDD.take() and get the same first k rows every time I call it and
> RDD.take() returns the same entries as RDD.map(Š).take() because map
> preserves the partition order. RDD order is also what allows me to get the
> top k out of RDD by doing RDD.sort().take().
>
> Am I misunderstanding it? Or, is it just when RDD is written to disk that
> the order is not well preserved? Thanks in advance!
>
> Mingyu
>
>
>
>
> On 1/22/14, 4:46 PM, "Patrick Wendell" <[hidden email]> wrote:
>
>>Ah somehow after all this time I've never seen that!
>>
>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia <[hidden email]>
>>wrote:
>>>
>>>
>>>
>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell <[hidden email]>
>>> wrote:
>>>>
>>>> What is the ++ operator here? Is this something you defined?
>>>
>>>
>>> No, it's an alias for union defined in RDD.scala:
>>>
>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>
>>>>
>>>>
>>>> Another issue is that RDD's are not ordered, so when you union two
>>>> together it doesn't have a well defined ordering.
>>>>
>>>> If you do want to do this you could coalesce into one partition, then
>>>> call MapPartitions and return an iterator that first adds your header
>>>> and then the rest of the file, then call saveAsTextFile. Keep in mind
>>>> this will only work if you coalesce into a single partition.
>>>
>>>
>>> Thanks! I'll give this a try.
>>>
>>>>
>>>>
>>>> myRdd.coalesce(1)
>>>> .map(_.mkString(",")))
>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>>> .saveAsTextFile("out.csv")
>>>>
>>>> - Patrick
>>>>
>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>> <[hidden email]> wrote:
>>>> > Hi,
>>>> >
>>>> > I'm trying to find a way to create a csv header when using
>>>> > saveAsTextFile,
>>>> > and I came up with this:
>>>> >
>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>>> >       .saveAsTextFile("out.csv")
>>>> >
>>>> > But it only saves the header part. Why is that the union method does
>>>>not
>>>> > return both RDD's?
>>>
>>>
Reply | Threaded
Open this post in threaded view
|

Re: Union of 2 RDD's only returns the first one

Mingyu Kim
Thanks for the quick response!

To better understand it, the reason sorted RDD has a well-defined ordering
is because sortedRDD.getPartitions() returns the partitions in the right
order and each partition internally is properly sorted. So, if you have

var rdd = sc.parallelize([2, 1, 3]);
var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]

Since mapValues doesn’t change the order of partitions not change the
order of rows within the partitions, I think “mapped” should have the
exact same order as “sorted”. Sure, if a transform involves shuffling, the
order will change. Am I mistaken? Is there an extra detail in sortedRDD
that guarantees a well-defined ordering?

If it’s true that the order of partitions returned by RDD.getPartitions()
and the row orders within the partitions determine the row order, I’m not
sure why union doesn’t respect the order because union operation simply
concatenates the two lists of partitions from the two RDDs.

Mingyu




On 4/29/14, 10:25 PM, "Patrick Wendell" <[hidden email]> wrote:

>You are right, once you sort() the RDD, then yes it has a well defined
>ordering.
>
>But that ordering is lost as soon as you transform the RDD, including
>if you union it with another RDD.
>
>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <[hidden email]> wrote:
>> Hi Patrick,
>>
>> I¹m a little confused about your comment that RDDs are not ordered. As
>>far
>> as I know, RDDs keep list of partitions that are ordered and this is
>>why I
>> can call RDD.take() and get the same first k rows every time I call it
>>and
>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>> preserves the partition order. RDD order is also what allows me to get
>>the
>> top k out of RDD by doing RDD.sort().take().
>>
>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>that
>> the order is not well preserved? Thanks in advance!
>>
>> Mingyu
>>
>>
>>
>>
>> On 1/22/14, 4:46 PM, "Patrick Wendell" <[hidden email]> wrote:
>>
>>>Ah somehow after all this time I've never seen that!
>>>
>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>><[hidden email]>
>>>wrote:
>>>>
>>>>
>>>>
>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell <[hidden email]>
>>>> wrote:
>>>>>
>>>>> What is the ++ operator here? Is this something you defined?
>>>>
>>>>
>>>> No, it's an alias for union defined in RDD.scala:
>>>>
>>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>>
>>>>>
>>>>>
>>>>> Another issue is that RDD's are not ordered, so when you union two
>>>>> together it doesn't have a well defined ordering.
>>>>>
>>>>> If you do want to do this you could coalesce into one partition, then
>>>>> call MapPartitions and return an iterator that first adds your header
>>>>> and then the rest of the file, then call saveAsTextFile. Keep in mind
>>>>> this will only work if you coalesce into a single partition.
>>>>
>>>>
>>>> Thanks! I'll give this a try.
>>>>
>>>>>
>>>>>
>>>>> myRdd.coalesce(1)
>>>>> .map(_.mkString(",")))
>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>>>> .saveAsTextFile("out.csv")
>>>>>
>>>>> - Patrick
>>>>>
>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>>> <[hidden email]> wrote:
>>>>> > Hi,
>>>>> >
>>>>> > I'm trying to find a way to create a csv header when using
>>>>> > saveAsTextFile,
>>>>> > and I came up with this:
>>>>> >
>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>>>> >       .saveAsTextFile("out.csv")
>>>>> >
>>>>> > But it only saves the header part. Why is that the union method
>>>>>does
>>>>>not
>>>>> > return both RDD's?
>>>>
>>>>

smime.p7s (7K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Union of 2 RDD's only returns the first one

Patrick Wendell
If you call map() on an RDD it will retain the ordering it had before,
but that is not necessarily a correct sort order for the new RDD.

var rdd = sc.parallelize([2, 1, 3]);
var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]

Note that mapped is no longer sorted.

When you union two RDD's together it will effectively concatenate the
two orderings, which is also not a valid sorted order on the new RDD:

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5]

On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim <[hidden email]> wrote:

> Thanks for the quick response!
>
> To better understand it, the reason sorted RDD has a well-defined ordering
> is because sortedRDD.getPartitions() returns the partitions in the right
> order and each partition internally is properly sorted. So, if you have
>
> var rdd = sc.parallelize([2, 1, 3]);
> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>
> Since mapValues doesn’t change the order of partitions not change the
> order of rows within the partitions, I think “mapped” should have the
> exact same order as “sorted”. Sure, if a transform involves shuffling, the
> order will change. Am I mistaken? Is there an extra detail in sortedRDD
> that guarantees a well-defined ordering?
>
> If it’s true that the order of partitions returned by RDD.getPartitions()
> and the row orders within the partitions determine the row order, I’m not
> sure why union doesn’t respect the order because union operation simply
> concatenates the two lists of partitions from the two RDDs.
>
> Mingyu
>
>
>
>
> On 4/29/14, 10:25 PM, "Patrick Wendell" <[hidden email]> wrote:
>
>>You are right, once you sort() the RDD, then yes it has a well defined
>>ordering.
>>
>>But that ordering is lost as soon as you transform the RDD, including
>>if you union it with another RDD.
>>
>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <[hidden email]> wrote:
>>> Hi Patrick,
>>>
>>> I¹m a little confused about your comment that RDDs are not ordered. As
>>>far
>>> as I know, RDDs keep list of partitions that are ordered and this is
>>>why I
>>> can call RDD.take() and get the same first k rows every time I call it
>>>and
>>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>>> preserves the partition order. RDD order is also what allows me to get
>>>the
>>> top k out of RDD by doing RDD.sort().take().
>>>
>>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>>that
>>> the order is not well preserved? Thanks in advance!
>>>
>>> Mingyu
>>>
>>>
>>>
>>>
>>> On 1/22/14, 4:46 PM, "Patrick Wendell" <[hidden email]> wrote:
>>>
>>>>Ah somehow after all this time I've never seen that!
>>>>
>>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>>><[hidden email]>
>>>>wrote:
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> What is the ++ operator here? Is this something you defined?
>>>>>
>>>>>
>>>>> No, it's an alias for union defined in RDD.scala:
>>>>>
>>>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>>>
>>>>>>
>>>>>>
>>>>>> Another issue is that RDD's are not ordered, so when you union two
>>>>>> together it doesn't have a well defined ordering.
>>>>>>
>>>>>> If you do want to do this you could coalesce into one partition, then
>>>>>> call MapPartitions and return an iterator that first adds your header
>>>>>> and then the rest of the file, then call saveAsTextFile. Keep in mind
>>>>>> this will only work if you coalesce into a single partition.
>>>>>
>>>>>
>>>>> Thanks! I'll give this a try.
>>>>>
>>>>>>
>>>>>>
>>>>>> myRdd.coalesce(1)
>>>>>> .map(_.mkString(",")))
>>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>>>>> .saveAsTextFile("out.csv")
>>>>>>
>>>>>> - Patrick
>>>>>>
>>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>>>> <[hidden email]> wrote:
>>>>>> > Hi,
>>>>>> >
>>>>>> > I'm trying to find a way to create a csv header when using
>>>>>> > saveAsTextFile,
>>>>>> > and I came up with this:
>>>>>> >
>>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>>>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>>>>> >       .saveAsTextFile("out.csv")
>>>>>> >
>>>>>> > But it only saves the header part. Why is that the union method
>>>>>>does
>>>>>>not
>>>>>> > return both RDD's?
>>>>>
>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: Union of 2 RDD's only returns the first one

Mingyu Kim
Yes, that’s what I meant. Sure, the numbers might not be actually sorted,
but the order of rows semantically are kept throughout non-shuffling
transforms. I’m on board with you on union as well.

Back to the original question, then, why is it important to coalesce to a
single partition? When you union two RDDs, for example, rdd1 = [“a, b,
c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three
lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the
two reds are concatenated.

Mingyu




On 4/29/14, 10:55 PM, "Patrick Wendell" <[hidden email]> wrote:

>If you call map() on an RDD it will retain the ordering it had before,
>but that is not necessarily a correct sort order for the new RDD.
>
>var rdd = sc.parallelize([2, 1, 3]);
>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
>
>Note that mapped is no longer sorted.
>
>When you union two RDD's together it will effectively concatenate the
>two orderings, which is also not a valid sorted order on the new RDD:
>
>rdd1 = [1,2,3]
>rdd2 = [1,4,5]
>
>rdd1.union(rdd2) = [1,2,3,1,4,5]
>
>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim <[hidden email]> wrote:
>> Thanks for the quick response!
>>
>> To better understand it, the reason sorted RDD has a well-defined
>>ordering
>> is because sortedRDD.getPartitions() returns the partitions in the right
>> order and each partition internally is properly sorted. So, if you have
>>
>> var rdd = sc.parallelize([2, 1, 3]);
>> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>>
>> Since mapValues doesn’t change the order of partitions not change the
>> order of rows within the partitions, I think “mapped” should have the
>> exact same order as “sorted”. Sure, if a transform involves shuffling,
>>the
>> order will change. Am I mistaken? Is there an extra detail in sortedRDD
>> that guarantees a well-defined ordering?
>>
>> If it’s true that the order of partitions returned by
>>RDD.getPartitions()
>> and the row orders within the partitions determine the row order, I’m
>>not
>> sure why union doesn’t respect the order because union operation simply
>> concatenates the two lists of partitions from the two RDDs.
>>
>> Mingyu
>>
>>
>>
>>
>> On 4/29/14, 10:25 PM, "Patrick Wendell" <[hidden email]> wrote:
>>
>>>You are right, once you sort() the RDD, then yes it has a well defined
>>>ordering.
>>>
>>>But that ordering is lost as soon as you transform the RDD, including
>>>if you union it with another RDD.
>>>
>>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <[hidden email]> wrote:
>>>> Hi Patrick,
>>>>
>>>> I¹m a little confused about your comment that RDDs are not ordered. As
>>>>far
>>>> as I know, RDDs keep list of partitions that are ordered and this is
>>>>why I
>>>> can call RDD.take() and get the same first k rows every time I call it
>>>>and
>>>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>>>> preserves the partition order. RDD order is also what allows me to get
>>>>the
>>>> top k out of RDD by doing RDD.sort().take().
>>>>
>>>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>>>that
>>>> the order is not well preserved? Thanks in advance!
>>>>
>>>> Mingyu
>>>>
>>>>
>>>>
>>>>
>>>> On 1/22/14, 4:46 PM, "Patrick Wendell" <[hidden email]> wrote:
>>>>
>>>>>Ah somehow after all this time I've never seen that!
>>>>>
>>>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>>>><[hidden email]>
>>>>>wrote:
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
>>>>>><[hidden email]>
>>>>>> wrote:
>>>>>>>
>>>>>>> What is the ++ operator here? Is this something you defined?
>>>>>>
>>>>>>
>>>>>> No, it's an alias for union defined in RDD.scala:
>>>>>>
>>>>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Another issue is that RDD's are not ordered, so when you union two
>>>>>>> together it doesn't have a well defined ordering.
>>>>>>>
>>>>>>> If you do want to do this you could coalesce into one partition,
>>>>>>>then
>>>>>>> call MapPartitions and return an iterator that first adds your
>>>>>>>header
>>>>>>> and then the rest of the file, then call saveAsTextFile. Keep in
>>>>>>>mind
>>>>>>> this will only work if you coalesce into a single partition.
>>>>>>
>>>>>>
>>>>>> Thanks! I'll give this a try.
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> myRdd.coalesce(1)
>>>>>>> .map(_.mkString(",")))
>>>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>>>>>> .saveAsTextFile("out.csv")
>>>>>>>
>>>>>>> - Patrick
>>>>>>>
>>>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>>>>> <[hidden email]> wrote:
>>>>>>> > Hi,
>>>>>>> >
>>>>>>> > I'm trying to find a way to create a csv header when using
>>>>>>> > saveAsTextFile,
>>>>>>> > and I came up with this:
>>>>>>> >
>>>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>>>>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>>>>>> >       .saveAsTextFile("out.csv")
>>>>>>> >
>>>>>>> > But it only saves the header part. Why is that the union method
>>>>>>>does
>>>>>>>not
>>>>>>> > return both RDD's?
>>>>>>
>>>>>>

smime.p7s (7K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Union of 2 RDD's only returns the first one

Patrick Wendell
I don't think we guarantee anywhere that union(A, B) will behave by
concatenating the partitions, it just happens to be an artifact of the
current implementation.

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
wouldn't violate the contract of union

AFIAK the only guarentee is the resulting RDD will contain all elements.

- Patrick

On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim <[hidden email]> wrote:

> Yes, that’s what I meant. Sure, the numbers might not be actually sorted,
> but the order of rows semantically are kept throughout non-shuffling
> transforms. I’m on board with you on union as well.
>
> Back to the original question, then, why is it important to coalesce to a
> single partition? When you union two RDDs, for example, rdd1 = [“a, b,
> c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
> rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three
> lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the
> two reds are concatenated.
>
> Mingyu
>
>
>
>
> On 4/29/14, 10:55 PM, "Patrick Wendell" <[hidden email]> wrote:
>
>>If you call map() on an RDD it will retain the ordering it had before,
>>but that is not necessarily a correct sort order for the new RDD.
>>
>>var rdd = sc.parallelize([2, 1, 3]);
>>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
>>
>>Note that mapped is no longer sorted.
>>
>>When you union two RDD's together it will effectively concatenate the
>>two orderings, which is also not a valid sorted order on the new RDD:
>>
>>rdd1 = [1,2,3]
>>rdd2 = [1,4,5]
>>
>>rdd1.union(rdd2) = [1,2,3,1,4,5]
>>
>>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim <[hidden email]> wrote:
>>> Thanks for the quick response!
>>>
>>> To better understand it, the reason sorted RDD has a well-defined
>>>ordering
>>> is because sortedRDD.getPartitions() returns the partitions in the right
>>> order and each partition internally is properly sorted. So, if you have
>>>
>>> var rdd = sc.parallelize([2, 1, 3]);
>>> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>>>
>>> Since mapValues doesn’t change the order of partitions not change the
>>> order of rows within the partitions, I think “mapped” should have the
>>> exact same order as “sorted”. Sure, if a transform involves shuffling,
>>>the
>>> order will change. Am I mistaken? Is there an extra detail in sortedRDD
>>> that guarantees a well-defined ordering?
>>>
>>> If it’s true that the order of partitions returned by
>>>RDD.getPartitions()
>>> and the row orders within the partitions determine the row order, I’m
>>>not
>>> sure why union doesn’t respect the order because union operation simply
>>> concatenates the two lists of partitions from the two RDDs.
>>>
>>> Mingyu
>>>
>>>
>>>
>>>
>>> On 4/29/14, 10:25 PM, "Patrick Wendell" <[hidden email]> wrote:
>>>
>>>>You are right, once you sort() the RDD, then yes it has a well defined
>>>>ordering.
>>>>
>>>>But that ordering is lost as soon as you transform the RDD, including
>>>>if you union it with another RDD.
>>>>
>>>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <[hidden email]> wrote:
>>>>> Hi Patrick,
>>>>>
>>>>> I¹m a little confused about your comment that RDDs are not ordered. As
>>>>>far
>>>>> as I know, RDDs keep list of partitions that are ordered and this is
>>>>>why I
>>>>> can call RDD.take() and get the same first k rows every time I call it
>>>>>and
>>>>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>>>>> preserves the partition order. RDD order is also what allows me to get
>>>>>the
>>>>> top k out of RDD by doing RDD.sort().take().
>>>>>
>>>>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>>>>that
>>>>> the order is not well preserved? Thanks in advance!
>>>>>
>>>>> Mingyu
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 1/22/14, 4:46 PM, "Patrick Wendell" <[hidden email]> wrote:
>>>>>
>>>>>>Ah somehow after all this time I've never seen that!
>>>>>>
>>>>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>>>>><[hidden email]>
>>>>>>wrote:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
>>>>>>><[hidden email]>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> What is the ++ operator here? Is this something you defined?
>>>>>>>
>>>>>>>
>>>>>>> No, it's an alias for union defined in RDD.scala:
>>>>>>>
>>>>>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Another issue is that RDD's are not ordered, so when you union two
>>>>>>>> together it doesn't have a well defined ordering.
>>>>>>>>
>>>>>>>> If you do want to do this you could coalesce into one partition,
>>>>>>>>then
>>>>>>>> call MapPartitions and return an iterator that first adds your
>>>>>>>>header
>>>>>>>> and then the rest of the file, then call saveAsTextFile. Keep in
>>>>>>>>mind
>>>>>>>> this will only work if you coalesce into a single partition.
>>>>>>>
>>>>>>>
>>>>>>> Thanks! I'll give this a try.
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> myRdd.coalesce(1)
>>>>>>>> .map(_.mkString(",")))
>>>>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>>>>>>> .saveAsTextFile("out.csv")
>>>>>>>>
>>>>>>>> - Patrick
>>>>>>>>
>>>>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>>>>>> <[hidden email]> wrote:
>>>>>>>> > Hi,
>>>>>>>> >
>>>>>>>> > I'm trying to find a way to create a csv header when using
>>>>>>>> > saveAsTextFile,
>>>>>>>> > and I came up with this:
>>>>>>>> >
>>>>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>>>>>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>>>>>>> >       .saveAsTextFile("out.csv")
>>>>>>>> >
>>>>>>>> > But it only saves the header part. Why is that the union method
>>>>>>>>does
>>>>>>>>not
>>>>>>>> > return both RDD's?
>>>>>>>
>>>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: Union of 2 RDD's only returns the first one

Mingyu Kim
Okay, that makes sense. It’d be great if this can be better documented at
some point, because the only way to find out about the resulting RDD row
order is by looking at the code.

Thanks for the discussion!

Mingyu




On 4/29/14, 11:59 PM, "Patrick Wendell" <[hidden email]> wrote:

>I don't think we guarantee anywhere that union(A, B) will behave by
>concatenating the partitions, it just happens to be an artifact of the
>current implementation.
>
>rdd1 = [1,2,3]
>rdd2 = [1,4,5]
>
>rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
>rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
>wouldn't violate the contract of union
>
>AFIAK the only guarentee is the resulting RDD will contain all elements.
>
>- Patrick
>
>On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim <[hidden email]> wrote:
>> Yes, that’s what I meant. Sure, the numbers might not be actually
>>sorted,
>> but the order of rows semantically are kept throughout non-shuffling
>> transforms. I’m on board with you on union as well.
>>
>> Back to the original question, then, why is it important to coalesce to
>>a
>> single partition? When you union two RDDs, for example, rdd1 = [“a, b,
>> c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
>> rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
>>three
>> lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
>>the
>> two reds are concatenated.
>>
>> Mingyu
>>
>>
>>
>>
>> On 4/29/14, 10:55 PM, "Patrick Wendell" <[hidden email]> wrote:
>>
>>>If you call map() on an RDD it will retain the ordering it had before,
>>>but that is not necessarily a correct sort order for the new RDD.
>>>
>>>var rdd = sc.parallelize([2, 1, 3]);
>>>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
>>>
>>>Note that mapped is no longer sorted.
>>>
>>>When you union two RDD's together it will effectively concatenate the
>>>two orderings, which is also not a valid sorted order on the new RDD:
>>>
>>>rdd1 = [1,2,3]
>>>rdd2 = [1,4,5]
>>>
>>>rdd1.union(rdd2) = [1,2,3,1,4,5]
>>>
>>>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim <[hidden email]> wrote:
>>>> Thanks for the quick response!
>>>>
>>>> To better understand it, the reason sorted RDD has a well-defined
>>>>ordering
>>>> is because sortedRDD.getPartitions() returns the partitions in the
>>>>right
>>>> order and each partition internally is properly sorted. So, if you
>>>>have
>>>>
>>>> var rdd = sc.parallelize([2, 1, 3]);
>>>> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>>> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>>>>
>>>> Since mapValues doesn’t change the order of partitions not change the
>>>> order of rows within the partitions, I think “mapped” should have the
>>>> exact same order as “sorted”. Sure, if a transform involves shuffling,
>>>>the
>>>> order will change. Am I mistaken? Is there an extra detail in
>>>>sortedRDD
>>>> that guarantees a well-defined ordering?
>>>>
>>>> If it’s true that the order of partitions returned by
>>>>RDD.getPartitions()
>>>> and the row orders within the partitions determine the row order, I’m
>>>>not
>>>> sure why union doesn’t respect the order because union operation
>>>>simply
>>>> concatenates the two lists of partitions from the two RDDs.
>>>>
>>>> Mingyu
>>>>
>>>>
>>>>
>>>>
>>>> On 4/29/14, 10:25 PM, "Patrick Wendell" <[hidden email]> wrote:
>>>>
>>>>>You are right, once you sort() the RDD, then yes it has a well defined
>>>>>ordering.
>>>>>
>>>>>But that ordering is lost as soon as you transform the RDD, including
>>>>>if you union it with another RDD.
>>>>>
>>>>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <[hidden email]>
>>>>>wrote:
>>>>>> Hi Patrick,
>>>>>>
>>>>>> I¹m a little confused about your comment that RDDs are not ordered.
>>>>>>As
>>>>>>far
>>>>>> as I know, RDDs keep list of partitions that are ordered and this is
>>>>>>why I
>>>>>> can call RDD.take() and get the same first k rows every time I call
>>>>>>it
>>>>>>and
>>>>>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>>>>>> preserves the partition order. RDD order is also what allows me to
>>>>>>get
>>>>>>the
>>>>>> top k out of RDD by doing RDD.sort().take().
>>>>>>
>>>>>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>>>>>that
>>>>>> the order is not well preserved? Thanks in advance!
>>>>>>
>>>>>> Mingyu
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 1/22/14, 4:46 PM, "Patrick Wendell" <[hidden email]> wrote:
>>>>>>
>>>>>>>Ah somehow after all this time I've never seen that!
>>>>>>>
>>>>>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>>>>>><[hidden email]>
>>>>>>>wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
>>>>>>>><[hidden email]>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> What is the ++ operator here? Is this something you defined?
>>>>>>>>
>>>>>>>>
>>>>>>>> No, it's an alias for union defined in RDD.scala:
>>>>>>>>
>>>>>>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Another issue is that RDD's are not ordered, so when you union
>>>>>>>>>two
>>>>>>>>> together it doesn't have a well defined ordering.
>>>>>>>>>
>>>>>>>>> If you do want to do this you could coalesce into one partition,
>>>>>>>>>then
>>>>>>>>> call MapPartitions and return an iterator that first adds your
>>>>>>>>>header
>>>>>>>>> and then the rest of the file, then call saveAsTextFile. Keep in
>>>>>>>>>mind
>>>>>>>>> this will only work if you coalesce into a single partition.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks! I'll give this a try.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> myRdd.coalesce(1)
>>>>>>>>> .map(_.mkString(",")))
>>>>>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>>>>>>>> .saveAsTextFile("out.csv")
>>>>>>>>>
>>>>>>>>> - Patrick
>>>>>>>>>
>>>>>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>> > Hi,
>>>>>>>>> >
>>>>>>>>> > I'm trying to find a way to create a csv header when using
>>>>>>>>> > saveAsTextFile,
>>>>>>>>> > and I came up with this:
>>>>>>>>> >
>>>>>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>>>>>>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>>>>>>>> >       .saveAsTextFile("out.csv")
>>>>>>>>> >
>>>>>>>>> > But it only saves the header part. Why is that the union method
>>>>>>>>>does
>>>>>>>>>not
>>>>>>>>> > return both RDD's?
>>>>>>>>
>>>>>>>>

smime.p7s (7K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Union of 2 RDD's only returns the first one

Mark Hamstra
Which is what you shouldn't be doing as an API user, since that implementation code might change.  The documentation doesn't mention a row ordering guarantee, so none should be assumed.  

It is hard enough for us to correctly document all of the things that the API does do.  We really shouldn't be forced into the expectation that we will also fully document everything that the API doesn't do. 


On Wed, Apr 30, 2014 at 11:13 AM, Mingyu Kim <[hidden email]> wrote:
Okay, that makes sense. It’d be great if this can be better documented at
some point, because the only way to find out about the resulting RDD row
order is by looking at the code.

Thanks for the discussion!

Mingyu




On 4/29/14, 11:59 PM, "Patrick Wendell" <[hidden email]> wrote:

>I don't think we guarantee anywhere that union(A, B) will behave by
>concatenating the partitions, it just happens to be an artifact of the
>current implementation.
>
>rdd1 = [1,2,3]
>rdd2 = [1,4,5]
>
>rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
>rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
>wouldn't violate the contract of union
>
>AFIAK the only guarentee is the resulting RDD will contain all elements.
>
>- Patrick
>
>On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim <[hidden email]> wrote:
>> Yes, that’s what I meant. Sure, the numbers might not be actually
>>sorted,
>> but the order of rows semantically are kept throughout non-shuffling
>> transforms. I’m on board with you on union as well.
>>
>> Back to the original question, then, why is it important to coalesce to
>>a
>> single partition? When you union two RDDs, for example, rdd1 = [“a, b,
>> c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
>> rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
>>three
>> lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
>>the
>> two reds are concatenated.
>>
>> Mingyu
>>
>>
>>
>>
>> On 4/29/14, 10:55 PM, "Patrick Wendell" <[hidden email]> wrote:
>>
>>>If you call map() on an RDD it will retain the ordering it had before,
>>>but that is not necessarily a correct sort order for the new RDD.
>>>
>>>var rdd = sc.parallelize([2, 1, 3]);
>>>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
>>>
>>>Note that mapped is no longer sorted.
>>>
>>>When you union two RDD's together it will effectively concatenate the
>>>two orderings, which is also not a valid sorted order on the new RDD:
>>>
>>>rdd1 = [1,2,3]
>>>rdd2 = [1,4,5]
>>>
>>>rdd1.union(rdd2) = [1,2,3,1,4,5]
>>>
>>>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim <[hidden email]> wrote:
>>>> Thanks for the quick response!
>>>>
>>>> To better understand it, the reason sorted RDD has a well-defined
>>>>ordering
>>>> is because sortedRDD.getPartitions() returns the partitions in the
>>>>right
>>>> order and each partition internally is properly sorted. So, if you
>>>>have
>>>>
>>>> var rdd = sc.parallelize([2, 1, 3]);
>>>> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>>> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>>>>
>>>> Since mapValues doesn’t change the order of partitions not change the
>>>> order of rows within the partitions, I think “mapped” should have the
>>>> exact same order as “sorted”. Sure, if a transform involves shuffling,
>>>>the
>>>> order will change. Am I mistaken? Is there an extra detail in
>>>>sortedRDD
>>>> that guarantees a well-defined ordering?
>>>>
>>>> If it’s true that the order of partitions returned by
>>>>RDD.getPartitions()
>>>> and the row orders within the partitions determine the row order, I’m
>>>>not
>>>> sure why union doesn’t respect the order because union operation
>>>>simply
>>>> concatenates the two lists of partitions from the two RDDs.
>>>>
>>>> Mingyu
>>>>
>>>>
>>>>
>>>>
>>>> On 4/29/14, 10:25 PM, "Patrick Wendell" <[hidden email]> wrote:
>>>>
>>>>>You are right, once you sort() the RDD, then yes it has a well defined
>>>>>ordering.
>>>>>
>>>>>But that ordering is lost as soon as you transform the RDD, including
>>>>>if you union it with another RDD.
>>>>>
>>>>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <[hidden email]>
>>>>>wrote:
>>>>>> Hi Patrick,
>>>>>>
>>>>>> I¹m a little confused about your comment that RDDs are not ordered.
>>>>>>As
>>>>>>far
>>>>>> as I know, RDDs keep list of partitions that are ordered and this is
>>>>>>why I
>>>>>> can call RDD.take() and get the same first k rows every time I call
>>>>>>it
>>>>>>and
>>>>>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>>>>>> preserves the partition order. RDD order is also what allows me to
>>>>>>get
>>>>>>the
>>>>>> top k out of RDD by doing RDD.sort().take().
>>>>>>
>>>>>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>>>>>that
>>>>>> the order is not well preserved? Thanks in advance!
>>>>>>
>>>>>> Mingyu
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 1/22/14, 4:46 PM, "Patrick Wendell" <[hidden email]> wrote:
>>>>>>
>>>>>>>Ah somehow after all this time I've never seen that!
>>>>>>>
>>>>>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>>>>>><[hidden email]>
>>>>>>>wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
>>>>>>>><[hidden email]>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> What is the ++ operator here? Is this something you defined?
>>>>>>>>
>>>>>>>>
>>>>>>>> No, it's an alias for union defined in RDD.scala:
>>>>>>>>
>>>>>>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Another issue is that RDD's are not ordered, so when you union
>>>>>>>>>two
>>>>>>>>> together it doesn't have a well defined ordering.
>>>>>>>>>
>>>>>>>>> If you do want to do this you could coalesce into one partition,
>>>>>>>>>then
>>>>>>>>> call MapPartitions and return an iterator that first adds your
>>>>>>>>>header
>>>>>>>>> and then the rest of the file, then call saveAsTextFile. Keep in
>>>>>>>>>mind
>>>>>>>>> this will only work if you coalesce into a single partition.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks! I'll give this a try.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> myRdd.coalesce(1)
>>>>>>>>> .map(_.mkString(",")))
>>>>>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>>>>>>>> .saveAsTextFile("out.csv")
>>>>>>>>>
>>>>>>>>> - Patrick
>>>>>>>>>
>>>>>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>> > Hi,
>>>>>>>>> >
>>>>>>>>> > I'm trying to find a way to create a csv header when using
>>>>>>>>> > saveAsTextFile,
>>>>>>>>> > and I came up with this:
>>>>>>>>> >
>>>>>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>>>>>>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>>>>>>>> >       .saveAsTextFile("out.csv")
>>>>>>>>> >
>>>>>>>>> > But it only saves the header part. Why is that the union method
>>>>>>>>>does
>>>>>>>>>not
>>>>>>>>> > return both RDD's?
>>>>>>>>
>>>>>>>>

Reply | Threaded
Open this post in threaded view
|

Re: Union of 2 RDD's only returns the first one

Mingyu Kim
I agree with you in general that as an API user, I shouldn’t be relying on code. However, without looking at the code, there is no way for me to find out even whether map() keeps the row order. Without the knowledge at all, I’d need to do “sort” every time I need certain things in a certain order. (and, sort is really expensive.) On the other hand, if I can assume, say, “filter” or “map” doesn’t shuffle the rows around, I can do the sort once and assume that the order is retained throughout such operations saving a lot of time from doing unnecessary sorts.

Mingyu

From: Mark Hamstra <[hidden email]>
Reply-To: "[hidden email]" <[hidden email]>
Date: Wednesday, April 30, 2014 at 11:36 AM
To: "[hidden email]" <[hidden email]>
Subject: Re: Union of 2 RDD's only returns the first one

Which is what you shouldn't be doing as an API user, since that implementation code might change.  The documentation doesn't mention a row ordering guarantee, so none should be assumed.  

It is hard enough for us to correctly document all of the things that the API does do.  We really shouldn't be forced into the expectation that we will also fully document everything that the API doesn't do. 


On Wed, Apr 30, 2014 at 11:13 AM, Mingyu Kim <[hidden email]> wrote:
Okay, that makes sense. It’d be great if this can be better documented at
some point, because the only way to find out about the resulting RDD row
order is by looking at the code.

Thanks for the discussion!

Mingyu




On 4/29/14, 11:59 PM, "Patrick Wendell" <[hidden email]> wrote:

>I don't think we guarantee anywhere that union(A, B) will behave by
>concatenating the partitions, it just happens to be an artifact of the
>current implementation.
>
>rdd1 = [1,2,3]
>rdd2 = [1,4,5]
>
>rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
>rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
>wouldn't violate the contract of union
>
>AFIAK the only guarentee is the resulting RDD will contain all elements.
>
>- Patrick
>
>On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim <[hidden email]> wrote:
>> Yes, that’s what I meant. Sure, the numbers might not be actually
>>sorted,
>> but the order of rows semantically are kept throughout non-shuffling
>> transforms. I’m on board with you on union as well.
>>
>> Back to the original question, then, why is it important to coalesce to
>>a
>> single partition? When you union two RDDs, for example, rdd1 = [“a, b,
>> c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
>> rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
>>three
>> lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
>>the
>> two reds are concatenated.
>>
>> Mingyu
>>
>>
>>
>>
>> On 4/29/14, 10:55 PM, "Patrick Wendell" <[hidden email]> wrote:
>>
>>>If you call map() on an RDD it will retain the ordering it had before,
>>>but that is not necessarily a correct sort order for the new RDD.
>>>
>>>var rdd = sc.parallelize([2, 1, 3]);
>>>var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>>var mapped = sorted.mapValues(x => 3 - x); // should be [2, 1, 0]
>>>
>>>Note that mapped is no longer sorted.
>>>
>>>When you union two RDD's together it will effectively concatenate the
>>>two orderings, which is also not a valid sorted order on the new RDD:
>>>
>>>rdd1 = [1,2,3]
>>>rdd2 = [1,4,5]
>>>
>>>rdd1.union(rdd2) = [1,2,3,1,4,5]
>>>
>>>On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim <[hidden email]> wrote:
>>>> Thanks for the quick response!
>>>>
>>>> To better understand it, the reason sorted RDD has a well-defined
>>>>ordering
>>>> is because sortedRDD.getPartitions() returns the partitions in the
>>>>right
>>>> order and each partition internally is properly sorted. So, if you
>>>>have
>>>>
>>>> var rdd = sc.parallelize([2, 1, 3]);
>>>> var sorted = rdd.map(x => (x, x)).sort(); // should be [1, 2, 3]
>>>> var mapped = sorted.mapValues(x => x + 1); // should be [2, 3, 4]
>>>>
>>>> Since mapValues doesn’t change the order of partitions not change the
>>>> order of rows within the partitions, I think “mapped” should have the
>>>> exact same order as “sorted”. Sure, if a transform involves shuffling,
>>>>the
>>>> order will change. Am I mistaken? Is there an extra detail in
>>>>sortedRDD
>>>> that guarantees a well-defined ordering?
>>>>
>>>> If it’s true that the order of partitions returned by
>>>>RDD.getPartitions()
>>>> and the row orders within the partitions determine the row order, I’m
>>>>not
>>>> sure why union doesn’t respect the order because union operation
>>>>simply
>>>> concatenates the two lists of partitions from the two RDDs.
>>>>
>>>> Mingyu
>>>>
>>>>
>>>>
>>>>
>>>> On 4/29/14, 10:25 PM, "Patrick Wendell" <[hidden email]> wrote:
>>>>
>>>>>You are right, once you sort() the RDD, then yes it has a well defined
>>>>>ordering.
>>>>>
>>>>>But that ordering is lost as soon as you transform the RDD, including
>>>>>if you union it with another RDD.
>>>>>
>>>>>On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim <[hidden email]>
>>>>>wrote:
>>>>>> Hi Patrick,
>>>>>>
>>>>>> I¹m a little confused about your comment that RDDs are not ordered.
>>>>>>As
>>>>>>far
>>>>>> as I know, RDDs keep list of partitions that are ordered and this is
>>>>>>why I
>>>>>> can call RDD.take() and get the same first k rows every time I call
>>>>>>it
>>>>>>and
>>>>>> RDD.take() returns the same entries as RDD.map(Š).take() because map
>>>>>> preserves the partition order. RDD order is also what allows me to
>>>>>>get
>>>>>>the
>>>>>> top k out of RDD by doing RDD.sort().take().
>>>>>>
>>>>>> Am I misunderstanding it? Or, is it just when RDD is written to disk
>>>>>>that
>>>>>> the order is not well preserved? Thanks in advance!
>>>>>>
>>>>>> Mingyu
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 1/22/14, 4:46 PM, "Patrick Wendell" <[hidden email]> wrote:
>>>>>>
>>>>>>>Ah somehow after all this time I've never seen that!
>>>>>>>
>>>>>>>On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
>>>>>>><[hidden email]>
>>>>>>>wrote:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
>>>>>>>><[hidden email]>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> What is the ++ operator here? Is this something you defined?
>>>>>>>>
>>>>>>>>
>>>>>>>> No, it's an alias for union defined in RDD.scala:
>>>>>>>>
>>>>>>>> def ++(other: RDD[T]): RDD[T] = this.union(other)
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Another issue is that RDD's are not ordered, so when you union
>>>>>>>>>two
>>>>>>>>> together it doesn't have a well defined ordering.
>>>>>>>>>
>>>>>>>>> If you do want to do this you could coalesce into one partition,
>>>>>>>>>then
>>>>>>>>> call MapPartitions and return an iterator that first adds your
>>>>>>>>>header
>>>>>>>>> and then the rest of the file, then call saveAsTextFile. Keep in
>>>>>>>>>mind
>>>>>>>>> this will only work if you coalesce into a single partition.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks! I'll give this a try.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> myRdd.coalesce(1)
>>>>>>>>> .map(_.mkString(",")))
>>>>>>>>> .mapPartitions(it => (Seq("col1,col2,col3") ++ it).iterator)
>>>>>>>>> .saveAsTextFile("out.csv")
>>>>>>>>>
>>>>>>>>> - Patrick
>>>>>>>>>
>>>>>>>>> On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>> > Hi,
>>>>>>>>> >
>>>>>>>>> > I'm trying to find a way to create a csv header when using
>>>>>>>>> > saveAsTextFile,
>>>>>>>>> > and I came up with this:
>>>>>>>>> >
>>>>>>>>> > (sc.makeRDD(Array("col1,col2,col3"), 1) ++
>>>>>>>>> > myRdd.coalesce(1).map(_.mkString(",")))
>>>>>>>>> >       .saveAsTextFile("out.csv")
>>>>>>>>> >
>>>>>>>>> > But it only saves the header part. Why is that the union method
>>>>>>>>>does
>>>>>>>>>not
>>>>>>>>> > return both RDD's?
>>>>>>>>
>>>>>>>>


smime.p7s (7K) Download Attachment