Tune hive query launched thru spark-yarn job.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Tune hive query launched thru spark-yarn job.

Himali Patel

Hello all,

 

We have one use-case where we are aggregating billion of rows. It does huge shuffle.

Example :

As per ‘Job’ tab on yarn UI

When Input size is 350 G something,  shuffle size >3 TBs. This increases Non-DFS usage beyond warning limit and thus affecting entire cluster.

 

It seems we need to tune our query / resources. Any suggestions ?

 

1.

Our data is high in cardinality :

# input rows are ~15 billion

# output rows are ~13 billion

 

2.

Spark version is 1.6

Hive is 1.1

It’s CDH.

We query using hive context in spark job. (yarn is resource manager)

Hive context has configs as :

.setConf("hive.exec.dynamic.partition.mode","nonstrict")

.setConf("hive.exec.dynamic.partition","true")

.setConf("hive.exec.stagingdir","/tmp/hive/")

 

3.

Our aggregation is done using single query as below :

SELECT

<list of 16 dimension columns >,

SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,

<custom-aggregate-operation>(c1, 'HEX', 'UNION') AS c1, <custom-aggregate-operation>(c2, 'HEX', 'UNION') AS c2, <custom-aggregate-operation>(c3, 'HEX', 'UNION') AS c3, <custom-aggregate-operation>(c4, 'HEX', 'UNION') AS c4, <custom-aggregate-operation>(c5, 'HEX', 'UNION') AS c5,

<Epochtime1>  AS <partition-column>, <Epochtime1> AS <column2>

FROM <table-name>

WHERE <partition-column> IN (<Epochtime1> ,<Epochtime2> , <Epochtime3> , <Epochtime4> , <Epochtime5> , <Epochtime6> , <Epochtime7> , <Epochtime8>)

GROUP BY <list of 16 dimension columns >.

 

4.

Configs are :

spark.master=yarn-client 
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<>
spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000
 
 
How to tune this job ? 
 
 
 
 
 
 
 
 


 

Reply | Threaded
Open this post in threaded view
|

Re: Tune hive query launched thru spark-yarn job.

Sathi Chowdhury
What I can immediately think of is, 
as you are doing IN in the where clause for a series of timestamps, if you can consider breaking them and for each epoch timestamp
You can load your results to an intermediate staging table and then do a final aggregate from that table keeping the group by same. As it is sum and can be done in two steps.
hth



On Thursday, September 5, 2019, 5:10 AM, Himali Patel <[hidden email]> wrote:

Hello all,

 

We have one use-case where we are aggregating billion of rows. It does huge shuffle.

Example :

As per ‘Job’ tab on yarn UI

When Input size is 350 G something,  shuffle size >3 TBs. This increases Non-DFS usage beyond warning limit and thus affecting entire cluster.

 

It seems we need to tune our query / resources. Any suggestions ?

 

1.

Our data is high in cardinality :

# input rows are ~15 billion

# output rows are ~13 billion

 

2.

Spark version is 1.6

Hive is 1.1

It’s CDH.

We query using hive context in spark job. (yarn is resource manager)

Hive context has configs as :

.setConf("hive.exec.dynamic.partition.mode","nonstrict")

.setConf("hive.exec.dynamic.partition","true")

.setConf("hive.exec.stagingdir","/tmp/hive/")

 

3.

Our aggregation is done using single query as below :

SELECT

<list of 16 dimension columns >,

SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,

<custom-aggregate-operation>(c1, 'HEX', 'UNION') AS c1, <custom-aggregate-operation>(c2, 'HEX', 'UNION') AS c2, <custom-aggregate-operation>(c3, 'HEX', 'UNION') AS c3, <custom-aggregate-operation>(c4, 'HEX', 'UNION') AS c4, <custom-aggregate-operation>(c5, 'HEX', 'UNION') AS c5,

<Epochtime1>  AS <partition-column>, <Epochtime1> AS <column2>

FROM <table-name>

