flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Petter Arvidsson <petter.arvids...@relayr.io>
Subject Re: Managing state migrations with Flink and Avro
Date Mon, 23 Apr 2018 12:21:46 GMT
Hi Timo,

Thanks for your response. We are using the filesystem backend backed by S3.

We were looking for a good long term solution with Avro, so manually
changing the serial version id is probably not the right way to proceed for
us. I think we will wait for Flink1.6 before trying to properly implement
state migrations in this case.

Regards,
Petter

On Fri, Apr 20, 2018 at 11:20 AM, Timo Walther <twalthr@apache.org> wrote:

> Hi Petter,
>
> which state backend are you using in your case? I think there is no quick
> solution for your problem because a proper schema evolution story is on the
> roadmap for Flink 1.6.
>
> Would it work to change the serial version id of the generated Avro class
> as a temporary workaround?
>
> Regards,
> Timo
>
>
> Am 18.04.18 um 14:21 schrieb Timo Walther:
>
> Thank you. Maybe we already identified the issue (see
> https://issues.apache.org/jira/browse/FLINK-9202). I will use your code
> to verify it.
>
> Regards,
> Timo
>
>
> Am 18.04.18 um 14:07 schrieb Petter Arvidsson:
>
> Hi Timo,
>
> Please find the generated class (for the second schema) attached.
>
> Regards,
> Petter
>
> On Wed, Apr 18, 2018 at 11:32 AM, Timo Walther <twalthr@apache.org> wrote:
>
>> Hi Petter,
>>
>> could you share the source code of the class that Avro generates out of
>> this schema?
>>
>> Thank you.
>>
>> Regards,
>> Timo
>>
>> Am 18.04.18 um 11:00 schrieb Petter Arvidsson:
>>
>> Hello everyone,
>>
>> I am trying to figure out how to set up Flink with Avro for state
>> management (especially the content of snapshots) to enable state migrations
>> (e.g. adding a nullable fields to a class). So far, in version 1.4.2, I
>> tried to explicitly provide an instance of "new
>> AvroTypeInfo(Accumulator.getClass())" where accumulator is a very simple
>> Avro generated SpecificRecordBase of the following schema:
>>
>> {"namespace": "io.relayr.flink",
>>  "type": "record",
>>  "name": "Accumulator",
>>  "fields": [
>>      {"name": "accumulator", "type": "int"}
>>  ]
>> }
>>
>> This successfully saves the state to the snapshot. When I then try to
>> load the snapshot with an updated schema (adding the nullable field) it
>> fails. Schema looks like this:
>>
>> {"namespace": "io.relayr.flink",
>>  "type": "record",
>>  "name": "Accumulator",
>>  "fields": [
>>      {"name": "accumulator", "type": "int"},
>>      {"name": "newStuff", "type": ["int", "null"]}
>>  ]
>> }
>>
>> When I try to restart the Job from the snapshot, I get the following
>> exception:
>> 2018-04-17 09:35:23,519 WARN  org.apache.flink.api.common.ty
>> peutils.TypeSerializerSerializationUtil  - Deserialization of serializer
>> errored; replacing with null.
>> java.io.IOException: Unloadable class for type serializer.
>> ...
>> Caused by: java.io.InvalidClassException: io.relayr.flink.Accumulator;
>> local class incompatible: stream classdesc serialVersionUID =
>> -3555733236161157838, local class serialVersionUID = 5291033088112484292
>>
>> Which is true, Avro tools do generate a new serialization ID for the
>> bean, I just didn't expect it to be used and expected the Avro schema to be
>> used instead? Did anyone get this working? What am I getting wrong?
>>
>> Best regards,
>> Petter
>>
>>
>>
>
>
>

Mime
View raw message