Spark2 DynamicAllocation doesn't release executors that used cache

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark2 DynamicAllocation doesn't release executors that used cache

Sergejs Andrejevs

Hi,

 

We're starting to use Spark2 with usecases for Dynamic Allocation.

However, it was noticed it doesn't work as expected when dataset is cached&uncached (persist&unpersist).

The cluster runs with:

CDH 5.15.0

Spark 2.3.0

Oracle Java 8.131

 

The following configs are passed to spark (as well as setup at cluster):

# Dynamic Allocation

spark.shuffle.service.enabled                                      true

spark.dynamicAllocation.enabled                                    true

 

spark.dynamicAllocation.schedulerBacklogTimeout                    1

spark.dynamicAllocation.sustainedSchedulerBacklogTimeout           1

spark.dynamicAllocation.executorIdleTimeout                        90

 

spark.dynamicAllocation.initialExecutors                           1

spark.dynamicAllocation.minExecutors                               1

spark.dynamicAllocation.maxExecutors                               30

 

Cluster also has these configs enabled, as well as spark_shuffle is setup and YARN application classpath is populated. The executors' storage is freed upon application finish (based on: https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service)

Here is the simplified code that reproduced the issue in our cluster (HA YARN).

 

When the following code is executed with "cache=false" - the executors are created, used and killed by idle timeout.

When "cache=true" - the executors are created, used, but not killed and they remain hanging.

 

The storage in both cases was cleaned up.

void run() {
    List<O1> objList = new ArrayList<>();
    for (long i = 0; i < 1000; i++) {
        objList.add(new O1(i, "test"));
    }
 
    Dataset<O1> ds = sparkSession.createDataset(objList, Encoders.bean(O1.class));
    ds = ds.repartition(4);
 
    if (cache) {
        ds.persist(StorageLevel.MEMORY_AND_DISK());
        try {
            ds.show(100, false);
        } finally {
            ds.unpersist();
        }
    } else {
        ds.show(100, false);
    }
}
 
//O1 POJO class:
public class O1 {                               
    private Long transactionDate;
    private String name;
 
    public O1() {
    }
    public O1(Long transactionDate, String name) {
        this.transactionDate = transactionDate;
        this.name = name;
    }
 
    public Long getTransactionDate() {
        return transactionDate;
    }
    public void setTransactionDate(Long transactionDate) {
        this.transactionDate = transactionDate;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}

 

Moreover, when spark.dynamicAllocation.cachedExecutorIdleTimeout is set to some particular time, then the containers are killed successfully (even if they have used cache) (the check was inspired by: https://spark.apache.org/docs/latest/job-scheduling.html#graceful-decommission-of-executors )

 

Unfortunately, we will have in future containers that keep cache and might live for a long time, as well as containers that free the cache (unpersist) and are expected to be killed (along with idling executors).

 

Is it a bug or some configuration is missing?

 

Best regards,

Sergejs Andrejevs