spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Debasish Das <debasish.da...@gmail.com>
Subject Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing
Date Fri, 15 Aug 2014 15:58:00 GMT
I am still a bit confused that why this issue did not show up in 0.9...at
that time there was no spark-submit and the context was constructed with
low level calls...

Kryo register for ALS was always in my application code..

Was this bug introduced in 1.0 or it was always there ?
 On Aug 14, 2014 5:48 PM, "Reynold Xin" <rxin@databricks.com> wrote:

> Here: https://github.com/apache/spark/pull/1948
>
>
>
> On Thu, Aug 14, 2014 at 5:45 PM, Debasish Das <debasish.das83@gmail.com>
> wrote:
>
>> Is there a fix that I can test ? I have the flows setup for both
>> standalone and YARN runs...
>>
>> Thanks.
>> Deb
>>
>>
>>
>> On Thu, Aug 14, 2014 at 10:59 AM, Reynold Xin <rxin@databricks.com>
>> wrote:
>>
>>> Yes, I understand it might not work for custom serializer, but that is a
>>> much less common path.
>>>
>>> Basically I want a quick fix for 1.1 release (which is coming up soon).
>>> I would not be comfortable making big changes to class path late into the
>>> release cycle. We can do that for 1.2.
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Aug 14, 2014 at 2:35 AM, Graham Dennis <graham.dennis@gmail.com>
>>> wrote:
>>>
>>>> That should work, but would you also make these changes to the
>>>> JavaSerializer?  The API of these is the same so that you can select one
or
>>>> the other (or in theory a custom serializer)?  This also wouldn't address
>>>> the problem of shipping custom *serializers* (not kryo registrators) in
>>>> user jars.
>>>>
>>>> On 14 August 2014 19:23, Reynold Xin <rxin@databricks.com> wrote:
>>>>
>>>>> Graham,
>>>>>
>>>>> SparkEnv only creates a KryoSerializer, but as I understand that
>>>>> serializer doesn't actually initializes the registrator since that is
only
>>>>> called when newKryo() is called when KryoSerializerInstance is initialized.
>>>>>
>>>>> Basically I'm thinking a quick fix for 1.2:
>>>>>
>>>>> 1. Add a classLoader field to KryoSerializer; initialize new
>>>>> KryoSerializerInstance with that class loader
>>>>>
>>>>>  2. Set that classLoader to the executor's class loader when Executor
>>>>> is initialized.
>>>>>
>>>>> Then all deser calls should be using the executor's class loader.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis <
>>>>> graham.dennis@gmail.com> wrote:
>>>>>
>>>>>> Hi Reynold,
>>>>>>
>>>>>> That would solve this specific issue, but you'd need to be careful
>>>>>> that you never created a serialiser instance before the first task
is
>>>>>> received.  Currently in Executor.TaskRunner.run a closure serialiser
>>>>>> instance is created before any application jars are downloaded, but
that
>>>>>> could be moved.  To me, this seems a little fragile.
>>>>>>
>>>>>> However there is a related issue where you can't ship a custom
>>>>>> serialiser in an application jar because the serialiser is instantiated
>>>>>> when the SparkEnv object is created, which is before any tasks are
received
>>>>>> by the executor.  The above approach wouldn't help with this problem.
>>>>>>  Additionally, the YARN scheduler currently uses this approach of
adding
>>>>>> the application jar to the Executor classpath, so it would make things
a
>>>>>> bit more uniform.
>>>>>>
>>>>>> Cheers,
>>>>>> Graham
>>>>>>
>>>>>>
>>>>>> On 14 August 2014 17:37, Reynold Xin <rxin@databricks.com>
wrote:
>>>>>>
>>>>>>> Graham,
>>>>>>>
>>>>>>> Thanks for working on this. This is an important bug to fix.
>>>>>>>
>>>>>>>  I don't have the whole context and obviously I haven't spent
>>>>>>> nearly as much time on this as you have, but I'm wondering what
if we
>>>>>>> always pass the executor's ClassLoader to the Kryo serializer?
Will that
>>>>>>> solve this problem?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis <
>>>>>>> graham.dennis@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Deb,
>>>>>>>>
>>>>>>>> The only alternative serialiser is the JavaSerialiser (the
>>>>>>>> default).  Theoretically Spark supports custom serialisers,
but due to a
>>>>>>>> related issue, custom serialisers currently can't live in
application jars
>>>>>>>> and must be available to all executors at launch.  My PR
fixes this issue
>>>>>>>> as well, allowing custom serialisers to be shipped in application
jars.
>>>>>>>>
>>>>>>>> Graham
>>>>>>>>
>>>>>>>>
>>>>>>>> On 14 August 2014 16:56, Debasish Das <debasish.das83@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Sorry I just saw Graham's email after sending my previous
email
>>>>>>>>> about this bug...
>>>>>>>>>
>>>>>>>>> I have been seeing this same issue on our ALS runs last
week but I
>>>>>>>>> thought it was due my hacky way to run mllib 1.1 snapshot
on core 1.0...
>>>>>>>>>
>>>>>>>>> What's the status of this PR ? Will this fix be back-ported
to
>>>>>>>>> 1.0.1 as we are running 1.0.1 stable standalone cluster
?
>>>>>>>>>
>>>>>>>>> Till the PR merges does it make sense to not use Kryo
? What are
>>>>>>>>> the other recommended efficient serializers ?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>> Deb
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <
>>>>>>>>> graham.dennis@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I now have a complete pull request for this issue
that I'd like
>>>>>>>>>> to get
>>>>>>>>>> reviewed and committed.  The PR is available here:
>>>>>>>>>> https://github.com/apache/spark/pull/1890 and includes
a
>>>>>>>>>> testcase for the
>>>>>>>>>> issue I described.  I've also submitted a related
PR (
>>>>>>>>>> https://github.com/apache/spark/pull/1827) that causes
>>>>>>>>>> exceptions raised
>>>>>>>>>> while attempting to run the custom kryo registrator
not to be
>>>>>>>>>> swallowed.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Graham
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 12 August 2014 18:44, Graham Dennis <graham.dennis@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> > I've submitted a work-in-progress pull request
for this issue
>>>>>>>>>> that I'd
>>>>>>>>>> > like feedback on.  See
>>>>>>>>>> https://github.com/apache/spark/pull/1890 . I've
>>>>>>>>>> > also submitted a pull request for the related
issue that the
>>>>>>>>>> exceptions hit
>>>>>>>>>> > when trying to use a custom kryo registrator
are being
>>>>>>>>>> swallowed:
>>>>>>>>>> > https://github.com/apache/spark/pull/1827
>>>>>>>>>> >
>>>>>>>>>> > The approach in my pull request is to get the
Worker processes
>>>>>>>>>> to download
>>>>>>>>>> > the application jars and add them to the Executor
class path at
>>>>>>>>>> launch
>>>>>>>>>> > time. There are a couple of things that still
need to be done
>>>>>>>>>> before this
>>>>>>>>>> > can be merged:
>>>>>>>>>> > 1. At the moment, the first time a task runs
in the executor,
>>>>>>>>>> the
>>>>>>>>>> > application jars are downloaded again.  My solution
here would
>>>>>>>>>> be to make
>>>>>>>>>> > the executor not download any jars that already
exist.
>>>>>>>>>>  Previously, the
>>>>>>>>>> > driver & executor kept track of the timestamp
of jar files and
>>>>>>>>>> would
>>>>>>>>>> > redownload 'updated' jars, however this never
made sense as the
>>>>>>>>>> previous
>>>>>>>>>> > version of the updated jar may have already
been loaded into
>>>>>>>>>> the executor,
>>>>>>>>>> > so the updated jar may have no effect.  As my
current pull
>>>>>>>>>> request removes
>>>>>>>>>> > the timestamp for jars, just checking whether
the jar exists
>>>>>>>>>> will allow us
>>>>>>>>>> > to avoid downloading the jars again.
>>>>>>>>>> > 2. Tests. :-)
>>>>>>>>>> >
>>>>>>>>>> > A side-benefit of my pull request is that you
will be able to
>>>>>>>>>> use custom
>>>>>>>>>> > serialisers that are distributed in a user jar.
 Currently, the
