flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ashish Attarde <ashish.atta...@gmail.com>
Subject Lot of data generated in out file
Date Fri, 06 Apr 2018 18:09:01 GMT
Hi Flink Team,

I am seeing one of the out file for on my task manager is dumping lot of
data.
Not sure, why this is happening. All the data that is getting dumped in out
file is ideally what *parsedInput *stream should be getting.



Here is the flink program that is executing:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

DataStream<String> rawInput = env.addSource(new FlinkKafkaConsumer010<>(
                                        "event-ft",
                                        new SimpleStringSchema(),
                                        kafkaProps).setStartFromLatest());

DataStream<String> input2 = rawInput
                            .map(new KafkaMsgReads());

DataStream<EventRec> parsedInput = input2
                                    .flatMap(new Splitter())
                                    .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<FTRecord>(Time.seconds(2)) {
                                    @Override
                                    public long
extractTimestamp(EventRec record) {
                                        return
record.getmTimeStamp()/TT_SCALE_FACTOR;
                                    }
                                }).rebalance().map(new RawInputCounter());

parsedInput
        .keyBy("mflowHashLSB","mflowHashMSB")
        .window(SlidingEventTimeWindows.of(Time.milliseconds(1000),Time.milliseconds(950)))
        .allowedLateness(Time.seconds(1))
        .apply(new CRWindow());

parsedInput.writeUsingOutputFormat(new DiscardingOutputFormat<>());

env.execute();


Here is the definition of *CRWindow* class:


public static class CRWindow  implements WindowFunction<FTRecord,
FTFlow, Tuple, TimeWindow> {

    @Override
    public void apply(Tuple key, TimeWindow window, Iterable<FTRecord>
ftRecords, Collector<FTFlow> collector) {
        return;
    }

}


Also, is there any elaborate documentation of windowing mechanism
available? I am intereseted in using windowing with ability to push
the events from one window to future window. Similar funcationality
exist in storm for pushing an event to subsequent window.


Thanks

-Ashish

Mime
View raw message