flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Window Function on AllWindowed Stream - Combining Kafka Topics
Date Fri, 05 May 2017 08:29:08 GMT
What’s the KeySelector you’re using? To me, this indicates that the timestamp field is
somehow changing after the original keying or in transit.

Best.
Aljoscha
> On 4. May 2017, at 22:01, G.S.Vijay Raajaa <gsvijayraajaa@gmail.com> wrote:
> 
> I tried to reorder and the window function works fine. but then after processing few
stream of data from Topic A and Topic B, the window function seem to throw the below error.
The keyby is on eventTime field.
> 
> java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
> 
> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
> 
> at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
> 
> at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
> 
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
> 
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> 
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
> 
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> 
> at java.lang.Thread.run(Thread.java:745)
> 
> 
> 
> Regards,
> 
> Vijay Raajaa GS 
> 
> 
> On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <gsvijayraajaa@gmail.com <mailto:gsvijayraajaa@gmail.com>>
wrote:
> Thanks for your input, will try to incorporate them in my implementation.
> 
> Regards,
> Vijay Raajaa G S
> 
> On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
> The approach could work, but if it can happen that an event from stream A is not matched
by an event in stream B you will have lingering state that never goes away. For such cases
it might be better to write a custom CoProcessFunction as sketched here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
<https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>.
> 
> The idea is to keep events from each side in state and emit a result when you get the
event from the other side. You also set a cleanup timer in case no other event arrives to
make sure that state eventually goes away.
> 
> Best,
> Aljoscha
> 
>> On 3. May 2017, at 11:47, G.S.Vijay Raajaa <gsvijayraajaa@gmail.com <mailto:gsvijayraajaa@gmail.com>>
wrote:
>> 
>> Sure. Thanks for the pointer, let me reorder the same. Any comments about the approach
followed for merging topics and creating a single JSON?
>> 
>> Regards,
>> Vijay Raajaa G S
>> 
>> On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
>> Hi,
>> An AllWindow operator requires an AllWindowFunction, instead of a WindowFunction.
In your case, the keyBy() seems to be in the wrong place, to get a keyed window you have to
write something akin to:
>> 
>> inputStream
>>   .keyBy(…)
>>   .window(…)
>>   .apply(…) // or reduce()
>> 
>> In your case, you key the stream and then the keying is “lost” again because
you apply a flatMap(). That’s why you have an all-window and not a keyed window.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraajaa@gmail.com <mailto:gsvijayraajaa@gmail.com>>
wrote:
>>> 
>>> Hi,
>>> 
>>> I am trying to combine two kafka topics using the a single kafka consumer on
a list of topics, further convert the json string in the stream to POJO. Then, join them via
keyBy ( On event time field ) and to merge them as a single fat json, I was planning to use
a window stream and apply a window function on the window stream. The assumption is that Topic-A
& Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON
) will be present with the same eventTime. Hence was planning to use a coutWindow(2) post
keyBy on eventTime.
>>> 
>>> I have couple of questions for the same;
>>> 
>>> 1. Is the approach fine for merging topics and creating a single JSON?
>>> 2. The window function on All Window stream doesnt seem to work fine; Any pointers
will be greatly appreciated.
>>> 
>>> Code Snippet : 
>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>> logger.info <http://logger.info/>("Flink Stream Window Charger has started");
>>> 
>>> Properties properties = new Properties();
>>> 
>>> properties.setProperty("bootstrap.servers", "127.0.0.1:1030 <http://127.0.0.1:1030/>");
>>> 
>>> properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka <http://127.0.0.1:2181/service-kafka>");
>>> 
>>> properties.setProperty("group.id <http://group.id/>", "group-0011");
>>> 
>>> properties.setProperty("auto.offset.reset", "smallest");
>>> 
>>> 
>>> 
>>> List < String > names = new ArrayList < > ();
>>> 
>>> 
>>> 
>>> names.add("Topic-A");
>>> 
>>> names.add("Topic-B");
>>> 
>>> 
>>> 
>>> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 <
> (names, new SimpleStringSchema(), properties));
>>> 
>>> DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime)
-> TopicPojo.getEventTime());
>>> 
>>> List < String > where = new ArrayList < String > ();
>>> 
>>> AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new
Tokenizer()).countWindowAll(2);
>>> 
>>> DataStream < String > data_charging = data_window.apply(new MyWindowFunction());
>>> 
>>> data_charging.addSink(new SinkFunction < String > () {
>>> 
>>> 
>>> 
>>> public void invoke(String value) throws Exception {
>>> 
>>> 
>>> 
>>>   // Yet to be implemented - Merge two POJO into one 
>>> 
>>>  }
>>> 
>>> });
>>> 
>>> 
>>> 
>>> try
>>> 
>>> {
>>> 
>>>  env.execute();
>>> 
>>> } catch (Exception e)
>>> 
>>> {
>>> 
>>>  return;
>>> 
>>> }
>>> 
>>> }
>>> 
>>> }
>>> 
>>> class Tokenizer implements FlatMapFunction < TopicPojo, String > {
>>> 
>>>  private static final long serialVersionUID = 1 L;
>>> 
>>>  @Override
>>> 
>>>  public void flatMap(TopicPojo value, Collector < String > out) throws
Exception {
>>> 
>>>   ObjectMapper mapper = new ObjectMapper();
>>> 
>>>   out.collect(mapper.writeValueAsString(value));
>>> 
>>>  }
>>> 
>>> }
>>> 
>>> class MyWindowFunction implements WindowFunction < TopicPojo, String, String,
GlobalWindow > {
>>> 
>>>  @Override
>>> 
>>>  public void apply(String key, GlobalWindow window, Iterable < TopicPojo >
arg2, Collector < String > out)
>>> 
>>>  throws Exception {
>>> 
>>>   int count = 0;
>>> 
>>>   for (TopicPojo in : arg2) {
>>> 
>>>    count++;
>>> 
>>>   }
>>> 
>>>   // Test Result - TO be modified
>>> 
>>>   out.collect("Window: " + window + "count: " + count);
>>> 
>>> 
>>> 
>>>  }
>>> 
>>> }
>>> 
>>> class Deserializer implements MapFunction < String, TopicPojo > {
>>> 
>>>  private static final long serialVersionUID = 1 L;
>>> 
>>>  @Override
>>> 
>>>  public TopicPojo map(String value) throws IOException {
>>> 
>>>   // TODO Auto-generated method stub
>>> 
>>>   ObjectMapper mapper = new ObjectMapper();
>>> 
>>>   TopicPojo obj = null;
>>> 
>>>   try {
>>> 
>>> 
>>> 
>>>    System.out.println(value);
>>> 
>>> 
>>> 
>>>    obj = mapper.readValue(value, TopicPojo.class);
>>> 
>>> 
>>> 
>>>   } catch (JsonParseException e) {
>>> 
>>> 
>>> 
>>>    // TODO Auto-generated catch block
>>> 
>>> 
>>> 
>>>    throw new IOException("Failed to deserialize JSON object.");
>>> 
>>> 
>>> 
>>>   } catch (JsonMappingException e) {
>>> 
>>> 
>>> 
>>>    // TODO Auto-generated catch block
>>> 
>>> 
>>> 
>>>    throw new IOException("Failed to deserialize JSON object.");
>>> 
>>>   } catch (IOException e) {
>>> 
>>> 
>>> 
>>>    // TODO Auto-generated catch block
>>> 
>>> 
>>> 
>>>    throw new IOException("Failed to deserialize JSON object.");
>>> 
>>>   }
>>> 
>>>   return obj;
>>> 
>>>  }
>>> 
>>> }
>>> 
>>> 
>>> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>)
in the type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments
(MyWindowFunction) error.
>>> 
>>> Kindly give your input.
>>> 
>>> Regards,
>>> Vijay Raajaa GS 
>>> 
>> 
>> 
> 
> 
> 


Mime
View raw message