Optimising multiple hive table join and query in spark

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Optimising multiple hive table join and query in spark

Manjunath Shetty H
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ?

Any pointers would be helpful.

Notes
  • Data is partitioned by date (yyyyMMdd) as integer.
  • Query will fetch data for last 7 days from some tables while joining with other tables.

Approach we thought of as now :
  • Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column )
  • Register all tables as temporary tables 
  • Run the sql query with joins
But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards 
Manjunath
Reply | Threaded
Open this post in threaded view
|

Re: Optimising multiple hive table join and query in spark

Dennis Suhari
Hi,

I am also using Spark on Hive Metastore. The performance is much more better esp. for larger datasets. I have the feeling that the performance is better if I load the data into dataframes and do a join instead of doing direct join within SparkSQL. But i can’t explain yet. 

Any body experiences in that ?

Br,

Dennis

Von meinem iPhone gesendet

Am 15.03.2020 um 06:04 schrieb Manjunath Shetty H <[hidden email]>:


Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ?

Any pointers would be helpful.

Notes
  • Data is partitioned by date (yyyyMMdd) as integer.
  • Query will fetch data for last 7 days from some tables while joining with other tables.

Approach we thought of as now :
  • Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column )
  • Register all tables as temporary tables 
  • Run the sql query with joins
But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards 
Manjunath
Reply | Threaded
Open this post in threaded view
|

Re: Optimising multiple hive table join and query in spark

Molotch
In reply to this post by Manjunath Shetty H
Been a while but I remember reading on Stack Overflow you can use a UDF as a join condition to trick catalyst into not reshuffling the partitions, ie use regular equality on the column you partitioned or bucketed by and your custom comparer for the other columns. Never got around to try it out hough. I really would like a native way to tell catalyst not to reshuffle just because you use more columns in the join condition. 

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <[hidden email]> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ?

Any pointers would be helpful.

Notes
  • Data is partitioned by date (yyyyMMdd) as integer.
  • Query will fetch data for last 7 days from some tables while joining with other tables.

Approach we thought of as now :
  • Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column )
  • Register all tables as temporary tables 
  • Run the sql query with joins
But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards 
Manjunath
Reply | Threaded
Open this post in threaded view
|

Re: Optimising multiple hive table join and query in spark

ayan guha
Hi

I would first and foremost try to identify where is the most time spend during the query. One possibility is it just takes ramp up time for executors to be available, if thats the case then maybe a dedicated yarn queue may help, or using Spark thriftserver may help. 

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson <[hidden email]> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a join condition to trick catalyst into not reshuffling the partitions, ie use regular equality on the column you partitioned or bucketed by and your custom comparer for the other columns. Never got around to try it out hough. I really would like a native way to tell catalyst not to reshuffle just because you use more columns in the join condition. 

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <[hidden email]> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ?

Any pointers would be helpful.

Notes
  • Data is partitioned by date (yyyyMMdd) as integer.
  • Query will fetch data for last 7 days from some tables while joining with other tables.

Approach we thought of as now :
  • Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column )
  • Register all tables as temporary tables 
  • Run the sql query with joins
But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards 
Manjunath


--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Optimising multiple hive table join and query in spark

Manjunath Shetty H
Mostly the concern is the reshuffle. Even though all the DF's are partitioned by same column. During join it does reshuffle, that is the bottleneck as of now in our POC implementation.

Is there any way tell spark that keep all partitions with same partition key at the same place so that during the join it wont do shuffle again.


-
Manjunath

From: ayan guha <[hidden email]>
Sent: Sunday, March 15, 2020 5:46 PM
To: Magnus Nilsson <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Optimising multiple hive table join and query in spark
 
Hi

I would first and foremost try to identify where is the most time spend during the query. One possibility is it just takes ramp up time for executors to be available, if thats the case then maybe a dedicated yarn queue may help, or using Spark thriftserver may help. 

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson <[hidden email]> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a join condition to trick catalyst into not reshuffling the partitions, ie use regular equality on the column you partitioned or bucketed by and your custom comparer for the other columns. Never got around to try it out hough. I really would like a native way to tell catalyst not to reshuffle just because you use more columns in the join condition. 

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <[hidden email]> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ?