>>>>>>>>>> serialiser
>>>>>>>>>> > instance is created in the Executor process
before the first
>>>>>>>>>> task is
>>>>>>>>>> > received and therefore before any user jars
are downloaded.  As
>>>>>>>>>> this PR
>>>>>>>>>> > adds user jars to the Executor process at launch
time, this
>>>>>>>>>> won't be an
>>>>>>>>>> > issue.
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.dennis@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >> See my comment on
>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-2878
for
>>>>>>>>>> >> the full stacktrace, but it's in the
>>>>>>>>>> BlockManager/BlockManagerWorker where
>>>>>>>>>> >> it's trying to fulfil a "getBlock" request
for another node.
>>>>>>>>>>  The objects
>>>>>>>>>> >> that would be in the block haven't yet been
serialised, and
>>>>>>>>>> that then
>>>>>>>>>> >> causes the deserialisation to happen on
that thread.  See
>>>>>>>>>> >> MemoryStore.scala:102.
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> On 7 August 2014 11:53, Reynold Xin <rxin@databricks.com>
>>>>>>>>>> wrote:
>>>>>>>>>> >>
>>>>>>>>>> >>> I don't think it was a conscious design
decision to not
>>>>>>>>>> include the
>>>>>>>>>> >>> application classes in the connection
manager serializer. We
>>>>>>>>>> should fix
>>>>>>>>>> >>> that. Where is it deserializing data
in that thread?
>>>>>>>>>> >>>
>>>>>>>>>> >>>  4 might make sense in the long run,
but it adds a lot of
>>>>>>>>>> complexity to
>>>>>>>>>> >>> the code base (whole separate code base,
task queue,
>>>>>>>>>> blocking/non-blocking
>>>>>>>>>> >>> logic within task threads) that can
be error prone, so I
>>>>>>>>>> think it is best
>>>>>>>>>> >>> to stay away from that right now.
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham
Dennis <
>>>>>>>>>> graham.dennis@gmail.com>
>>>>>>>>>> >>> wrote:
>>>>>>>>>> >>>
>>>>>>>>>> >>>> Hi Spark devs,
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> I’ve posted an issue on JIRA (
>>>>>>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878)
which
>>>>>>>>>> occurs when
>>>>>>>>>> >>>> using
>>>>>>>>>> >>>> Kryo serialisation with a custom
Kryo registrator to
>>>>>>>>>> register custom
>>>>>>>>>> >>>> classes with Kryo.  This is an insidious
issue that
>>>>>>>>>> >>>> non-deterministically
>>>>>>>>>> >>>> causes Kryo to have different ID
number => class name maps
>>>>>>>>>> on different
>>>>>>>>>> >>>> nodes, which then causes weird exceptions
>>>>>>>>>> (ClassCastException,
>>>>>>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException)
at
>>>>>>>>>> >>>> deserialisation
>>>>>>>>>> >>>> time.  I’ve created a reliable
reproduction for the issue
>>>>>>>>>> here:
>>>>>>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> I’m happy to try and put a pull
request together to try and
>>>>>>>>>> address
>>>>>>>>>> >>>> this,
>>>>>>>>>> >>>> but it’s not obvious to me the
right way to solve this and
>>>>>>>>>> I’d like to
>>>>>>>>>> >>>> get
>>>>>>>>>> >>>> feedback / ideas on how to address
this.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> The root cause of the problem is
a "Failed to run
>>>>>>>>>> >>>> spark.kryo.registrator”
>>>>>>>>>> >>>> error which non-deterministically
occurs in some executor
>>>>>>>>>> processes
>>>>>>>>>> >>>> during
>>>>>>>>>> >>>> operation.  My custom Kryo registrator
is in the application
>>>>>>>>>> jar, and
>>>>>>>>>> >>>> it is
>>>>>>>>>> >>>> accessible on the worker nodes.
 This is demonstrated by the
