Are there some pitfalls in my spark structured streaming code which causes slow response after several hours running?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Are there some pitfalls in my spark structured streaming code which causes slow response after several hours running?

spark-learner
The spark job has the correct functions and logic. However, after several hours running, it becomes slower and slower. Are there some pitfalls in the below code? Thanks!


val query = "(select * from meta_table) as meta_data"    
val meta_schema = new StructType()        
       .add("config_id", BooleanType)        
       .add("threshold", LongType)        
var meta_df = spark.read.jdbc(url, query, connectionProperties)        
var meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")  

//rules_imsi_df: joining of kafka ingestion with the meta_df_explode

//rules_monitoring_df: static dataframe for monitoring purpose  

val rules_monitoring_stream =        
            rules_imsi_df.writeStream          
                        .outputMode("append")  
                          .format("memory")
                        .trigger(Trigger.ProcessingTime("120  seconds"))
                         .foreachBatch {                  
                              (batchDF: DataFrame, batchId: Long) =>
                                    if(!batchDF.isEmpty)                                                     {    

printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, batchDF.count())                                                            
batchDF.show()                                                            
 batchDF.persist()                                                                var batchDF_group = batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)", "total_volume_id")                  
rules_monitoring_df = rules_monitoring_df.join(batchDF_group, rules_monitoring_df("id") === batchDF_group("id"), "left").select(rules_monitoring_df("id"), batchDF_group("total_volume_id")).na.fill(0)                        
rules_monitoring_df = rules_monitoring_df.withColumn("volume", rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id"))                                                               batchDF.unpersist()                                                                          }                                          }.start()    


      while(rules_monitoring_stream.isActive)    {                  
Thread.sleep(240000)                      
... //Periodically load meta data from database          
meta_df = spark.read.jdbc(url, query, connectionProperties)              
meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")        
}




In addition to the code, the yarn-sites.xml is configured as below.

yarn.nodemanager.pmem-check-enabled, false
yarn.nodemanager.localizer.cache.target-size-mb, 5120
yarn.nodemanager.localizer.cache.cleanup.interval-ms, 400000
yarn.nodemanager.vmem-check-enabled, false
yarn.nodemanager.disk-health-checker.enable,true
yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage,95.0
yarn.log-aggregation.retain-seconds,36000



The spark-submit command is as below.

spark-submit --driver-memory 5G --num-executors 3 --executor-memory 6G --executor-cores 2 --files client_jaas.conf,cacerts,krb5.conf,service.keytab --driver-java-options "-Djava.security.auth.login.config=./client_jaas.conf -Djava.security.krb5.conf=./krb5.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf -Djava.security.krb5.conf=./krb5.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf -Djava.security.krb5.conf=./krb5.conf"  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar


I am running the job in AWS EMR with 2 m4.xlarge.

Thanks!
Reply | Threaded
Open this post in threaded view
|

Re: Are there some pitfalls in my spark structured streaming code which causes slow response after several hours running?

Jörn Franke
It depends a  bit on the data as well, but have you investigated in SparkUI which executor/task becomes slowly?

Could it be also the database from which you load data?

> Am 18.07.2020 um 17:00 schrieb Yong Yuan <[hidden email]>:
>
> 
> The spark job has the correct functions and logic. However, after several hours running, it becomes slower and slower. Are there some pitfalls in the below code? Thanks!
>
>
> val query = "(select * from meta_table) as meta_data"    
> val meta_schema = new StructType()        
>        .add("config_id", BooleanType)        
>        .add("threshold", LongType)        
> var meta_df = spark.read.jdbc(url, query, connectionProperties)        
> var meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")  
>
> //rules_imsi_df: joining of kafka ingestion with the meta_df_explode
>
> //rules_monitoring_df: static dataframe for monitoring purpose  
>
> val rules_monitoring_stream =        
>             rules_imsi_df.writeStream          
>                         .outputMode("append")  
>                           .format("memory")
>                         .trigger(Trigger.ProcessingTime("120  seconds"))
>                          .foreachBatch {                  
>                               (batchDF: DataFrame, batchId: Long) =>
>                                     if(!batchDF.isEmpty)                                                     {    
>
> printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, batchDF.count())                                                            
> batchDF.show()                                                            
>  batchDF.persist()                                                                var batchDF_group = batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)", "total_volume_id")                  
> rules_monitoring_df = rules_monitoring_df.join(batchDF_group, rules_monitoring_df("id") === batchDF_group("id"), "left").select(rules_monitoring_df("id"), batchDF_group("total_volume_id")).na.fill(0)                        
> rules_monitoring_df = rules_monitoring_df.withColumn("volume", rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id"))                                                               batchDF.unpersist()                                                                          }                                          }.start()    
>
>
>       while(rules_monitoring_stream.isActive)    {                  
> Thread.sleep(240000)                      
> ... //Periodically load meta data from database          
> meta_df = spark.read.jdbc(url, query, connectionProperties)              
> meta_df_explode=meta_df.select(col("id"), from_json(col("config"), meta_schema).as("config")).select("config_id", "thresold", "config.*")        
> }
>
>
>
>
> In addition to the code, the yarn-sites.xml is configured as below.
>
> yarn.nodemanager.pmem-check-enabled, false
> yarn.nodemanager.localizer.cache.target-size-mb, 5120
> yarn.nodemanager.localizer.cache.cleanup.interval-ms, 400000
> yarn.nodemanager.vmem-check-enabled, false
> yarn.nodemanager.disk-health-checker.enable,true
> yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage,95.0
> yarn.log-aggregation.retain-seconds,36000
>
>
>
> The spark-submit command is as below.
>
> spark-submit --driver-memory 5G --num-executors 3 --executor-memory 6G --executor-cores 2 --files client_jaas.conf,cacerts,krb5.conf,service.keytab --driver-java-options "-Djava.security.auth.login.config=./client_jaas.conf -Djava.security.krb5.conf=./krb5.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf -Djava.security.krb5.conf=./krb5.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf -Djava.security.krb5.conf=./krb5.conf"  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar
>
>
> I am running the job in AWS EMR with 2 m4.xlarge.
>
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]