Any pointers would be helpful.

Notes
  • Data is partitioned by date (yyyyMMdd) as integer.
  • Query will fetch data for last 7 days from some tables while joining with other tables.

Approach we thought of as now :
  • Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column )
  • Register all tables as temporary tables 
  • Run the sql query with joins
But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards 
Manjunath


--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Optimising multiple hive table join and query in spark

geoHeil
Did you only partition or also bucket by the join column? Are ORCI indices active i.e. the JOIN keys sorted when writing the files?

Best,
Georg

Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H <[hidden email]>:
Mostly the concern is the reshuffle. Even though all the DF's are partitioned by same column. During join it does reshuffle, that is the bottleneck as of now in our POC implementation.

Is there any way tell spark that keep all partitions with same partition key at the same place so that during the join it wont do shuffle again.


-
Manjunath

From: ayan guha <[hidden email]>
Sent: Sunday, March 15, 2020 5:46 PM
To: Magnus Nilsson <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Optimising multiple hive table join and query in spark
 
Hi

I would first and foremost try to identify where is the most time spend during the query. One possibility is it just takes ramp up time for executors to be available, if thats the case then maybe a dedicated yarn queue may help, or using Spark thriftserver may help. 

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson <[hidden email]> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a join condition to trick catalyst into not reshuffling the partitions, ie use regular equality on the column you partitioned or bucketed by and your custom comparer for the other columns. Never got around to try it out hough. I really would like a native way to tell catalyst not to reshuffle just because you use more columns in the join condition. 

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <[hidden email]> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ?

Any pointers would be helpful.

Notes
  • Data is partitioned by date (yyyyMMdd) as integer.
  • Query will fetch data for last 7 days from some tables while joining with other tables.

Approach we thought of as now :
  • Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column )
  • Register all tables as temporary tables 
  • Run the sql query with joins
But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards 
Manjunath


--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Optimising multiple hive table join and query in spark

Manjunath Shetty H
Only partitioned and Join keys are not sorted coz those are written incrementally with batch jobs

From: Georg Heiler <[hidden email]>
Sent: Sunday, March 15, 2020 8:30:53 PM
To: Manjunath Shetty H <[hidden email]>
Cc: ayan guha <[hidden email]>; Magnus Nilsson <[hidden email]>; user <[hidden email]>
Subject: Re: Optimising multiple hive table join and query in spark
 
Did you only partition or also bucket by the join column? Are ORCI indices active i.e. the JOIN keys sorted when writing the files?

Best,
Georg

Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H <[hidden email]>:
Mostly the concern is the reshuffle. Even though all the DF's are partitioned by same column. During join it does reshuffle, that is the bottleneck as of now in our POC implementation.

Is there any way tell spark that keep all partitions with same partition key at the same place so that during the join it wont do shuffle again.


-
Manjunath

From: ayan guha <[hidden email]>
Sent: Sunday, March 15, 2020 5:46 PM
To: Magnus Nilsson <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Optimising multiple hive table join and query in spark
 
Hi

I would first and foremost try to identify where is the most time spend during the query. One possibility is it just takes ramp up time for executors to be available, if thats the case then maybe a dedicated yarn queue may help, or using Spark thriftserver may help. 

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson <[hidden email]> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a join condition to trick catalyst into not reshuffling the partitions, ie use regular equality on the column you partitioned or bucketed by and your custom comparer for the other columns. Never got around to try it out hough. I really would like a native way to tell catalyst not to reshuffle just because you use more columns in the join condition. 

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <[hidden email]> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ?

Any pointers would be helpful.

Notes
  • Data is partitioned by date (yyyyMMdd) as integer.
  • Query will fetch data for last 7 days from some tables while joining with other tables.

Approach we thought of as now :
  • Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column )
  • Register all tables as temporary tables 
  • Run the sql query with joins
But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards 
Manjunath


--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Optimising multiple hive table join and query in spark

Manjunath Shetty H
Thanks Georg,

Batch import job frequency is different than the read job. Import job will run every 15mins - 1 hour, and Read/Transform job will run once a day.

