spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sidney Feiner <>
Subject RE: [PySpark] - running processes and computing time
Date Tue, 04 Jul 2017 12:15:44 GMT
To initialize it per executor, I used a class with only class attibutes and class methods (like
an `object` in Scala), but because  I was using PySpark, it was actually being initiated per
process ☹
What I went for was the broadcast variable but there still is something suspicious with my
application – the processing time of each batch.

In my logs, I see that when I process a partition, it takes under a second. But in the Spark
UI I see that a task takes between 3 and 6 seconds.
Shouldn't the partition process time and the task computing time be the same?

My code:

def process_func (obj, records):
    start = time()
    processed_records = # Some processing"It took {0} seconds to handle records".format(time() - start, events_amount))
 # This logs very small numbers (around 0.05 seoonds)
    return analyzed_events

def handle_rdd(rdd: RDD):
    start_time = time.time()
    rdd.foreachPartition(lambda records: process_func(object_broadcast.value, records))"Handle RDD took: {0} seconds".format(time.time() - start_time))  # This logs
much bigger numbers (around 3-6 seconds)

    .filter(lambda x: x[1] is not None)\
    .map(lambda x: x[1])\
    .foreachRDD(handle_rdd)  # Keep only values and cast them to TextAnalysis

each RDD has at most 10 partitions which means that it should take around 0.5 seconds for
all the tasks to be processed.

Does anyone know what happens here? The time difference is too big for it to be networking

From: Sudev A C []
Sent: Monday, July 3, 2017 7:48 PM
To: Sidney Feiner <>;
Subject: Re: [PySpark] - running processes

You might want to do the initialisation per partition (Not sure how you are achieving the
per executor initialisation in your code ).

To initialise something for per partition, you may use something like rdd.forEach partition.

Or if you want something globally like a variable for further processing you might want to
initialise it once as a broadcast variable and use access the data structure through broadcast

Afaik python process will be initiated for per partition tasks.
On Mon, 3 Jul 2017 at 5:23 PM, Sidney Feiner <<>>

In my Spark Streaming application, I have the need to build a graph from a file and initializing
that graph takes between 5 and 10 seconds.

So I tried initializing it once per executor so it'll be initialized only once.

After running the application, I've noticed that it's initiated much more than once per executor,
every time with a different process id (every process has it's own logger).

Doesn't every executor have it's own JVM and it's own process? Or is that only relevant when
I develop in JVM languages like Scala/Java? Do executors in PySpark spawn new processes for
new tasks?

And if they do, how can I make sure that my graph object will really only be initiated once?
Thanks :)

Sidney Feiner / SW Developer
M: +972.528197720 / Skype: sidney.feiner.startapp




This message is intended only for the use of the addressee and may contain information that
is privileged, confidential and exempt from disclosure under applicable law. If the reader
of this message is not the intended recipient, or the employee or agent responsible for delivering
the message to the intended recipient, you are hereby notified that any dissemination, distribution
or copying of this communication is strictly prohibited. If you have received this e-mail
in error, please notify us immediately by return e-mail and delete this e-mail and all attachments
from your system.
View raw message