flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] yanghua commented on a change in pull request #7470: [FLINK-11283] Accessing the key when processing connected keyed stream
Date Fri, 12 Apr 2019 03:26:38 GMT
yanghua commented on a change in pull request #7470: [FLINK-11283] Accessing the key when processing
connected keyed stream
URL: https://github.com/apache/flink/pull/7470#discussion_r274748632
 
 

 ##########
 File path: flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
 ##########
 @@ -591,6 +638,114 @@ public void processElement2(Integer value, Context ctx, Collector<Integer>
out)
 		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
 	}
 
+	/**
+	 * Test keyed KeyedCoProcessFunction side output with multiple consumers.
+	 */
+	@Test
+	public void testRealKeyedCoProcessFunctionSideOutputWithMultipleConsumers() throws Exception
{
+		final OutputTag<String> sideOutputTag1 = new OutputTag<String>("side1"){};
+		final OutputTag<String> sideOutputTag2 = new OutputTag<String>("side2"){};
+
+		TestListResultSink<String> sideOutputResultSink1 = new TestListResultSink<>();
+		TestListResultSink<String> sideOutputResultSink2 = new TestListResultSink<>();
+		TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+
+		StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+		see.setParallelism(3);
+
+		DataStream<Integer> ds1 = see.fromCollection(elements);
+		DataStream<Integer> ds2 = see.fromCollection(elements);
+
+		SingleOutputStreamOperator<Integer> passThroughtStream = ds1
+			.keyBy(i -> i)
+			.connect(ds2.keyBy(i -> i))
+			.process(new KeyedCoProcessFunction<Integer, Integer, Integer, Integer>() {
+				@Override
+				public void processElement1(Integer value, Context ctx, Collector<Integer> out)
+					throws Exception {
+					if (value < 4) {
+						out.collect(value);
+						ctx.output(sideOutputTag1, "sideout1-" + ctx.getCurrentKey() + "-" + String.valueOf(value));
+					}
+				}
+
+				@Override
+				public void processElement2(Integer value, Context ctx, Collector<Integer> out)
+					throws Exception {
+					if (value >= 4) {
+						out.collect(value);
+						ctx.output(sideOutputTag2, "sideout2-" + ctx.getCurrentKey() + "-" + String.valueOf(value));
+					}
+				}
+			});
+
+		passThroughtStream.getSideOutput(sideOutputTag1).addSink(sideOutputResultSink1);
+		passThroughtStream.getSideOutput(sideOutputTag2).addSink(sideOutputResultSink2);
+		passThroughtStream.addSink(resultSink);
+		see.execute();
+
+		assertEquals(Arrays.asList("sideout1-1-1", "sideout1-2-2", "sideout1-3-3"), sideOutputResultSink1.getSortedResult());
+		assertEquals(Arrays.asList("sideout2-4-4", "sideout2-5-5"), sideOutputResultSink2.getSortedResult());
+		assertEquals(Arrays.asList(1, 2, 3, 4, 5), resultSink.getSortedResult());
+	}
+
+	/**
+	 * Test keyed KeyedCoProcessFunction side output with multiple consumers.
+	 */
+	@Test
+	public void testRealKeyedCoProcessFunctionSideOutputWithMultipleConsumersAndDifferentTypes()
throws Exception {
 
 Review comment:
   um... Yes, I tried tested a tuple input type and whether it can invoke `keyBy`, `connect`
or not.
   OK, Let me remove this test.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message