flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "olivier sohn (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-7569) '1 record' delay for counted windowed streams
Date Fri, 01 Sep 2017 15:26:02 GMT

     [ https://issues.apache.org/jira/browse/FLINK-7569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

olivier sohn updated FLINK-7569:
--------------------------------
    Description: 
In the example below (described at the end of the class documentation), we see that Flink
waits for the first element of the next window to be received before "computing" a window.
I guess this was necessary for windows based on time, but here it's a window based on count,
so I guess it should be possible to specialize the behaviour so that as soon as the window
has the right count of elements, the processing is executed, instead of waiting for the first
element of the next window.

Could this behaviour be implemented?

{code:java}

/**
 * Example on how to read with a Kafka consumer and write the size-2 windowed sum of records
using a Kafka producer
 *
 *   For example, it transforms
 *
 *   1 1 2 1 3 2 1
 *
 *   into
 *
 *   2 3 5
 *
 * Note that the Kafka source and sink are expecting the following parameters to be set
 *  - "bootstrap.servers" (comma separated list of kafka brokers)
 *  - "zookeeper.connect" (comma separated list of zookeeper servers)
 *
 * Note that the Kafka source is expecting the following parameters to be set
 *  - "topicIn" the name of the topic to read data from.
 *  - "group.id" the id of the consumer group
 *
 * Note that the Kafka sink is expecting the following parameters to be set:
 *  - "topicOut" the name of the topic to read data to.
 *
 * You can pass these required parameters using
 * "--bootstrap.servers host:port,host1:port1
 *  --zookeeper.connect host:port
 *  --topicIn testTopicIn
 *  --topicOut testTopicOut"
 *
 * This is a valid input example:
 * 		--topicIn testIn
 * 		--topicOut testOut
 * 		--bootstrap.servers 172.22.12.3:49092
 * 		--zookeeper.connect 172.22.12.3:22181/dev
 * 		--group.id myGroup
 *
 *
 * if LeaderNotAvailableException is raised, it means the topic doesn't exist
 *
 * It can be created using the command:
 * > kafka-topics --create
 *                --zookeeper 172.22.12.3:22181/dev
 *                --partitions 2
 *                --replication-factor 1
 *                --topic testTopicOut
 *
 * To test that the example works:
 * ------------------------------
 *
 * listen to results :
 * > kafka-console-consumer --bootstrap-server 172.22.12.3:49092
 *                          --whitelist testTopicOut
 *
 * run this class' main
 *
 * create inputs:
 * > echo 1 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 * > echo 2 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 * > echo 3 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 * > echo 4 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 * > echo 5 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 *
 * in the console that listens to the echo you should see :
 * > 3
 * > 7
 *
 * Note that the last input, '5', is necessary in this case. It seems Flink waits for the
first element
 * of the next window to be received before further computing the window.
 */
public class ReadFromKafkaWriteWindowedSumIntoKafka {

	public static void main(String[] args) throws Exception {
		// create flink execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// parse user parameters
		ParameterTool parameterTool = ParameterTool.fromArgs(args);

		// add a kafka source which reads from 'topicIn'
		DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(
				parameterTool.getRequired("topicIn"), new SimpleStringSchema(), parameterTool.getProperties()));

		messageStream

				// group stream elements 2 by 2
				.window(new Count(2))

				// sum the windowed stream
				.foldWindow(0, new FoldFunction<String, Integer>() {
			@Override
			public Integer fold(Integer accumulator, String value) throws Exception {
				return accumulator + Integer.parseInt(value);
			}
		})

				// convert DiscretizedStream to DataStream
				.flatten()

				// convert stream data from Integer to String
				.map(new MapFunction<Integer, String>() {
			@Override
			public String map(Integer value) throws Exception {
				return Integer.toString(value);
			}
		})

				// add a kafka sink which writes into 'topicOut'
				.addSink(new KafkaSink<>(
						parameterTool.getRequired("bootstrap.servers"),
						parameterTool.getRequired("topicOut"),
						new WriteIntoKafka.SimpleStringSchema()));

		env.execute();
	}
}
{code}

  was:

