[Help] Set nThread in Spark cluster

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[Help] Set nThread in Spark cluster

Aakash Basu-2
Hi,

API = ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier

val xgbParam = Map("eta" -> 0.1f,

     |       "max_depth" -> 2,

     |       "objective" -> "multi:softprob",

     |       "num_class" -> 3,

     |       "num_round" -> 100,

     |       "num_workers" -> 2)


I'm running a job which will not work until the number of threads of the API is equivalent to the num_worker set for Spark.

So, in master = local mode, when I do --master local[n] and also set num_worker for that API as the same value as n, it works.

But, in cluster I do not know which parameter to control which precisely takes the call of handling the number of threads. I tried with -

1) spark.task.cpus
2) spark.default.parallelism
3) executor cores

But, none of them works, and the speciality of this issue is, it goes into a halt while distributing the XGBoost model if the above condition is not met.


My code is as follows, it works in local mode, but not in cluster, any help?

Code:

import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType

val schema = new StructType(Array(
  StructField("sepal length", DoubleType, true),
  StructField("sepal width", DoubleType, true),
  StructField("petal length", DoubleType, true),
  StructField("petal width", DoubleType, true),
  StructField("class", StringType, true)))
val rawInput = spark.read.schema(schema).csv("file:///appdata/bblite-data/iris.csv")
import org.apache.spark.ml.feature.StringIndexer

val stringIndexer = new StringIndexer().
  setInputCol("class").
  setOutputCol("classIndex").
  fit(rawInput)
val labelTransformed = stringIndexer.transform(rawInput).drop("class")

import org.apache.spark.ml.feature.VectorAssembler
val vectorAssembler = new VectorAssembler().
  setInputCols(Array("sepal length", "sepal width", "petal length", "petal width")).
  setOutputCol("features")
val xgbInput = vectorAssembler.transform(labelTransformed).select("features", "classIndex")
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
val xgbParam = Map("eta" -> 0.1f,
      "max_depth" -> 2,
      "objective" -> "multi:softprob",
      "num_class" -> 3,
      "num_round" -> 100,
      "num_workers" -> 2)
val xgbClassifier = new XGBoostClassifier(xgbParam).
      setFeaturesCol("features").
      setLabelCol("classIndex")
val xgbClassificationModel = xgbClassifier.fit(xgbInput)



Thanks,
Aakash.