[Spark SQL] INSERT OVERWRITE to a hive partitioned table (pointing to s3) from spark is too slow.

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[Spark SQL] INSERT OVERWRITE to a hive partitioned table (pointing to s3) from spark is too slow.

ehbhaskar
I have a pyspark job that inserts data into hive partitioned table using
`Insert Overwrite` statement.

Spark job loads data quickly (in 15 mins) to temp directory (~/.hive-***) in
S3. But, it's very slow in moving data from temp directory to the target
path, it takes more than 40 mins to move data from temp to target path.

I set the option mapreduce.fileoutputcommitter.algorithm.version=2 (default
is 1) but still I see no change.

*Are there any ways to improve the performance of hive INSERT OVERWRITE
query from spark?*

Also, I noticed that this behavior is even worse (i.e. job takes even more
time) with hive table that has too many existing partitions. i.e. The data
loads relatively fast into table that have less existing partitions.

*Some additional details:*
* Table is a dynamic partitioned table.
* Spark version - 2.3.0
* Hive version - 2.3.2-amzn-2
* Hadoop version - 2.8.3-amzn-0

PS: Other config options I have tried that didn't have much effect on the
job performance.
* "hive.load.dynamic.partitions.thread - "10"
* "hive.mv.files.thread" - "30"
* "fs.trash.interval" - "0".



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Spark SQL] INSERT OVERWRITE to a hive partitioned table (pointing to s3) from spark is too slow.

Jörn Franke
Can you share some relevant source code?


> Am 05.11.2018 um 07:58 schrieb ehbhaskar <[hidden email]>:
>
> I have a pyspark job that inserts data into hive partitioned table using
> `Insert Overwrite` statement.
>
> Spark job loads data quickly (in 15 mins) to temp directory (~/.hive-***) in
> S3. But, it's very slow in moving data from temp directory to the target
> path, it takes more than 40 mins to move data from temp to target path.
>
> I set the option mapreduce.fileoutputcommitter.algorithm.version=2 (default
> is 1) but still I see no change.
>
> *Are there any ways to improve the performance of hive INSERT OVERWRITE
> query from spark?*
>
> Also, I noticed that this behavior is even worse (i.e. job takes even more
> time) with hive table that has too many existing partitions. i.e. The data
> loads relatively fast into table that have less existing partitions.
>
> *Some additional details:*
> * Table is a dynamic partitioned table.
> * Spark version - 2.3.0
> * Hive version - 2.3.2-amzn-2
> * Hadoop version - 2.8.3-amzn-0
>
> PS: Other config options I have tried that didn't have much effect on the
> job performance.
> * "hive.load.dynamic.partitions.thread - "10"
> * "hive.mv.files.thread" - "30"
> * "fs.trash.interval" - "0".
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Spark SQL] INSERT OVERWRITE to a hive partitioned table (pointing to s3) from spark is too slow.

ehbhaskar
Here's some sample code. 

self.session = SparkSession \
            .builder \
            .appName(self.app_name) \
            .config("spark.dynamicAllocation.enabled", "false") \
            .config("hive.exec.dynamic.partition.mode", "nonstrict") \
            .config("mapreduce.fileoutputcommitter.algorithm.version", "2") \
            .config("hive.load.dynamic.partitions.thread", "10") \
            .config("hive.mv.files.thread", "30") \
            .config("fs.trash.interval", "0") \
            .enableHiveSupport()
            
columns_with_default = "col1, NULL as col2, col2, col4, NULL as col5, partition_col1, partition_col2"
source_data_df_to_write = self.session.sql(
                 "SELECT %s, %s, %s as %s, %s as %s FROM TEMP_VIEW" % (columns_with_default))
         source_data_df_to_write\
             .coalesce(50)\
             .createOrReplaceTempView("TEMP_VIEW")

table_name_abs = "%s.%s" % (self.database, self.target_table)
self.session.sql(
    "INSERT OVERWRITE TABLE %s "
    "PARTITION (%s) "
    "SELECT %s FROM TEMP_VIEW" % (
        table_name_abs, "partition_col1, partition_col2", columns_with_default))


On Sun, Nov 4, 2018 at 11:08 PM Jörn Franke <[hidden email]> wrote:
Can you share some relevant source code?


> Am 05.11.2018 um 07:58 schrieb ehbhaskar <[hidden email]>:
>
> I have a pyspark job that inserts data into hive partitioned table using
> `Insert Overwrite` statement.
>
> Spark job loads data quickly (in 15 mins) to temp directory (~/.hive-***) in
> S3. But, it's very slow in moving data from temp directory to the target
> path, it takes more than 40 mins to move data from temp to target path.
>
> I set the option mapreduce.fileoutputcommitter.algorithm.version=2 (default
> is 1) but still I see no change.
>
> *Are there any ways to improve the performance of hive INSERT OVERWRITE
> query from spark?*
>
> Also, I noticed that this behavior is even worse (i.e. job takes even more
> time) with hive table that has too many existing partitions. i.e. The data
> loads relatively fast into table that have less existing partitions.
>
> *Some additional details:*
> * Table is a dynamic partitioned table.
> * Spark version - 2.3.0
> * Hive version - 2.3.2-amzn-2
> * Hadoop version - 2.8.3-amzn-0
>
> PS: Other config options I have tried that didn't have much effect on the
> job performance.
> * "hive.load.dynamic.partitions.thread - "10"
> * "hive.mv.files.thread" - "30"
> * "fs.trash.interval" - "0".
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>
Reply | Threaded
Open this post in threaded view
|

Re: [Spark SQL] INSERT OVERWRITE to a hive partitioned table (pointing to s3) from spark is too slow.

ehbhaskar
Here's code with correct data frame.

