spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davies Liu <dav...@databricks.com>
Subject Re: Python UDF performance at large scale
Date Thu, 25 Jun 2015 07:05:46 GMT
I'm thinking that the batched synchronous version will be too slow
(with small batch size) or easy to OOM with large (batch size). If
it's not that hard, you can give it a try.

On Wed, Jun 24, 2015 at 4:39 PM, Justin Uang <justin.uang@gmail.com> wrote:
> Correct, I was running with a batch size of about 100 when I did the tests,
> because I was worried about deadlocks. Do you have any concerns regarding
> the batched synchronous version of communication between the Java and Python
> processes, and if not, should I file a ticket and starting writing it?
>
> On Wed, Jun 24, 2015 at 7:27 PM Davies Liu <davies@databricks.com> wrote:
>>
>> From you comment, the 2x improvement only happens when you have the
>> batch size as 1, right?
>>
>> On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang <justin.uang@gmail.com>
>> wrote:
>> > FYI, just submitted a PR to Pyrolite to remove their StopException.
>> > https://github.com/irmen/Pyrolite/pull/30
>> >
>> > With my benchmark, removing it basically made it about 2x faster.
>> >
>> > On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal
>> > <punya.biswal@gmail.com>
>> > wrote:
>> >>
>> >> Hi Davies,
>> >>
>> >> In general, do we expect people to use CPython only for "heavyweight"
>> >> UDFs
>> >> that invoke an external library? Are there any examples of using
>> >> Jython,
>> >> especially performance comparisons to Java/Scala and CPython? When
>> >> using
>> >> Jython, do you expect the driver to send code to the executor as a
>> >> string,
>> >> or is there a good way to serialized Jython lambdas?
>> >>
>> >> (For context, I was unable to serialize Nashorn lambdas when I tried to
>> >> use them in Spark.)
>> >>
>> >> Punya
>> >> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <davies@databricks.com>
>> >> wrote:
>> >>>
>> >>> Fare points, I also like simpler solutions.
>> >>>
>> >>> The overhead of Python task could be a few of milliseconds, which
>> >>> means we also should eval them as batches (one Python task per batch).
>> >>>
>> >>> Decreasing the batch size for UDF sounds reasonable to me, together
>> >>> with other tricks to reduce the data in socket/pipe buffer.
>> >>>
>> >>> BTW, what do your UDF looks like? How about to use Jython to run
>> >>> simple Python UDF (without some external libraries).
>> >>>
>> >>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <justin.uang@gmail.com>
>> >>> wrote:
>> >>> > // + punya
>> >>> >
>> >>> > Thanks for your quick response!
>> >>> >
>> >>> > I'm not sure that using an unbounded buffer is a good solution
to
>> >>> > the
>> >>> > locking problem. For example, in the situation where I had 500
>> >>> > columns,
>> >>> > I am
>> >>> > in fact storing 499 extra columns on the java side, which might
make
>> >>> > me
>> >>> > OOM
>> >>> > if I have to store many rows. In addition, if I am using an
>> >>> > AutoBatchedSerializer, the java side might have to write 1 <<
16 ==
>> >>> > 65536
>> >>> > rows before python starts outputting elements, in which case, the
>> >>> > Java
>> >>> > side
>> >>> > has to buffer 65536 complete rows. In general it seems fragile
to
>> >>> > rely
>> >>> > on
>> >>> > blocking behavior in the Python coprocess. By contrast, it's very
>> >>> > easy
>> >>> > to
>> >>> > verify the correctness and performance characteristics of the
>> >>> > synchronous
>> >>> > blocking solution.
>> >>> >
>> >>> >
>> >>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <davies@databricks.com>
>> >>> > wrote:
>> >>> >>
>> >>> >> Thanks for looking into it, I'd like the idea of having
>> >>> >> ForkingIterator. If we have unlimited buffer in it, then will
not
>> >>> >> have
>> >>> >> the problem of deadlock, I think. The writing thread will be
>> >>> >> blocked
>> >>> >> by Python process, so there will be not much rows be buffered(still
>> >>> >> be
>> >>> >> a reason to OOM). At least, this approach is better than current
>> >>> >> one.
>> >>> >>
>> >>> >> Could you create a JIRA and sending out the PR?
>> >>> >>
>> >>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang
>> >>> >> <justin.uang@gmail.com>
>> >>> >> wrote:
>> >>> >> > BLUF: BatchPythonEvaluation's implementation is unusable
at large
>> >>> >> > scale,
>> >>> >> > but
>> >>> >> > I have a proof-of-concept implementation that avoids caching
the
>> >>> >> > entire
>> >>> >> > dataset.
>> >>> >> >
>> >>> >> > Hi,
>> >>> >> >
>> >>> >> > We have been running into performance problems using Python
UDFs
>> >>> >> > with
>> >>> >> > DataFrames at large scale.
>> >>> >> >
>> >>> >> > From the implementation of BatchPythonEvaluation, it looks
like
>> >>> >> > the
>> >>> >> > goal
>> >>> >> > was
>> >>> >> > to reuse the PythonRDD code. It caches the entire child
RDD so
>> >>> >> > that
>> >>> >> > it
>> >>> >> > can
>> >>> >> > do two passes over the data. One to give to the PythonRDD,
then
>> >>> >> > one
>> >>> >> > to
>> >>> >> > join
>> >>> >> > the python lambda results with the original row (which
may have
>> >>> >> > java
>> >>> >> > objects
>> >>> >> > that should be passed through).
>> >>> >> >
>> >>> >> > In addition, it caches all the columns, even the ones
that don't
>> >>> >> > need to
>> >>> >> > be
>> >>> >> > processed by the Python UDF. In the cases I was working
with, I
>> >>> >> > had
>> >>> >> > a
>> >>> >> > 500
>> >>> >> > column table, and i wanted to use a python UDF for one
column,
>> >>> >> > and
>> >>> >> > it
>> >>> >> > ended
>> >>> >> > up caching all 500 columns.
>> >>> >> >
>> >>> >> > I have a working solution over here that does it in one
pass over
>> >>> >> > the
>> >>> >> > data,
>> >>> >> > avoiding caching
>> >>> >> >
>> >>> >> >
>> >>> >> >
>> >>> >> > (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
>> >>> >> > With this patch, I go from a job that takes 20 minutes
then OOMs,
>> >>> >> > to
>> >>> >> > a
>> >>> >> > job
>> >>> >> > that finishes completely in 3 minutes. It is indeed quite
hacky
>> >>> >> > and
>> >>> >> > prone to
>> >>> >> > deadlocks since there is buffering in many locations:
>> >>> >> >
>> >>> >> >     - NEW: the ForkingIterator LinkedBlockingDeque
>> >>> >> >     - batching the rows before pickling them
>> >>> >> >     - os buffers on both sides
>> >>> >> >     - pyspark.serializers.BatchedSerializer
>> >>> >> >
>> >>> >> > We can avoid deadlock by being very disciplined. For example,
we
>> >>> >> > can
>> >>> >> > have
>> >>> >> > the ForkingIterator instead always do a check of whether
the
>> >>> >> > LinkedBlockingDeque is full and if so:
>> >>> >> >
>> >>> >> > Java
>> >>> >> >     - flush the java pickling buffer
>> >>> >> >     - send a flush command to the python process
>> >>> >> >     - os.flush the java side
>> >>> >> >
>> >>> >> > Python
>> >>> >> >     - flush BatchedSerializer
>> >>> >> >     - os.flush()
>> >>> >> >
>> >>> >> > I haven't added this yet. This is getting very complex
however.
>> >>> >> > Another
>> >>> >> > model would just be to change the protocol between the
java side
>> >>> >> > and
>> >>> >> > the
>> >>> >> > worker to be a synchronous request/response. This has
the
>> >>> >> > disadvantage
>> >>> >> > that
>> >>> >> > the CPU isn't doing anything when the batch is being sent
across,
>> >>> >> > but it
>> >>> >> > has
>> >>> >> > the huge advantage of simplicity. In addition, I imagine
that the
>> >>> >> > actual
>> >>> >> > IO
>> >>> >> > between the processes isn't that slow, but rather the
>> >>> >> > serialization
>> >>> >> > of
>> >>> >> > java
>> >>> >> > objects into pickled bytes, and the deserialization/serialization
>> >>> >> > +
>> >>> >> > python
>> >>> >> > loops on the python side. Another advantage is that we
won't be
>> >>> >> > taking
>> >>> >> > more
>> >>> >> > than 100% CPU since only one thread is doing CPU work
at a time
>> >>> >> > between
>> >>> >> > the
>> >>> >> > executor and the python interpreter.
>> >>> >> >
>> >>> >> > Any thoughts would be much appreciated =)
>> >>> >> >
>> >>> >> > Other improvements:
>> >>> >> >     - extract some code of the worker out of PythonRDD
so that we
>> >>> >> > can do
>> >>> >> > a
>> >>> >> > mapPartitions directly in BatchedPythonEvaluation without
>> >>> >> > resorting
>> >>> >> > to
>> >>> >> > the
>> >>> >> > hackery in ForkedRDD.compute(), which uses a cache to
ensure that
>> >>> >> > the
>> >>> >> > other
>> >>> >> > RDD can get a handle to the same iterator.
>> >>> >> >     - read elements and use a size estimator to create
the
>> >>> >> > BlockingQueue
>> >>> >> > to
>> >>> >> > make sure that we don't store too many things in memory
when
>> >>> >> > batching
>> >>> >> >     - patch Unpickler to not use StopException for control
flow,
>> >>> >> > which
>> >>> >> > is
>> >>> >> > slowing down the java side
>> >>> >> >
>> >>> >> >

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Mime
View raw message