spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Graham Dennis <graham.den...@gmail.com>
Subject Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing
Date Thu, 14 Aug 2014 09:35:50 GMT
That should work, but would you also make these changes to the
JavaSerializer?  The API of these is the same so that you can select one or
the other (or in theory a custom serializer)?  This also wouldn't address
the problem of shipping custom *serializers* (not kryo registrators) in
user jars.

On 14 August 2014 19:23, Reynold Xin <rxin@databricks.com> wrote:

> Graham,
>
> SparkEnv only creates a KryoSerializer, but as I understand that
> serializer doesn't actually initializes the registrator since that is only
> called when newKryo() is called when KryoSerializerInstance is initialized.
>
> Basically I'm thinking a quick fix for 1.2:
>
> 1. Add a classLoader field to KryoSerializer; initialize new
> KryoSerializerInstance with that class loader
>
>  2. Set that classLoader to the executor's class loader when Executor is
> initialized.
>
> Then all deser calls should be using the executor's class loader.
>
>
>
>
> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis <graham.dennis@gmail.com>
> wrote:
>
>> Hi Reynold,
>>
>> That would solve this specific issue, but you'd need to be careful that
>> you never created a serialiser instance before the first task is received.
>>  Currently in Executor.TaskRunner.run a closure serialiser instance is
>> created before any application jars are downloaded, but that could be
>> moved.  To me, this seems a little fragile.
>>
>> However there is a related issue where you can't ship a custom serialiser
>> in an application jar because the serialiser is instantiated when the
>> SparkEnv object is created, which is before any tasks are received by the
>> executor.  The above approach wouldn't help with this problem.
>>  Additionally, the YARN scheduler currently uses this approach of adding
>> the application jar to the Executor classpath, so it would make things a
>> bit more uniform.
>>
>> Cheers,
>> Graham
>>
>>
>> On 14 August 2014 17:37, Reynold Xin <rxin@databricks.com> wrote:
>>
>>> Graham,
>>>
>>> Thanks for working on this. This is an important bug to fix.
>>>
>>>  I don't have the whole context and obviously I haven't spent nearly as
>>> much time on this as you have, but I'm wondering what if we always pass the
>>> executor's ClassLoader to the Kryo serializer? Will that solve this problem?
>>>
>>>
>>>
>>>
>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis <graham.dennis@gmail.com
>>> > wrote:
>>>
>>>> Hi Deb,
>>>>
>>>> The only alternative serialiser is the JavaSerialiser (the default).
>>>>  Theoretically Spark supports custom serialisers, but due to a related
>>>> issue, custom serialisers currently can't live in application jars and must
>>>> be available to all executors at launch.  My PR fixes this issue as well,
>>>> allowing custom serialisers to be shipped in application jars.
>>>>
>>>> Graham
>>>>
>>>>
>>>> On 14 August 2014 16:56, Debasish Das <debasish.das83@gmail.com> wrote:
>>>>
>>>>> Sorry I just saw Graham's email after sending my previous email about
>>>>> this bug...
>>>>>
>>>>> I have been seeing this same issue on our ALS runs last week but I
>>>>> thought it was due my hacky way to run mllib 1.1 snapshot on core 1.0...
>>>>>
>>>>> What's the status of this PR ? Will this fix be back-ported to 1.0.1
>>>>> as we are running 1.0.1 stable standalone cluster ?
>>>>>
>>>>> Till the PR merges does it make sense to not use Kryo ? What are the
>>>>> other recommended efficient serializers ?
>>>>>
>>>>> Thanks.
>>>>> Deb
>>>>>
>>>>>
>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <
>>>>> graham.dennis@gmail.com> wrote:
>>>>>
>>>>>> I now have a complete pull request for this issue that I'd like to
get
>>>>>> reviewed and committed.  The PR is available here:
>>>>>> https://github.com/apache/spark/pull/1890 and includes a testcase
>>>>>> for the
>>>>>> issue I described.  I've also submitted a related PR (
>>>>>> https://github.com/apache/spark/pull/1827) that causes exceptions
>>>>>> raised
>>>>>> while attempting to run the custom kryo registrator not to be
>>>>>> swallowed.
>>>>>>
>>>>>> Thanks,
>>>>>> Graham
>>>>>>
>>>>>>
>>>>>> On 12 August 2014 18:44, Graham Dennis <graham.dennis@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > I've submitted a work-in-progress pull request for this issue
that
>>>>>> I'd
>>>>>> > like feedback on.  See https://github.com/apache/spark/pull/1890
.
>>>>>> I've
>>>>>> > also submitted a pull request for the related issue that the
>>>>>> exceptions hit
>>>>>> > when trying to use a custom kryo registrator are being swallowed:
>>>>>> > https://github.com/apache/spark/pull/1827
>>>>>> >
>>>>>> > The approach in my pull request is to get the Worker processes
to
>>>>>> download
>>>>>> > the application jars and add them to the Executor class path
at
>>>>>> launch
>>>>>> > time. There are a couple of things that still need to be done
>>>>>> before this
>>>>>> > can be merged:
>>>>>> > 1. At the moment, the first time a task runs in the executor,
the
>>>>>> > application jars are downloaded again.  My solution here would
be
>>>>>> to make
>>>>>> > the executor not download any jars that already exist.  Previously,
>>>>>> the
>>>>>> > driver & executor kept track of the timestamp of jar files
and would
>>>>>> > redownload 'updated' jars, however this never made sense as
the
>>>>>> previous
>>>>>> > version of the updated jar may have already been loaded into
the
>>>>>> executor,
>>>>>> > so the updated jar may have no effect.  As my current pull request
>>>>>> removes
>>>>>> > the timestamp for jars, just checking whether the jar exists
will
>>>>>> allow us
>>>>>> > to avoid downloading the jars again.
>>>>>> > 2. Tests. :-)
>>>>>> >
>>>>>> > A side-benefit of my pull request is that you will be able to
use
>>>>>> custom
>>>>>> > serialisers that are distributed in a user jar.  Currently,
the
>>>>>> serialiser
>>>>>> > instance is created in the Executor process before the first
task is
>>>>>> > received and therefore before any user jars are downloaded.
 As
