What is the best way to take the top N entries from a hive table/data source?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

What is the best way to take the top N entries from a hive table/data source?

Yeikel
When I use .limit() , the number of partitions for the returning dataframe is 1 which normally fails most jobs.

val df = spark.sql("select * from table limit n")
df.write.parquet(....)


Thanks!




Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to take the top N entries from a hive table/data source?

Yeikel
Looking at the results of explain, I can see a CollectLimit step. Does that
work the same way as a regular .collect() ? (where all records are sent to
the driver?)


spark.sql("select * from db.table limit 1000000").explain(false)
== Physical Plan ==
CollectLimit 1000000
+- FileScan parquet ... 806 more fields] Batched: false, Format: Parquet,
Location: CatalogFileIndex[...], PartitionCount: 3, PartitionFilters: [],
PushedFilters: [], ReadSchema:.....
db: Unit = ()

The number of partitions is 1 so that makes sense.

spark.sql("select * from db.table limit 1000000").rdd.partitions.size = 1

As a follow up , I tried to repartition the resultant dataframe and while I
can't see the CollectLimit step anymore , It did not make any difference in
the job. I still saw a big task at the end that ends up failing.

spark.sql("select * from db.table limit
1000000").repartition(1000).explain(false)

Exchange RoundRobinPartitioning(1000)
+- GlobalLimit 1000000
   +- Exchange SinglePartition
      +- LocalLimit 1000000  -> Is this a collect?





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to take the top N entries from a hive table/data source?

ZHANG Wei
https://github.com/apache/spark/pull/7334 may explain the question as below:

>  This patch preserves this optimization by treating logical Limit operators specially when they appear as the terminal operator in a query plan: if a Limit is the final operator, then we will plan a special CollectLimit physical operator which implements the old take()-based logic.

For `spark.sql("select * from db.table limit 1000000").explain(false)`, `limit` is the final operator;
for `spark.sql("select * from db.table limit 1000000").repartition(1000).explain(false)`, `repartition` is the final operator. If you add a `.limit()` operation after `repartition`, such as `spark.sql("select * from db.table limit 1000000").repartition(1000).limit(1000).explain(false)`, the `CollectLimit` will show again.

---
Cheers,
-z

________________________________________
From: Yeikel <[hidden email]>
Sent: Wednesday, April 15, 2020 2:45
To: [hidden email]
Subject: Re: What is the best way to take the top N entries from a hive table/data source?

Looking at the results of explain, I can see a CollectLimit step. Does that
work the same way as a regular .collect() ? (where all records are sent to
the driver?)


spark.sql("select * from db.table limit 1000000").explain(false)
== Physical Plan ==
CollectLimit 1000000
+- FileScan parquet ... 806 more fields] Batched: false, Format: Parquet,
Location: CatalogFileIndex[...], PartitionCount: 3, PartitionFilters: [],
PushedFilters: [], ReadSchema:.....
db: Unit = ()

The number of partitions is 1 so that makes sense.

spark.sql("select * from db.table limit 1000000").rdd.partitions.size = 1

As a follow up , I tried to repartition the resultant dataframe and while I
can't see the CollectLimit step anymore , It did not make any difference in
the job. I still saw a big task at the end that ends up failing.

spark.sql("select * from db.table limit
1000000").repartition(1000).explain(false)

Exchange RoundRobinPartitioning(1000)
+- GlobalLimit 1000000
   +- Exchange SinglePartition
      +- LocalLimit 1000000  -> Is this a collect?





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to take the top N entries from a hive table/data source?

Yeikel
Hi Zhang. Thank you for your response

While your answer clarifies my confusion with `CollectLimit` it still does
not clarify what is the recommended way to extract large amounts of data
(but not all the records) from a source and maintain a high level of
parallelism.

For example , at some instances trying to extract 1 million records from a
table with over 100M records , I see my cluster using 1-2 cores out of the
hundreds that I have available.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: What is the best way to take the top N entries from a hive table/data source?

ZHANG Wei
The performance issue might be caused by the parquet table partitions count, only 3. The reader used that partitions count to parallelize extraction.

Refer to the log you provided:
> spark.sql("select * from db.table limit 1000000").explain(false)
> == Physical Plan ==
> CollectLimit 1000000
> +- FileScan parquet ... 806 more fields] Batched: false, Format: Parquet, Location: CatalogFileIndex[...], PartitionCount: 3, PartitionFilters: [], PushedFilters: [], ReadSchema:.....
...PartitionCount: 3,...

According to the first email:
> val df = spark.sql("select * from table limit n")
> df.write.parquet(....)

You can try to recreate the parquet table with more partitions. Hope this page https://mungingdata.com/apache-spark/partitionby/ can help you.

---
Cheers,
-z
________________________________________
From: Yeikel <[hidden email]>
Sent: Wednesday, April 22, 2020 12:17
To: [hidden email]
Subject: Re: What is the best way to take the top N entries from a hive table/data source?

Hi Zhang. Thank you for your response

While your answer clarifies my confusion with `CollectLimit` it still does
not clarify what is the recommended way to extract large amounts of data
(but not all the records) from a source and maintain a high level of
parallelism.

For example , at some instances trying to extract 1 million records from a
table with over 100M records , I see my cluster using 1-2 cores out of the
hundreds that I have available.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]