storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [2/3] storm git commit: STORM-2913: Add metadata to at-most-once and at-least-once commits
Date Mon, 05 Feb 2018 21:58:09 GMT
STORM-2913: Add metadata to at-most-once and at-least-once commits


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/03915f0a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/03915f0a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/03915f0a

Branch: refs/heads/1.x-branch
Commit: 03915f0aefa49c3115e9b66ec3f98b0914dc9433
Parents: b56c30d
Author: Stig Rohde Døssing <srdo@apache.org>
Authored: Sat Jan 27 15:15:45 2018 +0100
Committer: Stig Rohde Døssing <srdo@apache.org>
Committed: Mon Feb 5 22:55:54 2018 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 87 ++++++-------------
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  4 +-
 .../spout/internal/CommitMetadataManager.java   | 91 ++++++++++++++++++++
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |  8 --
 .../spout/KafkaSpoutMessagingGuaranteeTest.java | 75 +++++++++-------
 5 files changed, 166 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/03915f0a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 27940d0..4464e68 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -23,12 +23,8 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import com.google.common.annotations.VisibleForTesting;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -55,7 +51,7 @@ import org.apache.kafka.common.errors.RetriableException;
 
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
-import org.apache.storm.kafka.spout.internal.CommitMetadata;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
@@ -75,7 +71,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     //Initial delay for the commit and subscription refresh timers
     public static final long TIMER_DELAY_MS = 500;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
-    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
 
     // Storm
     protected SpoutOutputCollector collector;
@@ -107,8 +102,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     // Triggers when a subscription should be refreshed
     private transient Timer refreshSubscriptionTimer;
     private transient TopologyContext context;
-    // Metadata information to commit to Kafka. It is unique per spout per topology.
-    private transient String commitMetadata;
+    private transient CommitMetadataManager commitMetadataManager;
     private transient KafkaOffsetMetric kafkaOffsetMetric;
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
@@ -145,7 +139,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         offsetManagers = new HashMap<>();
         emitted = new HashSet<>();
         waitingToEmit = new HashMap<>();
-        setCommitMetadata(context);
+        commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());
 
         tupleListener.open(conf, context);
         if (canRegisterMetrics()) {
@@ -160,7 +154,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         kafkaOffsetMetric = new KafkaOffsetMetric(new Supplier() {
             @Override
             public Object get() {
-                return offsetManagers;
+                return Collections.unmodifiableMap(offsetManagers);
             }
         }, new Supplier() {
             @Override
@@ -181,16 +175,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return true;
     }
 
-    private void setCommitMetadata(TopologyContext context) {
-        try {
-            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
-                context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
-        } catch (JsonProcessingException e) {
-            LOG.error("Failed to create Kafka commit metadata due to JSON serialization error",
e);
-            throw new RuntimeException(e);
-        }
-    }
-
     private boolean isAtLeastOnceProcessing() {
         return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
     }
@@ -228,8 +212,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 retryService.retainAll(partitions);
 
                 /*
-                 * Emitted messages for partitions that are no longer assigned to this spout
can't
-                 * be acked and should not be retried, hence remove them from emitted collection.
+                 * Emitted messages for partitions that are no longer assigned to this spout
can't be acked and should not be retried, hence
+                 * remove them from emitted collection.
                  */
                 Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator();
                 while (msgIdIterator.hasNext()) {
@@ -265,7 +249,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
             if (committedOffset != null) {
                 // offset was previously committed for this consumer group and topic-partition,
either by this or another topology.
-                if (isOffsetCommittedByThisTopology(newTp, committedOffset)) {
+                if (commitMetadataManager.isOffsetCommittedByThisTopology(newTp, committedOffset,
Collections.unmodifiableMap(offsetManagers))) {
                     // Another KafkaSpout instance (of this topology) already committed,
therefore FirstPollOffsetStrategy does not apply.
                     kafkaConsumer.seek(newTp, committedOffset.offset());
                 } else {
@@ -293,31 +277,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
     }
 
-    /**
-     * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance
in this topology. This info is used to decide if
-     * {@link FirstPollOffsetStrategy} should be applied
-     *
-     * @param tp topic-partition
-     * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
-     * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
-     */
-    private boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata
committedOffset) {
-        try {
-            if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).hasCommitted())
{
-                return true;
-            }
-
-            final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(),
CommitMetadata.class);
-            return committedMetadata.getTopologyId().equals(context.getStormId());
-        } catch (IOException e) {
-            LOG.warn("Failed to deserialize [{}]. Error likely occurred because the last
commit "
-                + "for this topic-partition was done using an earlier version of Storm. "
-                + "Defaulting to behavior compatible with earlier version", committedOffset);
-            LOG.trace("", e);
-            return false;
-        }
-    }
-
     // ======== Next Tuple =======
     @Override
     public void nextTuple() {
@@ -328,9 +287,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
             if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
                 if (isAtLeastOnceProcessing()) {
-                commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+                    commitOffsetsForAckedTuples(kafkaConsumer.assignment());
                 } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE)
{
-                    commitFetchedOffsetsAsync(kafkaConsumer.assignment());
+                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+                        createFetchedOffsetsMetadata(kafkaConsumer.assignment());
+                    kafkaConsumer.commitAsync(offsetsToCommit, null);
+                    LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
                 }
             }
 
