spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "沈俊" <jun.shen....@qq.com>
Subject No matter how many instances and cores configured for spark on k8s, only one executor is reading file
Date Mon, 21 Dec 2020 09:29:58 GMT
Hi


I am now trying to use spark to do tcpdump pcap file analysis.&nbsp; The first step is
to read the file and parse the content to dataframe according to analysis requirements.&nbsp;


I've made a public folder for all executors so that they can access it directly like a local
file system.&nbsp;
Here is the main code:
&nbsp;&nbsp;&nbsp;&nbsp;filename =["/mdata/400m.pcap"]
    #filenames&nbsp;=&nbsp;["/mdata/400m.pcap","/mdata/400m.pcap1","/mdata/400m.pcap2","/mdata/400m.pcap3","/mdata/400m.pcap4","/mdata/400m.pcap5","/mdata/400m.pcap6",]
&nbsp;&nbsp;&nbsp;&nbsp;tsharkFilter&nbsp;=&nbsp;&nbsp;conf.tsharkFilter
&nbsp;&nbsp;&nbsp;&nbsp;tsharkArgs&nbsp;=&nbsp;&nbsp;&nbsp;conf.tsharkArgs
&nbsp;&nbsp;&nbsp;&nbsp;workerAmount&nbsp;=&nbsp;conf.workerAmount

&nbsp;&nbsp;&nbsp;&nbsp;parallelTasks&nbsp;=&nbsp;spark.sparkContext.parallelize(filenames)
&nbsp;&nbsp;&nbsp;&nbsp;allSplitedTasks&nbsp;&nbsp;=&nbsp;parallelTasks.flatMap(lambda&nbsp;x:&nbsp;SplitToTasksByExecutorAmount(x,workerAmount))
&nbsp;&nbsp;&nbsp;&nbsp;allSplitedTasks&nbsp;=allSplitedTasks.map(lambda&nbsp;x:&nbsp;addTsharkArgs(x,tsharkFilter=tsharkFilter,&nbsp;tsharkArgs=tsharkArgs))
&nbsp;&nbsp;&nbsp;&nbsp;out&nbsp;=&nbsp;allSplitedTasks.flatMap(readPieces)




Then, the file parsing part is here.
def&nbsp;readPieces(param):
&nbsp;&nbsp;&nbsp;&nbsp;try:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;filename&nbsp;=&nbsp;param['filename']
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;#batchID&nbsp;=&nbsp;param['batchID']
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;startPos&nbsp;=&nbsp;param['startPos']
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;endPos&nbsp;=&nbsp;param['endPos']
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;count&nbsp;=&nbsp;param['count']
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;tsharkFilter&nbsp;=&nbsp;param['tsharkFilter']&nbsp;if&nbsp;'tsharkFilter'&nbsp;in&nbsp;param&nbsp;else&nbsp;None
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;tsharkArgs&nbsp;=&nbsp;param['tsharkArgs']&nbsp;if&nbsp;'tsharkArgs'&nbsp;in&nbsp;param&nbsp;else&nbsp;None
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;with&nbsp;open(filename,&nbsp;"rb")&nbsp;as&nbsp;f:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;endPos&nbsp;==&nbsp;0:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;endPos&nbsp;=&nbsp;f.seek(0,2)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;f.seek(0)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;hdr&nbsp;=&nbsp;f.read(24)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;f.seek(startPos)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;readLen&nbsp;=&nbsp;endPos&nbsp;-&nbsp;startPos
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;content&nbsp;=&nbsp;f.read(readLen)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;count:&nbsp;
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;cmd&nbsp;=&nbsp;["tshark",&nbsp;"-r",&nbsp;"-","-c",&nbsp;str(count)]
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;else:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;cmd&nbsp;=&nbsp;["tshark",&nbsp;"-r",&nbsp;"-"]
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;tsharkArgs:&nbsp;cmd.extend(tsharkArgs)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if&nbsp;tsharkFilter:&nbsp;cmd.extend(tsharkFilter)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;childProcess&nbsp;=&nbsp;Popen(cmd,&nbsp;stdin=PIPE,&nbsp;stdout=PIPE)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;raw&nbsp;=b''.join([hdr,content])
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;outStr&nbsp;=&nbsp;childProcess.communicate(input=raw&nbsp;)[0]
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;print(outStr)
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;lines&nbsp;=&nbsp;&nbsp;outStr.split(b'\n')
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;[&nbsp;x.split(b'\t')&nbsp;for&nbsp;x&nbsp;in&nbsp;lines&nbsp;if&nbsp;x&nbsp;!=b'']
&nbsp;&nbsp;&nbsp;&nbsp;except:
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return&nbsp;[[str(e)]]




The&nbsp;SplitToTasksByExecutorAmount function will go through the file and then output
a list of dictionary elements. So that, i supporse multiple executors would read the file
from different startPos and only read to endPos.&nbsp;
{"filename":filename,&nbsp;"batchID":batchID,&nbsp;"startPos":startPos,&nbsp;"endPos":endPos,&nbsp;"count":count}



Then when application is running, i can only see single tshark process running in all my k8s
nodes.&nbsp; &nbsp; &nbsp;
If i add more filenames into the main code, then the running tshark process equals to filename
amount.&nbsp;&nbsp;


Is there some global lock somethere in spark so that same file would only be read by single
executor per time?&nbsp; Is it possible to enable multiple executors to read same file
at the same time?




Thanks
Shen Jun
Mime
View raw message