In the example below (described at the end of the class documentation), we see that Flink
waits for the first element of the next window to be received before "computing" a window.
I guess this was necessary for windows based on time, but here it's a window based on count,
so I guess it should be possible to specialize the behaviour so that as soon as the window
has the right count of elements, the processing is executed, instead of waiting for the first
element of the next window.

{code:java}

/**
 * Example on how to read with a Kafka consumer and write the size-2 windowed sum of records
using a Kafka producer
 *
 *   For example, it transforms
 *
 *   1 1 2 1 3 2 1
 *
 *   into
 *
 *   2 3 5
 *
 * Note that the Kafka source and sink are expecting the following parameters to be set
 *  - "bootstrap.servers" (comma separated list of kafka brokers)
 *  - "zookeeper.connect" (comma separated list of zookeeper servers)
 *
 * Note that the Kafka source is expecting the following parameters to be set
 *  - "topicIn" the name of the topic to read data from.
 *  - "group.id" the id of the consumer group
 *
 * Note that the Kafka sink is expecting the following parameters to be set:
 *  - "topicOut" the name of the topic to read data to.
 *
 * You can pass these required parameters using
 * "--bootstrap.servers host:port,host1:port1
 *  --zookeeper.connect host:port
 *  --topicIn testTopicIn
 *  --topicOut testTopicOut"
 *
 * This is a valid input example:
 * 		--topicIn testIn
 * 		--topicOut testOut
 * 		--bootstrap.servers 172.22.12.3:49092
 * 		--zookeeper.connect 172.22.12.3:22181/dev
 * 		--group.id myGroup
 *
 *
 * if LeaderNotAvailableException is raised, it means the topic doesn't exist
 *
 * It can be created using the command:
 * > kafka-topics --create
 *                --zookeeper 172.22.12.3:22181/dev
 *                --partitions 2
 *                --replication-factor 1
 *                --topic testTopicOut
 *
 * To test that the example works:
 * ------------------------------
 *
 * listen to results :
 * > kafka-console-consumer --bootstrap-server 172.22.12.3:49092
 *                          --whitelist testTopicOut
 *
 * run this class' main
 *
 * create inputs:
 * > echo 1 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 * > echo 2 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 * > echo 3 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 * > echo 4 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 * > echo 5 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
 *                                   --topic testTopicIn
 *
 * in the console that listens to the echo you should see :
 * > 3
 * > 7
 *
 * Note that the last input, '5', is necessary in this case. It seems Flink waits for the
first element
 * of the next window to be received before further computing the window.
 */
public class ReadFromKafkaWriteWindowedSumIntoKafka {

	public static void main(String[] args) throws Exception {
		// create flink execution environment
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// parse user parameters
		ParameterTool parameterTool = ParameterTool.fromArgs(args);

		// add a kafka source which reads from 'topicIn'
		DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(
				parameterTool.getRequired("topicIn"), new SimpleStringSchema(), parameterTool.getProperties()));

		messageStream

				// group stream elements 2 by 2
				.window(new Count(2))

				// sum the windowed stream
				.foldWindow(0, new FoldFunction<String, Integer>() {
			@Override
			public Integer fold(Integer accumulator, String value) throws Exception {
				return accumulator + Integer.parseInt(value);
			}
		})

				// convert DiscretizedStream to DataStream
				.flatten()

				// convert stream data from Integer to String
				.map(new MapFunction<Integer, String>() {
			@Override
			public String map(Integer value) throws Exception {
				return Integer.toString(value);
			}
		})

				// add a kafka sink which writes into 'topicOut'
				.addSink(new KafkaSink<>(
						parameterTool.getRequired("bootstrap.servers"),
						parameterTool.getRequired("topicOut"),
						new WriteIntoKafka.SimpleStringSchema()));

		env.execute();
	}
}
{code}


