spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Debasish Das <debasish.da...@gmail.com>
Subject Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing
Date Fri, 15 Aug 2014 00:45:13 GMT
Is there a fix that I can test ? I have the flows setup for both standalone
and YARN runs...

Thanks.
Deb



On Thu, Aug 14, 2014 at 10:59 AM, Reynold Xin <rxin@databricks.com> wrote:

> Yes, I understand it might not work for custom serializer, but that is a
> much less common path.
>
> Basically I want a quick fix for 1.1 release (which is coming up soon). I
> would not be comfortable making big changes to class path late into the
> release cycle. We can do that for 1.2.
>
>
>
>
>
> On Thu, Aug 14, 2014 at 2:35 AM, Graham Dennis <graham.dennis@gmail.com>
> wrote:
>
>> That should work, but would you also make these changes to the
>> JavaSerializer?  The API of these is the same so that you can select one or
>> the other (or in theory a custom serializer)?  This also wouldn't address
>> the problem of shipping custom *serializers* (not kryo registrators) in
>> user jars.
>>
>> On 14 August 2014 19:23, Reynold Xin <rxin@databricks.com> wrote:
>>
>>> Graham,
>>>
>>> SparkEnv only creates a KryoSerializer, but as I understand that
>>> serializer doesn't actually initializes the registrator since that is only
>>> called when newKryo() is called when KryoSerializerInstance is initialized.
>>>
>>> Basically I'm thinking a quick fix for 1.2:
>>>
>>> 1. Add a classLoader field to KryoSerializer; initialize new
>>> KryoSerializerInstance with that class loader
>>>
>>>  2. Set that classLoader to the executor's class loader when Executor is
>>> initialized.
>>>
>>> Then all deser calls should be using the executor's class loader.
>>>
>>>
>>>
>>>
>>> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis <graham.dennis@gmail.com
>>> > wrote:
>>>
>>>> Hi Reynold,
>>>>
>>>> That would solve this specific issue, but you'd need to be careful that
>>>> you never created a serialiser instance before the first task is received.
>>>>  Currently in Executor.TaskRunner.run a closure serialiser instance is
>>>> created before any application jars are downloaded, but that could be
>>>> moved.  To me, this seems a little fragile.
>>>>
>>>> However there is a related issue where you can't ship a custom
>>>> serialiser in an application jar because the serialiser is instantiated
>>>> when the SparkEnv object is created, which is before any tasks are received
>>>> by the executor.  The above approach wouldn't help with this problem.
>>>>  Additionally, the YARN scheduler currently uses this approach of adding
>>>> the application jar to the Executor classpath, so it would make things a
>>>> bit more uniform.
>>>>
>>>> Cheers,
>>>> Graham
>>>>
>>>>
>>>> On 14 August 2014 17:37, Reynold Xin <rxin@databricks.com> wrote:
>>>>
>>>>> Graham,
>>>>>
>>>>> Thanks for working on this. This is an important bug to fix.
>>>>>
>>>>>  I don't have the whole context and obviously I haven't spent nearly
>>>>> as much time on this as you have, but I'm wondering what if we always
pass
>>>>> the executor's ClassLoader to the Kryo serializer? Will that solve this
>>>>> problem?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis <
>>>>> graham.dennis@gmail.com> wrote:
>>>>>
>>>>>> Hi Deb,
>>>>>>
>>>>>> The only alternative serialiser is the JavaSerialiser (the default).
>>>>>>  Theoretically Spark supports custom serialisers, but due to a related
>>>>>> issue, custom serialisers currently can't live in application jars
and must
>>>>>> be available to all executors at launch.  My PR fixes this issue
as well,
>>>>>> allowing custom serialisers to be shipped in application jars.
>>>>>>
>>>>>> Graham
>>>>>>
>>>>>>
>>>>>> On 14 August 2014 16:56, Debasish Das <debasish.das83@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Sorry I just saw Graham's email after sending my previous email
>>>>>>> about this bug...
>>>>>>>
>>>>>>> I have been seeing this same issue on our ALS runs last week
but I
>>>>>>> thought it was due my hacky way to run mllib 1.1 snapshot on
core 1.0...
>>>>>>>
>>>>>>> What's the status of this PR ? Will this fix be back-ported to
1.0.1
>>>>>>> as we are running 1.0.1 stable standalone cluster ?
>>>>>>>
>>>>>>> Till the PR merges does it make sense to not use Kryo ? What
are the
>>>>>>> other recommended efficient serializers ?
>>>>>>>
>>>>>>> Thanks.
>>>>>>> Deb
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <
>>>>>>> graham.dennis@gmail.com> wrote:
>>>>>>>
>>>>>>>> I now have a complete pull request for this issue that I'd
like to
>>>>>>>> get
>>>>>>>> reviewed and committed.  The PR is available here:
>>>>>>>> https://github.com/apache/spark/pull/1890 and includes a
testcase
>>>>>>>> for the
>>>>>>>> issue I described.  I've also submitted a related PR (
>>>>>>>> https://github.com/apache/spark/pull/1827) that causes exceptions
>>>>>>>> raised
>>>>>>>> while attempting to run the custom kryo registrator not to
be
>>>>>>>> swallowed.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Graham
>>>>>>>>
>>>>>>>>
>>>>>>>> On 12 August 2014 18:44, Graham Dennis <graham.dennis@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> > I've submitted a work-in-progress pull request for this
issue
>>>>>>>> that I'd
>>>>>>>> > like feedback on.  See https://github.com/apache/spark/pull/1890
>>>>>>>> . I've
>>>>>>>> > also submitted a pull request for the related issue
that the
>>>>>>>> exceptions hit
>>>>>>>> > when trying to use a custom kryo registrator are being
swallowed:
>>>>>>>> > https://github.com/apache/spark/pull/1827
>>>>>>>> >
>>>>>>>> > The approach in my pull request is to get the Worker
processes to
>>>>>>>> download
>>>>>>>> > the application jars and add them to the Executor class
path at
>>>>>>>> launch
>>>>>>>> > time. There are a couple of things that still need to
be done
>>>>>>>> before this
>>>>>>>> > can be merged:
>>>>>>>> > 1. At the moment, the first time a task runs in the
executor, the
>>>>>>>> > application jars are downloaded again.  My solution
here would be
>>>>>>>> to make
>>>>>>>> > the executor not download any jars that already exist.
>>>>>>>>  Previously, the
>>>>>>>> > driver & executor kept track of the timestamp of
jar files and
>>>>>>>> would
>>>>>>>> > redownload 'updated' jars, however this never made sense
as the
>>>>>>>> previous
>>>>>>>> > version of the updated jar may have already been loaded
into the
>>>>>>>> executor,
>>>>>>>> > so the updated jar may have no effect.  As my current
pull
>>>>>>>> request removes
>>>>>>>> > the timestamp for jars, just checking whether the jar
exists will
>>>>>>>> allow us
>>>>>>>> > to avoid downloading the jars again.
>>>>>>>> > 2. Tests. :-)
>>>>>>>> >
>>>>>>>> > A side-benefit of my pull request is that you will be
able to use
>>>>>>>> custom
>>>>>>>> > serialisers that are distributed in a user jar.  Currently,
the
>>>>>>>> serialiser
>>>>>>>> > instance is created in the Executor process before the
first task
>>>>>>>> is
>>>>>>>> > received and therefore before any user jars are downloaded.
 As
