flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: Managing state migrations with Flink and Avro
Date Fri, 20 Apr 2018 09:20:33 GMT
Hi Petter,

which state backend are you using in your case? I think there is no 
quick solution for your problem because a proper schema evolution story 
is on the roadmap for Flink 1.6.

Would it work to change the serial version id of the generated Avro 
class as a temporary workaround?

Regards,
Timo


Am 18.04.18 um 14:21 schrieb Timo Walther:
> Thank you. Maybe we already identified the issue (see 
> https://issues.apache.org/jira/browse/FLINK-9202). I will use your 
> code to verify it.
>
> Regards,
> Timo
>
>
> Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
>> Hi Timo,
>>
>> Please find the generated class (for the second schema) attached.
>>
>> Regards,
>> Petter
>>
>> On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <twalthr@apache.org 
>> <mailto:twalthr@apache.org>> wrote:
>>
>>     Hi Petter,
>>
>>     could you share the source code of the class that Avro generates
>>     out of this schema?
>>
>>     Thank you.
>>
>>     Regards,
>>     Timo
>>
>>     Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>>>     Hello everyone,
>>>
>>>     I am trying to figure out how to set up Flink with Avro for
>>>     state management (especially the content of snapshots) to enable
>>>     state migrations (e.g. adding a nullable fields to a class). So
>>>     far, in version 1.4.2, I tried to explicitly provide an instance
>>>     of "new AvroTypeInfo(Accumulator.getClass())" where accumulator
>>>     is a very simple Avro generated SpecificRecordBase of the
>>>     following schema:
>>>
>>>     {"namespace": "io.relayr.flink",
>>>      "type": "record",
>>>      "name": "Accumulator",
>>>      "fields": [
>>>          {"name": "accumulator", "type": "int"}
>>>      ]
>>>     }
>>>
>>>     This successfully saves the state to the snapshot. When I then
>>>     try to load the snapshot with an updated schema (adding the
>>>     nullable field) it fails. Schema looks like this:
>>>
>>>     {"namespace": "io.relayr.flink",
>>>      "type": "record",
>>>      "name": "Accumulator",
>>>      "fields": [
>>>          {"name": "accumulator", "type": "int"},
>>>          {"name": "newStuff", "type": ["int", "null"]}
>>>      ]
>>>     }
>>>
>>>     When I try to restart the Job from the snapshot, I get the
>>>     following exception:
>>>     2018-04-17 09:35:23,519 WARN
>>>     org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil
>>>     - Deserialization of serializer errored; replacing with null.
>>>     java.io.IOException: Unloadable class for type serializer.
>>>     ...
>>>     Caused by: java.io.InvalidClassException:
>>>     io.relayr.flink.Accumulator; local class incompatible: stream
>>>     classdesc serialVersionUID = -3555733236161157838, local class
>>>     serialVersionUID = 5291033088112484292
>>>
>>>     Which is true, Avro tools do generate a new serialization ID for
>>>     the bean, I just didn't expect it to be used and expected the
>>>     Avro schema to be used instead? Did anyone get this working?
>>>     What am I getting wrong?
>>>
>>>     Best regards,
>>>     Petter
>>
>>
>>
>


Mime
View raw message