Spark launcher listener not getting invoked k8s Spark 2.3

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark launcher listener not getting invoked k8s Spark 2.3

purna m

HI im using below code to submit a spark 2.3 application on kubernetes cluster in scala using play framework

I have also tried as a simple scala program without using play framework

im trying to spark submit which was mentioned below programaticallyhttps://spark.apache.org/docs/latest/running-on-kubernetes.html



$ bin/spark-submit \

    --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \

    --deploy-mode cluster \

    --name spark-pi \

    --class org.apache.spark.examples.SparkPi \

    --conf spark.executor.instances=5 \

    --conf spark.kubernetes.container.image=<spark-image> \

    local:///path/to/examples.jar

 

  def index = Action {

    try

    {

    val spark = new SparkLauncher()

      .setMaster("my k8 apiserver host")

      .setVerbose(true)

      .addSparkArg("--verbose")

      .setAppResource("hdfs://server/inputs/my.jar")

      .setConf("spark.app.name","myapp")

      .setConf("spark.executor.instances","5")

      .setConf("spark.kubernetes.container.image","mydockerimage")

      .setDeployMode("cluster")

      .startApplication(new SparkAppHandle.Listener(){

        def infoChanged(handle: SparkAppHandle): Unit = {

          System.out.println("Spark App Id [" + handle.getAppId + "] Info Changed.  State [" + handle.getState + "]")

        }

       def stateChanged(handle: SparkAppHandle): Unit = {

          System.out.println("Spark App Id [" + handle.getAppId + "] State Changed. State [" + handle.getState + "]")

          if (handle.getState.toString == "FINISHED") System.exit(0)

        }

      } )

    Ok(spark.getState().toString())

    }

    catch

    {

      case NonFatal(e)=>{

        println("failed with exception: " + e)

      }

    }

Ok

  }

Reply | Threaded
Open this post in threaded view
|

Re: Spark launcher listener not getting invoked k8s Spark 2.3

Marcelo Vanzin
Please include the mailing list in your replies.

Yes, you'll be able to launch the jobs, but the k8s backend isn't
hooked up to the listener functionality.

On Mon, Apr 30, 2018 at 8:13 PM, purna m <[hidden email]> wrote:

> I’m able to submit the job though !! I mean spark application is running on
> k8 but listener is not getting invoked
>
>
> On Monday, April 30, 2018, Marcelo Vanzin <[hidden email]> wrote:
>>
>> I'm pretty sure this feature hasn't been implemented for the k8s backend.
>>
>> On Mon, Apr 30, 2018 at 4:51 PM, purna m <[hidden email]> wrote:
>> > HI im using below code to submit a spark 2.3 application on kubernetes
>> > cluster in scala using play framework
>> >
>> > I have also tried as a simple scala program without using play framework
>> >
>> > im trying to spark submit which was mentioned below
>> >
>> > programaticallyhttps://spark.apache.org/docs/latest/running-on-kubernetes.html
>> >
>> >
>> >
>> > $ bin/spark-submit \
>> >
>> >     --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
>> >
>> >     --deploy-mode cluster \
>> >
>> >     --name spark-pi \
>> >
>> >     --class org.apache.spark.examples.SparkPi \
>> >
>> >     --conf spark.executor.instances=5 \
>> >
>> >     --conf spark.kubernetes.container.image=<spark-image> \
>> >
>> >     local:///path/to/examples.jar
>> >
>> >
>> >
>> >   def index = Action {
>> >
>> >     try
>> >
>> >     {
>> >
>> >     val spark = new SparkLauncher()
>> >
>> >       .setMaster("my k8 apiserver host")
>> >
>> >       .setVerbose(true)
>> >
>> >       .addSparkArg("--verbose")
>> >
>> >       .setAppResource("hdfs://server/inputs/my.jar")
>> >
>> >       .setConf("spark.app.name","myapp")
>> >
>> >       .setConf("spark.executor.instances","5")
>> >
>> >       .setConf("spark.kubernetes.container.image","mydockerimage")
>> >
>> >       .setDeployMode("cluster")
>> >
>> >       .startApplication(new SparkAppHandle.Listener(){
>> >
>> >         def infoChanged(handle: SparkAppHandle): Unit = {
>> >
>> >           System.out.println("Spark App Id [" + handle.getAppId + "]
>> > Info
>> > Changed.  State [" + handle.getState + "]")
>> >
>> >         }
>> >
>> >        def stateChanged(handle: SparkAppHandle): Unit = {
>> >
>> >           System.out.println("Spark App Id [" + handle.getAppId + "]
>> > State
>> > Changed. State [" + handle.getState + "]")
>> >
>> >           if (handle.getState.toString == "FINISHED") System.exit(0)
>> >
>> >         }
>> >
>> >       } )
>> >
>> >     Ok(spark.getState().toString())
>> >
>> >     }
>> >
>> >     catch
>> >
>> >     {
>> >
>> >       case NonFatal(e)=>{
>> >
>> >         println("failed with exception: " + e)
>> >
>> >       }
>> >
>> >     }
>> >
>> > Ok
>> >
>> >   }
>>
>>
>>
>> --
>> Marcelo



--
Marcelo

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]