Performance tuning on the Databricks pyspark 2.4.4

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Performance tuning on the Databricks pyspark 2.4.4

anbutech
This post was updated on .
Hi sir,

Could you please help me on the below two cases in the databricks pyspark
data processing terabytes of json data read from aws s3 bucket.

case 1:

currently I'm reading multiple tables sequentially to get the day count
from each table

for ex: table_list.csv having one column with multiple table names

year=2019
month=12

tablesDF =
spark.read.format("csv").option("header",false).load("s3a://bucket//source/table_list.csv")
tabList = tablesDF.toPandas().values.tolist()
for table in tabList:
tab_name = table[0]

 // Snowflake Settings and snowflake  table count()
 
    sfOptions = dict(
      "sfURL" -> "",
      "sfAccount" -> "",
      "sfUser" -> "",
      "sfPassword" -> "",
      "sfDatabase" -> "",
      "sfSchema" -> "",
      "sfWarehouse" -> "",
    )
   
    // Read data as dataframe
       
    sfxdf = spark.read
      .format("snowflake")
      .options(**sfOptions)
      .option("query", "select y as year,m as month,count(*) as sCount from
{} where y={} and m={} group by year,month").format(tab_name,year,month)
      .load()
         
//databricks delta lake
         
         dbxDF = spark.sql("select y as year,m as month,count(*) as dCount from
db.{} where y={} and m={}" group by year,month).format(tab_name,year,month)
         
    resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer'
).na.fill(0).withColumn("flag_col", expr("dCount == sCount"))
 
        finalDF = resultDF.withColumn("table_name",
lit(tab_name)).select("table_name","year","month","dCount","sCount","flag_col")
       
       
        finalDF.coalesce(1).write.format('csv').option('header',
'true').save("s3a://outputs/reportcsv)
       
        Question:
       
        1) Instead of sequence based running the count query taking one by one
tables ,how to parallel read all the tables from the csv file from s3 and
        distributed the jobs across the cluster.
       
        2) Could you please how to optimize the above code in the pyspark for
parallel processing all the count query at the same time.
--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org

Reply | Threaded
Open this post in threaded view
|

Re: Performance tuning on the Databricks pyspark 2.4.4

ayan guha
For case 1, you can create 3 notebooks and 3 jobs in databricks. Then you can run them in parallel

On Wed, 22 Jan 2020 at 3:50 am, anbutech <[hidden email]> wrote:
Hi sir,

Could you please help me on the below two cases in the databricks pyspark
data processing terabytes of json data read from aws s3 bucket.

case 1:

currently I'm reading multiple tables sequentially to get the day count
from each table

for ex: table_list.csv having one column with multiple table names

year=2019
month=12

tablesDF =
spark.read.format("csv").option("header",false).load("s3a://bucket//source/table_list.csv")
tabList = tablesDF.toPandas().values.tolist()
for table in tabList:
tab_name = table[0]

 // Snowflake Settings and snowflake  table count()

    sfOptions = dict(
      "sfURL" -> "",
      "sfAccount" -> "",
      "sfUser" -> "",
      "sfPassword" -> "",
      "sfDatabase" -> "",
      "sfSchema" -> "",
      "sfWarehouse" -> "",
    )

    // Read data as dataframe

    sfxdf = spark.read
      .format("snowflake")
      .options(**sfOptions)
      .option("query", "select y as year,m as month,count(*) as sCount from
{} where y={} and m={} group by year,month").format(tab_name,year,month)
      .load()

//databricks delta lake

         dbxDF = spark.sql("select y as year,m as month,count(*) as dCount from
db.{} where y={} and m={}" group by year,month).format(tab_name,year,month)

    resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer'
).na.fill(0).withColumn("flag_col", expr("dCount == sCount"))

        finalDF = resultDF.withColumn("table_name",
lit(tab_name)).select("table_name","year","month","dCount","sCount","flag_col")


        finalDF.coalesce(1).write.format('csv').option('header',
'true').save("s3a://outputs/reportcsv)

        Question:

        1) Instead of sequence based running the count query taking one by one
tables ,how to parallel read all the tables from the csv file from s3 and
        distributed the jobs across the cluster.

        2) Could you please how to optimize the above code in the pyspark for
parallel processing all the count query at the same time.



Case 2 :

Multiprocessing case:
  ------------------------

        Could you please help me how to achieve multiprocessing on the above
pyspark query to parallel running in the distributed environment.

        By using below snippets is there any way to achieve the parallel processing
pyspark code in the cluster.

        # Creating a pool of 20 processes. You can set this as per your intended
parallelism and your available resources.




   start = time.time()
    pool = multiprocessing.Pool(20)
    # This will execute get_counts() parallel, on each element inside
input_paths.
    # result (a list of dictionary) is constructed when all executions are
completed.
    //result = pool.map(get_counts, input_paths)

    end = time.time()

    result_df = pd.DataFrame(result)
    # You can use, result_df.to_csv() to store the results in a csv.
    print(result_df)
    print('Time take : {}'.format(end - start))



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

--
Best Regards,
Ayan Guha