> '1 record' delay for counted windowed streams
> ---------------------------------------------
>
>                 Key: FLINK-7569
>                 URL: https://issues.apache.org/jira/browse/FLINK-7569
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.2
>         Environment: osx
>            Reporter: olivier sohn
>            Priority: Minor
>
> In the example below (described at the end of the class documentation), we see that Flink
waits for the first element of the next window to be received before "computing" a window.
I guess this was necessary for windows based on time, but here it's a window based on count,
so I guess it should be possible to specialize the behaviour so that as soon as the window
has the right count of elements, the processing is executed, instead of waiting for the first
element of the next window.
> Could this behaviour be implemented?
> {code:java}
> /**
>  * Example on how to read with a Kafka consumer and write the size-2 windowed sum of
records using a Kafka producer
>  *
>  *   For example, it transforms
>  *
>  *   1 1 2 1 3 2 1
>  *
>  *   into
>  *
>  *   2 3 5
>  *
>  * Note that the Kafka source and sink are expecting the following parameters to be set
>  *  - "bootstrap.servers" (comma separated list of kafka brokers)
>  *  - "zookeeper.connect" (comma separated list of zookeeper servers)
>  *
>  * Note that the Kafka source is expecting the following parameters to be set
>  *  - "topicIn" the name of the topic to read data from.
>  *  - "group.id" the id of the consumer group
>  *
>  * Note that the Kafka sink is expecting the following parameters to be set:
>  *  - "topicOut" the name of the topic to read data to.
>  *
>  * You can pass these required parameters using
>  * "--bootstrap.servers host:port,host1:port1
>  *  --zookeeper.connect host:port
>  *  --topicIn testTopicIn
>  *  --topicOut testTopicOut"
>  *
>  * This is a valid input example:
>  * 		--topicIn testIn
>  * 		--topicOut testOut
>  * 		--bootstrap.servers 172.22.12.3:49092
>  * 		--zookeeper.connect 172.22.12.3:22181/dev
>  * 		--group.id myGroup
>  *
>  *
>  * if LeaderNotAvailableException is raised, it means the topic doesn't exist
>  *
>  * It can be created using the command:
>  * > kafka-topics --create
>  *                --zookeeper 172.22.12.3:22181/dev
>  *                --partitions 2
>  *                --replication-factor 1
>  *                --topic testTopicOut
>  *
>  * To test that the example works:
>  * ------------------------------
>  *
>  * listen to results :
>  * > kafka-console-consumer --bootstrap-server 172.22.12.3:49092
>  *                          --whitelist testTopicOut
>  *
>  * run this class' main
>  *
>  * create inputs:
>  * > echo 1 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
>  *                                   --topic testTopicIn
>  * > echo 2 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
>  *                                   --topic testTopicIn
>  * > echo 3 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
>  *                                   --topic testTopicIn
>  * > echo 4 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
>  *                                   --topic testTopicIn
>  * > echo 5 | kafka-console-producer --broker-list 172.22.12.3:49092,172.22.12.3:49093,172.22.12.3:49094
>  *                                   --topic testTopicIn
>  *
>  * in the console that listens to the echo you should see :
>  * > 3
>  * > 7
>  *
>  * Note that the last input, '5', is necessary in this case. It seems Flink waits for
the first element
>  * of the next window to be received before further computing the window.
>  */
> public class ReadFromKafkaWriteWindowedSumIntoKafka {
> 	public static void main(String[] args) throws Exception {
> 		// create flink execution environment
> 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 		// parse user parameters
> 		ParameterTool parameterTool = ParameterTool.fromArgs(args);
> 		// add a kafka source which reads from 'topicIn'
> 		DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer082<>(
> 				parameterTool.getRequired("topicIn"), new SimpleStringSchema(), parameterTool.getProperties()));
> 		messageStream
> 				// group stream elements 2 by 2
> 				.window(new Count(2))
> 				// sum the windowed stream
> 				.foldWindow(0, new FoldFunction<String, Integer>() {
> 			@Override
> 			public Integer fold(Integer accumulator, String value) throws Exception {
> 				return accumulator + Integer.parseInt(value);
> 			}
> 		})
> 				// convert DiscretizedStream to DataStream
> 				.flatten()
> 				// convert stream data from Integer to String
> 				.map(new MapFunction<Integer, String>() {
> 			@Override
> 			public String map(Integer value) throws Exception {
> 				return Integer.toString(value);
> 			}
> 		})
> 				// add a kafka sink which writes into 'topicOut'
> 				.addSink(new KafkaSink<>(
> 						parameterTool.getRequired("bootstrap.servers"),
> 						parameterTool.getRequired("topicOut"),
> 						new WriteIntoKafka.SimpleStringSchema()));
> 		env.execute();
> 	}
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message