flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Need help on Streaming API | Flink | GlobalWindow and Customized Trigger
Date Wed, 24 May 2017 09:38:03 GMT
Hi,

I think the problem is that the Trigger only uses processing time to determine when to trigger.
If the job shuts down (which happens when the sources shut down and the whole pipeline is
flushed) pending processing-time triggers are not fired.

You can use the fact that when sources shutdown they emit a Long.MAX_VALUE watermark to signal
that there won’t be any data in the future. For this, you have to enable watermarks (you
can do this via env.setStreamTimeCharacteristic(EventTime)) and then set an event-time timer
in your Trigger for Long.MAX_VALUE. This will call your onEventTime() method and allow the
Trigger to fire before shutting down.

Best,
Aljoscha

> On 24. May 2017, at 07:08, Samim Ahmed <samim1216@gmail.com> wrote:
> 
> Hello Aljoscha,
> 
> Sorry !!! , I am asking for updates on my issue. 
> Did you get a chance to have a look on the trigger files, I am blocked. Please have a
look if you have time in your hand. 
> Waiting for your answer and again sorry for this mail.
> 
> Regards,
> Samim 
> 
> On Tue, May 23, 2017 at 1:00 AM, Samim Ahmed <samim1216@gmail.com <mailto:samim1216@gmail.com>>
wrote:
> Hello Aljoscha,
> 
> Thanks for have look on this issue.
> 
> I have coppied the mail code for the flow of execution and the trigger code are attached
with this mail.
> In main class:
> 
> AggregationProfile vtrAggProfile = new VtrAggregationProfile();
> 1. decode the input file
> 2. Filter Events
> 	
> 		DataStream<EventBean> vtrFilteredStream = vtrDecodedEventsDs.filter(vtrAggProfile).setParallelism(vtrParserParallelism);
> 
> 3. Correlate VTR Records 
> 		
> 		DataStream<ISession> vtrSessionStream=vtrFilteredStream
> 		.keyBy(vtrAggProfile)
> 		.window(GlobalWindows.create())
> 		.trigger(vtrAggProfile)  <=== this is the trigger creates problem for last few minutes
data.
> 		.apply(vtrAggProfile).setParallelism(maxParallelism);
> 
> Attached file names :
> 1. VerAggregationProfile.java
> 2. AggregationProfile.java
> 
> 
> Please let me know if you need any other information. Thanks in advance .
> 
> 
> //Regards,
> Samim
> 
> On Mon, May 22, 2017 at 6:30 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
> Hi,
> 
> If your could give us a look at your custom Trigger we might be able to figure out what’s
going on.
> 
> Best,
> Aljoscha
> 
>> On 22. May 2017, at 09:06, Samim Ahmed <samim1216@gmail.com <mailto:samim1216@gmail.com>>
wrote:
>> 
>> Hello All,
>> 
>> Hope you are doing well..
>>  
>> Myself Samim and I am working of POC(proof of concept) for a project. In this project
we are using Apache Flink to process the stream data and find the required pattern and finally
dump those patterns in DB.
>>  
>> So to implement this we have used the global window and customized trigger to done
our work.
>> While testing we observed that output is coming as expected but we are loosing the
data for few minutes when the Stream ends at input.
>>  
>> For example If the data streaming stared at 1pm and it ends at 5pm on the same day
and in out put we found the data is missing for the time 4:55pm to 5 pm. Also we observed
when the input data stream finishes immediately the entire process stops and the last few
minutes data are remains inside the window.
>>  
>> We need your help here to overcome this last minutes data missing issue as I am new
to this flink framework. Do we have any API available to solve this problem or it is the Flink
limitation?
>>  
>> It’ll be great if you share your views and do let me know if you need any further
information.
>>  
>> I am waiting for your inputs, Thanks in advance.
>>  
>> Thanks,
>> Samim.
> 
> 
> 
> 
> -- 
> Regards,
> Samim Ahmed 
> Mumbai
> 09004259232 <tel:090042%2059232>
> 
> 
> 
> 
> -- 
> Regards,
> Samim Ahmed 
> Mumbai
> 09004259232


Mime
View raw message