@@ -424,7 +386,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 numPolledRecords);
             if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
{
                 //Commit polled records immediately to ensure delivery is at-most-once.
-                kafkaConsumer.commitSync();
+                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+                    createFetchedOffsetsMetadata(kafkaConsumer.assignment());
+                kafkaConsumer.commitSync(offsetsToCommit);
+                LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
             }
             return consumerRecords;
         } finally {
@@ -497,11 +462,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
         } else {
             final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
-            if (committedOffset != null && isOffsetCommittedByThisTopology(tp, committedOffset)
-                && committedOffset.offset() > record.offset()) {
+            if (isAtLeastOnceProcessing()
+                && committedOffset != null 
+                && committedOffset.offset() > record.offset()
+                && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset,
Collections.unmodifiableMap(offsetManagers))) {
                 // Ensures that after a topology with this id is started, the consumer fetch
                 // position never falls behind the committed offset (STORM-2844)
-                throw new IllegalStateException("Attempting to emit a message that has already
been committed.");
+                throw new IllegalStateException("Attempting to emit a message that has already
been committed."
+                    + " This should never occur when using the at-least-once processing guarantee.");
             }
 
             final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
@@ -547,13 +515,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
     }
 
-    private void commitFetchedOffsetsAsync(Set<TopicPartition> assignedPartitions)
{
+    private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition>
assignedPartitions) {
         Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
         for (TopicPartition tp : assignedPartitions) {
-            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp)));
+            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata()));
         }
-        kafkaConsumer.commitAsync(offsetsToCommit, null);
-        LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
+        return offsetsToCommit;
     }
     
     private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions)
{
@@ -567,7 +534,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
         for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet())
{
-            final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadata);
+            final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
             if (nextCommitOffset != null) {
                 nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
             }
@@ -778,6 +745,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @VisibleForTesting
     KafkaOffsetMetric getKafkaOffsetMetric() {
-        return  kafkaOffsetMetric;
+        return kafkaOffsetMetric;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/03915f0a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index cbe52a7..7aa836c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -758,7 +758,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
                  * error, rather than seeking to the latest (Kafka's default). This type
of error will typically happen when the consumer
                  * requests an offset that was deleted.
                  */
-                LOG.info("Setting consumer property '{}' to 'earliest' to ensure at-least-once
processing",
+                LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once
processing",
                     ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
                 builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
             } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none"))
{
@@ -772,7 +772,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
                     + " Some messages may be processed more than once.");
             }
         }
