High level explanation of dropDuplicates

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

High level explanation of dropDuplicates

Yeikel
Hi ,

I am looking for a high level explanation(overview) on how dropDuplicates[1]
works.

[1]
https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326

Could someone please explain?

Thank you



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: High level explanation of dropDuplicates

Nicholas Hakobian
From doing some searching around in the spark codebase, I found the following:


So it appears there is no direct operation called dropDuplicates or Deduplicate, but there is an optimizer rule that converts this logical operation to a physical operation that is equivalent to grouping by all the columns you want to deduplicate across (or all columns if you are doing something like distinct), and taking the First() value. So (using a pySpark code example):

df = input_df.dropDuplicates(['col1', 'col2'])

Is effectively shorthand for saying something like:

df = input_df.groupBy('col1', 'col2').agg(first(struct(input_df.columns)).alias('data')).select('data.*')

Except I assume that it has some internal optimization so it doesn't need to pack/unpack the column data, and just returns the whole Row.

Nicholas Szandor Hakobian, Ph.D.
Principal Data Scientist
Rally Health



On Mon, May 20, 2019 at 11:38 AM Yeikel <[hidden email]> wrote:
Hi ,

I am looking for a high level explanation(overview) on how dropDuplicates[1]
works.

[1]
https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326

Could someone please explain?

Thank you



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: High level explanation of dropDuplicates

rishishah.star
Hi All,

Just wanted to check back regarding best way to perform deduplication. Is using drop duplicates the optimal way to get rid of duplicates? Would it be better if we run operations on red directly?

Also what about if we want to keep the last value of the group while performing deduplication (based on some sorting criteria)?

Thanks,
Rishi

On Mon, May 20, 2019 at 3:33 PM Nicholas Hakobian <[hidden email]> wrote:
From doing some searching around in the spark codebase, I found the following:


So it appears there is no direct operation called dropDuplicates or Deduplicate, but there is an optimizer rule that converts this logical operation to a physical operation that is equivalent to grouping by all the columns you want to deduplicate across (or all columns if you are doing something like distinct), and taking the First() value. So (using a pySpark code example):

df = input_df.dropDuplicates(['col1', 'col2'])

Is effectively shorthand for saying something like:

df = input_df.groupBy('col1', 'col2').agg(first(struct(input_df.columns)).alias('data')).select('data.*')

Except I assume that it has some internal optimization so it doesn't need to pack/unpack the column data, and just returns the whole Row.

Nicholas Szandor Hakobian, Ph.D.
Principal Data Scientist
Rally Health



On Mon, May 20, 2019 at 11:38 AM Yeikel <[hidden email]> wrote:
Hi ,

I am looking for a high level explanation(overview) on how dropDuplicates[1]
works.

[1]
https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326

Could someone please explain?

Thank you



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Regards,

Rishi Shah
Reply | Threaded
Open this post in threaded view
|

Re: High level explanation of dropDuplicates

Vladimir Prus
Hi,

If your data frame is partitioned by column A, and you want deduplication by columns A, B and C, then a faster way might be to sort each partition by A, B and C and then do a linear scan - it is often faster than group by all columns - which require a shuffle. Sadly, there's no standard way to do it.

One way to do it is via mapPartitions, but that involves serialisation to/from Row. The best way is to write custom physical exec operator, but it's not entirely trivial.

On Mon, 10 Jun 2019, 06:00 Rishi Shah, <[hidden email]> wrote:
Hi All,

Just wanted to check back regarding best way to perform deduplication. Is using drop duplicates the optimal way to get rid of duplicates? Would it be better if we run operations on red directly?

Also what about if we want to keep the last value of the group while performing deduplication (based on some sorting criteria)?

Thanks,
Rishi

On Mon, May 20, 2019 at 3:33 PM Nicholas Hakobian <[hidden email]> wrote:
From doing some searching around in the spark codebase, I found the following:


So it appears there is no direct operation called dropDuplicates or Deduplicate, but there is an optimizer rule that converts this logical operation to a physical operation that is equivalent to grouping by all the columns you want to deduplicate across (or all columns if you are doing something like distinct), and taking the First() value. So (using a pySpark code example):

df = input_df.dropDuplicates(['col1', 'col2'])

Is effectively shorthand for saying something like:

df = input_df.groupBy('col1', 'col2').agg(first(struct(input_df.columns)).alias('data')).select('data.*')

Except I assume that it has some internal optimization so it doesn't need to pack/unpack the column data, and just returns the whole Row.

Nicholas Szandor Hakobian, Ph.D.
Principal Data Scientist
Rally Health



On Mon, May 20, 2019 at 11:38 AM Yeikel <[hidden email]> wrote:
Hi ,

I am looking for a high level explanation(overview) on how dropDuplicates[1]
works.

[1]
https://github.com/apache/spark/blob/db24b04cad421ed508413d397c6beec01f723aee/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L2326

Could someone please explain?

Thank you



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Regards,

Rishi Shah
Reply | Threaded
Open this post in threaded view
|

Re: High level explanation of dropDuplicates

Yeikel
Nicholas , thank you for your explanation.

