flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add ability to access and provider Kafka headers
Date Mon, 27 Aug 2018 17:57:30 GMT
alexeyt820 commented on a change in pull request #6615: [FLINK-8354] [flink-connectors] Add
ability to access and provider Kafka headers
URL: https://github.com/apache/flink/pull/6615#discussion_r213060361
 
 

 ##########
 File path: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java
 ##########
 @@ -350,4 +363,201 @@ public boolean isEndOfStream(Long nextElement) {
 		}
 	}
 
+	/**
+	 * Kafka 0.11 specific test, ensuring Kafka Headers are properly written to and read from
Kafka.
+	 */
+	@Test(timeout = 60000)
+	public void testHeaders() throws Exception {
+		final String topic = "headers-topic";
+		final long testSequenceLength = 127L;
+		createTestTopic(topic, 3, 1);
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		DataStream<Long> testSequence = env.addSource(new SourceFunction<Long>() {
+			private static final long serialVersionUID = 1L;
+			boolean running = true;
+
+			@Override
+			public void run(SourceContext<Long> ctx) throws Exception {
+				long i = 0;
+				while (running) {
+					ctx.collectWithTimestamp(i, i * 2);
+					if (i++ == testSequenceLength) {
+						running = false;
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				running = false;
+			}
+		});
+
+		FlinkKafkaProducer011<Long> producer = new FlinkKafkaProducer011<>(topic,
+			new TestHeadersKeyedSerializationSchema(topic), standardProps, Optional.empty());
+		testSequence.addSink(producer).setParallelism(3);
+		env.execute("Produce some data");
+
+		// Now let's consume data and check that headers deserialized correctly
+		env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+		FlinkKafkaConsumer011<TestHeadersElement> kafkaSource = new FlinkKafkaConsumer011<>(topic,
new TestHeadersKeyedDeserializationSchema(testSequenceLength), standardProps);
+
+		env.addSource(kafkaSource).addSink(new TestHeadersElementValid());
+		env.execute("Consume again");
+
+		deleteTestTopic(topic);
+	}
+
+	/**
+	 * Element consisting of key, value and headers represented as list of tuples: key, list
of Bytes.
+	 */
+	public static class TestHeadersElement extends Tuple3<Long, Byte, List<Tuple2<String,
List<Byte>>>> {
+
+	}
+
+	/**
+	 * Generate "headers" for given element.
+	 * @param element - sequence element
+	 * @return headers
+	 */
+	private static Iterable<Map.Entry<String, byte[]>> headersFor(Long element)
{
+		final long x = element;
+		return Arrays.asList(
+			new AbstractMap.SimpleImmutableEntry<>("low", new byte[]{
+				(byte) ((x >>> 8) & 0xFF),
+				(byte) ((x) & 0xFF)
+			}),
+			new AbstractMap.SimpleImmutableEntry<>("low", new byte[]{
+				(byte) ((x >>> 24) & 0xFF),
+				(byte) ((x >>> 16) & 0xFF)
+			}),
+			new AbstractMap.SimpleImmutableEntry<>("high", new byte[]{
+				(byte) ((x >>> 40) & 0xFF),
+				(byte) ((x >>> 32) & 0xFF)
+			}),
+			new AbstractMap.SimpleImmutableEntry<>("high", new byte[]{
+				(byte) ((x >>> 56) & 0xFF),
+				(byte) ((x >>> 48) & 0xFF)
+			})
+		);
+	}
+
+	/**
+	 * Convert headers into list of tuples representation. List of tuples is more convenient
to use in
+	 * assert expressions, because they have equals
+	 * @param headers - headers
+	 * @return list of tuples(string, list of Bytes)
+	 */
+	private static List<Tuple2<String, List<Byte>>> headersAsList(Iterable<Map.Entry<String,
byte[]>> headers) {
+		List<Tuple2<String, List<Byte>>> r = new ArrayList<>();
+		for (Map.Entry<String, byte[]> entry: headers) {
+			final Tuple2<String, List<Byte>> t = new Tuple2<>();
+			t.f0 = entry.getKey();
+			t.f1 = new ArrayList<>(entry.getValue().length);
+			for (byte b: entry.getValue()) {
+				t.f1.add(b);
+			}
+			r.add(t);
+		}
+		return r;
+	}
+
+	/**
+	 * Sink consuming TestHeadersElement, while consuming sink generates headers using
+	 * message value and validates that headers generated from message
+	 * are equal to headers in element, which were read from Kafka.
+	 */
+	private static class TestHeadersElementValid implements SinkFunction<TestHeadersElement>
{
+		private static final long serialVersionUID = 1L;
+		@Override
+		public void invoke(TestHeadersElement value, Context context) throws Exception {
+			// calculate Headers from message
+			final Iterable<Map.Entry<String, byte[]>> headers = headersFor(value.f0);
+			final List<Tuple2<String, List<Byte>>> expected = headersAsList(headers);
+			assertEquals(expected, value.f2);
+		}
+	}
+
+	/**
+	 * Serialization schema, which serialize given element as value, lowest element byte as
key,
+	 * low 32-bit integer is also stored as two "low" headers with 16-bit parts as headers values,
+	 * and similar high 32-bit integer is stored as two "high" headers, each 16-bit part is
"high" header value.
+	 */
+	private static class TestHeadersKeyedSerializationSchema implements KeyedSerializationSchema<Long>
{
+		private final String topic;
+
+		TestHeadersKeyedSerializationSchema(String topic) {
+			this.topic = Objects.requireNonNull(topic);
+		}
+
+		@Override
+		public byte[] serializeKey(Long element) {
+			return new byte[] { element.byteValue() };
+		}
+
+		@Override
+		public byte[] serializeValue(Long data) {
+			return data == null ? null : Longs.toByteArray(data);
+		}
+
+		@Override
+		public String getTargetTopic(Long element) {
+			return topic;
+		}
+
+		@Override
+		public Iterable<Map.Entry<String, byte[]>> headers(Long element) {
+			return headersFor(element);
+		}
+	}
+
+	/**
+	 * Deserialization schema for TestHeadersElement elements.
+	 */
+	private static class TestHeadersKeyedDeserializationSchema implements KeyedDeserializationSchema<TestHeadersElement>
{
+		private final long count;
+
+		TestHeadersKeyedDeserializationSchema(long count){
+			this.count = count;
+		}
+
+		@Override
+		public TypeInformation<TestHeadersElement> getProducedType() {
+			return TypeInformation.of(TestHeadersElement.class);
+		}
+
+		@Override
+		public TestHeadersElement deserialize(byte[] messageKey, byte[] message, String topic,
int partition, long offset) throws IOException {
+			final TestHeadersElement element = new TestHeadersElement();
+			element.f0 = Longs.fromByteArray(message);
+			element.f1 = messageKey[0];
+			element.f2 = new ArrayList<>(0);
+			return element;
+		}
+
+		@Override
+		public boolean isEndOfStream(TestHeadersElement nextElement) {
+			return nextElement.f0 >= count;
+		}
+
+		@Override
+		public TestHeadersElement deserialize(byte[] messageKey, byte[] message, String topic,
int partition, long offset, Iterable<Map.Entry<String, byte[]>> headers) throws
IOException {
 
 Review comment:
   Thank you. Will do

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message