spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Debasish Das <debasish.da...@gmail.com>
Subject Re: [SPARK-2878] Kryo serialisation with custom Kryo registrator failing
Date Tue, 19 Aug 2014 07:19:58 GMT
@rxin With the fixes, I could run it fine on top of branch-1.0

On master when running using YARN I am getting another KryoException:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 247 in stage 52.0 failed 4 times, most recent
failure: Lost task 247.3 in stage 52.0 (TID 10010,
tblpmidn05adv-hdp.tdc.vzwcorp.com):
com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException

Serialization trace:

shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)


com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)


com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)

        com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

OutLinkBlock does not extend Serializable...Also I did not see this failure
before..

Was the fix tested on YARN ?

@dbtsai did your assembly on YARN ran fine or you are still noticing these
exceptions ?

Thanks.

Deb


On Thu, Aug 14, 2014 at 5:48 PM, Reynold Xin <rxin@databricks.com> wrote:

> Here: https://github.com/apache/spark/pull/1948
>
>
>
> On Thu, Aug 14, 2014 at 5:45 PM, Debasish Das <debasish.das83@gmail.com>
> wrote:
>
>> Is there a fix that I can test ? I have the flows setup for both
>> standalone and YARN runs...
>>
>> Thanks.
>> Deb
>>
>>
>>
>> On Thu, Aug 14, 2014 at 10:59 AM, Reynold Xin <rxin@databricks.com>
>> wrote:
>>
>>> Yes, I understand it might not work for custom serializer, but that is a
>>> much less common path.
>>>
>>> Basically I want a quick fix for 1.1 release (which is coming up soon).
>>> I would not be comfortable making big changes to class path late into the
>>> release cycle. We can do that for 1.2.
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Aug 14, 2014 at 2:35 AM, Graham Dennis <graham.dennis@gmail.com>
>>> wrote:
>>>
>>>> That should work, but would you also make these changes to the
>>>> JavaSerializer?  The API of these is the same so that you can select one
or
>>>> the other (or in theory a custom serializer)?  This also wouldn't address
>>>> the problem of shipping custom *serializers* (not kryo registrators) in
>>>> user jars.
>>>>
>>>> On 14 August 2014 19:23, Reynold Xin <rxin@databricks.com> wrote:
>>>>
>>>>> Graham,
>>>>>
>>>>> SparkEnv only creates a KryoSerializer, but as I understand that
>>>>> serializer doesn't actually initializes the registrator since that is
only
>>>>> called when newKryo() is called when KryoSerializerInstance is initialized.
>>>>>
>>>>> Basically I'm thinking a quick fix for 1.2:
>>>>>
>>>>> 1. Add a classLoader field to KryoSerializer; initialize new
>>>>> KryoSerializerInstance with that class loader
>>>>>
>>>>>  2. Set that classLoader to the executor's class loader when Executor
>>>>> is initialized.
>>>>>
>>>>> Then all deser calls should be using the executor's class loader.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 14, 2014 at 12:53 AM, Graham Dennis <
>>>>> graham.dennis@gmail.com> wrote:
>>>>>
>>>>>> Hi Reynold,
>>>>>>
>>>>>> That would solve this specific issue, but you'd need to be careful
>>>>>> that you never created a serialiser instance before the first task
is
>>>>>> received.  Currently in Executor.TaskRunner.run a closure serialiser
>>>>>> instance is created before any application jars are downloaded, but
that
>>>>>> could be moved.  To me, this seems a little fragile.
>>>>>>
>>>>>> However there is a related issue where you can't ship a custom
>>>>>> serialiser in an application jar because the serialiser is instantiated
>>>>>> when the SparkEnv object is created, which is before any tasks are
received
>>>>>> by the executor.  The above approach wouldn't help with this problem.
>>>>>>  Additionally, the YARN scheduler currently uses this approach of
adding
>>>>>> the application jar to the Executor classpath, so it would make things
a
>>>>>> bit more uniform.
>>>>>>
>>>>>> Cheers,
>>>>>> Graham
>>>>>>
>>>>>>
>>>>>> On 14 August 2014 17:37, Reynold Xin <rxin@databricks.com>
wrote:
>>>>>>
>>>>>>> Graham,
>>>>>>>
>>>>>>> Thanks for working on this. This is an important bug to fix.
>>>>>>>
>>>>>>>  I don't have the whole context and obviously I haven't spent
>>>>>>> nearly as much time on this as you have, but I'm wondering what
if we
>>>>>>> always pass the executor's ClassLoader to the Kryo serializer?
Will that
>>>>>>> solve this problem?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 13, 2014 at 11:59 PM, Graham Dennis <
>>>>>>> graham.dennis@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Deb,
>>>>>>>>
>>>>>>>> The only alternative serialiser is the JavaSerialiser (the
>>>>>>>> default).  Theoretically Spark supports custom serialisers,
but due to a
>>>>>>>> related issue, custom serialisers currently can't live in
application jars
>>>>>>>> and must be available to all executors at launch.  My PR
fixes this issue
>>>>>>>> as well, allowing custom serialisers to be shipped in application
jars.
>>>>>>>>
>>>>>>>> Graham
>>>>>>>>
>>>>>>>>
>>>>>>>> On 14 August 2014 16:56, Debasish Das <debasish.das83@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Sorry I just saw Graham's email after sending my previous
email
>>>>>>>>> about this bug...
>>>>>>>>>
>>>>>>>>> I have been seeing this same issue on our ALS runs last
week but I
>>>>>>>>> thought it was due my hacky way to run mllib 1.1 snapshot
on core 1.0...
>>>>>>>>>
>>>>>>>>> What's the status of this PR ? Will this fix be back-ported
to
>>>>>>>>> 1.0.1 as we are running 1.0.1 stable standalone cluster
?
>>>>>>>>>
>>>>>>>>> Till the PR merges does it make sense to not use Kryo
? What are
>>>>>>>>> the other recommended efficient serializers ?
>>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>> Deb
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Aug 13, 2014 at 2:47 PM, Graham Dennis <
>>>>>>>>> graham.dennis@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I now have a complete pull request for this issue
that I'd like
>>>>>>>>>> to get
>>>>>>>>>> reviewed and committed.  The PR is available here:
>>>>>>>>>> https://github.com/apache/spark/pull/1890 and includes
a
>>>>>>>>>> testcase for the
>>>>>>>>>> issue I described.  I've also submitted a related
PR (
>>>>>>>>>> https://github.com/apache/spark/pull/1827) that causes
>>>>>>>>>> exceptions raised
>>>>>>>>>> while attempting to run the custom kryo registrator
not to be
>>>>>>>>>> swallowed.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Graham
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 12 August 2014 18:44, Graham Dennis <graham.dennis@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> > I've submitted a work-in-progress pull request
for this issue
>>>>>>>>>> that I'd
>>>>>>>>>> > like feedback on.  See
>>>>>>>>>> https://github.com/apache/spark/pull/1890 . I've
>>>>>>>>>> > also submitted a pull request for the related
issue that the
>>>>>>>>>> exceptions hit
>>>>>>>>>> > when trying to use a custom kryo registrator
are being
>>>>>>>>>> swallowed:
>>>>>>>>>> > https://github.com/apache/spark/pull/1827
>>>>>>>>>> >
>>>>>>>>>> > The approach in my pull request is to get the
Worker processes
>>>>>>>>>> to download
>>>>>>>>>> > the application jars and add them to the Executor
class path at
>>>>>>>>>> launch
>>>>>>>>>> > time. There are a couple of things that still
need to be done
>>>>>>>>>> before this
>>>>>>>>>> > can be merged:
>>>>>>>>>> > 1. At the moment, the first time a task runs
in the executor,
>>>>>>>>>> the
>>>>>>>>>> > application jars are downloaded again.  My solution
here would
>>>>>>>>>> be to make
>>>>>>>>>> > the executor not download any jars that already
exist.
>>>>>>>>>>  Previously, the
>>>>>>>>>> > driver & executor kept track of the timestamp
of jar files and
>>>>>>>>>> would
>>>>>>>>>> > redownload 'updated' jars, however this never
made sense as the
>>>>>>>>>> previous
>>>>>>>>>> > version of the updated jar may have already
been loaded into
>>>>>>>>>> the executor,
>>>>>>>>>> > so the updated jar may have no effect.  As my
current pull
>>>>>>>>>> request removes
>>>>>>>>>> > the timestamp for jars, just checking whether
the jar exists
>>>>>>>>>> will allow us
>>>>>>>>>> > to avoid downloading the jars again.
>>>>>>>>>> > 2. Tests. :-)
>>>>>>>>>> >
>>>>>>>>>> > A side-benefit of my pull request is that you
will be able to
>>>>>>>>>> use custom
>>>>>>>>>> > serialisers that are distributed in a user jar.
 Currently, the