In this case i think write with sortWithinPartitions doesnt make any difference as the combined data stored in HDFS will not be sorted at the end of the day.

Does partition/sort while reading help ?. Tried this out but it still results in shuffle during join of multiple tables and generates very complex DAG

-
Manjunath

From: Georg Heiler <[hidden email]>
Sent: Monday, March 16, 2020 12:06 PM
To: Manjunath Shetty H <[hidden email]>
Subject: Re: Optimising multiple hive table join and query in spark
 
Hi,

if you only have 1.6, forget bucketing. https://databricks.com/session/bucketing-in-spark-sql-2-3 that only works well with Hive from 2.3 onwards.
The other thing in your (daily?) batch job

val myData = spark.read.<<file>>(/path/to/file).transform(<<apply_my_transform>>)

Now when writing the data:
myData.write.repartition(xxx)
where xxx resembles the number of files you want to have for each period (day?). When writing ORC / Parquet make sure to have files of HDFS Block Size or more i.e. usually 128MB up to a maximum of 1G.
myData.write.repartition(xxx)).sortWithinPartitions(join_col, join_col)

apply a secondary sort to get ORC Indices.

IF the cardinality of the join_cols is high enough:
myData.write.repartition(xxx, col(join_col), col(other_join_col))).sortWithinPartitions(join_col, join_col)

Best,
Georg

Am Mo., 16. März 2020 um 04:27 Uhr schrieb Manjunath Shetty H <[hidden email]>:
Hi Georg,

Thanks for the suggestion. Can you please explain bit more about what you meant exactly ?

Bdw i am on Spark 1.6


-
Manjunath

From: Georg Heiler <[hidden email]>
Sent: Monday, March 16, 2020 12:35 AM
To: Manjunath Shetty H <[hidden email]>
Subject: Re: Optimising multiple hive table join and query in spark
 
To speed things up:
- sortWithinPartitions (i.e. for each day)& potentially repartition
- pre-shuffle the data with bucketing

Am So., 15. März 2020 um 17:07 Uhr schrieb Manjunath Shetty H <[hidden email]>:
Only partitioned and Join keys are not sorted coz those are written incrementally with batch jobs

From: Georg Heiler <[hidden email]>
Sent: Sunday, March 15, 2020 8:30:53 PM
To: Manjunath Shetty H <[hidden email]>
Cc: ayan guha <[hidden email]>; Magnus Nilsson <[hidden email]>; user <[hidden email]>
Subject: Re: Optimising multiple hive table join and query in spark
 
Did you only partition or also bucket by the join column? Are ORCI indices active i.e. the JOIN keys sorted when writing the files?

Best,
Georg

Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H <[hidden email]>:
Mostly the concern is the reshuffle. Even though all the DF's are partitioned by same column. During join it does reshuffle, that is the bottleneck as of now in our POC implementation.

Is there any way tell spark that keep all partitions with same partition key at the same place so that during the join it wont do shuffle again.


-
Manjunath

From: ayan guha <[hidden email]>
Sent: Sunday, March 15, 2020 5:46 PM
To: Magnus Nilsson <[hidden email]>
Cc: user <[hidden email]>
Subject: Re: Optimising multiple hive table join and query in spark
 
Hi

I would first and foremost try to identify where is the most time spend during the query. One possibility is it just takes ramp up time for executors to be available, if thats the case then maybe a dedicated yarn queue may help, or using Spark thriftserver may help. 

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson <[hidden email]> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a join condition to trick catalyst into not reshuffling the partitions, ie use regular equality on the column you partitioned or bucketed by and your custom comparer for the other columns. Never got around to try it out hough. I really would like a native way to tell catalyst not to reshuffle just because you use more columns in the join condition. 

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <[hidden email]> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ?

Any pointers would be helpful.

Notes
  • Data is partitioned by date (yyyyMMdd) as integer.
  • Query will fetch data for last 7 days from some tables while joining with other tables.

Approach we thought of as now :
  • Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column )
  • Register all tables as temporary tables 
  • Run the sql query with joins
But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards 
Manjunath


--
Best Regards,
Ayan Guha