How to optimize the configuration and/or code to solve the cache overloading issue?

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How to optimize the configuration and/or code to solve the cache overloading issue?

spark-learner
I met a trouble in using spark structured streaming. The usercache is continuously consumed due to the join operation without releasing. How can I optimize the configuration and/or code to solve this problem?


Spark Cluster in AWS EMR. 

1 master node, m4.xlarge, 4 core, 16GB
2 core nodes, m4.xlarge, 4 core, 16GB

yarn configuration:
        'yarn.nodemanager.disk-health-checker.enable':'true',
       'yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage':'95.0',
        'yarn.nodemanager.localizer.cache.cleanup.interval-ms': '100000',
        'yarn.nodemanager.localizer.cache.target-size-mb': '1024',
        'yarn.nodemanager.pmem-check-enabled': 'false',
        'yarn.nodemanager.vmem-check-enabled': 'false',
       'yarn.log-aggregation.retain-seconds': '12000'
 
spark-submit
--deploy-mode cluster
--num-executors 3 --executor-memory 8G --executor-cores 2



Code snippet:

//Disable broadcast join
      spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
 

      val monitoring_stream = volume_df_filtered.writeStream
                              .trigger(Trigger.ProcessingTime("120 seconds"))
                              .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
                                if(!batchDF.isEmpty)
                                {
                                    monitoring_df = monitoring_df.join(batchDF, monitoring_df("id") === batchDF("id"), "left").select(monitoring_df("id"),                         monitoring_df("total_volume"), batchDF("volume")).na.fill(0) //This join operation consumes the usercache continusly. 
                                   monitoring_df = monitoring_df.withColumn("total_volume", monitoring_df("total_volume")+monitoring_df("volume"))
                                   
                                    monitoring_df = monitoring_df.repartition(6)
                                    batchDF.unpersist()
                                    spark.catalog.clearCache()
}