spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Justin Uang <justin.u...@gmail.com>
Subject Re: Python UDF performance at large scale
Date Wed, 24 Jun 2015 19:11:08 GMT
FYI, just submitted a PR to Pyrolite to remove their StopException.
https://github.com/irmen/Pyrolite/pull/30

With my benchmark, removing it basically made it about 2x faster.

On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal <punya.biswal@gmail.com>
wrote:

> Hi Davies,
>
> In general, do we expect people to use CPython only for "heavyweight" UDFs
> that invoke an external library? Are there any examples of using Jython,
> especially performance comparisons to Java/Scala and CPython? When using
> Jython, do you expect the driver to send code to the executor as a string,
> or is there a good way to serialized Jython lambdas?
>
> (For context, I was unable to serialize Nashorn lambdas when I tried to
> use them in Spark.)
>
> Punya
> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <davies@databricks.com> wrote:
>
>> Fare points, I also like simpler solutions.
>>
>> The overhead of Python task could be a few of milliseconds, which
>> means we also should eval them as batches (one Python task per batch).
>>
>> Decreasing the batch size for UDF sounds reasonable to me, together
>> with other tricks to reduce the data in socket/pipe buffer.
>>
>> BTW, what do your UDF looks like? How about to use Jython to run
>> simple Python UDF (without some external libraries).
>>
>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <justin.uang@gmail.com>
>> wrote:
>> > // + punya
>> >
>> > Thanks for your quick response!
>> >
>> > I'm not sure that using an unbounded buffer is a good solution to the
>> > locking problem. For example, in the situation where I had 500 columns,
>> I am
>> > in fact storing 499 extra columns on the java side, which might make me
>> OOM
>> > if I have to store many rows. In addition, if I am using an
>> > AutoBatchedSerializer, the java side might have to write 1 << 16 ==
>> 65536
>> > rows before python starts outputting elements, in which case, the Java
>> side
>> > has to buffer 65536 complete rows. In general it seems fragile to rely
>> on
>> > blocking behavior in the Python coprocess. By contrast, it's very easy
>> to
>> > verify the correctness and performance characteristics of the
>> synchronous
>> > blocking solution.
>> >
>> >
>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <davies@databricks.com>
>> wrote:
>> >>
>> >> Thanks for looking into it, I'd like the idea of having
>> >> ForkingIterator. If we have unlimited buffer in it, then will not have
>> >> the problem of deadlock, I think. The writing thread will be blocked
>> >> by Python process, so there will be not much rows be buffered(still be
>> >> a reason to OOM). At least, this approach is better than current one.
>> >>
>> >> Could you create a JIRA and sending out the PR?
>> >>
>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <justin.uang@gmail.com>
>> >> wrote:
>> >> > BLUF: BatchPythonEvaluation's implementation is unusable at large
>> scale,
>> >> > but
>> >> > I have a proof-of-concept implementation that avoids caching the
>> entire
>> >> > dataset.
>> >> >
>> >> > Hi,
>> >> >
>> >> > We have been running into performance problems using Python UDFs with
>> >> > DataFrames at large scale.
>> >> >
>> >> > From the implementation of BatchPythonEvaluation, it looks like the
>> goal
>> >> > was
>> >> > to reuse the PythonRDD code. It caches the entire child RDD so that
>> it
>> >> > can
>> >> > do two passes over the data. One to give to the PythonRDD, then one
>> to
>> >> > join
>> >> > the python lambda results with the original row (which may have java
>> >> > objects
>> >> > that should be passed through).
>> >> >
>> >> > In addition, it caches all the columns, even the ones that don't
>> need to
>> >> > be
>> >> > processed by the Python UDF. In the cases I was working with, I had
a
>> >> > 500
>> >> > column table, and i wanted to use a python UDF for one column, and
it
>> >> > ended
>> >> > up caching all 500 columns.
>> >> >
>> >> > I have a working solution over here that does it in one pass over the
>> >> > data,
>> >> > avoiding caching
>> >> >
>> >> > (
>> https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b
>> ).
>> >> > With this patch, I go from a job that takes 20 minutes then OOMs, to
>> a
>> >> > job
>> >> > that finishes completely in 3 minutes. It is indeed quite hacky and
>> >> > prone to
>> >> > deadlocks since there is buffering in many locations:
>> >> >
>> >> >     - NEW: the ForkingIterator LinkedBlockingDeque
>> >> >     - batching the rows before pickling them
>> >> >     - os buffers on both sides
>> >> >     - pyspark.serializers.BatchedSerializer
>> >> >
>> >> > We can avoid deadlock by being very disciplined. For example, we can
>> >> > have
>> >> > the ForkingIterator instead always do a check of whether the
>> >> > LinkedBlockingDeque is full and if so:
>> >> >
>> >> > Java
>> >> >     - flush the java pickling buffer
>> >> >     - send a flush command to the python process
>> >> >     - os.flush the java side
>> >> >
>> >> > Python
>> >> >     - flush BatchedSerializer
>> >> >     - os.flush()
>> >> >
>> >> > I haven't added this yet. This is getting very complex however.
>> Another
>> >> > model would just be to change the protocol between the java side and
>> the
>> >> > worker to be a synchronous request/response. This has the
>> disadvantage
>> >> > that
>> >> > the CPU isn't doing anything when the batch is being sent across,
>> but it
>> >> > has
>> >> > the huge advantage of simplicity. In addition, I imagine that the
>> actual
>> >> > IO
>> >> > between the processes isn't that slow, but rather the serialization
>> of
>> >> > java
>> >> > objects into pickled bytes, and the deserialization/serialization +
>> >> > python
>> >> > loops on the python side. Another advantage is that we won't be
>> taking
>> >> > more
>> >> > than 100% CPU since only one thread is doing CPU work at a time
>> between
>> >> > the
>> >> > executor and the python interpreter.
>> >> >
>> >> > Any thoughts would be much appreciated =)
>> >> >
>> >> > Other improvements:
>> >> >     - extract some code of the worker out of PythonRDD so that we
>> can do
>> >> > a
>> >> > mapPartitions directly in BatchedPythonEvaluation without resorting
>> to
>> >> > the
>> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure that the
>> >> > other
>> >> > RDD can get a handle to the same iterator.
>> >> >     - read elements and use a size estimator to create the
>> BlockingQueue
>> >> > to
>> >> > make sure that we don't store too many things in memory when batching
>> >> >     - patch Unpickler to not use StopException for control flow,
>> which
>> >> > is
>> >> > slowing down the java side
>> >> >
>> >> >
>>
>

Mime
View raw message