flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "G.S.Vijay Raajaa" <gsvijayraa...@gmail.com>
Subject Re: Window Function on AllWindowed Stream - Combining Kafka Topics
Date Fri, 05 May 2017 08:32:37 GMT
I tried the timestamp field as a string datatype as well as a Date object.
Getting same error in both the cases;

Please find the POJO file:

import java.text.DateFormat;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import com.fasterxml.jackson.annotation.JsonAnyGetter;

import com.fasterxml.jackson.annotation.JsonAnySetter;

import com.fasterxml.jackson.annotation.JsonFormat;

import com.fasterxml.jackson.annotation.JsonIgnore;

import com.fasterxml.jackson.annotation.JsonInclude;

import com.fasterxml.jackson.annotation.JsonProperty;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;

import org.apache.commons.lang.builder.ToStringBuilder;


@JsonPropertyOrder({

"data",

"label",

"eventTime"

})

public class TopicPojo {


@JsonProperty("data")

private List<List<Double>> data = null;

@JsonProperty("label")

private List<String> label = null;

@JsonProperty("eventTime")

private  static  Date eventTime;


/**

* No args constructor for use in serialization

*

*/

public TopicPojo() {

}


/**

*

* @param data

* @param label

* @param eventTime

*/

public SammonsPojo(List<List<Double>> data, List<String> label, Date
eventTime) {

super();

this.data = data;

this.label = label;

this.eventTime = eventTime;

}


@JsonProperty("data")

public List<List<Double>> getData() {

return data;

}


@JsonProperty("data")

public void setData(List<List<Double>> data) {

this.data = data;

}


@JsonProperty("label")

public List<String> getLabel() {

return label;

}


@JsonProperty("label")

public void setLabel(List<String> label) {

this.label = label;

}


@JsonProperty("eventTime")

public static Date getEventTime() {

return eventTime;

}


@JsonProperty("eventTime")

public void setEventTime(Date eventTime) {

this.eventTime = eventTime;

}


@Override

public String toString() {

return ToStringBuilder.reflectionToString(this);

}


}


The above code pertains to eventTime as Date object , tried them as String
as well.

Regards,

Vijay Raajaa G S

