delta lake implementation issue in large data set

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

delta lake implementation issue in large data set

Bimal Chandra Naresh

Hi All,

 

We have implemented delta lake in spark scala. It’s working fine for small data set.

We need to be updated appox 20M records on daily basis on delta lake using spark EMR jobs(5 node m5.2xlarge machine) . Jobs has been failed with memory error .  

 

Error  Terminated with errors All slaves in the job flow were terminated

 

 

source code :

 

var source_location_distination_data_lake = “s3://test/deltalake”

 

DeltaTable.forPath(spark, source_location_distination_data_lake)
  .as(
"events")
  .merge(
    df_reader_update.as(
"updates"),
   
"events.id = updates.id")
  .whenMatched
  .updateExpr(
   
Map("action_name" -> "updates.action_name",
     
"action_date" -> "updates.action_date"))
  .whenNotMatched
  .insertExpr(
   
Map(
     
"id" -> "updates.id",
     
"package_name" -> "updates.package_name",
     
"action_name" -> "updates.action_name",
     
"action_date" -> "updates.action_date"
  
    ))
  .execute()

 

Please suggest for better optimized code in delta lake.

 

Thanks,

Bimal Naresh

Disclaimer: This e-mail and any documents, files, or previous e-mail messages appended or attached to it may contain confidential and/or privileged information. If you are not the intended recipient (or have received this e-mail in error) please notify the sender immediately and delete this e-mail. Any unauthorized copying, disclosure or distribution of the material in this e-mail is strictly forbidden
Reply | Threaded
Open this post in threaded view
|

Re: delta lake implementation issue in large data set

Gourav Sengupta
Hi Bimal,

please check your code, Delta is massively scalable and 20 million records is literally quite small.

Can you try to see whether the entire data is being shipped to a single node in the cluster or not?  In case it is then try to increase the partitioning by including additional conditions in your query for matching.


Regards,
Gourav Sengupta

We use it for several million record updates 

On Tue, Dec 22, 2020 at 4:11 PM Bimal Chandra Naresh <[hidden email]> wrote:

Hi All,

 

We have implemented delta lake in spark scala. It’s working fine for small data set.

We need to be updated appox 20M records on daily basis on delta lake using spark EMR jobs(5 node m5.2xlarge machine) . Jobs has been failed with memory error .  

 

Error  Terminated with errors All slaves in the job flow were terminated

 

 

source code :

 

var source_location_distination_data_lake = “s3://test/deltalake”

 

DeltaTable.forPath(spark, source_location_distination_data_lake)
  .as(
"events")
  .merge(
    df_reader_update.as(
"updates"),
   
"events.id = updates.id")
  .whenMatched
  .updateExpr(
   
Map("action_name" -> "updates.action_name",
     
"action_date" -> "updates.action_date"))
  .whenNotMatched
  .insertExpr(
   
Map(
     
"id" -> "updates.id",
     
"package_name" -> "updates.package_name",
     
"action_name" -> "updates.action_name",
     
"action_date" -> "updates.action_date"
  
    ))
  .execute()

 

Please suggest for better optimized code in delta lake.

 

Thanks,

Bimal Naresh

Disclaimer: This e-mail and any documents, files, or previous e-mail messages appended or attached to it may contain confidential and/or privileged information. If you are not the intended recipient (or have received this e-mail in error) please notify the sender immediately and delete this e-mail. Any unauthorized copying, disclosure or distribution of the material in this e-mail is strictly forbidden