>>>>>>>>>> serialiser
>>>>>>>>>> > instance is created in the Executor process
before the first
>>>>>>>>>> task is
>>>>>>>>>> > received and therefore before any user jars
are downloaded.  As
>>>>>>>>>> this PR
>>>>>>>>>> > adds user jars to the Executor process at launch
time, this
>>>>>>>>>> won't be an
>>>>>>>>>> > issue.
>>>>>>>>>> >
>>>>>>>>>> >
>>>>>>>>>> > On 7 August 2014 12:01, Graham Dennis <graham.dennis@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >> See my comment on
>>>>>>>>>> https://issues.apache.org/jira/browse/SPARK-2878
for
>>>>>>>>>> >> the full stacktrace, but it's in the
>>>>>>>>>> BlockManager/BlockManagerWorker where
>>>>>>>>>> >> it's trying to fulfil a "getBlock" request
for another node.
>>>>>>>>>>  The objects
>>>>>>>>>> >> that would be in the block haven't yet been
serialised, and
>>>>>>>>>> that then
>>>>>>>>>> >> causes the deserialisation to happen on
that thread.  See
>>>>>>>>>> >> MemoryStore.scala:102.
>>>>>>>>>> >>
>>>>>>>>>> >>
>>>>>>>>>> >> On 7 August 2014 11:53, Reynold Xin <rxin@databricks.com>
>>>>>>>>>> wrote:
>>>>>>>>>> >>
>>>>>>>>>> >>> I don't think it was a conscious design
decision to not
>>>>>>>>>> include the
>>>>>>>>>> >>> application classes in the connection
manager serializer. We
>>>>>>>>>> should fix
>>>>>>>>>> >>> that. Where is it deserializing data
in that thread?
>>>>>>>>>> >>>
>>>>>>>>>> >>>  4 might make sense in the long run,
but it adds a lot of
>>>>>>>>>> complexity to
>>>>>>>>>> >>> the code base (whole separate code base,
task queue,
>>>>>>>>>> blocking/non-blocking
>>>>>>>>>> >>> logic within task threads) that can
be error prone, so I
>>>>>>>>>> think it is best
>>>>>>>>>> >>> to stay away from that right now.
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> On Wed, Aug 6, 2014 at 6:47 PM, Graham
Dennis <
>>>>>>>>>> graham.dennis@gmail.com>
>>>>>>>>>> >>> wrote:
>>>>>>>>>> >>>
>>>>>>>>>> >>>> Hi Spark devs,
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> I’ve posted an issue on JIRA (
>>>>>>>>>> >>>> https://issues.apache.org/jira/browse/SPARK-2878)
which
>>>>>>>>>> occurs when
>>>>>>>>>> >>>> using
>>>>>>>>>> >>>> Kryo serialisation with a custom
Kryo registrator to
>>>>>>>>>> register custom
>>>>>>>>>> >>>> classes with Kryo.  This is an insidious
issue that
>>>>>>>>>> >>>> non-deterministically
>>>>>>>>>> >>>> causes Kryo to have different ID
number => class name maps
>>>>>>>>>> on different
>>>>>>>>>> >>>> nodes, which then causes weird exceptions
>>>>>>>>>> (ClassCastException,
>>>>>>>>>> >>>> ClassNotFoundException, ArrayIndexOutOfBoundsException)
at
>>>>>>>>>> >>>> deserialisation
>>>>>>>>>> >>>> time.  I’ve created a reliable
reproduction for the issue
>>>>>>>>>> here:
>>>>>>>>>> >>>> https://github.com/GrahamDennis/spark-kryo-serialisation
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> I’m happy to try and put a pull
request together to try and
>>>>>>>>>> address
>>>>>>>>>> >>>> this,
>>>>>>>>>> >>>> but it’s not obvious to me the
right way to solve this and
>>>>>>>>>> I’d like to
>>>>>>>>>> >>>> get
>>>>>>>>>> >>>> feedback / ideas on how to address
this.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> The root cause of the problem is
a "Failed to run
>>>>>>>>>> >>>> spark.kryo.registrator”
>>>>>>>>>> >>>> error which non-deterministically
occurs in some executor
>>>>>>>>>> processes
>>>>>>>>>> >>>> during
>>>>>>>>>> >>>> operation.  My custom Kryo registrator
is in the application
>>>>>>>>>> jar, and
>>>>>>>>>> >>>> it is
>>>>>>>>>> >>>> accessible on the worker nodes.
 This is demonstrated by the
>>>>>>>>>> fact that
>>>>>>>>>> >>>> most
>>>>>>>>>> >>>> of the time the custom kryo registrator
is successfully run.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> What’s happening is that Kryo
serialisation/deserialisation
>>>>>>>>>> is happening
>>>>>>>>>> >>>> most of the time on an “Executor
task launch worker” thread,
>>>>>>>>>> which has
>>>>>>>>>> >>>> the
>>>>>>>>>> >>>> thread's class loader set to contain
the application jar.
>>>>>>>>>>  This happens
>>>>>>>>>> >>>> in
>>>>>>>>>> >>>> `org.apache.spark.executor.Executor.TaskRunner.run`,
and
>>>>>>>>>> from what I can
>>>>>>>>>> >>>> tell, it is only these threads that
have access to the
>>>>>>>>>> application jar
>>>>>>>>>> >>>> (that contains the custom Kryo registrator).
 However, the
>>>>>>>>>> >>>> ConnectionManager threads sometimes
need to
>>>>>>>>>> serialise/deserialise
>>>>>>>>>> >>>> objects
>>>>>>>>>> >>>> to satisfy “getBlock” requests
when the objects haven’t
>>>>>>>>>> previously been
>>>>>>>>>> >>>> serialised.  As the ConnectionManager
threads don’t have the
>>>>>>>>>> application
>>>>>>>>>> >>>> jar available from their class loader,
when it tries to look
>>>>>>>>>> up the
>>>>>>>>>> >>>> custom
>>>>>>>>>> >>>> Kryo registrator, this fails.  Spark
then swallows this
>>>>>>>>>> exception, which
>>>>>>>>>> >>>> results in a different ID number
—> class mapping for this
>>>>>>>>>> kryo
>>>>>>>>>> >>>> instance,
>>>>>>>>>> >>>> and this then causes deserialisation
errors later on a
>>>>>>>>>> different node.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> A related issue to the issue reported
in SPARK-2878 is that
>>>>>>>>>> Spark
>>>>>>>>>> >>>> probably
>>>>>>>>>> >>>> shouldn’t swallow the ClassNotFound
exception for custom Kryo
>>>>>>>>>> >>>> registrators.
>>>>>>>>>> >>>>  The user has explicitly specified
this class, and if it
>>>>>>>>>> >>>> deterministically
>>>>>>>>>> >>>> can’t be found, then it may cause
problems at serialisation /
>>>>>>>>>> >>>> deserialisation time.  If only sometimes
it can’t be found
>>>>>>>>>> (as in this
>>>>>>>>>> >>>> case), then it leads to a data corruption
issue later on.
>>>>>>>>>>  Either way,
>>>>>>>>>> >>>> we’re better off dying due to
the ClassNotFound exception
>>>>>>>>>> earlier, than
>>>>>>>>>> >>>> the
>>>>>>>>>> >>>> weirder errors later on.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> I have some ideas on potential solutions
to this issue, but
>>>>>>>>>> I’m keen for
>>>>>>>>>> >>>> experienced eyes to critique these
approaches:
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> 1. The simplest approach to fixing
this would be to just
>>>>>>>>>> make the
>>>>>>>>>> >>>> application jar available to the
connection manager threads,
>>>>>>>>>> but I’m
>>>>>>>>>> >>>> guessing it’s a design decision
to isolate the application
>>>>>>>>>> jar to just
>>>>>>>>>> >>>> the
>>>>>>>>>> >>>> executor task runner threads.  Also,
I don’t know if there
>>>>>>>>>> are any other
>>>>>>>>>> >>>> threads that might be interacting
with kryo serialisation /
>>>>>>>>>> >>>> deserialisation.
>>>>>>>>>> >>>> 2. Before looking up the custom
Kryo registrator, change the
>>>>>>>>>> thread’s
>>>>>>>>>> >>>> class
>>>>>>>>>> >>>> loader to include the application
jar, then restore the
>>>>>>>>>> class loader
>>>>>>>>>> >>>> after
>>>>>>>>>> >>>> the kryo registrator has been run.
 I don’t know if this
>>>>>>>>>> would have any
>>>>>>>>>> >>>> other side-effects.
>>>>>>>>>> >>>> 3. Always serialise / deserialise
on the existing TaskRunner
>>>>>>>>>> threads,
>>>>>>>>>> >>>> rather than delaying serialisation
until later, when it can
>>>>>>>>>> be done
>>>>>>>>>> >>>> only if
>>>>>>>>>> >>>> needed.  This approach would probably
have negative
>>>>>>>>>> performance
>>>>>>>>>> >>>> consequences.
>>>>>>>>>> >>>> 4. Create a new dedicated thread
pool for lazy serialisation
>>>>>>>>>> /
>>>>>>>>>> >>>> deserialisation that has the application
jar on the class
>>>>>>>>>> path.
>>>>>>>>>> >>>>  Serialisation / deserialisation
would be the only thing
>>>>>>>>>> these threads
>>>>>>>>>> >>>> do,
>>>>>>>>>> >>>> and this would minimise conflicts
/ interactions between the
>>>>>>>>>> application
>>>>>>>>>> >>>> jar and other jars.
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> #4 sounds like the best approach
to me, but I think would
>>>>>>>>>> require
>>>>>>>>>> >>>> considerable knowledge of Spark
internals, which is beyond
>>>>>>>>>> me at
>>>>>>>>>> >>>> present.
>>>>>>>>>> >>>>  Does anyone have any better (and
ideally simpler) ideas?
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Cheers,
>>>>>>>>>> >>>>
>>>>>>>>>> >>>> Graham
>>>>>>>>>> >>>>
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message