Reading Large SequenceFile into RDD Results in Imbalance Task

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Reading Large SequenceFile into RDD Results in Imbalance Task

phonchi
This post has NOT been accepted by the mailing list yet.
Dear all:

I have generated a sequence file which is about 900MB that contains 15 records, among them, the size of 13 records are around 64MB (The block size of our HDFS).

In Pyspark, I read it as follows (The key and value are both custom java/scala class):

rdd = sc.sequenceFile("hdfs:///Test.seq", keyClass = "ChunkID", valueClass="ChunkData", keyConverter="KeyToChunkConverter", valueConverter="DataToChunkConverter")

rdd.getNumPartitions() shows that there are 14 partitions. And I try to perform some algorithm with it as follows:

def open_map():
    def open_map_nested(key_value):
        try:
            # ChunkID, ChunkData
            key, data = key_value
            ####Algorithm Ommited#####                  
            if key[0] == 0:              
                return [['if', 'if', 'if']]
            else:
                return [["else","else","else"]]
        except Exception, e:
            logging.exception(e)
            return [["None","None","None"],["None","None","None"]] #["None"]
    return open_map_nested
result = rdd.flatMap(open_map()).count()

However, when I open the Spark UI, the first two records are always assigned to one task and the last task does not consume any record as following diagram shows(also the input size is strange):



This results in other tasks which contain exactly one records waiting for the task to complete and becomes the bottleneck.

I have also tried to use java to implement the same function, however, the problem remains.

The setting of my job is  --master yarn --deploy-mode client --driver-memory 8G --num-executors 20  --executor-cores 1 --executor-memory 1500M.

My questions is:

Is there any parameter or coding style that can avoid spark assign two records to the first task?

Any suggestions will be highly appreciated!!

ps: I also post the questions in stackoverflow http://stackoverflow.com/questions/43358158/reading-sequencefile-into-rdd-in-spark-results-in-imbalance-task