flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Custom TimestampExtractor and FlinkKafkaConsumer082
Date Mon, 16 Nov 2015 12:37:01 GMT
Hi,
yes, at your data-rate emitting a watermark for every element should not be a problem. It
could become a problem with higher data-rates since the system can get overwhelmed if every
element also generates a watermark. In that case I would suggest storing the lastest element-timestamp
in an internal field and only emitting in getCurrentWatermark(), since then, then the watermark
interval can be tunes using the auto-watermark interval setting.

But that should not be the cause of the problem that you currently have. Would you maybe be
willing to send me some (mock) example data and the code so that I can reproduce the problem
and have a look at it? to aljoscha at apache.org.

Cheers,
Aljoscha
> On 16 Nov 2015, at 13:05, Konstantin Knauf <konstantin.knauf@tngtech.com> wrote:
> 
> Hi Aljoscha,
> 
> ok, now I at least understand, why it works with fromElements(...). For
> the rest I am not so sure.
> 
>> What this means in your case is that the watermark can only advance if
> a new element arrives, because only then is the watermark updated.
> 
> But new elements arrive all the time, about 50/s, or do you mean
> something else?
> 
> getCurrentWatermark returning Long.MIN_VALUE still seems to be an ok
> choice, if i understand the semantics correctly. It just affects
> watermarking in the absence of events, right?
> 
> Cheers,
> 
> Konstantin
> 
> 
> On 16.11.2015 12:31, Aljoscha Krettek wrote:
>> Hi,
>> it could be what Gyula mentioned. Let me first go a bit into how the TimestampExtractor
works internally.
>> 
>> First, the timestamp extractor internally keeps the value of the last emitted watermark.
Then, the semantics of the TimestampExtractor are as follows :
>> - the result of extractTimestamp is taken and it replaces the internal timestamp
of the element
>> - if the result of extractWatermark is larger than the last watermark the new value
is emitted as a watermark and the value is stored
>> - getCurrentWatermark is called on the specified auto-watermark interval, if the
returned value is larger than the last watermark it is emitted and stored as last watermark
>> 
>> What this means in your case is that the watermark can only advance if a new element
arrives, because only then is the watermark updated.
>> 
>> The reason why you see results if you use fromElements is that the window-operator
also emits all the windows that it currently has buffered if the program closes. This happens
in the case of fromElements because only a finite number of elements is emitted, after which
the source closes, thereby finishing the whole program.
>> 
>> Cheers,
>> Aljoscha
>>> On 16 Nov 2015, at 10:42, Gyula Fóra <gyula.fora@gmail.com> wrote:
>>> 
>>> Could this part of the extractor be the problem Aljoscha?
>>> 
>>> @Override
>>>    public long getCurrentWatermark() {
>>>        return Long.MIN_VALUE;
>>>    }
>>> 
>>> Gyula
>>> 
>>> Konstantin Knauf <konstantin.knauf@tngtech.com> ezt írta (időpont: 2015.
nov. 16., H, 10:39):
>>> Hi Aljoscha,
>>> 
>>> thanks for your answer. Yes I am using the same TimestampExtractor-Class.
>>> 
>>> The timestamps look good to me. Here an example.
>>> 
>>> {"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00
>>> 
>>> The order now is
>>> 
>>> stream
>>> .map(dummyMapper)
>>> .assignTimestamps(...)
>>> .timeWindow(...)
>>> 
>>> Is there a way to print out the assigned timestamps after
>>> stream.assignTimestamps(...)?
>>> 
>>> Cheers,
>>> 
>>> Konstantin
>>> 
>>> 
>>> On 16.11.2015 10:31, Aljoscha Krettek wrote:
>>>> Hi,
>>>> are you also using the timestamp extractor when you are using env.fromCollection().
>>>> 
>>>> Could you maybe insert a dummy mapper after the Kafka source that just prints
the element and forwards it? To see if the elements come with a good timestamp from Kafka.
>>>> 
>>>> Cheers,
>>>> Aljoscha
>>>>> On 15 Nov 2015, at 22:55, Konstantin Knauf <konstantin.knauf@tngtech.com>
wrote:
>>>>> 
>>>>> Hi everyone,
>>>>> 
>>>>> I have the following issue with Flink (0.10) and Kafka.
>>>>> 
>>>>> I am using a very simple TimestampExtractor like [1], which just
>>>>> extracts a millis timestamp from a POJO. In my streaming job, I read
in
>>>>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>>>> 
>>>>> stream = env.addSource(new FlinkKafkaConsumer082<
>>>>> (parameterTool.getRequired("topic"),
>>>>>               new AvroPojoDeserializationSchema(),
>>>>> parameterTool.getProperties()))
>>>>> 
>>>>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>>>>> AutoWatermarkIntervall is 500.
>>>>> 
>>>>> The problem is, when I do something like:
>>>>> 
>>>>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>>>>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>>>>> .sum(..)
>>>>> .print()
>>>>> 
>>>>> env.execute();
>>>>> 
>>>>> the windows never get triggered.
>>>>> 
>>>>> If I use ProcessingTime it works.
>>>>> If I use env.fromCollection(...) instead of the KafkaSource it works
>>>>> with EventTime, too.
>>>>> 
>>>>> Any ideas what I could be doing wrong are highly appreciated.
>>>>> 
>>>>> Cheers,
>>>>> 
>>>>> Konstantin
>>>>> 
>>>>> [1]:
>>>>> 
>>>>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo>
{
>>>>> 
>>>>>   final private long maxDelay;
>>>>> 
>>>>>   public  PojoTimestampExtractor(long maxDelay) {
>>>>>       this.maxDelay = maxDelay;
>>>>>   }
>>>>> 
>>>>>   @Override
>>>>>   public long extractTimestamp(Pojo fightEvent, long l) {
>>>>>       return pojo.getTime();
>>>>>   }
>>>>> 
>>>>>   @Override
>>>>>   public long extractWatermark(Pojo pojo, long l) {
>>>>>       return pojo.getTime() - maxDelay;
>>>>>   }
>>>>> 
>>>>>   @Override
>>>>>   public long getCurrentWatermark() {
>>>>>       return Long.MIN_VALUE;
>>>>>   }
>>>> 
>>>> 
>>> 
>>> --
>>> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
>>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>> 
>> 
> 
> -- 
> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082


Mime
View raw message