Spark is only using one worker machine when more are available

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark is only using one worker machine when more are available

宋源栋


Hi all,

I hava a standalone mode spark cluster without HDFS with 10 machines that each one has 40 cpu cores and 128G RAM.

My application is a sparksql application that reads data from database "tpch_100g" in mysql and run tpch queries. When loading tables from myql to spark, I spilts the biggest table "lineitem" into 600 partitions.

When my application runs, there are only 40 executor(spark.executor.memory = 1g, spark.executor.cores = 1) in executor page of spark application web and all executors are on the same mathine. It is too slowly that all tasks are parallelly running in only one mathine.



Reply | Threaded
Open this post in threaded view
|

Re: Spark is only using one worker machine when more are available

Jhon Anderson Cardenas Diaz
Hi, could you please share the environment variables values that you are sending when you run the jobs, spark version, etc.. more details.
Btw, you should take a look on SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES if you are using spark 2.0.0.

Regards.

2018-04-11 4:10 GMT-05:00 宋源栋 <[hidden email]>:


Hi all,

I hava a standalone mode spark cluster without HDFS with 10 machines that each one has 40 cpu cores and 128G RAM.

My application is a sparksql application that reads data from database "tpch_100g" in mysql and run tpch queries. When loading tables from myql to spark, I spilts the biggest table "lineitem" into 600 partitions.

When my application runs, there are only 40 executor(spark.executor.memory = 1g, spark.executor.cores = 1) in executor page of spark application web and all executors are on the same mathine. It is too slowly that all tasks are parallelly running in only one mathine.




Reply | Threaded
Open this post in threaded view
|

回复:Spark is only using one worker machine when more are available

宋源栋
Hi
 1. Spark version : 2.3.0
 2. jdk: oracle jdk 1.8
 3. os version: centos 6.8
 4. spark-env.sh: null
 5. spark session config:
    
SparkSession.builder().appName("DBScale")
.config("spark.sql.crossJoin.enabled", "true")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.scheduler.mode", "FAIR")
.config("spark.executor.memory", "1g")
.config("spark.executor.cores", 1)
.config("spark.driver.memory", "20")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC " +
"-verbose:gc -XX:+PrintGCDetails " +
"-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy")
.master(this.spark_master)
.getOrCreate();
  6. core code:
    
         for (SparksqlTableInfo tableInfo: this.dbtable){ // this loop reads data from mysql
String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name;
String[] pred = new String[tableInfo.partition_num];
if (tableInfo.partition_num > 0) {
for (int j = 0; j < tableInfo.partition_num; j++) {
String str = "some where clause to split mysql table into many partitions";
pred[j] = str;
}
Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, pred, connProp); //this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:XXXX)
jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
} else {
logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]");
Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, connProp);
jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
}
}

        // Then run a query and write the result set to mysql

Dataset<Row> result = ss.sql(this.sql);
result.explain(true);
connProp.put("rewriteBatchedStatements", "true");
connProp.put("sessionVariables", "sql_log_bin=off");
result.write().jdbc(this.dst_url, this.dst_table, connProp);


------------------------------------------------------------------
发件人:Jhon Anderson Cardenas Diaz <[hidden email]>
发送时间:2018年4月11日(星期三) 22:42
收件人:宋源栋 <[hidden email]>
抄 送:user <[hidden email]>
主 题:Re: Spark is only using one worker machine when more are available

Hi, could you please share the environment variables values that you are sending when you run the jobs, spark version, etc.. more details.
Btw, you should take a look on SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES if you are using spark 2.0.0.

Regards.

2018-04-11 4:10 GMT-05:00 宋源栋 <[hidden email]>:


Hi all,

I hava a standalone mode spark cluster without HDFS with 10 machines that each one has 40 cpu cores and 128G RAM.

My application is a sparksql application that reads data from database "tpch_100g" in mysql and run tpch queries. When loading tables from myql to spark, I spilts the biggest table "lineitem" into 600 partitions.

When my application runs, there are only 40 executor(spark.executor.memory = 1g, spark.executor.cores = 1) in executor page of spark application web and all executors are on the same mathine. It is too slowly that all tasks are parallelly running in only one mathine.





Reply | Threaded
Open this post in threaded view
|

Re: Spark is only using one worker machine when more are available

Jhon Anderson Cardenas Diaz
Hi.

On spark standalone i think you can not specify the number of workers machines to use but you can achieve that in this way: https://stackoverflow.com/questions/39399205/spark-standalone-number-executors-cores-control.

For example, if you want that your jobs run on the 10 machines using all their cores (10 executors, each one in a different machine and with 40 cores), you can use this configuration:

