flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Kafka consumer to sync topics by event time?
Date Mon, 16 Apr 2018 12:14:43 GMT
Fully agree Juho!

Do you want to contribute the docs fix?
If yes, we should update FLINK-5479 to make sure that the warning is
removed once the bug is fixed.

Thanks, Fabian

2018-04-12 9:32 GMT+02:00 Juho Autio <juho.autio@rovio.com>:

> Looks like the bug https://issues.apache.org/jira/browse/FLINK-5479 is
> entirely preventing this feature to be used if there are any idle
> partitions. It would be nice to mention in documentation that currently
> this requires all subscribed partitions to have a constant stream of data
> with growing timestamps. When watermark gets stalled on an idle partition
> it blocks everything.
>
> Link to current documentation:
> https://ci.apache.org/projects/flink/flink-docs-
> master/dev/connectors/kafka.html#kafka-consumers-and-
> timestamp-extractionwatermark-emission
>
> On Mon, Dec 4, 2017 at 4:29 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> You are right, offsets cannot be used for tracking processing progress. I
>> think setting Kafka offsets with respect to some progress notion other than
>> "has been consumed" would be highly application specific and hard to
>> generalize.
>> As you said, there might be a window (such as a session window) that is
>> open much longer than all other windows and which would hold back the
>> offset. Other applications might not use the built-in windows at all but
>> custom ProcessFunctions.
>>
>> Have you considered tracking progress using watermarks?
>>
>> 2017-12-04 14:42 GMT+01:00 Juho Autio <juho.autio@rovio.com>:
>>
>>> Thank you Fabian. Really clear explanation. That matches with my
>>> observation indeed (data is not dropped from either small or big topic, but
>>> the offsets are advancing in kafka side already before those offsets have
>>> been triggered from a window operator).
>>>
>>> This means that it's a bit harder to meaningfully monitor the job's
>>> progress solely based on kafka consumer offsets. Is there a reason why
>>> Flink couldn't instead commit the offsets after they have been triggered
>>> from downstream windows? I could imagine that this might pose a problem if
>>> there are any windows that remain open for a very long time, but in general
>>> it would be useful IMHO. Or Flink could even commit both (read vs.
>>> triggered) offsets to kafka for monitoring purposes.
>>>
>>> On Mon, Dec 4, 2017 at 3:30 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>>
>>>> Hi Juho,
>>>>
>>>> the partitions of both topics are independently consumed, i.e., at
>>>> their own speed without coordination. With the configuration that Gordon
>>>> linked, watermarks are generated per partition.
>>>> Each source task maintains the latest (and highest) watermark per
>>>> partition and propagates the smallest watermark. The same mechanism is
>>>> applied for watermarks across tasks (this is what Kien referred to).
>>>>
>>>> In the case that you are describing, the partitions of the smaller
>>>> topic are faster consumed (hence the offsets are faster aligned) but
>>>> watermarks are emitted "at the speed" of the bigger topic.
>>>> Therefore, the timestamps of records from the smaller topic can be much
>>>> ahead of the watermark.
>>>> In principle, that does not pose a problem. Stateful operators (such as
>>>> windows) remember the "early" records and process them when they receive
a
>>>> watermark passes the timestamps of the early records.
>>>>
>>>> Regarding your question "Are they committed to Kafka before their
>>>> watermark has passed on Flink's side?":
>>>> The offsets of the smaller topic might be checkpointed when all
>>>> partitions have been read to the "end" and the bigger topic is still
>>>> catching up.
>>>> The watermarks are moving at the speed of the bigger topic, but all
>>>> "early" events of the smaller topic are stored in stateful operators and
>>>> are checkpointed as well.
>>>>
>>>> So, you do not lose neither early nor late data.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>>
>>>> 2017-12-01 13:43 GMT+01:00 Juho Autio <juho.autio@rovio.com>:
>>>>
>>>>> Thanks for the answers, I still don't understand why I can see the
>>>>> offsets being quickly committed to Kafka for the "small topic"? Are they
>>>>> committed to Kafka before their watermark has passed on Flink's side?
That
>>>>> would be quite confusing.. Indeed when Flink handles the state/offsets
>>>>> internally, the consumer offsets are committed to Kafka just for reference.
>>>>>
>>>>> Otherwise, what you're saying sounds very good to me. The
>>>>> documentation just doesn't explicitly say anything about how it works
>>>>> across topics.
>>>>>
>>>>> On Kien's answer: "When you join multiple stream with different
>>>>> watermarks", note that I'm not joining any topics myself, I get them
as a
>>>>> single stream from the Flink kafka consumer based on the list of topics
>>>>> that I asked for.
>>>>>
>>>>> Thanks,
>>>>> Juho
>>>>>
>>>>> On Wed, Nov 22, 2017 at 2:57 PM, Tzu-Li (Gordon) Tai <
>>>>> tzulitai@apache.org> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> The FlinkKafkaConsumer can handle watermark advancement with
>>>>>> per-Kafka-partition awareness (across partitions of different topics).
>>>>>> You can see an example of how to do that here [1].
>>>>>>
>>>>>> Basically what this does is that it generates watermarks within the
>>>>>> Kafka
>>>>>> consumer individually for each Kafka partition, and the per-partition
>>>>>> watermarks are aggregated and emitted from the consumer in the same
>>>>>> way that
>>>>>> watermarks are aggregated on a stream shuffle; only when the low
>>>>>> watermark
>>>>>> advances across all partitions, should a watermark be emitted from
the
>>>>>> consumer.
>>>>>>
>>>>>> Therefore, this helps avoid the problem that you described, in which
a
>>>>>> "big_topic" has subscribed partitions that lags behind others. In
>>>>>> this case
>>>>>> and when the above feature is used, the event time would advance
>>>>>> along with
>>>>>> the lagging "big_topic" partitions and would not result in messages
>>>>>> being
>>>>>> recognized as late and discarded.
>>>>>>
>>>>>> Cheers,
>>>>>> Gordon
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
>>>>>> dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Sent from: http://apache-flink-user-maili
>>>>>> ng-list-archive.2336050.n4.nabble.com/
>>>>>>
>>>>>
>>>>>
>>>
>>
>

Mime
View raw message