executors processing tasks sequentially

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

executors processing tasks sequentially

Ramanan, Buvana (Nokia - US/Murray Hill)

Hi All,

 

Using Spark 2.2.0 on YARN cluster.

 

I am running the Kafka Direct Stream wordcount example code (pasted below my signature). My topic consists of 400 partitions. And the Spark Job tracker page shows 26 executors to process the corresponding 400 tasks.

 

When I check the execution timeline for each job (=2 sec microbatch worth of records), it shows the tasks to be executed serially by the executors. I attach a screen shot for reference (shows only two out of the 26 executors).

 

I increased the total-executor-cores to 200, in hopes that it would show me 4 tasks to be processed in parallel by each executor. Still the behavior continues.

Ran the scala wordcount example that uses Direct Kafka Stream (supposedly using kafka010), reading from the same topic. Once again, I see the tasks to be serially executed and not in parallel within an executor.

 

Can someone please explain why the executor processes the tasks serially? Is it expected? Does it have something to do with YARN?

 

Thanks,

Buvana

 

rom __future__ import print_function

 

import sys

 

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils

 

# This keeps a running count of the total service.instances

def updateFunc(newValues,runningCount):

    if runningCount is None:

        runningCount=0

    return sum(newValues,runningCount)

 

 

if __name__ == "__main__":

    if len(sys.argv) != 3:

        print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)

        exit(-1)

 

    sc = SparkContext(appName="Wordcount_python_DS_Kafka")

    ssc = StreamingContext(sc, 2)

 

    brokers, topic = sys.argv[1:]

    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

    lines = kvs.map(lambda x: x[1])

    counts = lines.map(lambda line: line.split("|")[1]).flatMap(lambda line: line.split(" ")) \

        .map(lambda word: (word, 1)) \

        .reduceByKey(lambda a, b: a+b)

    counts.pprint()

    ssc.start()

ssc.awaitTermination()



---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

tasks_timeline_400partitions_Capture.JPG (93K) Download Attachment