flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Konstantin Knauf <konstantin.kn...@tngtech.com>
Subject Re: Custom TimestampExtractor and FlinkKafkaConsumer082
Date Mon, 16 Nov 2015 09:39:23 GMT
Hi Aljoscha,

thanks for your answer. Yes I am using the same TimestampExtractor-Class.

The timestamps look good to me. Here an example.

{"time": 1447666537260, ...} And parsed: 2015-11-16T10:35:37.260+01:00

The order now is

stream
.map(dummyMapper)
.assignTimestamps(...)
.timeWindow(...)

Is there a way to print out the assigned timestamps after
stream.assignTimestamps(...)?

Cheers,

Konstantin


On 16.11.2015 10:31, Aljoscha Krettek wrote:
> Hi,
> are you also using the timestamp extractor when you are using env.fromCollection().
> 
> Could you maybe insert a dummy mapper after the Kafka source that just prints the element
and forwards it? To see if the elements come with a good timestamp from Kafka.
> 
> Cheers,
> Aljoscha
>> On 15 Nov 2015, at 22:55, Konstantin Knauf <konstantin.knauf@tngtech.com> wrote:
>>
>> Hi everyone,
>>
>> I have the following issue with Flink (0.10) and Kafka.
>>
>> I am using a very simple TimestampExtractor like [1], which just
>> extracts a millis timestamp from a POJO. In my streaming job, I read in
>> these POJOs from Kafka using the FlinkKafkaConsumer082 like this:
>>
>> stream = env.addSource(new FlinkKafkaConsumer082<
>> (parameterTool.getRequired("topic"),
>>                new AvroPojoDeserializationSchema(),
>> parameterTool.getProperties()))
>>
>> I have timestampEnabled() and the TimeCharacteristics are EventTime,
>> AutoWatermarkIntervall is 500.
>>
>> The problem is, when I do something like:
>>
>> stream.assignTimestamps(new PojoTimestampExtractor(6000))
>> .timeWindowAll(Time.of(1, TimeUnit.SECONDS)
>> .sum(..)
>> .print()
>>
>> env.execute();
>>
>> the windows never get triggered.
>>
>> If I use ProcessingTime it works.
>> If I use env.fromCollection(...) instead of the KafkaSource it works
>> with EventTime, too.
>>
>> Any ideas what I could be doing wrong are highly appreciated.
>>
>> Cheers,
>>
>> Konstantin
>>
>> [1]:
>>
>> public class PojoTimestampExtractor implements TimestampExtractor<Pojo> {
>>
>>    final private long maxDelay;
>>
>>    public  PojoTimestampExtractor(long maxDelay) {
>>        this.maxDelay = maxDelay;
>>    }
>>
>>    @Override
>>    public long extractTimestamp(Pojo fightEvent, long l) {
>>        return pojo.getTime();
>>    }
>>
>>    @Override
>>    public long extractWatermark(Pojo pojo, long l) {
>>        return pojo.getTime() - maxDelay;
>>    }
>>
>>    @Override
>>    public long getCurrentWatermark() {
>>        return Long.MIN_VALUE;
>>    }
> 
> 

-- 
Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
Sitz: Unterföhring * Amtsgericht München * HRB 135082

Mime
View raw message