Replacing groupBykey() with reduceByKey()

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Replacing groupBykey() with reduceByKey()

Bathi CCDB

I am trying to replace groupByKey() with reudceByKey(), I am a pyspark and python newbie and I am having a hard time figuring out the lambda function for the reduceByKey() operation.

Here is the code

dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).groupByKey(25).take(2)

Here is the return value

>>> dd
[(u'KEY_1', <pyspark.resultiterable.ResultIterable object at 0x107be0c50>), (u'KEY_2', <pyspark.resultiterable.ResultIterable object at 0x107be0c10>)]

and Here are the iterable contents dd[0][1]

Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', value=u'e7dc1f2a')
Row(key=u'KEY_1', hash_fn=u'f8891048a9ef8331227b4af080ecd28a', value=u'fb0bc953')
....
...
Row(key=u'KEY_1', hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', value=u'd39714d3')
Row(key=u'KEY_1', hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92')

My question is how do replace with reduceByKey() and get the same output as above?


Santhosh
Reply | Threaded
Open this post in threaded view
|

Re: Replacing groupBykey() with reduceByKey()

Biplob Biswas
Hi Santhosh,

If you are not performing any aggregation, then I don't think you can replace your groupbykey with a reducebykey, and as I see you are only grouping and taking 2 values of the result, thus I believe you can't just replace your groupbykey with that. 

Thanks & Regards
Biplob Biswas


On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB <[hidden email]> wrote:

I am trying to replace groupByKey() with reudceByKey(), I am a pyspark and python newbie and I am having a hard time figuring out the lambda function for the reduceByKey() operation.

Here is the code

dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).groupByKey(25).take(2)

Here is the return value

>>> dd
[(u'KEY_1', <pyspark.resultiterable.ResultIterable object at 0x107be0c50>), (u'KEY_2', <pyspark.resultiterable.ResultIterable object at 0x107be0c10>)]

and Here are the iterable contents dd[0][1]

Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', value=u'e7dc1f2a')
Row(key=u'KEY_1', hash_fn=u'f8891048a9ef8331227b4af080ecd28a', value=u'fb0bc953')
....
...
Row(key=u'KEY_1', hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', value=u'd39714d3')
Row(key=u'KEY_1', hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92')

My question is how do replace with reduceByKey() and get the same output as above?


Santhosh
Reply | Threaded
Open this post in threaded view
|

Re: Replacing groupBykey() with reduceByKey()

Bathi CCDB
Hey Bipin,
Thanks for the reply, I am actually aggregating after the groupByKey() operation, I have posted the wrong code snippet in my first email. Here is what I am doing

dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).groupByKey(25).map(build_edges)
Can we replace reduceByKey() in this context ?

Santhosh


On Mon, Aug 6, 2018 at 1:20 AM, Biplob Biswas <[hidden email]> wrote:
Hi Santhosh,

If you are not performing any aggregation, then I don't think you can replace your groupbykey with a reducebykey, and as I see you are only grouping and taking 2 values of the result, thus I believe you can't just replace your groupbykey with that. 

Thanks & Regards
Biplob Biswas


On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB <[hidden email]> wrote:

I am trying to replace groupByKey() with reudceByKey(), I am a pyspark and python newbie and I am having a hard time figuring out the lambda function for the reduceByKey() operation.

Here is the code

dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).groupByKey(25).take(2)

Here is the return value

>>> dd
[(u'KEY_1', <pyspark.resultiterable.ResultIterable object at 0x107be0c50>), (u'KEY_2', <pyspark.resultiterable.ResultIterable object at 0x107be0c10>)]

and Here are the iterable contents dd[0][1]

Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', value=u'e7dc1f2a')
Row(key=u'KEY_1', hash_fn=u'f8891048a9ef8331227b4af080ecd28a', value=u'fb0bc953')
....
...
Row(key=u'KEY_1', hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', value=u'd39714d3')
Row(key=u'KEY_1', hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92')

My question is how do replace with reduceByKey() and get the same output as above?


Santhosh

Reply | Threaded
Open this post in threaded view
|

Re: Replacing groupBykey() with reduceByKey()

Biplob Biswas
Hi Santhosh,

My name is not Bipin, its Biplob as is clear from my Signature. 

Regarding your question, I have no clue what your map operation is doing on the grouped data, so I can only suggest you to do :
dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).reduceByKey(build_edges, 25)
Although based on the return type you would have to modify your build_edges function.

Thanks & Regards
Biplob Biswas


On Mon, Aug 6, 2018 at 6:28 PM Bathi CCDB <[hidden email]> wrote:
Hey Bipin,
Thanks for the reply, I am actually aggregating after the groupByKey() operation, I have posted the wrong code snippet in my first email. Here is what I am doing

dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).groupByKey(25).map(build_edges)
Can we replace reduceByKey() in this context ?

Santhosh


On Mon, Aug 6, 2018 at 1:20 AM, Biplob Biswas <[hidden email]> wrote:
Hi Santhosh,

If you are not performing any aggregation, then I don't think you can replace your groupbykey with a reducebykey, and as I see you are only grouping and taking 2 values of the result, thus I believe you can't just replace your groupbykey with that. 

Thanks & Regards
Biplob Biswas


On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB <[hidden email]> wrote:

I am trying to replace groupByKey() with reudceByKey(), I am a pyspark and python newbie and I am having a hard time figuring out the lambda function for the reduceByKey() operation.

Here is the code

dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: (x[0],x)).groupByKey(25).take(2)

Here is the return value

>>> dd
[(u'KEY_1', <pyspark.resultiterable.ResultIterable object at 0x107be0c50>), (u'KEY_2', <pyspark.resultiterable.ResultIterable object at 0x107be0c10>)]

and Here are the iterable contents dd[0][1]

Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', value=u'e7dc1f2a')
Row(key=u'KEY_1', hash_fn=u'f8891048a9ef8331227b4af080ecd28a', value=u'fb0bc953')
....
...
Row(key=u'KEY_1', hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', value=u'd39714d3')
Row(key=u'KEY_1', hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92')

My question is how do replace with reduceByKey() and get the same output as above?


Santhosh