I am also interested in the example that Rishi is asking for.  I am sure
mapPartitions may work , but as Vladimir suggests it may not be the best
option in terms of performance.

@Vladimir Prus , are you aware of any example about writing a  "custom
physical exec operator"?

If anyone needs a further explanation for the follow up  question Rishi
posted , please see the example below :


import org.apache.spark.sql.types._
import org.apache.spark.sql.Row


val someData = Seq(
  Row(1, 10),
  Row(1, 20),
  Row(1, 11)
)

val schema = List(
  StructField("id", IntegerType, true),
  StructField("score", IntegerType, true)
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(someData),
  StructType(schema)
)

// Goal : Drop duplicates using the "id" as the primary key and keep the
highest "score".

df.sort($"score".desc).dropDuplicates("id").show

== Physical Plan ==
*(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)])
+- Exchange hashpartitioning(id#191, 200)
   +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192,
false)])
      +- *(1) Sort [score#192 DESC NULLS LAST], true, 0
         +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200)
            +- Scan ExistingRDD[id#191,score#192]

This seems to work , but I don't know what are the implications if we use
this approach with a bigger dataset or what are the alternatives. From the
explain output I can see the two Exchanges , so it may not be the best
approach?







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: High level explanation of dropDuplicates

rishishah.star
Thanks everyone for your contribution on this topic, I wanted to check-in to see if anyone has discovered a different or have an opinion on better approach to deduplicating data using pyspark. Would really appreciate any further insight on this.

Thanks,
-Rishi

On Wed, Jun 12, 2019 at 4:21 PM Yeikel <[hidden email]> wrote:
Nicholas , thank you for your explanation.

I am also interested in the example that Rishi is asking for.  I am sure
mapPartitions may work , but as Vladimir suggests it may not be the best
option in terms of performance.

@Vladimir Prus , are you aware of any example about writing a  "custom
physical exec operator"?

If anyone needs a further explanation for the follow up  question Rishi
posted , please see the example below :


import org.apache.spark.sql.types._
import org.apache.spark.sql.Row


val someData = Seq(
  Row(1, 10),
  Row(1, 20),
  Row(1, 11)
)

val schema = List(
  StructField("id", IntegerType, true),
  StructField("score", IntegerType, true)
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(someData),
  StructType(schema)
)

// Goal : Drop duplicates using the "id" as the primary key and keep the
highest "score".

df.sort($"score".desc).dropDuplicates("id").show

== Physical Plan ==
*(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)])
+- Exchange hashpartitioning(id#191, 200)
   +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192,
false)])
      +- *(1) Sort [score#192 DESC NULLS LAST], true, 0
         +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200)
            +- Scan ExistingRDD[id#191,score#192]

This seems to work , but I don't know what are the implications if we use
this approach with a bigger dataset or what are the alternatives. From the
explain output I can see the two Exchanges , so it may not be the best
approach?







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Regards,

Rishi Shah
Reply | Threaded
Open this post in threaded view
|

Re: High level explanation of dropDuplicates

Miguel Morales
I would just map to pair using the id. Then do a reduceByKey where you compare the scores and keep the highest. Then do .values and that should do it.

Sent from my iPhone

On Jan 11, 2020, at 11:14 AM, Rishi Shah <[hidden email]> wrote:


Thanks everyone for your contribution on this topic, I wanted to check-in to see if anyone has discovered a different or have an opinion on better approach to deduplicating data using pyspark. Would really appreciate any further insight on this.

Thanks,
-Rishi

On Wed, Jun 12, 2019 at 4:21 PM Yeikel <[hidden email]> wrote:
Nicholas , thank you for your explanation.

I am also interested in the example that Rishi is asking for.  I am sure
mapPartitions may work , but as Vladimir suggests it may not be the best
option in terms of performance.

@Vladimir Prus , are you aware of any example about writing a  "custom
physical exec operator"?

If anyone needs a further explanation for the follow up  question Rishi
posted , please see the example below :


import org.apache.spark.sql.types._
import org.apache.spark.sql.Row


val someData = Seq(
  Row(1, 10),
  Row(1, 20),
  Row(1, 11)
)

val schema = List(
  StructField("id", IntegerType, true),
  StructField("score", IntegerType, true)
)

val df = spark.createDataFrame(
  spark.sparkContext.parallelize(someData),
  StructType(schema)
)

// Goal : Drop duplicates using the "id" as the primary key and keep the
highest "score".

df.sort($"score".desc).dropDuplicates("id").show

== Physical Plan ==
*(2) HashAggregate(keys=[id#191], functions=[first(score#192, false)])
+- Exchange hashpartitioning(id#191, 200)
   +- *(1) HashAggregate(keys=[id#191], functions=[partial_first(score#192,
false)])
      +- *(1) Sort [score#192 DESC NULLS LAST], true, 0
         +- Exchange rangepartitioning(score#192 DESC NULLS LAST, 200)
            +- Scan ExistingRDD[id#191,score#192]

This seems to work , but I don't know what are the implications if we use
this approach with a bigger dataset or what are the alternatives. From the
explain output I can see the two Exchanges , so it may not be the best
approach?







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Regards,

Rishi Shah