self.session = SparkSession \
            .builder \
            .appName(self.app_name) \
            .config("spark.dynamicAllocation.enabled", "false") \
            .config("hive.exec.dynamic.partition.mode", "nonstrict") \
            .config("mapreduce.fileoutputcommitter.algorithm.version", "2")
\
            .config("hive.load.dynamic.partitions.thread", "10") \
            .config("hive.mv.files.thread", "30") \
            .config("fs.trash.interval", "0") \
            .enableHiveSupport()
           
columns_with_default = "col1, NULL as col2, col2, col4, NULL as col5,
partition_col1, partition_col2"
source_data_df_to_write = self.session.sql(
                 "SELECT %s FROM TEMP_VIEW" % (columns_with_default))



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Spark SQL] INSERT OVERWRITE to a hive partitioned table (pointing to s3) from spark is too slow.

ehbhaskar
In reply to this post by ehbhaskar
Posted in Mailing list too. 

My process generates at most 150 files. As I said it takes more time (to move files from temp folder to target path) for table with many partitions compared to table with less partitions. Not sure what's the reason behind such behavior.

I tried with writing files directly to s3 and then add partitions to hive table. But, spark job doesn't save dataframe with null values. I get IllegalArgument exception stating - found `null` instead of <datatype>. 


On Mon, Nov 5, 2018 at 2:41 AM Jörn Franke <[hidden email]> wrote:
Can you share it with the mailing list? 

I believe it would be more efficient to work in Spark just at the file level (without using Hive) and at the end let Hive discover the new files via MSCK repair. 
It could be that your process generates a lot of small files and this is very inefficient on Hadoop (try to have larger partitions at least 128M size)

Am 05.11.2018 um 08:58 schrieb Bhaskar Ebbur <[hidden email]>:

Here's code with correct data frame.

self.session = SparkSession \
            .builder \
            .appName(self.app_name) \
            .config("spark.dynamicAllocation.enabled", "false") \
            .config("hive.exec.dynamic.partition.mode", "nonstrict") \
            .config("mapreduce.fileoutputcommitter.algorithm.version", "2") \
            .config("hive.load.dynamic.partitions.thread", "10") \
            .config("hive.mv.files.thread", "30") \
            .config("fs.trash.interval", "0") \
            .enableHiveSupport()
            
columns_with_default = "col1, NULL as col2, col2, col4, NULL as col5, partition_col1, partition_col2"
source_data_df_to_write = self.session.sql(
                 "SELECT %s FROM TEMP_VIEW" % (columns_with_default))

source_data_df_to_write\
    .coalesce(50)\
    .createOrReplaceTempView("TEMP_VIEW")

table_name_abs = "%s.%s" % (self.database, self.target_table)
self.session.sql(
    "INSERT OVERWRITE TABLE %s "
    "PARTITION (%s) "
    "SELECT %s FROM TEMP_VIEW" % (
        table_name_abs, "partition_col1, partition_col2", columns_with_default))




On Sun, Nov 4, 2018 at 11:30 PM Bhaskar Ebbur <[hidden email]> wrote:
Here's some sample code. 

self.session = SparkSession \
            .builder \
            .appName(self.app_name) \
            .config("spark.dynamicAllocation.enabled", "false") \
            .config("hive.exec.dynamic.partition.mode", "nonstrict") \
            .config("mapreduce.fileoutputcommitter.algorithm.version", "2") \
            .config("hive.load.dynamic.partitions.thread", "10") \
            .config("hive.mv.files.thread", "30") \
            .config("fs.trash.interval", "0") \
            .enableHiveSupport()
            
columns_with_default = "col1, NULL as col2, col2, col4, NULL as col5, partition_col1, partition_col2"
source_data_df_to_write = self.session.sql(
                 "SELECT %s, %s, %s as %s, %s as %s FROM TEMP_VIEW" % (columns_with_default))
         source_data_df_to_write\
             .coalesce(50)\
             .createOrReplaceTempView("TEMP_VIEW")

table_name_abs = "%s.%s" % (self.database, self.target_table)
self.session.sql(
    "INSERT OVERWRITE TABLE %s "
    "PARTITION (%s) "
    "SELECT %s FROM TEMP_VIEW" % (
        table_name_abs, "partition_col1, partition_col2", columns_with_default))


On Sun, Nov 4, 2018 at 11:08 PM Jörn Franke <[hidden email]> wrote:
Can you share some relevant source code?


> Am 05.11.2018 um 07:58 schrieb ehbhaskar <[hidden email]>:
>
> I have a pyspark job that inserts data into hive partitioned table using
> `Insert Overwrite` statement.
>
> Spark job loads data quickly (in 15 mins) to temp directory (~/.hive-***) in
> S3. But, it's very slow in moving data from temp directory to the target
> path, it takes more than 40 mins to move data from temp to target path.
>
> I set the option mapreduce.fileoutputcommitter.algorithm.version=2 (default
> is 1) but still I see no change.
>
> *Are there any ways to improve the performance of hive INSERT OVERWRITE
> query from spark?*
>
> Also, I noticed that this behavior is even worse (i.e. job takes even more
> time) with hive table that has too many existing partitions. i.e. The data
> loads relatively fast into table that have less existing partitions.
>
> *Some additional details:*
> * Table is a dynamic partitioned table.
> * Spark version - 2.3.0
> * Hive version - 2.3.2-amzn-2
> * Hadoop version - 2.8.3-amzn-0
>
> PS: Other config options I have tried that didn't have much effect on the
> job performance.
> * "hive.load.dynamic.partitions.thread - "10"
> * "hive.mv.files.thread" - "30"
> * "fs.trash.interval" - "0".
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>