>>>>>> this PR
>>>>>> > adds user jars to the Executor process at launch time, this
won't
>>>>>> be an
>>>>>> > issue.
>>>>>> >
>>>>>> >
>>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.dennis@gmail.com>
>>>>>> wrote:
>>>>>> >
>>>>>> >> See my comment on https://issues.apache.org/jira/browse/SPARK-2878
>>>>>> for
>>>>>> >> the full stacktrace, but it's in the
>>>>>> BlockManager/BlockManagerWorker where
>>>>>> >> it's trying to fulfil a "getBlock" request for another node.
 The
>>>>>> objects
>>>>>> >> that would be in the block haven't yet been serialised,
and that
>>>>>> then
>>>>>> >> causes the deserialisation to happen on that thread.  See
>>>>>> >> MemoryStore.scala:102.
>>>>>> >>
>>>>>> >>
>>>>>> >> On 7 August 2014 11:53, Reynold Xin <rxin@databricks.com>
wrote:
>>>>>> >>
>>>>>> >>> I don't think it was a conscious design decision to
not include
>>>>>> the
>>>>>> >>> application classes in the connection manager serializer.
We
>>>>>> should fix
>>>>>> >>> that. Where is it deserializing data in that thread?
>>>>>> >>>
>>>>>> >>>  4 might make sense in the long run, but it adds a lot
of
>>>>>> complexity to
>>>>>> >>> the code base (whole separate code base, task queue,
>>>>>> blocking/non-blocking
>>>>>> >>> logic within task threads) that can be error prone,
so I think it
>>>>>> is best
>>>>>> >>> to stay away from that right now.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis <
>>>>>> graham.dennis@gmail.com>
>>>>>> >>> wrote:
>>>>>> >>>
>>>>>> >>>> Hi Spark devs,
>>>>>> >>>>
>>>>>> >>>> I’ve posted an issue on JIRA (
>>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878)
which occurs
>>>>>> when
>>>>>> >>>> using
>>>>>> >>>> Kryo serialisation with a custom Kryo registrator
to register
>>>>>> custom
>>>>>> >>>> classes with Kryo.  This is an insidious issue that
>>>>>> >>>> non-deterministically
>>>>>> >>>> causes Kryo to have different ID number => class
name maps on
>>>>>> different
>>>>>> >>>> nodes, which then causes weird exceptions (ClassCastException,
>>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException)
at
>>>>>> >>>> deserialisation
>>>>>> >>>> time.  I’ve created a reliable reproduction for
the issue here:
>>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>>>>> >>>>
>>>>>> >>>> I’m happy to try and put a pull request together
to try and
>>>>>> address
>>>>>> >>>> this,
>>>>>> >>>> but it’s not obvious to me the right way to solve
this and I’d
>>>>>> like to
>>>>>> >>>> get
>>>>>> >>>> feedback / ideas on how to address this.
>>>>>> >>>>
>>>>>> >>>> The root cause of the problem is a "Failed to run
>>>>>> >>>> spark.kryo.registrator”
>>>>>> >>>> error which non-deterministically occurs in some
executor
>>>>>> processes
>>>>>> >>>> during
>>>>>> >>>> operation.  My custom Kryo registrator is in the
application
>>>>>> jar, and
>>>>>> >>>> it is
>>>>>> >>>> accessible on the worker nodes.  This is demonstrated
by the
>>>>>> fact that
>>>>>> >>>> most
>>>>>> >>>> of the time the custom kryo registrator is successfully
run.
>>>>>> >>>>
>>>>>> >>>> What’s happening is that Kryo serialisation/deserialisation
is
>>>>>> happening
>>>>>> >>>> most of the time on an “Executor task launch worker”
thread,
>>>>>> which has
>>>>>> >>>> the
>>>>>> >>>> thread's class loader set to contain the application
jar.  This
>>>>>> happens
>>>>>> >>>> in
>>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`,
and from
>>>>>> what I can
>>>>>> >>>> tell, it is only these threads that have access
to the
>>>>>> application jar
>>>>>> >>>> (that contains the custom Kryo registrator).  However,
the
>>>>>> >>>> ConnectionManager threads sometimes need to serialise/deserialise
>>>>>> >>>> objects
>>>>>> >>>> to satisfy “getBlock” requests when the objects
haven’t
>>>>>> previously been
>>>>>> >>>> serialised.  As the ConnectionManager threads don’t
have the
>>>>>> application
>>>>>> >>>> jar available from their class loader, when it tries
to look up
>>>>>> the
>>>>>> >>>> custom
>>>>>> >>>> Kryo registrator, this fails.  Spark then swallows
this
>>>>>> exception, which
>>>>>> >>>> results in a different ID number —> class mapping
for this kryo
>>>>>> >>>> instance,
>>>>>> >>>> and this then causes deserialisation errors later
on a different
>>>>>> node.
>>>>>> >>>>
>>>>>> >>>> A related issue to the issue reported in SPARK-2878
is that Spark
>>>>>> >>>> probably
>>>>>> >>>> shouldn’t swallow the ClassNotFound exception
for custom Kryo
>>>>>> >>>> registrators.
>>>>>> >>>>  The user has explicitly specified this class, and
if it
>>>>>> >>>> deterministically
>>>>>> >>>> can’t be found, then it may cause problems at
serialisation /
>>>>>> >>>> deserialisation time.  If only sometimes it can’t
be found (as
>>>>>> in this
>>>>>> >>>> case), then it leads to a data corruption issue
later on.
>>>>>>  Either way,
>>>>>> >>>> we’re better off dying due to the ClassNotFound
exception
>>>>>> earlier, than
>>>>>> >>>> the
>>>>>> >>>> weirder errors later on.
>>>>>> >>>>
>>>>>> >>>> I have some ideas on potential solutions to this
issue, but I’m
>>>>>> keen for
>>>>>> >>>> experienced eyes to critique these approaches:
>>>>>> >>>>
>>>>>> >>>> 1. The simplest approach to fixing this would be
to just make the
>>>>>> >>>> application jar available to the connection manager
threads, but
>>>>>> I’m
>>>>>> >>>> guessing it’s a design decision to isolate the
application jar
>>>>>> to just
>>>>>> >>>> the
>>>>>> >>>> executor task runner threads.  Also, I don’t know
if there are
>>>>>> any other
>>>>>> >>>> threads that might be interacting with kryo serialisation
/
>>>>>> >>>> deserialisation.
>>>>>> >>>> 2. Before looking up the custom Kryo registrator,
change the
>>>>>> thread’s
>>>>>> >>>> class
>>>>>> >>>> loader to include the application jar, then restore
the class
>>>>>> loader
>>>>>> >>>> after
>>>>>> >>>> the kryo registrator has been run.  I don’t know
if this would
>>>>>> have any
>>>>>> >>>> other side-effects.
>>>>>> >>>> 3. Always serialise / deserialise on the existing
TaskRunner
>>>>>> threads,
>>>>>> >>>> rather than delaying serialisation until later,
when it can be
>>>>>> done
>>>>>> >>>> only if
>>>>>> >>>> needed.  This approach would probably have negative
performance
>>>>>> >>>> consequences.
>>>>>> >>>> 4. Create a new dedicated thread pool for lazy serialisation
/
>>>>>> >>>> deserialisation that has the application jar on
the class path.
>>>>>> >>>>  Serialisation / deserialisation would be the only
thing these
>>>>>> threads
>>>>>> >>>> do,
>>>>>> >>>> and this would minimise conflicts / interactions
between the
>>>>>> application
>>>>>> >>>> jar and other jars.
>>>>>> >>>>
>>>>>> >>>> #4 sounds like the best approach to me, but I think
would require
>>>>>> >>>> considerable knowledge of Spark internals, which
is beyond me at
>>>>>> >>>> present.
>>>>>> >>>>  Does anyone have any better (and ideally simpler)
ideas?
>>>>>> >>>>
>>>>>> >>>> Cheers,
>>>>>> >>>>
>>>>>> >>>> Graham
>>>>>> >>>>
>>>>>> >>>
>>>>>> >>>
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message