[Help] Set nThread in Spark cluster

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

[Help] Set nThread in Spark cluster

Aakash Basu-2

API = ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier

val xgbParam = Map("eta" -> 0.1f,

     |       "max_depth" -> 2,

     |       "objective" -> "multi:softprob",

     |       "num_class" -> 3,

     |       "num_round" -> 100,

     |       "num_workers" -> 2)

I'm running a job which will not work until the number of threads of the API is equivalent to the num_worker set for Spark.

So, in master = local mode, when I do --master local[n] and also set num_worker for that API as the same value as n, it works.

But, in cluster I do not know which parameter to control which precisely takes the call of handling the number of threads. I tried with -

1) spark.task.cpus
2) spark.default.parallelism
3) executor cores

But, none of them works, and the speciality of this issue is, it goes into a halt while distributing the XGBoost model if the above condition is not met.

My code is as follows, it works in local mode, but not in cluster, any help?


import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType

val schema = new StructType(Array(
  StructField("sepal length", DoubleType, true),
  StructField("sepal width", DoubleType, true),
  StructField("petal length", DoubleType, true),
  StructField("petal width", DoubleType, true),
  StructField("class", StringType, true)))
val rawInput = spark.read.schema(schema).csv("file:///appdata/bblite-data/iris.csv")
import org.apache.spark.ml.feature.StringIndexer

val stringIndexer = new StringIndexer().
val labelTransformed = stringIndexer.transform(rawInput).drop("class")

import org.apache.spark.ml.feature.VectorAssembler
val vectorAssembler = new VectorAssembler().
  setInputCols(Array("sepal length", "sepal width", "petal length", "petal width")).
val xgbInput = vectorAssembler.transform(labelTransformed).select("features", "classIndex")
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
val xgbParam = Map("eta" -> 0.1f,
      "max_depth" -> 2,
      "objective" -> "multi:softprob",
      "num_class" -> 3,
      "num_round" -> 100,
      "num_workers" -> 2)
val xgbClassifier = new XGBoostClassifier(xgbParam).
val xgbClassificationModel = xgbClassifier.fit(xgbInput)