WHERE <partition-column> IN (<Epochtime1> ,<Epochtime2> , <Epochtime3> , <Epochtime4> , <Epochtime5> , <Epochtime6> , <Epochtime7> , <Epochtime8>)

GROUP BY <list of 16 dimension columns >.

 

4.

Configs are :

spark.master=yarn-client 
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<>
spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000
  
  
How to tune this job ? 
  
  
  
  
  
  
  
  


 

Reply | Threaded
Open this post in threaded view
|

Re: Tune hive query launched thru spark-yarn job.

Himali Patel

Hi Sathi,

 

Thanks for a quick reply, so this ( list of some epoch times in IN clause) was part of 30 days aggregation already. As per our input to output aggregation ratio, our cardinality is too high. So we require query tuning kind of thing. As we can’t assign additional resource for this job.

 

 

From: Sathi Chowdhury <[hidden email]>
Date: Thursday, 5 September 2019 at 8:10 PM
To: Himali Patel <[hidden email]>, "[hidden email]" <[hidden email]>
Subject: Re: Tune hive query launched thru spark-yarn job.

 

What I can immediately think of is, 

as you are doing IN in the where clause for a series of timestamps, if you can consider breaking them and for each epoch timestamp

You can load your results to an intermediate staging table and then do a final aggregate from that table keeping the group by same. As it is sum and can be done in two steps.

hth





 

On Thursday, September 5, 2019, 5:10 AM, Himali Patel <[hidden email]> wrote:

Hello all,

 

We have one use-case where we are aggregating billion of rows. It does huge shuffle.

Example :

As per ‘Job’ tab on yarn UI

When Input size is 350 G something,  shuffle size >3 TBs. This increases Non-DFS usage beyond warning limit and thus affecting entire cluster.

 

It seems we need to tune our query / resources. Any suggestions ?

 

1.

Our data is high in cardinality :

# input rows are ~15 billion

# output rows are ~13 billion

 

2.

Spark version is 1.6

Hive is 1.1

It’s CDH.

We query using hive context in spark job. (yarn is resource manager)

Hive context has configs as :

.setConf("hive.exec.dynamic.partition.mode","nonstrict")

.setConf("hive.exec.dynamic.partition","true")

.setConf("hive.exec.stagingdir","/tmp/hive/")

 

3.

Our aggregation is done using single query as below :

SELECT

<list of 16 dimension columns >,

SUM(m1) AS m1, SUM(m2) AS m2,sum(m3) as m3,sum(m4) as m4, SUM(m5) AS m5,

<custom-aggregate-operation>(c1, 'HEX', 'UNION') AS c1, <custom-aggregate-operation>(c2, 'HEX', 'UNION') AS c2, <custom-aggregate-operation>(c3, 'HEX', 'UNION') AS c3, <custom-aggregate-operation>(c4, 'HEX', 'UNION') AS c4, <custom-aggregate-operation>(c5, 'HEX', 'UNION') AS c5,

<Epochtime1>  AS <partition-column>, <Epochtime1> AS <column2>

FROM <table-name>

WHERE <partition-column> IN (<Epochtime1> ,<Epochtime2> , <Epochtime3> , <Epochtime4> , <Epochtime5> , <Epochtime6> , <Epochtime7> , <Epochtime8>)

GROUP BY <list of 16 dimension columns >.

 

4.

Configs are :

spark.master=yarn-client 
spark.yarn.queue=default
spark.executor.instances=52
spark.executor.cores=4
spark.executor.memory=30g
spark.driver.memory=25g
spark.memory.fraction=0.8
spark.memory.storageFraction=0.1
spark.yarn.executor.memoryOverhead=9500
spark.yarn.driver.memoryOverhead=5120
spark.core.connection.ack.wait.timeout=1000
spark.eventLog.enabled=True
spark.eventLog.dir=<>
spark.eventLog.overwrite=True
spark.sql.shuffle.partitions=1000
  
  
How to tune this job ?