No matter how many instances and cores configured for spark on k8s, only one executor is reading file

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

No matter how many instances and cores configured for spark on k8s, only one executor is reading file

沈俊
Hi

I am now trying to use spark to do tcpdump pcap file analysis.  The first step is to read the file and parse the content to dataframe according to analysis requirements. 

I've made a public folder for all executors so that they can access it directly like a local file system. 
Here is the main code:
    filename =["/mdata/400m.pcap"]
#filenames = ["/mdata/400m.pcap","/mdata/400m.pcap1","/mdata/400m.pcap2","/mdata/400m.pcap3","/mdata/400m.pcap4","/mdata/400m.pcap5","/mdata/400m.pcap6",]
    tsharkFilter =  conf.tsharkFilter
    tsharkArgs =   conf.tsharkArgs
    workerAmount = conf.workerAmount

    parallelTasks = spark.sparkContext.parallelize(filenames)
    allSplitedTasks  = parallelTasks.flatMap(lambda x: SplitToTasksByExecutorAmount(x,workerAmount))
    allSplitedTasks =allSplitedTasks.map(lambda x: addTsharkArgs(x,tsharkFilter=tsharkFilter, tsharkArgs=tsharkArgs))
    out = allSplitedTasks.flatMap(readPieces)

Then, the file parsing part is here.
def readPieces(param):
    try:
        filename = param['filename']
        #batchID = param['batchID']
        startPos = param['startPos']
        endPos = param['endPos']
        count = param['count']
        tsharkFilter = param['tsharkFilter'if 'tsharkFilter' in param else None
        tsharkArgs = param['tsharkArgs'if 'tsharkArgs' in param else None
        
        with open(filename, "rb"as f:
            if endPos == 0:
                endPos = f.seek(0,2)
                f.seek(0)
            hdr = f.read(24)
            f.seek(startPos)
            readLen = endPos - startPos
            content = f.read(readLen)
            if count: 
                cmd = ["tshark""-r""-","-c"str(count)]
            else:
                cmd = ["tshark""-r""-"]
            if tsharkArgs: cmd.extend(tsharkArgs)
            if tsharkFilter: cmd.extend(tsharkFilter)
            childProcess = Popen(cmd, stdin=PIPE, stdout=PIPE)
            raw =b''.join([hdr,content])
            outStr = childProcess.communicate(input=raw )[0]
            print(outStr)
            lines =  outStr.split(b'\n')
            return [ x.split(b'\t'for x in lines if x !=b'']
    except:
        return [[str(e)]]

The SplitToTasksByExecutorAmount function will go through the file and then output a list of dictionary elements. So that, i supporse multiple executors would read the file from different startPos and only read to endPos. 
{"filename":filename, "batchID":batchID, "startPos":startPos, "endPos":endPos, "count":count}

Then when application is running, i can only see single tshark process running in all my k8s nodes.     
If i add more filenames into the main code, then the running tshark process equals to filename amount.  

Is there some global lock somethere in spark so that same file would only be read by single executor per time?  Is it possible to enable multiple executors to read same file at the same time?


Thanks
Shen Jun


Reply | Threaded
Open this post in threaded view
|

Re: No matter how many instances and cores configured for spark on k8s, only one executor is reading file

srowen
Pass more partitions to the second argument of parallelize()?

On Mon, Dec 21, 2020 at 7:39 AM 沈俊 <[hidden email]> wrote:
Hi

I am now trying to use spark to do tcpdump pcap file analysis.  The first step is to read the file and parse the content to dataframe according to analysis requirements. 

I've made a public folder for all executors so that they can access it directly like a local file system. 
Here is the main code:
    filename =["/mdata/400m.pcap"]
#filenames = ["/mdata/400m.pcap","/mdata/400m.pcap1","/mdata/400m.pcap2","/mdata/400m.pcap3","/mdata/400m.pcap4","/mdata/400m.pcap5","/mdata/400m.pcap6",]
    tsharkFilter =  conf.tsharkFilter
    tsharkArgs =   conf.tsharkArgs
    workerAmount = conf.workerAmount

    parallelTasks = spark.sparkContext.parallelize(filenames)
    allSplitedTasks  = parallelTasks.flatMap(lambda x: SplitToTasksByExecutorAmount(x,workerAmount))
    allSplitedTasks =allSplitedTasks.map(lambda x: addTsharkArgs(x,tsharkFilter=tsharkFilter, tsharkArgs=tsharkArgs))
    out = allSplitedTasks.flatMap(readPieces)

Then, the file parsing part is here.
def readPieces(param):
    try:
        filename = param['filename']
        #batchID = param['batchID']
        startPos = param['startPos']
        endPos = param['endPos']
        count = param['count']
        tsharkFilter = param['tsharkFilter'if 'tsharkFilter' in param else None
        tsharkArgs = param['tsharkArgs'if 'tsharkArgs' in param else None
        
        with open(filename, "rb"as f:
            if endPos == 0:
                endPos = f.seek(0,2)
                f.seek(0)
            hdr = f.read(24)
            f.seek(startPos)
            readLen = endPos - startPos
            content = f.read(readLen)
            if count: 
                cmd = ["tshark""-r""-","-c"str(count)]
            else:
                cmd = ["tshark""-r""-"]
            if tsharkArgs: cmd.extend(tsharkArgs)
            if tsharkFilter: cmd.extend(tsharkFilter)
            childProcess = Popen(cmd, stdin=PIPE, stdout=PIPE)
            raw =b''.join([hdr,content])
            outStr = childProcess.communicate(input=raw )[0]
            print(outStr)
            lines =  outStr.split(b'\n')
            return [ x.split(b'\t'for x in lines if x !=b'']
    except:
        return [[str(e)]]

The SplitToTasksByExecutorAmount function will go through the file and then output a list of dictionary elements. So that, i supporse multiple executors would read the file from different startPos and only read to endPos. 
{"filename":filename, "batchID":batchID, "startPos":startPos, "endPos":endPos, "count":count}

Then when application is running, i can only see single tshark process running in all my k8s nodes.     
If i add more filenames into the main code, then the running tshark process equals to filename amount.  

Is there some global lock somethere in spark so that same file would only be read by single executor per time?  Is it possible to enable multiple executors to read same file at the same time?


Thanks
Shen Jun


Reply | Threaded
Open this post in threaded view
|

Re: No matter how many instances and cores configured for spark on k8s, only one executor is reading file

沈俊
Hi

Finally, i found a configuraiton parameter:  spark.default.parallelism
Change this parmater will finally change the parallel running exeutor amount,  although log file still says first 15 tasks ... blabla. 

Any way, my problem is solved.




------------------ Original ------------------
From: "沈俊" <[hidden email]>;
Date: Tue, Dec 22, 2020 10:34 AM
To: "Sean Owen"<[hidden email]>;
Cc: "user"<[hidden email]>;
Subject: Re: No matter how many instances and cores configured for spark on k8s, only one executor is reading file

Hi  Sean Owen

Great thanks for your reply and suggestion. 
I tried add the second parmater to 30 to func parallelize.  But it still keeps single executor running tasks.
But if i add two lines, then can have multiple executors to run tasks.
    parallelTasks = spark.sparkContext.parallelize(filenames,30)
    allSplitedTasks  = parallelTasks.flatMap(lambda x: SplitToTasksByExecutorAmount(x,workerAmount))
    allSplitedTasks =allSplitedTasks.map(lambda x: addTsharkArgs(x,tsharkFilter=tsharkFilter, tsharkArgs=tsharkArgs))
    listSplitedTasks = allSplitedTasks.collect()
    allSplitedTasks = spark.sparkContext.parallelize(listSplitedTasks)
    out = allSplitedTasks.flatMap(readPiecesNew)

The added two lines is to terminiate previous rdd via collect() method, then create a new rdd based on collected data.   Then i see a new issue from logs: max 15 tasks per batch.
20/12/22 02:07:06 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 8.4 KiB, free 3.4 GiB)
20/12/22 02:07:06 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 5.6 KiB, free 3.4 GiB)
20/12/22 02:07:06 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on shenjun-2ee1217688323457-driver-svc.default.svc:7079 (size: 5.6 KiB, free: 3.4 GiB)
20/12/22 02:07:06 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1223
20/12/22 02:07:06 INFO DAGScheduler: Submitting 80 missing tasks from ResultStage 1 (PythonRDD[3] at count at /mdata/iostat.py:354) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
20/12/22 02:07:06 INFO TaskSchedulerImpl: Adding task set 1.0 with 80 tasks

This 15 tasks could be seen from the last second line.   Actually, not mattter how many elements in rdd allSplitedTasks this value is always 15. 
Any approach to change it dynamically by programmers?




------------------ Original ------------------
From: "Sean Owen" <[hidden email]>;
Date: Mon, Dec 21, 2020 09:53 PM
To: "沈俊"<[hidden email]>;
Cc: "user"<[hidden email]>;
Subject: Re: No matter how many instances and cores configured for spark on k8s, only one executor is reading file

Pass more partitions to the second argument of parallelize()?

On Mon, Dec 21, 2020 at 7:39 AM 沈俊 <[hidden email]> wrote:
Hi

I am now trying to use spark to do tcpdump pcap file analysis.  The first step is to read the file and parse the content to dataframe according to analysis requirements.

I've made a public folder for all executors so that they can access it directly like a local file system.
Here is the main code:
    filename =["/mdata/400m.pcap"]
#filenames = ["/mdata/400m.pcap","/mdata/400m.pcap1","/mdata/400m.pcap2","/mdata/400m.pcap3","/mdata/400m.pcap4","/mdata/400m.pcap5","/mdata/400m.pcap6",]
    tsharkFilter =  conf.tsharkFilter
    tsharkArgs =   conf.tsharkArgs
    workerAmount = conf.workerAmount

    parallelTasks = spark.sparkContext.parallelize(filenames)
    allSplitedTasks  = parallelTasks.flatMap(lambda x: SplitToTasksByExecutorAmount(x,workerAmount))
    allSplitedTasks =allSplitedTasks.map(lambda x: addTsharkArgs(x,tsharkFilter=tsharkFilter, tsharkArgs=tsharkArgs))
    out = allSplitedTasks.flatMap(readPieces)

Then, the file parsing part is here.
def readPieces(param):
    try:
        filename = param['filename']
        #batchID = param['batchID']
        startPos = param['startPos']
        endPos = param['endPos']
        count = param['count']
        tsharkFilter = param['tsharkFilter'] if 'tsharkFilter' in param else None
        tsharkArgs = param['tsharkArgs'] if 'tsharkArgs' in param else None
       
        with open(filename, "rb") as f:
            if endPos == 0:
                endPos = f.seek(0,2)
                f.seek(0)
            hdr = f.read(24)
            f.seek(startPos)
            readLen = endPos - startPos
            content = f.read(readLen)
            if count:
                cmd = ["tshark", "-r", "-","-c", str(count)]
            else:
                cmd = ["tshark", "-r", "-"]
            if tsharkArgs: cmd.extend(tsharkArgs)
            if tsharkFilter: cmd.extend(tsharkFilter)
            childProcess = Popen(cmd, stdin=PIPE, stdout=PIPE)
            raw =b''.join([hdr,content])
            outStr = childProcess.communicate(input=raw )[0]
            print(outStr)
            lines =  outStr.split(b'\n')
            return [ x.split(b'\t') for x in lines if x !=b'']
    except:
        return [[str(e)]]

The SplitToTasksByExecutorAmount function will go through the file and then output a list of dictionary elements. So that, i supporse multiple executors would read the file from different startPos and only read to endPos.
{"filename":filename, "batchID":batchID, "startPos":startPos, "endPos":endPos, "count":count}

Then when application is running, i can only see single tshark process running in all my k8s nodes.     
If i add more filenames into the main code, then the running tshark process equals to filename amount. 

Is there some global lock somethere in spark so that same file would only be read by single executor per time?  Is it possible to enable multiple executors to read same file at the same time?


Thanks
Shen Jun