spark.cores.max        = 400
spark.executor.cores  = 40

If you want more executors with less cores each one (lets say 20 executors, each one with 20 cores):

spark.cores.max        = 400
spark.executor.cores  = 20

Note that in the last case each worker machine will run two executors.

In summary, use this trick:

number-of-executors = spark.cores.max / spark.executor.cores.

And have in mind that the executors will be divided among the available workers.

Regards.


2018-04-11 21:39 GMT-05:00 宋源栋 <[hidden email]>:
Hi
 1. Spark version : 2.3.0
 2. jdk: oracle jdk 1.8
 3. os version: centos 6.8
 4. spark-env.sh: null
 5. spark session config:
    
SparkSession.builder().appName("DBScale")
.config("spark.sql.crossJoin.enabled", "true")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.scheduler.mode", "FAIR")
.config("spark.executor.memory", "1g")
.config("spark.executor.cores", 1)
.config("spark.driver.memory", "20")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC " +
"-verbose:gc -XX:+PrintGCDetails " +
"-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy")
.master(this.spark_master)
.getOrCreate();
  6. core code:
    
         for (SparksqlTableInfo tableInfo: this.dbtable){ // this loop reads data from mysql
String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name;
String[] pred = new String[tableInfo.partition_num];
if (tableInfo.partition_num > 0) {
for (int j = 0; j < tableInfo.partition_num; j++) {
String str = "some where clause to split mysql table into many partitions";
pred[j] = str;
}
Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, pred, connProp); //this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:XXXX)
jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
} else {
logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]");
Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, connProp);
jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
}
}

        // Then run a query and write the result set to mysql

Dataset<Row> result = ss.sql(this.sql);
result.explain(true);
connProp.put("rewriteBatchedStatements", "true");
connProp.put("sessionVariables", "sql_log_bin=off");
result.write().jdbc(this.dst_url, this.dst_table, connProp);


------------------------------------------------------------------
发件人:Jhon Anderson Cardenas Diaz <[hidden email]>
发送时间:2018年4月11日(星期三) 22:42
收件人:宋源栋 <[hidden email]>
抄 送:user <[hidden email]>
主 题:Re: Spark is only using one worker machine when more are available

Hi, could you please share the environment variables values that you are sending when you run the jobs, spark version, etc.. more details.
Btw, you should take a look on SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES if you are using spark 2.0.0.

Regards.

2018-04-11 4:10 GMT-05:00 宋源栋 <[hidden email]>:


Hi all,

I hava a standalone mode spark cluster without HDFS with 10 machines that each one has 40 cpu cores and 128G RAM.

My application is a sparksql application that reads data from database "tpch_100g" in mysql and run tpch queries. When loading tables from myql to spark, I spilts the biggest table "lineitem" into 600 partitions.

When my application runs, there are only 40 executor(spark.executor.memory = 1g, spark.executor.cores = 1) in executor page of spark application web and all executors are on the same mathine. It is too slowly that all tasks are parallelly running in only one mathine.






Reply | Threaded
Open this post in threaded view
|

Spark Kubernetes Volumes

Marius
Hey,

i have a question regarding the Spark on Kubernetes feature. I would
like to mount a pre-populated Kubernetes volume into the execution pods
of Spark. One of my tools that i invoke using the Sparks pipe command
requires these files to be available on a POSIX compatible FS and they
are too large to justify copying them around using addFile. If this is
not possible i would like to know if the community be interested in such
a feature.

Cheers

Marius

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark is only using one worker machine when more are available

Gourav Sengupta
In reply to this post by Jhon Anderson Cardenas Diaz
Hi,

Just for sake of clarity can you please given the full statement for reading the data from the largest table? I mean not the programmatic one but the one which has the full statement in it.


Regards,
Gourav Sengupta




On Thu, Apr 12, 2018 at 7:19 AM, Jhon Anderson Cardenas Diaz <[hidden email]> wrote:
Hi.

On spark standalone i think you can not specify the number of workers machines to use but you can achieve that in this way: https://stackoverflow.com/questions/39399205/spark-standalone-number-executors-cores-control.

For example, if you want that your jobs run on the 10 machines using all their cores (10 executors, each one in a different machine and with 40 cores), you can use this configuration:

spark.cores.max        = 400
spark.executor.cores  = 40

If you want more executors with less cores each one (lets say 20 executors, each one with 20 cores):

spark.cores.max        = 400
spark.executor.cores  = 20

Note that in the last case each worker machine will run two executors.

In summary, use this trick:

number-of-executors = spark.cores.max / spark.executor.cores.

