spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Borgmans <paul.borm...@asml.com>
Subject [Spark scheduling] Spark schedules single task although rdd has 48 partitions?
Date Wed, 02 May 2018 09:31:51 GMT
(please notice this question was previously posted to https://stackoverflow.com/questions/49943655/spark-schedules-single-task-although-rdd-has-48-partitions)
We are running Spark 2.3 / Python 3.5.2. For a job we run following code (please notice that
the input txt files are just a simplified example, in-fact these are large binary files and
sc.binaryFiles(...) runs out of memory loading the content into memory, therefor only the
filenames are parallelized and the executors open/read the content):
files = [u'foo.txt', u'bar.txt', u'baz.txt', etc....]  # len(files) == 155
def func(filename):
    from app import generate_rows
    return list(generate_rows(filename))

rdd = sc.parallelize(files, numSlices=48)
rdd2 = rdd.flatMap(func)
rdd3 = rdd2.map(lambda d: Row(**d))
df = spark.createDataFrame(rdd3)
df.write.mode(u'append').partitionBy(u'foo').parquet(output_path)

Where the app is a Python module (added to Spark using --py-files app.egg), simplified code
is like this:
def generate_rows(filename):
    <opens the file and performs compute intensive operations; taking +/- 1min ultimately
returning a dict>
    yield OrderedDict([
        (u'filename', filename),
        (u'item1', u'item1'),
        ....etc
    ])

We notice that the cluster is not utilized fully during the first stages which we don't understand,
and we are looking for ways to control this behavior.
Job0 Stage0 1Task 1min paralellize
Job1 Stage1 1Task 2min paralellize
Job2 Stage2 1Task 1min paralellize
Job3 Stage3 48Tasks 5min paralellize|mappartitions|map|mappartitions|existingRDD|sort
What are the first 3 jobs? And why isn't there 1 Job/Stage with the 48 tasks (as expected
given the second parameter of parallelize set to 48)?

Excerpt from DEBUG logging:
________________________________
18/05/02 10:09:07 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks:
0
18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks:
1
18/05/02 10:09:07 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks:
1
...
18/05/02 10:09:58 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks:
1
18/05/02 10:09:59 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks:
1
18/05/02 10:10:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0.0, runningTasks:
0
18/05/02 10:10:00 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed,
from pool
18/05/02 10:10:00 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
18/05/02 10:10:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks:
0
18/05/02 10:10:01 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks:
1
18/05/02 10:10:02 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks:
1
...
18/05/02 10:12:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks:
1
18/05/02 10:12:04 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks:
1
18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_1.0, runningTasks:
0
18/05/02 10:12:05 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed,
from pool
18/05/02 10:12:05 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks
18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks:
0
18/05/02 10:12:05 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks:
1
18/05/02 10:12:06 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks:
1
...
18/05/02 10:12:59 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks:
1
18/05/02 10:13:00 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks:
1
18/05/02 10:13:01 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks:
0
18/05/02 10:13:01 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed,
from pool
18/05/02 10:13:03 INFO TaskSchedulerImpl: Adding task set 3.0 with 48 tasks
18/05/02 10:13:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks:
0
18/05/02 10:13:03 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks:
48
18/05/02 10:13:04 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks:
48
...
18/05/02 10:17:16 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks:
1
18/05/02 10:17:17 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks:
1
18/05/02 10:17:18 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_3.0, runningTasks:
0
18/05/02 10:17:18 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed,
from pool

-- The information contained in this communication and any attachments is confidential and
may be privileged, and is for the sole use of the intended recipient(s). Any unauthorized
review, use, disclosure or distribution is prohibited. Unless explicitly stated otherwise
in the body of this communication or the attachment thereto (if any), the information is provided
on an AS-IS basis without any express or implied warranties or liabilities. To the extent
you are relying on this information, you are doing so at your own risk. If you are not the
intended recipient, please notify the sender immediately by replying to this message and destroy
all copies of this message and any attachments. Neither the sender nor the company/group of
companies he or she represents shall be liable for the proper and complete transmission of
the information contained in this communication, or for any delay in its receipt.

Mime
View raw message