>>>>>>>> this PR
>>>>>>>> > adds user jars to the Executor process at launch time,
this won't
>>>>>>>> be an
>>>>>>>> > issue.
>>>>>>>> >
>>>>>>>> >
>>>>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.dennis@gmail.com>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> >> See my comment on
>>>>>>>> https://issues.apache.org/jira/browse/SPARK-2878 for
>>>>>>>> >> the full stacktrace, but it's in the
>>>>>>>> BlockManager/BlockManagerWorker where
>>>>>>>> >> it's trying to fulfil a "getBlock" request for another
node.
>>>>>>>>  The objects
>>>>>>>> >> that would be in the block haven't yet been serialised,
and that
>>>>>>>> then
>>>>>>>> >> causes the deserialisation to happen on that thread.
 See
>>>>>>>> >> MemoryStore.scala:102.
>>>>>>>> >>
>>>>>>>> >>
>>>>>>>> >> On 7 August 2014 11:53, Reynold Xin <rxin@databricks.com>
wrote:
>>>>>>>> >>
>>>>>>>> >>> I don't think it was a conscious design decision
to not include
>>>>>>>> the
>>>>>>>> >>> application classes in the connection manager
serializer. We
>>>>>>>> should fix
>>>>>>>> >>> that. Where is it deserializing data in that
thread?
>>>>>>>> >>>
>>>>>>>> >>>  4 might make sense in the long run, but it
adds a lot of
>>>>>>>> complexity to
>>>>>>>> >>> the code base (whole separate code base, task
queue,
>>>>>>>> blocking/non-blocking
>>>>>>>> >>> logic within task threads) that can be error
prone, so I think
>>>>>>>> it is best
>>>>>>>> >>> to stay away from that right now.
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham Dennis
<
>>>>>>>> graham.dennis@gmail.com>
>>>>>>>> >>> wrote:
>>>>>>>> >>>
>>>>>>>> >>>> Hi Spark devs,
>>>>>>>> >>>>
>>>>>>>> >>>> I’ve posted an issue on JIRA (
>>>>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878)
which
>>>>>>>> occurs when
>>>>>>>> >>>> using
>>>>>>>> >>>> Kryo serialisation with a custom Kryo registrator
to register
>>>>>>>> custom
>>>>>>>> >>>> classes with Kryo.  This is an insidious
issue that
>>>>>>>> >>>> non-deterministically
>>>>>>>> >>>> causes Kryo to have different ID number
=> class name maps on
>>>>>>>> different
>>>>>>>> >>>> nodes, which then causes weird exceptions
(ClassCastException,
>>>>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException)
at
>>>>>>>> >>>> deserialisation
>>>>>>>> >>>> time.  I’ve created a reliable reproduction
for the issue here:
>>>>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>>>>>>> >>>>
>>>>>>>> >>>> I’m happy to try and put a pull request
together to try and
>>>>>>>> address
>>>>>>>> >>>> this,
>>>>>>>> >>>> but it’s not obvious to me the right way
to solve this and I’d
>>>>>>>> like to
>>>>>>>> >>>> get
>>>>>>>> >>>> feedback / ideas on how to address this.
>>>>>>>> >>>>
>>>>>>>> >>>> The root cause of the problem is a "Failed
to run
>>>>>>>> >>>> spark.kryo.registrator”
>>>>>>>> >>>> error which non-deterministically occurs
in some executor
>>>>>>>> processes
>>>>>>>> >>>> during
>>>>>>>> >>>> operation.  My custom Kryo registrator is
in the application
>>>>>>>> jar, and
>>>>>>>> >>>> it is
>>>>>>>> >>>> accessible on the worker nodes.  This is
demonstrated by the
>>>>>>>> fact that
>>>>>>>> >>>> most
>>>>>>>> >>>> of the time the custom kryo registrator
is successfully run.
>>>>>>>> >>>>
>>>>>>>> >>>> What’s happening is that Kryo serialisation/deserialisation
is
>>>>>>>> happening
>>>>>>>> >>>> most of the time on an “Executor task
launch worker” thread,
>>>>>>>> which has
>>>>>>>> >>>> the
>>>>>>>> >>>> thread's class loader set to contain the
application jar.
>>>>>>>>  This happens
>>>>>>>> >>>> in
>>>>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`,
and from
>>>>>>>> what I can
>>>>>>>> >>>> tell, it is only these threads that have
access to the
>>>>>>>> application jar
>>>>>>>> >>>> (that contains the custom Kryo registrator).
 However, the
>>>>>>>> >>>> ConnectionManager threads sometimes need
to
>>>>>>>> serialise/deserialise
>>>>>>>> >>>> objects
>>>>>>>> >>>> to satisfy “getBlock” requests when
the objects haven’t
>>>>>>>> previously been
>>>>>>>> >>>> serialised.  As the ConnectionManager threads
don’t have the
>>>>>>>> application
>>>>>>>> >>>> jar available from their class loader, when
it tries to look
>>>>>>>> up the
>>>>>>>> >>>> custom
>>>>>>>> >>>> Kryo registrator, this fails.  Spark then
swallows this
>>>>>>>> exception, which
>>>>>>>> >>>> results in a different ID number —>
class mapping for this kryo
>>>>>>>> >>>> instance,
>>>>>>>> >>>> and this then causes deserialisation errors
later on a
>>>>>>>> different node.
>>>>>>>> >>>>
>>>>>>>> >>>> A related issue to the issue reported in
SPARK-2878 is that
>>>>>>>> Spark
>>>>>>>> >>>> probably
>>>>>>>> >>>> shouldn’t swallow the ClassNotFound exception
for custom Kryo
>>>>>>>> >>>> registrators.
>>>>>>>> >>>>  The user has explicitly specified this
class, and if it
>>>>>>>> >>>> deterministically
>>>>>>>> >>>> can’t be found, then it may cause problems
at serialisation /
>>>>>>>> >>>> deserialisation time.  If only sometimes
it can’t be found (as
>>>>>>>> in this
>>>>>>>> >>>> case), then it leads to a data corruption
issue later on.
>>>>>>>>  Either way,
>>>>>>>> >>>> we’re better off dying due to the ClassNotFound
exception
>>>>>>>> earlier, than
>>>>>>>> >>>> the
>>>>>>>> >>>> weirder errors later on.
>>>>>>>> >>>>
>>>>>>>> >>>> I have some ideas on potential solutions
to this issue, but
>>>>>>>> I’m keen for
>>>>>>>> >>>> experienced eyes to critique these approaches:
>>>>>>>> >>>>
>>>>>>>> >>>> 1. The simplest approach to fixing this
would be to just make
>>>>>>>> the
>>>>>>>> >>>> application jar available to the connection
manager threads,
>>>>>>>> but I’m
>>>>>>>> >>>> guessing it’s a design decision to isolate
the application jar
>>>>>>>> to just
>>>>>>>> >>>> the
>>>>>>>> >>>> executor task runner threads.  Also, I don’t
know if there are
>>>>>>>> any other
>>>>>>>> >>>> threads that might be interacting with kryo
serialisation /
>>>>>>>> >>>> deserialisation.
>>>>>>>> >>>> 2. Before looking up the custom Kryo registrator,
change the
>>>>>>>> thread’s
>>>>>>>> >>>> class
>>>>>>>> >>>> loader to include the application jar, then
restore the class
>>>>>>>> loader
>>>>>>>> >>>> after
>>>>>>>> >>>> the kryo registrator has been run.  I don’t
know if this would
>>>>>>>> have any
>>>>>>>> >>>> other side-effects.
>>>>>>>> >>>> 3. Always serialise / deserialise on the
existing TaskRunner
>>>>>>>> threads,
>>>>>>>> >>>> rather than delaying serialisation until
later, when it can be
>>>>>>>> done
>>>>>>>> >>>> only if
>>>>>>>> >>>> needed.  This approach would probably have
negative performance
>>>>>>>> >>>> consequences.
>>>>>>>> >>>> 4. Create a new dedicated thread pool for
lazy serialisation /
>>>>>>>> >>>> deserialisation that has the application
jar on the class path.
>>>>>>>> >>>>  Serialisation / deserialisation would be
the only thing these
>>>>>>>> threads
>>>>>>>> >>>> do,
>>>>>>>> >>>> and this would minimise conflicts / interactions
between the
>>>>>>>> application
>>>>>>>> >>>> jar and other jars.
>>>>>>>> >>>>
>>>>>>>> >>>> #4 sounds like the best approach to me,
but I think would
>>>>>>>> require
>>>>>>>> >>>> considerable knowledge of Spark internals,
which is beyond me
>>>>>>>> at
>>>>>>>> >>>> present.
>>>>>>>> >>>>  Does anyone have any better (and ideally
simpler) ideas?
>>>>>>>> >>>>
>>>>>>>> >>>> Cheers,
>>>>>>>> >>>>
>>>>>>>> >>>> Graham
>>>>>>>> >>>>
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>
>>>>>>>> >
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message