spark on k8s - can driver and executor have separate checkpoint location?

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

spark on k8s - can driver and executor have separate checkpoint location?

wzhan
Hi guys,

I'm running spark applications on kubernetes. According to spark
documentation
https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
Spark needs distributed file system to store its checkpoint data so that in
case of failure, it can recover from checkpoint directory.

My question is, can driver and executor have separate checkpoint location?
I'm asking this because driver and executor might be deployed on different
nodes. A shared checkpoint location will require ReadWriteMany access mode.
Since I only have a storage class that supports ReadWriteOnce access mode
I'm trying to find some workaround.


In Spark Streaming Guide, it mentioned "Failed driver can be restarted from
checkpoint information" and when executor failed, "Tasks and receivers
restarted by Spark automatically, no config needed".

Given this I tried only config checkpoint location for driver pod. It
immediately failed with below exception:

2020-05-15T14:20:17.142 [stream execution thread for baseline_windower [id =
b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
f246774e-5a20-4bfb-b997-4d06c344bb0f]hread] ERROR
org.apache.spark.sql.execution.streaming.MicroBatchExecution - Query
baseline_windower [id = b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
f246774e-5a20-4bfb-b997-4d06c344bb0f] terminated with error
org.apache.spark.SparkException: Writing job aborted.
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
...
        at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task
0.3 in stage 11.0 (TID 420, 10.233.124.162, executor 1):
java.io.IOException: mkdir of
file:/opt/window-data/baseline_windower/state/0/0 failed


So I tried giving separate checkpoint location to driver and executor:

In spark application helm chart I have a checkpointlocation configuration:

spec:
   sparkConf:
     "spark.sql.streaming.checkpointLocation": "file:///opt/checkpoint-data"

I created two checkpoint pvc and mount the volume for driver and executor
pod:

  volumes:
    - name: checkpoint-driver-volume
      persistentVolumeClaim:
        claimName: checkpoint-driver-pvc
    - name: checkpoint-executor-volume
      persistentVolumeClaim:
        claimName: checkpoint-executor-pvc

driver:
    volumeMounts:
      - name: checkpoint-driver-volume
        mountPath: "/opt/checkpoint-data"
...
executor:
    volumeMounts:
      - name: checkpoint-executor-volume
        mountPath: "/opt/checkpoint-data"

After deployment it seems to be working. I tried restarted driver pod and it
did recover from checkpoint directory. But I'm not sure if this is actually
supported by design.

Thanks,
wzhan



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: spark on k8s - can driver and executor have separate checkpoint location?

AliGouta
Hello,

I am wondering if you do so, then all your executor pods should run on the same kubernetes worker node since you mount a single volume with a ReadWriteOnce policy. By design this seems not to be good I assume. You may need to have a kind of ReadWriteMany policy associated to the volume. Then have pod anti-affinity to make sure they are not running on the same node. You may achieve this by running an NFS fiesystem and then create a PV/PVC that mounts to that shared file system. The persistentVolumeClaim defined in your Yaml should call the PVC you created.

Best regards,
Ali Gouta.

On Sat, May 16, 2020 at 6:06 AM wzhan <[hidden email]> wrote:
Hi guys,

I'm running spark applications on kubernetes. According to spark
documentation
https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
Spark needs distributed file system to store its checkpoint data so that in
case of failure, it can recover from checkpoint directory.

My question is, can driver and executor have separate checkpoint location?
I'm asking this because driver and executor might be deployed on different
nodes. A shared checkpoint location will require ReadWriteMany access mode.
Since I only have a storage class that supports ReadWriteOnce access mode
I'm trying to find some workaround.


In Spark Streaming Guide, it mentioned "Failed driver can be restarted from
checkpoint information" and when executor failed, "Tasks and receivers
restarted by Spark automatically, no config needed".

Given this I tried only config checkpoint location for driver pod. It
immediately failed with below exception:

2020-05-15T14:20:17.142 [stream execution thread for baseline_windower [id =
b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
f246774e-5a20-4bfb-b997-4d06c344bb0f]hread] ERROR
org.apache.spark.sql.execution.streaming.MicroBatchExecution - Query
baseline_windower [id = b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
f246774e-5a20-4bfb-b997-4d06c344bb0f] terminated with error
org.apache.spark.SparkException: Writing job aborted.
        at
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
        at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
        at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
...
        at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost task
0.3 in stage 11.0 (TID 420, 10.233.124.162, executor 1):
java.io.IOException: mkdir of
file:/opt/window-data/baseline_windower/state/0/0 failed


So I tried giving separate checkpoint location to driver and executor:

In spark application helm chart I have a checkpointlocation configuration:

spec:
   sparkConf:
     "spark.sql.streaming.checkpointLocation": "file:///opt/checkpoint-data"

I created two checkpoint pvc and mount the volume for driver and executor
pod:

  volumes:
    - name: checkpoint-driver-volume
      persistentVolumeClaim:
        claimName: checkpoint-driver-pvc
    - name: checkpoint-executor-volume
      persistentVolumeClaim:
        claimName: checkpoint-executor-pvc

driver:
    volumeMounts:
      - name: checkpoint-driver-volume
        mountPath: "/opt/checkpoint-data"
...
executor:
    volumeMounts:
      - name: checkpoint-executor-volume
        mountPath: "/opt/checkpoint-data"

After deployment it seems to be working. I tried restarted driver pod and it
did recover from checkpoint directory. But I'm not sure if this is actually
supported by design.

Thanks,
wzhan



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]