-        LOG.info("Setting consumer property '{}' to 'false', because the spout does not support
auto-commit",
+        LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does
not support auto-commit",
             ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
         builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/03915f0a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
new file mode 100644
index 0000000..a63619c
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public final class CommitMetadataManager {
+
+    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadataManager.class);
+    // Metadata information to commit to Kafka. It is unique per spout instance.
+    private final String commitMetadata;
+    private final ProcessingGuarantee processingGuarantee;
+    private final TopologyContext context;
+
+    /**
+     * Create a manager with the given context.
+     */
+    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee)
{
+        this.context = context;
+        try {
+            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
+                context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
+            this.processingGuarantee = processingGuarantee;
+        } catch (JsonProcessingException e) {
+            LOG.error("Failed to create Kafka commit metadata due to JSON serialization error",
e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance
in this topology.
+     *
+     * @param tp The topic partition the commit metadata belongs to.
+     * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
+     * @param offsetManagers The offset managers.
+     * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
+     */
+    public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset,
+        Map<TopicPartition, OffsetManager> offsetManagers) {
+        try {
+            if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+                && offsetManagers.containsKey(tp)
+                && offsetManagers.get(tp).hasCommitted()) {
+                return true;
+            }
+
+            final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(),
CommitMetadata.class);
+            return committedMetadata.getTopologyId().equals(context.getStormId());
+        } catch (IOException e) {
+            LOG.warn("Failed to deserialize expected commit metadata [{}]."
+                + " This error is expected to occur once per partition, if the last commit
to each partition"
+                + " was by an earlier version of the KafkaSpout, or by a process other than
the KafkaSpout. "
+                + "Defaulting to behavior compatible with earlier version", committedOffset);
+            LOG.trace("", e);
+            return false;
+        }
+    }
+
+    public String getCommitMetadata() {
+        return commitMetadata;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/03915f0a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index 91a7be6..90e906b 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -238,12 +238,4 @@ public class KafkaSpoutConfigTest {
 
         assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
     }
-    
-    @Test
-    public void testThrowsIfEnableAutoCommitIsSet() {
-        expectedException.expect(IllegalStateException.class);
-        KafkaSpoutConfig.builder("localhost:1234", "topic")
-            .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
-            .build();
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/03915f0a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index a20f706..082cc58 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -21,9 +21,9 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.inOrder;
@@ -36,13 +36,16 @@ import static org.mockito.Mockito.when;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
@@ -51,6 +54,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
 import org.mockito.Captor;
 import org.mockito.InOrder;
 import org.mockito.runners.MockitoJUnitRunner;
@@ -60,7 +64,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
 
     @Captor
     private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
-    
+
     private final TopologyContext contextMock = mock(TopologyContext.class);
     private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
     private final Map<String, Object> conf = new HashMap<>();
@@ -85,11 +89,18 @@ public class KafkaSpoutMessagingGuaranteeTest {
 
         spout.nextTuple();
 
+        when(consumerMock.position(partition)).thenReturn(1L);
+
         //The spout should have emitted the tuple, and must have committed it before emit
         InOrder inOrder = inOrder(consumerMock, collectorMock);
         inOrder.verify(consumerMock).poll(anyLong());
-        inOrder.verify(consumerMock).commitSync();
+        inOrder.verify(consumerMock).commitSync(commitCapture.capture());
         inOrder.verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM),
anyList());
+
+        CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock, KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE);
+        Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+        assertThat(committedOffsets.get(partition).offset(), is(0L));
+        assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
     }
 
     private void doTestModeDisregardsMaxUncommittedOffsets(KafkaSpoutConfig<String, String>
spoutConfig) {
@@ -172,70 +183,76 @@ public class KafkaSpoutMessagingGuaranteeTest {
         doTestModeCannotReplayTuples(spoutConfig);
     }
 
-    private void doTestModeDoesNotCommitAckedTuples(KafkaSpoutConfig<String, String>
spoutConfig) {
+    @Test
+    public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once
mode because they were committed before being emitted
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class),
-1)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
+            .setTupleTrackingEnforced(true)
+            .build();
         try (SimulatedTime time = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig,
conf, contextMock, collectorMock, consumerMock,partition);
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig,
conf, contextMock, collectorMock, consumerMock, partition);
 
             when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
                 SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition,
0, 1))));
 
             spout.nextTuple();
+            reset(consumerMock);
 
             ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
             verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(),
msgIdCaptor.capture());
             assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
 
             spout.ack(msgIdCaptor.getValue());
-            
+
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
-            
+
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition,
List<ConsumerRecord<String, String>>>emptyMap()));
+
             spout.nextTuple();
-            
-            verify(consumerMock, never()).commitSync(any(Map.class));
+
+            verify(consumerMock, never()).commitSync(argThat(new ArgumentMatcher<Map<TopicPartition,
OffsetAndMetadata>>() {
+                @Override
+                public boolean matches(Object arg) {
+                    Map<TopicPartition, OffsetAndMetadata> castArg = (Map<TopicPartition,
OffsetAndMetadata>) arg;
+                    return !castArg.containsKey(partition);
+                }
+            }));
         }
     }
 
     @Test
-    public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
-        //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once
mode because they were committed before being emitted
-        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class),
-1)
-            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
-            .setTupleTrackingEnforced(true)
-            .build();
-        doTestModeDoesNotCommitAckedTuples(spoutConfig);
-    }
-    
-    @Test
     public void testNoGuaranteeModeCommitsPolledTuples() throws Exception {
         //When using the no guarantee mode, the spout must commit tuples periodically, regardless
of whether they've been acked
         KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class),
-1)
             .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .setTupleTrackingEnforced(true)
             .build();
-        
+
         try (SimulatedTime time = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig,
conf, contextMock, collectorMock, consumerMock,partition);
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig,
conf, contextMock, collectorMock, consumerMock, partition);
 
             when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
                 SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition,
0, 1))));
 
             spout.nextTuple();
-            
+
             when(consumerMock.position(partition)).thenReturn(1L);
 
             ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
             verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(),
msgIdCaptor.capture());
             assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
-            
+
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
-            
+
             spout.nextTuple();
-            
+
             verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class));
-            
-            Map<TopicPartition, OffsetAndMetadata> commit = commitCapture.getValue();
-            assertThat(commit.containsKey(partition), is(true));
-            assertThat(commit.get(partition).offset(), is(1L));
+
+            CommitMetadataManager metadataManager = new CommitMetadataManager(contextMock,
KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE);
+            Map<TopicPartition, OffsetAndMetadata> committedOffsets = commitCapture.getValue();
+            assertThat(committedOffsets.get(partition).offset(), is(1L));
+            assertThat(committedOffsets.get(partition).metadata(), is(metadataManager.getCommitMetadata()));
         }
     }
 


Mime
View raw message