>>>>>>>>>> fact that
>>>>>>>>>> >>>> most
>>>>>>>>>> >>>> of the time the custom kryo registrator
is successfully run.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> What’s happening is that Kryo
serialisation/deserialisation
>>>>>>>>>> is happening
>>>>>>>>>> >>>> most of the time on an “Executor
task launch worker” thread,
>>>>>>>>>> which has
>>>>>>>>>> >>>> the
>>>>>>>>>> >>>> thread's class loader set to contain
the application jar.
>>>>>>>>>>  This happens
>>>>>>>>>> >>>> in
>>>>>>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`,
and
>>>>>>>>>> from what I can
>>>>>>>>>> >>>> tell, it is only these threads that
have access to the
>>>>>>>>>> application jar
>>>>>>>>>> >>>> (that contains the custom Kryo registrator).
 However, the
>>>>>>>>>> >>>> ConnectionManager threads sometimes
need to
>>>>>>>>>> serialise/deserialise
>>>>>>>>>> >>>> objects
>>>>>>>>>> >>>> to satisfy “getBlock” requests
when the objects haven’t
>>>>>>>>>> previously been
>>>>>>>>>> >>>> serialised.  As the ConnectionManager
threads don’t have the
>>>>>>>>>> application
>>>>>>>>>> >>>> jar available from their class loader,
when it tries to look
>>>>>>>>>> up the
>>>>>>>>>> >>>> custom
>>>>>>>>>> >>>> Kryo registrator, this fails.  Spark
then swallows this
>>>>>>>>>> exception, which
>>>>>>>>>> >>>> results in a different ID number
—> class mapping for this
>>>>>>>>>> kryo
>>>>>>>>>> >>>> instance,
>>>>>>>>>> >>>> and this then causes deserialisation
errors later on a
>>>>>>>>>> different node.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> A related issue to the issue reported
in SPARK-2878 is that
>>>>>>>>>> Spark
>>>>>>>>>> >>>> probably
>>>>>>>>>> >>>> shouldn’t swallow the ClassNotFound
exception for custom Kryo
>>>>>>>>>> >>>> registrators.
>>>>>>>>>> >>>>  The user has explicitly specified
this class, and if it
>>>>>>>>>> >>>> deterministically
>>>>>>>>>> >>>> can’t be found, then it may cause
problems at serialisation /
>>>>>>>>>> >>>> deserialisation time.  If only sometimes
it can’t be found
>>>>>>>>>> (as in this
>>>>>>>>>> >>>> case), then it leads to a data corruption
issue later on.
>>>>>>>>>>  Either way,
>>>>>>>>>> >>>> we’re better off dying due to
the ClassNotFound exception
>>>>>>>>>> earlier, than
>>>>>>>>>> >>>> the
>>>>>>>>>> >>>> weirder errors later on.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> I have some ideas on potential solutions
to this issue, but
>>>>>>>>>> I’m keen for
>>>>>>>>>> >>>> experienced eyes to critique these
approaches:
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> 1. The simplest approach to fixing
this would be to just
>>>>>>>>>> make the
>>>>>>>>>> >>>> application jar available to the
connection manager threads,
>>>>>>>>>> but I’m
>>>>>>>>>> >>>> guessing it’s a design decision
to isolate the application
>>>>>>>>>> jar to just
>>>>>>>>>> >>>> the
>>>>>>>>>> >>>> executor task runner threads.  Also,
I don’t know if there
>>>>>>>>>> are any other
>>>>>>>>>> >>>> threads that might be interacting
with kryo serialisation /
>>>>>>>>>> >>>> deserialisation.
>>>>>>>>>> >>>> 2. Before looking up the custom
Kryo registrator, change the
>>>>>>>>>> thread’s
>>>>>>>>>> >>>> class
>>>>>>>>>> >>>> loader to include the application
jar, then restore the
>>>>>>>>>> class loader
>>>>>>>>>> >>>> after
>>>>>>>>>> >>>> the kryo registrator has been run.
 I don’t know if this
>>>>>>>>>> would have any
>>>>>>>>>> >>>> other side-effects.
>>>>>>>>>> >>>> 3. Always serialise / deserialise
on the existing TaskRunner
>>>>>>>>>> threads,
>>>>>>>>>> >>>> rather than delaying serialisation
until later, when it can
>>>>>>>>>> be done
>>>>>>>>>> >>>> only if
>>>>>>>>>> >>>> needed.  This approach would probably
have negative
>>>>>>>>>> performance
>>>>>>>>>> >>>> consequences.
>>>>>>>>>> >>>> 4. Create a new dedicated thread
pool for lazy serialisation
>>>>>>>>>> /
>>>>>>>>>> >>>> deserialisation that has the application
jar on the class
>>>>>>>>>> path.
>>>>>>>>>> >>>>  Serialisation / deserialisation
would be the only thing
>>>>>>>>>> these threads
>>>>>>>>>> >>>> do,
>>>>>>>>>> >>>> and this would minimise conflicts
/ interactions between the
>>>>>>>>>> application
>>>>>>>>>> >>>> jar and other jars.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> #4 sounds like the best approach
to me, but I think would
>>>>>>>>>> require
>>>>>>>>>> >>>> considerable knowledge of Spark
internals, which is beyond
>>>>>>>>>> me at
>>>>>>>>>> >>>> present.
>>>>>>>>>> >>>>  Does anyone have any better (and
ideally simpler) ideas?
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Cheers,
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Graham
>>>>>>>>>> >>>>
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message