[Spark scheduling] Spark schedules single task although rdd has 48 partitions?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[Spark scheduling] Spark schedules single task although rdd has 48 partitions?

Paul Borgmans

(please notice this question was previously posted to https://stackoverflow.com/questions/49943655/spark-schedules-single-task-although-rdd-has-48-partitions)

We are running Spark 2.3 / Python 3.5.2. For a job we run following code (please notice that the input txt files are just a simplified example, in-fact these are large binary files and sc.binaryFiles(...) runs out of memory loading the content into memory, therefor only the filenames are parallelized and the executors open/read the content):

files = [u'foo.txt', u'bar.txt', u'baz.txt', etc....]  # len(files) == 155

def func(filename):

    from app import generate_rows

    return list(generate_rows(filename))

 

rdd = sc.parallelize(files, numSlices=48)

rdd2 = rdd.flatMap(func)

rdd3 = rdd2.map(lambda d: Row(**d))

df = spark.createDataFrame(rdd3)

df.write.mode(u'append').partitionBy(u'foo').parquet(output_path)

 

Where the app is a Python module (added to Spark using --py-files app.egg), simplified code is like this:

def generate_rows(filename):

    <opens the file and performs compute intensive operations; taking +/- 1min ultimately returning a dict>

    yield OrderedDict([

        (u'filename', filename),

        (u'item1', u'item1'),

        ....etc

    ])

 

We notice that the cluster is not utilized fully during the first stages which we don't understand, and we are looking for ways to control this behavior.

Job0 Stage0 1Task 1min paralellize

Job1 Stage1 1Task 2min paralellize

Job2 Stage2 1Task 1min paralellize

Job3 Stage3 48Tasks 5min paralellize|mappartitions|map|mappartitions|existingRDD|sort

What are the first 3 jobs? And why isn't there 1 Job/Stage with the 48 tasks (as expected given the second parameter of parallelize set to 48)?

 

Excerpt from DEBUG logging:


18/05/02 10:09:07 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 0

18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 1

18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 1

...

18/05/02 10:09:58 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 1

18/05/02 10:09:59 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 1

18/05/02 10:10:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks: 0

18/05/02 10:10:00 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

18/05/02 10:10:00 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

18/05/02 10:10:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 0

18/05/02 10:10:01 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 1

18/05/02 10:10:02 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 1

...

18/05/02 10:12:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 1

18/05/02 10:12:04 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 1

18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks: 0

18/05/02 10:12:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

18/05/02 10:12:05 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks

18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 0

18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 1

18/05/02 10:12:06 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 1

...

18/05/02 10:12:59 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 1

18/05/02 10:13:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 1

18/05/02 10:13:01 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 0

18/05/02 10:13:01 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool

18/05/02 10:13:03 INFO TaskSchedulerImpl: Adding task set 3.0 with 48 tasks

18/05/02 10:13:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 0

18/05/02 10:13:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 48

18/05/02 10:13:04 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 48

...

18/05/02 10:17:16 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 1

18/05/02 10:17:17 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 1

18/05/02 10:17:18 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks: 0

18/05/02 10:17:18 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool

 

-- The information contained in this communication and any attachments is confidential and may be privileged, and is for the sole use of the intended recipient(s). Any unauthorized review, use, disclosure or distribution is prohibited. Unless explicitly stated otherwise in the body of this communication or the attachment thereto (if any), the information is provided on an AS-IS basis without any express or implied warranties or liabilities. To the extent you are relying on this information, you are doing so at your own risk. If you are not the intended recipient, please notify the sender immediately by replying to this message and destroy all copies of this message and any attachments. Neither the sender nor the company/group of companies he or she represents shall be liable for the proper and complete transmission of the information contained in this communication, or for any delay in its receipt.