flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arvid Heise <arvid.he...@gmail.com>
Subject Re: Managed State Custom Serializer with Avro
Date Tue, 20 Feb 2018 10:40:27 GMT
Hi guys,

just wanted to write about that topic on my own.

The FF talk of Tzu-Li gave me also the impression that by just using
AvroSerializer, we get some kind of state evolution for free.
https://www.slideshare.net/FlinkForward/flink-forward-berlin-2017-tzuli-gordon-tai-managing-state-in-apache-flink

However, I discovered two issues on 1.3.2:

1. The AvroSerializer does not use read/write schema. The snapshot
stores type information instead of the more plausible schema
information.
However, the actual type should not matter as long as a compatible
type is used for state restoration.
I have rewritten the AvroSerializer to store the schema in the
snapshot config and actually uses it as a read schema during the
initialization of the DatumReader.

2. During integration tests, it turns out that the current
implementation of the StateDescriptor always returns copies of the
serializer through #getSerializer. So #ensureCompatibility is invoked
on a different serializer than the actual #deserialize method. So
although my AvroSerializer sets the correct read schema, it is not
used, since it is set on the wrong instance.
I propose to make sure that #ensureCompatibility is invoked on the
original serializer in the state descriptor. Otherwise all adjustments
to the serializer are lost.

I can provide tests and patches if needed.

One related question:

If I do an incremental snapshot with RocksDB backend and keyed state
backend, is the snapshot config attached to all keys? So would the
following work:
* Write (key1, value1) and (key2, value2) with schema1. Do cancel with snapshot.
* Read (key1, value1) with schema1->schema2 and write with (key1,
value1). Do cancel with snapshot.
<Now we have two different schemas in the snapshots>
* Read (key1, value1) with schema2 and read with (key2, value2) with
schema1->schema2.

Thanks for any feedback

Arvid

On Mon, Feb 19, 2018 at 7:17 PM, Niels Denissen <nielsdenissen@gmail.com> wrote:
> Hi Till,
>
> Thanks for the quick reply, I'm using 1.3.2 atm.
>
> Cheers,
> Niels
>
> On Feb 19, 2018 19:10, "Till Rohrmann" <trohrmann@apache.org> wrote:
>>
>> Hi Niels,
>>
>> which version of Flink are you using? Currently, Flink does not support to
>> upgrade the TypeSerializer itself, if I'm not mistaken. As you've described,
>> it will try to use the old serializer stored in the checkpoint stream to
>> restore state.
>>
>> I've pulled Gordon into the conversation who can tell you a little bit
>> more about the current capability and limitations of state evolution.
>>
>> Cheers,
>> Till
>>
>> On Mon, Feb 19, 2018 at 4:14 PM, Niels <[hidden email]> wrote:
>>>
>>> Hi all,
>>>
>>> I'm currently trying to use Avro in order to evolve our data present in
>>> Flink's Managed State. I've extended the TypeSerializer class
>>> successfully
>>> for this purpose, but still have issues using Schema Evolution.
>>>
>>> *The problem:*
>>> When we try to read data (deserialize from savepoint) with a new
>>> serialiser
>>> and a new schema, Flink seems to use the old schema of the old serializer
>>> (written to the savepoint). This results in an old GenericRecord that
>>> doesn't adhere to the new Avro schema.
>>>
>>> *What seems to happen to me is the following* (Say we evolve from dataV1
>>> to
>>> dataV2):
>>> - State containing dataV1 is serialized with avro schema V1 to a
>>> check/savepoint. Along with the data, the serializer itself is written.
>>> - Upon restore, the old serializer is retrieved from the data (therefore
>>> needs to be on the classpath). Data is restored using this old
>>> serializer.
>>> The new serializer provided is only used for writes.
>>>
>>> If this is indeed the case it explains our aforementioned problem. If you
>>> have any pointers as to whether this is true and what a possible solution
>>> would be that would be very much appreciated!
>>>
>>> Thanks!
>>> Niels
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>>
>>
>>
>> ________________________________
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Managed-State-Custom-Serializer-with-Avro-tp18419p18437.html
>> To unsubscribe from Managed State Custom Serializer with Avro, click here.
>> NAML

Mime
View raw message