flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzulitai <...@git.apache.org>
Subject [GitHub] flink pull request #4928: [FLINK-7732][kafka-consumer] Do not commit to kafk...
Date Wed, 01 Nov 2017 10:31:02 GMT
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4928#discussion_r148226195
  
    --- Diff: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
---
    @@ -242,10 +243,25 @@ public void addDiscoveredPartitions(List<KafkaTopicPartition>
newPartitions) thr
     	 * @param commitCallback The callback that the user should trigger when a commit request
completes or fails.
     	 * @throws Exception This method forwards exceptions.
     	 */
    -	public abstract void commitInternalOffsetsToKafka(
    +	public final void commitInternalOffsetsToKafka(
    +			Map<KafkaTopicPartition, Long> offsets,
    +			@Nonnull KafkaCommitCallback commitCallback) throws Exception {
    +		// Ignore sentinels. They might appear here if snapshot has started before actual offsets
values
    +		// replaced sentinels
    +		doCommitInternalOffsetsToKafka(filerOutSentinels(offsets), commitCallback);
    +	}
    +
    +	protected abstract void doCommitInternalOffsetsToKafka(
     			Map<KafkaTopicPartition, Long> offsets,
     			@Nonnull KafkaCommitCallback commitCallback) throws Exception;
     
    +	private Map<KafkaTopicPartition, Long> filerOutSentinels(Map<KafkaTopicPartition,
Long> offsets) {
    --- End diff --
    
    typo: `filterOutSentinels`, missing `t`.


---

Mime
View raw message