flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Managing state migrations with Flink and Avro
Date Wed, 18 Apr 2018 12:21:15 GMT
Thank you. Maybe we already identified the issue (see 
https://issues.apache.org/jira/browse/FLINK-9202). I will use your code 
to verify it.

Regards,
Timo


Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
> Hi Timo,
>
> Please find the generated class (for the second schema) attached.
>
> Regards,
> Petter
>
> On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <twalthr@apache.org 
> <mailto:twalthr@apache.org>> wrote:
>
>     Hi Petter,
>
>     could you share the source code of the class that Avro generates
>     out of this schema?
>
>     Thank you.
>
>     Regards,
>     Timo
>
>     Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>>     Hello everyone,
>>
>>     I am trying to figure out how to set up Flink with Avro for state
>>     management (especially the content of snapshots) to enable state
>>     migrations (e.g. adding a nullable fields to a class). So far, in
>>     version 1.4.2, I tried to explicitly provide an instance of "new
>>     AvroTypeInfo(Accumulator.getClass())" where accumulator is a very
>>     simple Avro generated SpecificRecordBase of the following schema:
>>
>>     {"namespace": "io.relayr.flink",
>>      "type": "record",
>>      "name": "Accumulator",
>>      "fields": [
>>          {"name": "accumulator", "type": "int"}
>>      ]
>>     }
>>
>>     This successfully saves the state to the snapshot. When I then
>>     try to load the snapshot with an updated schema (adding the
>>     nullable field) it fails. Schema looks like this:
>>
>>     {"namespace": "io.relayr.flink",
>>      "type": "record",
>>      "name": "Accumulator",
>>      "fields": [
>>          {"name": "accumulator", "type": "int"},
>>          {"name": "newStuff", "type": ["int", "null"]}
>>      ]
>>     }
>>
>>     When I try to restart the Job from the snapshot, I get the
>>     following exception:
>>     2018-04-17 09:35:23,519 WARN
>>     org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
>>     - Deserialization of serializer errored; replacing with null.
>>     java.io.IOException: Unloadable class for type serializer.
>>     ...
>>     Caused by: java.io.InvalidClassException:
>>     io.relayr.flink.Accumulator; local class incompatible: stream
>>     classdesc serialVersionUID = -3555733236161157838, local class
>>     serialVersionUID = 5291033088112484292
>>
>>     Which is true, Avro tools do generate a new serialization ID for
>>     the bean, I just didn't expect it to be used and expected the
>>     Avro schema to be used instead? Did anyone get this working? What
>>     am I getting wrong?
>>
>>     Best regards,
>>     Petter
>
>
>


Mime
View raw message