And have in mind that the executors will be divided among the available workers.

Regards.


2018-04-11 21:39 GMT-05:00 宋源栋 <[hidden email]>:
Hi
 1. Spark version : 2.3.0
 2. jdk: oracle jdk 1.8
 3. os version: centos 6.8
 4. spark-env.sh: null
 5. spark session config:
    
SparkSession.builder().appName("DBScale")
.config("spark.sql.crossJoin.enabled", "true")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.scheduler.mode", "FAIR")
.config("spark.executor.memory", "1g")
.config("spark.executor.cores", 1)
.config("spark.driver.memory", "20")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.executor.extraJavaOptions",
"-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC " +
"-verbose:gc -XX:+PrintGCDetails " +
"-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy")
.master(this.spark_master)
.getOrCreate();
  6. core code:
    
         for (SparksqlTableInfo tableInfo: this.dbtable){ // this loop reads data from mysql
String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name;
String[] pred = new String[tableInfo.partition_num];
if (tableInfo.partition_num > 0) {
for (int j = 0; j < tableInfo.partition_num; j++) {
String str = "some where clause to split mysql table into many partitions";
pred[j] = str;
}
Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, pred, connProp); //this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:XXXX)
jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
} else {
logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]");
Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, connProp);
jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
}
}

        // Then run a query and write the result set to mysql

Dataset<Row> result = ss.sql(this.sql);
result.explain(true);
connProp.put("rewriteBatchedStatements", "true");
connProp.put("sessionVariables", "sql_log_bin=off");
result.write().jdbc(this.dst_url, this.dst_table, connProp);


------------------------------------------------------------------
发件人:Jhon Anderson Cardenas Diaz <[hidden email]>
发送时间:2018年4月11日(星期三) 22:42
收件人:宋源栋 <[hidden email]>
抄 送:user <[hidden email]>
主 题:Re: Spark is only using one worker machine when more are available

Hi, could you please share the environment variables values that you are sending when you run the jobs, spark version, etc.. more details.
Btw, you should take a look on SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES if you are using spark 2.0.0.

Regards.

2018-04-11 4:10 GMT-05:00 宋源栋 <[hidden email]>:


Hi all,

I hava a standalone mode spark cluster without HDFS with 10 machines that each one has 40 cpu cores and 128G RAM.

My application is a sparksql application that reads data from database "tpch_100g" in mysql and run tpch queries. When loading tables from myql to spark, I spilts the biggest table "lineitem" into 600 partitions.

When my application runs, there are only 40 executor(spark.executor.memory = 1g, spark.executor.cores = 1) in executor page of spark application web and all executors are on the same mathine. It is too slowly that all tasks are parallelly running in only one mathine.







Reply | Threaded
Open this post in threaded view
|

Re: Spark Kubernetes Volumes

Yinan Li
In reply to this post by Marius
Hi Marius,

Spark on Kubernetes does not yet support mounting user-specified volumes natively. But mounting volume is supported in https://github.com/GoogleCloudPlatform/spark-on-k8s-operator. Please see https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/user-guide.md#mounting-volumes.

On Thu, Apr 12, 2018 at 7:50 AM, Marius <[hidden email]> wrote:
Hey,

i have a question regarding the Spark on Kubernetes feature. I would like to mount a pre-populated Kubernetes volume into the execution pods of Spark. One of my tools that i invoke using the Sparks pipe command requires these files to be available on a POSIX compatible FS and they are too large to justify copying them around using addFile. If this is not possible i would like to know if the community be interested in such a feature.

Cheers

Marius

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Spark Kubernetes Volumes

Anirudh Ramanathan
There's a JIRA SPARK-23529 that deals with mounting hostpath volumes.
I propose we extend that PR/JIRA to encompass all the different volume types and allow mounting them into the driver/executors.

On Thu, Apr 12, 2018 at 10:55 AM Yinan Li <[hidden email]> wrote:
Hi Marius,

Spark on Kubernetes does not yet support mounting user-specified volumes natively. But mounting volume is supported in https://github.com/GoogleCloudPlatform/spark-on-k8s-operator. Please see https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/user-guide.md#mounting-volumes.

On Thu, Apr 12, 2018 at 7:50 AM, Marius <[hidden email]> wrote:
Hey,

i have a question regarding the Spark on Kubernetes feature. I would like to mount a pre-populated Kubernetes volume into the execution pods of Spark. One of my tools that i invoke using the Sparks pipe command requires these files to be available on a POSIX compatible FS and they are too large to justify copying them around using addFile. If this is not possible i would like to know if the community be interested in such a feature.

Cheers

Marius

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--
Anirudh Ramanathan