spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andy Davidson <A...@SantaCruzIntegration.com>
Subject getting more concurrency best practices
Date Tue, 26 Jul 2016 17:44:55 GMT
Bellow is a very simple application. It runs very slowly. It does not look
like I am getting a lot of parallel execution. I image this is a very common
work flow. Periodically I want to runs some standard summary statistics
across several different data sets.

Any suggestions would be greatly appreciated.

Andy

Overview 
All the sets use the same data format. The data is twitter tweet stored in
JSON. The JSON is very complicated. Each record could be as large as 4k. The
data is collected using spark streaming. Every mini batch is stored in S3 as
separate object. E.G. s3n://buckName/date/timestampMS/parts*. I only select
one col. From the data frame. The column is “top” level key in the JSON
structure

The program is simple

For each data set
1. Find all the part files
2. Load them into a data frame
3. Calculate the summary stat and print
4. Free memory
In my example bellow the data sets are not very big.

# fullPath is list of part files.
sqlContext.read.format('json').load(fullPath).select("body") #.cache()


1
%%timeit -n 1 -r 1
2
# %timeit # line magic
3
# %%timeit # cell magic
4
# -n 1 -r 1 # run cell once
5
​
6
for prefix in districtDataSets:
7
    dataSet = [name for name in constituentDataSets if
name.startswith(prefix)]
8
    # print(dataSets)
9
    # would be nice if we could have this loop run in parallel
10
    constituentDFS = getDataFrames(dataSet) # returns a dictionary
11
    # we could union but would probably be slower
12
    total = 0
13
    for name in constituentDFS:
14
        c = constituentDFS[name].count();
15
        total = total + c;
16
    print("{} {:15,}".format(prefix, total))
17
    # free memory
18
    del constituentDFS
19
    
ne-2 110169
fl-8 12
mi-1 2552
ny-19 27657
ny-24 59739
pa-8 42867
wi-8 7352
ny-3 51136
ny-1 105296
ny-22 5671287
mn-2 34834
tx-8 5246
il-6 12772
co-6 24700
1 loop, best of 1: 2h 41min 8s per loop
Environment

I am using spark-1.6.1

My app is using
10 cores, 
6GB per node
5 executors
1 driver

Each executor has at most 2 active tasks


Over all the resources do not seem to be utilized well. I do not think
adding machines would improve performance.

I launch the notebook server as follows

#

# allow notebooks to use all avalible resources

# 

export PYSPARK_PYTHON=python3.4

export PYSPARK_DRIVER_PYTHON=python3.4

export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN"

$SPARK_ROOT/bin/pyspark \

    --master $MASTER_URL \

    --driver-memory 2G \

    --executor-memory 6G \

    $extraPkgs \

    $*



All my data is being read from s3
- Is there an easy way to figure out how much time I am spending reading?
- I am guessing S3 is really slow. I have lot of objects to read.
- I image copying the data to HDFS would run faster how ever I have not
found an easy way to copy the data. I am using ec2. Looks like I would have
to copy from s3 to a file partition in my cluster and then copy to HDFS



Looking at the stages It does not look like shuffle is a major problem






Mime
View raw message