flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4723) Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer
Date Mon, 10 Oct 2016 10:26:21 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15561915#comment-15561915
] 

ASF GitHub Bot commented on FLINK-4723:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2580#discussion_r82576069
  
    --- Diff: flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
---
    @@ -208,6 +207,235 @@ public void runFailOnNoBrokerTest() throws Exception {
     			}
     		}
     	}
    +
    +	/**
    +	 * Ensures that the committed offsets to Kafka are the offsets of "the next record to
process"
    +	 */
    +	public void runCommitOffsetsToKafka() throws Exception {
    +		// 3 partitions with 50 records each (0-49, so the expected commit offset of each partition
should be 50)
    +		final int parallelism = 3;
    +		final int recordsInEachPartition = 50;
    +
    +		final String topicName = writeSequence("testCommitOffsetsToKafkaTopic", recordsInEachPartition,
parallelism, 1);
    +
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
flinkPort);
    +		env.getConfig().disableSysoutLogging();
    +		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
    +		env.setParallelism(parallelism);
    +		env.enableCheckpointing(200);
    +
    +		DataStream<String> stream = env.addSource(kafkaServer.getConsumer(topicName,
new SimpleStringSchema(), standardProps));
    +		stream.addSink(new DiscardingSink<String>());
    +
    +		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
    +		final Thread runner = new Thread("runner") {
    +			@Override
    +			public void run() {
    +				try {
    +					env.execute();
    +				}
    +				catch (Throwable t) {
    +					if (!(t.getCause() instanceof JobCancellationException)) {
    +						errorRef.set(t);
    +					}
    +				}
    +			}
    +		};
    +		runner.start();
    +
    +		final Long l50 = 50L; // the final committed offset in Kafka should be 50
    +		final long deadline = 30000 + System.currentTimeMillis();
    +
    +		KafkaTestEnvironment.KafkaOffsetHandler kafkaOffsetHandler = kafkaServer.createOffsetHandler(standardProps);
    +
    +		do {
    +			Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
    +			Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
    +			Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
    +
    +			if (l50.equals(o1) && l50.equals(o2) && l50.equals(o3)) {
    +				break;
    +			}
    +
    +			Thread.sleep(100);
    +		}
    +		while (System.currentTimeMillis() < deadline);
    +
    +		// cancel the job
    +		JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
    +
    +		final Throwable t = errorRef.get();
    +		if (t != null) {
    +			throw new RuntimeException("Job failed with an exception", t);
    +		}
    +
    +		// final check to see if offsets are correctly in Kafka
    +		Long o1 = kafkaOffsetHandler.getCommittedOffset(topicName, 0);
    +		Long o2 = kafkaOffsetHandler.getCommittedOffset(topicName, 1);
    +		Long o3 = kafkaOffsetHandler.getCommittedOffset(topicName, 2);
    +		Assert.assertEquals(Long.valueOf(50L), o1);
    +		Assert.assertEquals(Long.valueOf(50L), o2);
    +		Assert.assertEquals(Long.valueOf(50L), o3);
    +
    +		kafkaOffsetHandler.close();
    +		deleteTestTopic(topicName);
    +	}
    +
    +	/**
    +	 * This test first writes a total of 200 records to a test topic, reads the first 100
so that some offsets are
    --- End diff --
    
    300, 150


> Unify behaviour of committed offsets to Kafka / ZK for Kafka 0.8 and 0.9 consumer
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-4723
>                 URL: https://issues.apache.org/jira/browse/FLINK-4723
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>             Fix For: 1.2.0, 1.1.3
>
>
> The proper "behaviour" of the offsets committed back to Kafka / ZK should be "the next
offset that consumers should read (in Kafka terms, the 'position')".
> This is already fixed for the 0.9 consumer by FLINK-4618, by incrementing the committed
offsets back to Kafka by the 0.9 by 1, so that the internal {{KafkaConsumer}} picks up the
correct start position when committed offsets are present. This fix was required because the
start position from committed offsets was implicitly determined with Kafka 0.9 APIs.
> However, since the 0.8 consumer handles offset committing and start position using Flink's
own {{ZookeeperOffsetHandler}} and not Kafka's high-level APIs, the 0.8 consumer did not require
a fix.
> I propose to still unify the behaviour of committed offsets across 0.8 and 0.9 to the
definition above.
> Otherwise, if users in any case first uses the 0.8 consumer to read data and have Flink-committed
offsets in ZK, and then uses a high-level 0.8 Kafka consumer to read the same topic in a non-Flink
application, the first record will be duplicate (because, like described above, Kafka high-level
consumers expect the committed offsets to be "the next record to process" and not "the last
processed record").
> This requires incrementing the committed ZK offsets in 0.8 to also be incremented by
1, and changing how Flink internal offsets are initialized with accordance to the acquired
ZK offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message