On Fri, May 5, 2017 at 1:59 PM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> What’s the KeySelector you’re using? To me, this indicates that the
> timestamp field is somehow changing after the original keying or in transit.
>
> Best.
> Aljoscha
>
> On 4. May 2017, at 22:01, G.S.Vijay Raajaa <gsvijayraajaa@gmail.com>
> wrote:
>
> I tried to reorder and the window function works fine. but then after
> processing few stream of data from Topic A and Topic B, the window function
> seem to throw the below error. The keyby is on eventTime field.
>
> java.lang.RuntimeException: Unexpected key group index. This indicates a
> bug.
>
> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
>
> at org.apache.flink.runtime.state.heap.HeapListState.add(H
> eapListState.java:98)
>
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:372)
>
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
>
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInpu
> tStreamTask.java:63)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:272)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
> Regards,
>
> Vijay Raajaa GS
>
> On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <gsvijayraajaa@gmail.com>
> wrote:
>
>> Thanks for your input, will try to incorporate them in my implementation.
>>
>> Regards,
>> Vijay Raajaa G S
>>
>> On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>
>>> The approach could work, but if it can happen that an event from stream
>>> A is not matched by an event in stream B you will have lingering state that
>>> never goes away. For such cases it might be better to write a custom
>>> CoProcessFunction as sketched here: https://ci.apache.org/pr
>>> ojects/flink/flink-docs-release-1.2/dev/stream/process_function.html.
>>>
>>> The idea is to keep events from each side in state and emit a result
>>> when you get the event from the other side. You also set a cleanup timer in
>>> case no other event arrives to make sure that state eventually goes away.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 3. May 2017, at 11:47, G.S.Vijay Raajaa <gsvijayraajaa@gmail.com>
>>> wrote:
>>>
>>> Sure. Thanks for the pointer, let me reorder the same. Any comments
>>> about the approach followed for merging topics and creating a single
>>> JSON?
>>>
>>> Regards,
>>> Vijay Raajaa G S
>>>
>>> On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljoscha@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>> An AllWindow operator requires an AllWindowFunction, instead of a
>>>> WindowFunction. In your case, the keyBy() seems to be in the wrong place,
>>>> to get a keyed window you have to write something akin to:
>>>>
>>>> inputStream
>>>>   .keyBy(…)
>>>>   .window(…)
>>>>   .apply(…) // or reduce()
>>>>
>>>> In your case, you key the stream and then the keying is “lost” again
>>>> because you apply a flatMap(). That’s why you have an all-window and not
a
>>>> keyed window.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraajaa@gmail.com>
>>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I am trying to combine two kafka topics using the a single kafka
>>>> consumer on a list of topics, further convert the json string in the stream
>>>> to POJO. Then, join them via keyBy ( On event time field ) and to merge
>>>> them as a single fat json, I was planning to use a window stream and apply
>>>> a window function on the window stream. The assumption is that Topic-A &
>>>> Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON )
,
>>>> Topic B (JSON ) will be present with the same eventTime. Hence was planning
>>>> to use a coutWindow(2) post keyBy on eventTime.
>>>>
>>>> I have couple of questions for the same;
>>>>
>>>> 1. Is the approach fine for merging topics and creating a single JSON?
>>>> 2. The window function on All Window stream doesnt seem to work fine;
>>>> Any pointers will be greatly appreciated.
>>>>
>>>> Code Snippet :
>>>>
>>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>>> ExecutionEnvironment();
>>>>
>>>> logger.info("Flink Stream Window Charger has started");
>>>>
>>>> Properties properties = new Properties();
>>>>
>>>> properties.setProperty("bootstrap.servers", "127.0.0.1:1030");
>>>>
>>>> properties.setProperty("zookeeper.connect", "
>>>> 127.0.0.1:2181/service-kafka");
>>>>
>>>> properties.setProperty("group.id", "group-0011");
>>>>
>>>> properties.setProperty("auto.offset.reset", "smallest");
>>>>
>>>>
>>>> List < String > names = new ArrayList < > ();
>>>>
>>>>
>>>> names.add("Topic-A");
>>>>
>>>> names.add("Topic-B");
>>>>
>>>>
>>>> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08
>>>> < > (names, new SimpleStringSchema(), properties));
>>>>
>>>> DataStream < TopicPojo > pojo = stream.map(new
>>>> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());
>>>>
>>>> List < String > where = new ArrayList < String > ();
>>>>
>>>> AllWindowedStream < String, GlobalWindow > data_window =
>>>> pojo.flatMap(new Tokenizer()).countWindowAll(2);
>>>>
>>>> DataStream < String > data_charging = data_window.apply(new
>>>> MyWindowFunction());
>>>>
>>>> data_charging.addSink(new SinkFunction < String > () {
>>>>
>>>>
>>>> public void invoke(String value) throws Exception {
>>>>
>>>>
>>>>   // Yet to be implemented - Merge two POJO into one
>>>>
>>>>  }
>>>>
>>>> });
>>>>
>>>>
>>>> try
>>>>
>>>> {
>>>>
>>>>  env.execute();
>>>>
>>>> } catch (Exception e)
>>>>
>>>> {
>>>>
>>>>  return;
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>> }
>>>>
>>>> class Tokenizer implements FlatMapFunction < TopicPojo, String > {
>>>>
>>>>  private static final long serialVersionUID = 1 L;
>>>>
>>>>  @Override
>>>>
>>>>  public void flatMap(TopicPojo value, Collector < String > out) throws
>>>> Exception {
>>>>
>>>>   ObjectMapper mapper = new ObjectMapper();
>>>>
>>>>   out.collect(mapper.writeValueAsString(value));
>>>>
>>>>  }
>>>>
>>>> }
>>>>
>>>> class MyWindowFunction implements WindowFunction < TopicPojo, String,
>>>> String, GlobalWindow > {
>>>>
>>>>  @Override
>>>>
>>>>  public void apply(String key, GlobalWindow window, Iterable <
>>>> TopicPojo > arg2, Collector < String > out)
>>>>
>>>>  throws Exception {
>>>>
>>>>   int count = 0;
>>>>
>>>>   for (TopicPojo in : arg2) {
>>>>
>>>>    count++;
>>>>
>>>>   }
>>>>
>>>>   // Test Result - TO be modified
>>>>
>>>>   out.collect("Window: " + window + "count: " + count);
>>>>
>>>>
>>>>  }
>>>>
>>>> }
>>>>
>>>> class Deserializer implements MapFunction < String, TopicPojo > {
>>>>
>>>>  private static final long serialVersionUID = 1 L;
>>>>
>>>>  @Override
>>>>
>>>>  public TopicPojo map(String value) throws IOException {
>>>>
>>>>   // TODO Auto-generated method stub
>>>>
>>>>   ObjectMapper mapper = new ObjectMapper();
>>>>
>>>>   TopicPojo obj = null;
>>>>
>>>>   try {
>>>>
>>>>
>>>>    System.out.println(value);
>>>>
>>>>
>>>>    obj = mapper.readValue(value, TopicPojo.class);
>>>>
>>>>
>>>>   } catch (JsonParseException e) {
>>>>
>>>>
>>>>    // TODO Auto-generated catch block
>>>>
>>>>
>>>>    throw new IOException("Failed to deserialize JSON object.");
>>>>
>>>>
>>>>   } catch (JsonMappingException e) {
>>>>
>>>>
>>>>    // TODO Auto-generated catch block
>>>>
>>>>
>>>>    throw new IOException("Failed to deserialize JSON object.");
>>>>
>>>>   } catch (IOException e) {
>>>>
>>>>
>>>>    // TODO Auto-generated catch block
>>>>
>>>>
>>>>    throw new IOException("Failed to deserialize JSON object.");
>>>>
>>>>   }
>>>>
>>>>   return obj;
>>>>
>>>>  }
>>>>
>>>> }
>>>>
>>>> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>)
>>>> in the type AllWindowedStream<String,GlobalWindow> is not applicable
>>>> for the arguments (MyWindowFunction) error.
>>>>
>>>> Kindly give your input.
>>>>
>>>> Regards,
>>>> Vijay Raajaa GS
>>>>
>>>>
>>>>
>>>
>>>
>>
>
>

Mime
View raw message