storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [07/14] storm git commit: STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch: copied external/storm-kafka-client from 1.x-branch
Date Thu, 08 Feb 2018 09:05:08 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index c72e70b..4464e68 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -18,308 +18,523 @@
 
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
+import java.util.Map.Entry;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
 
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import com.google.common.base.Supplier;
+import org.apache.commons.lang.Validate;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.RetriableException;
+
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.kafka.spout.internal.Timer;
+import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KafkaSpout<K, V> extends BaseRichSpout {
+
+    private static final long serialVersionUID = 4151921085047987154L;
+    //Initial delay for the commit and subscription refresh timers
+    public static final long TIMER_DELAY_MS = 500;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
-    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
 
     // Storm
     protected SpoutOutputCollector collector;
 
     // Kafka
     private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
-    private final KafkaConsumerFactory kafkaConsumerFactory;
+    private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
     private transient KafkaConsumer<K, V> kafkaConsumer;
-    private transient boolean consumerAutoCommitMode;
-
 
     // Bookkeeping
-    private transient int maxRetries;                                   // Max number of times a tuple is retried
-    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  // Strategy to determine the fetch offset of the first realized by the spout upon activation
-    private transient KafkaSpoutRetryService retryService;              // Class that has the logic to handle tuple failure
-    private transient Timer commitTimer;                                // timer == null for auto commit mode
-    private transient boolean initialized;                              // Flag indicating that the spout is still undergoing initialization process.
+    // Strategy to determine the fetch offset of the first realized by the spout upon activation
+    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
+    // Class that has the logic to handle tuple failure.
+    private transient KafkaSpoutRetryService retryService;
+    // Handles tuple events (emit, ack etc.)
+    private transient KafkaTupleListener tupleListener;
+    // timer == null only if the processing guarantee is at-most-once
+    private transient Timer commitTimer;
     // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
-    private KafkaSpoutStreams kafkaSpoutStreams;                        // Object that wraps all the logic to declare output fields and emit tuples
-    private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;      // Object that contains the logic to build tuples for each ConsumerRecord
-
-    private transient Map<TopicPartition, OffsetEntry> acked;           // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
-    private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
-    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;         // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
-    private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed
-
+    // Tuples that were successfully acked/emitted. These tuples will be committed periodically when the commit timer expires,
+    // or after a consumer rebalance, or during close/deactivate. Always empty if processing guarantee is none or at-most-once.
+    private transient Map<TopicPartition, OffsetManager> offsetManagers;
+    // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed.
+    // Always empty if processing guarantee is none or at-most-once
+    private transient Set<KafkaSpoutMessageId> emitted;
+    // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
+    private transient Map<TopicPartition, List<ConsumerRecord<K, V>>> waitingToEmit;
+    // Triggers when a subscription should be refreshed
+    private transient Timer refreshSubscriptionTimer;
+    private transient TopologyContext context;
+    private transient CommitMetadataManager commitMetadataManager;
+    private transient KafkaOffsetMetric kafkaOffsetMetric;
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
-        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
+        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<K, V>());
     }
-    
-    //This constructor is here for testing
+
+    @VisibleForTesting
     KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
-        this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass in configuration
-        this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
         this.kafkaConsumerFactory = kafkaConsumerFactory;
+        this.kafkaSpoutConfig = kafkaSpoutConfig;
     }
 
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        initialized = false;
+        this.context = context;
 
         // Spout internals
         this.collector = collector;
-        maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
-        numUncommittedOffsets = 0;
 
         // Offset management
         firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
-        consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
 
         // Retries management
         retryService = kafkaSpoutConfig.getRetryService();
 
-        // Tuples builder delegate
-        tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder();
+        tupleListener = kafkaSpoutConfig.getTupleListener();
 
-        if (!consumerAutoCommitMode) {     // If it is auto commit, no need to commit offsets manually
-            commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
+        if (kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
+            // In at-most-once mode the offsets are committed after every poll, and not periodically as controlled by the timer
+            commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
         }
+        refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
 
-        acked = new HashMap<>();
+        offsetManagers = new HashMap<>();
         emitted = new HashSet<>();
-        waitingToEmit = Collections.emptyListIterator();
+        waitingToEmit = new HashMap<>();
+        commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());
+
+        tupleListener.open(conf, context);
+        if (canRegisterMetrics()) {
+            registerMetric();
+        }
 
         LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
     }
 
-    // =========== Consumer Rebalance Listener - On the same thread as the caller ===========
+    private void registerMetric() {
+        LOG.info("Registering Spout Metrics");
+        kafkaOffsetMetric = new KafkaOffsetMetric(new Supplier() {
+            @Override
+            public Object get() {
+                return Collections.unmodifiableMap(offsetManagers);
+            }
+        }, new Supplier() {
+            @Override
+            public Object get() {
+                return kafkaConsumer;
+            }
+        });
+        context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
+    }
 
+    private boolean canRegisterMetrics() {
+        try {
+            KafkaConsumer.class.getDeclaredMethod("beginningOffsets", Collection.class);
+        } catch (NoSuchMethodException e) {
+            LOG.warn("Minimum required kafka-clients library version to enable metrics is 0.10.1.0. Disabling spout metrics.");
+            return false;
+        }
+        return true;
+    }
+
+    private boolean isAtLeastOnceProcessing() {
+        return kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE;
+    }
+
+    // =========== Consumer Rebalance Listener - On the same thread as the caller ===========
     private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
+
+        private Collection<TopicPartition> previousAssignment = new HashSet<>();
+
         @Override
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            previousAssignment = partitions;
+
             LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
-                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
-            if (!consumerAutoCommitMode && initialized) {
-                initialized = false;
-                commitOffsetsForAckedTuples();
+                kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+
+            if (isAtLeastOnceProcessing()) {
+                commitOffsetsForAckedTuples(new HashSet<>(partitions));
             }
         }
 
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
-                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+            LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]",
+                context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
 
             initialize(partitions);
+            tupleListener.onPartitionsReassigned(partitions);
         }
 
         private void initialize(Collection<TopicPartition> partitions) {
-            if (!consumerAutoCommitMode) {
-                acked.keySet().retainAll(partitions);   // remove from acked all partitions that are no longer assigned to this spout
-            }
-
-            retryService.retainAll(partitions);
-            
-            //Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted.
-            Set<TopicPartition> partitionsSet = new HashSet(partitions);
-            Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator();
-            while (msgIdIterator.hasNext()) {
-                KafkaSpoutMessageId msgId = msgIdIterator.next();
-                if (!partitionsSet.contains(msgId.getTopicPartition())) {
-                    msgIdIterator.remove();
+            if (isAtLeastOnceProcessing()) {
+                // remove offsetManagers for all partitions that are no longer assigned to this spout
+                offsetManagers.keySet().retainAll(partitions);
+                retryService.retainAll(partitions);
+
+                /*
+                 * Emitted messages for partitions that are no longer assigned to this spout can't be acked and should not be retried, hence
+                 * remove them from emitted collection.
+                 */
+                Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator();
+                while (msgIdIterator.hasNext()) {
+                    KafkaSpoutMessageId msgId = msgIdIterator.next();
+                    if (!partitions.contains(msgId.getTopicPartition())) {
+                        msgIdIterator.remove();
+                    }
                 }
             }
-
-            for (TopicPartition tp : partitions) {
-                final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
-                final long fetchOffset = doSeek(tp, committedOffset);
-                setAcked(tp, fetchOffset);
+            waitingToEmit.keySet().retainAll(partitions);
+
+            Set<TopicPartition> newPartitions = new HashSet<>(partitions);
+            newPartitions.removeAll(previousAssignment);
+            for (TopicPartition newTp : newPartitions) {
+                final OffsetAndMetadata committedOffset = kafkaConsumer.committed(newTp);
+                final long fetchOffset = doSeek(newTp, committedOffset);
+                LOG.debug("Set consumer position to [{}] for topic-partition [{}] with [{}] and committed offset [{}]",
+                    fetchOffset, newTp, firstPollOffsetStrategy, committedOffset);
+                // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
+                if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(newTp)) {
+                    offsetManagers.put(newTp, new OffsetManager(newTp, fetchOffset));
+                }
             }
-            initialized = true;
             LOG.info("Initialization complete");
         }
 
         /**
-         * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset
+         * Sets the cursor to the location dictated by the first poll strategy and returns the fetch offset.
          */
-        private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) {
-            long fetchOffset;
-            if (committedOffset != null) {             // offset was committed for this TopicPartition
-                if (firstPollOffsetStrategy.equals(EARLIEST)) {
-                    kafkaConsumer.seekToBeginning(tp);
-                    fetchOffset = kafkaConsumer.position(tp);
-                } else if (firstPollOffsetStrategy.equals(LATEST)) {
-                    kafkaConsumer.seekToEnd(tp);
-                    fetchOffset = kafkaConsumer.position(tp);
+        private long doSeek(TopicPartition newTp, OffsetAndMetadata committedOffset) {
+            LOG.trace("Seeking offset for topic-partition [{}] with [{}] and committed offset [{}]",
+                newTp, firstPollOffsetStrategy, committedOffset);
+
+            if (committedOffset != null) {
+                // offset was previously committed for this consumer group and topic-partition, either by this or another topology.
+                if (commitMetadataManager.isOffsetCommittedByThisTopology(newTp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {
+                    // Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply.
+                    kafkaConsumer.seek(newTp, committedOffset.offset());
                 } else {
-                    // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset.
-                    fetchOffset = committedOffset.offset() + 1;
-                    kafkaConsumer.seek(tp, fetchOffset);
+                    // offset was not committed by this topology, therefore FirstPollOffsetStrategy applies
+                    // (only when the topology is first deployed).
+                    if (firstPollOffsetStrategy.equals(EARLIEST)) {
+                        kafkaConsumer.seekToBeginning(Collections.singleton(newTp));
+                    } else if (firstPollOffsetStrategy.equals(LATEST)) {
+                        kafkaConsumer.seekToEnd(Collections.singleton(newTp));
+                    } else {
+                        // Resume polling at the last committed offset, i.e. the first offset that is not marked as processed.
+                        kafkaConsumer.seek(newTp, committedOffset.offset());
+                    }
                 }
-            } else {    // no commits have ever been done, so start at the beginning or end depending on the strategy
+            } else {
+                // no offset commits have ever been done for this consumer group and topic-partition,
+                // so start at the beginning or end depending on FirstPollOffsetStrategy
                 if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
-                    kafkaConsumer.seekToBeginning(tp);
+                    kafkaConsumer.seekToBeginning(Collections.singleton(newTp));
                 } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
-                    kafkaConsumer.seekToEnd(tp);
+                    kafkaConsumer.seekToEnd(Collections.singleton(newTp));
                 }
-                fetchOffset = kafkaConsumer.position(tp);
             }
-            return fetchOffset;
-        }
-    }
-
-    private void setAcked(TopicPartition tp, long fetchOffset) {
-        // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
-        if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
-            acked.put(tp, new OffsetEntry(tp, fetchOffset));
+            return kafkaConsumer.position(newTp);
         }
     }
 
     // ======== Next Tuple =======
-
     @Override
     public void nextTuple() {
-        if (initialized) {
-            if (commit()) {
-                commitOffsetsForAckedTuples();
+        try {
+            if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
+                kafkaSpoutConfig.getSubscription().refreshAssignment();
             }
 
-            if (poll()) {
-                setWaitingToEmit(pollKafkaBroker());
+            if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
+                if (isAtLeastOnceProcessing()) {
+                    commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+                } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
+                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+                        createFetchedOffsetsMetadata(kafkaConsumer.assignment());
+                    kafkaConsumer.commitAsync(offsetsToCommit, null);
+                    LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
+                }
             }
 
-            if (waitingToEmit()) {
-                emit();
+            PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
+            if (pollablePartitionsInfo.shouldPoll()) {
+                try {
+                    setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
+                } catch (RetriableException e) {
+                    LOG.error("Failed to poll from kafka.", e);
+                }
             }
-        } else {
-            LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
+
+            emitIfWaitingNotEmitted();
+        } catch (InterruptException e) {
+            throwKafkaConsumerInterruptedException();
         }
     }
 
-    private boolean commit() {
-        return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
+    private void throwKafkaConsumerInterruptedException() {
+        //Kafka throws their own type of exception when interrupted.
+        //Throw a new Java InterruptedException to ensure Storm can recognize the exception as a reaction to an interrupt.
+        throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted"));
     }
 
-    private boolean poll() {
-        final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
-        final boolean poll = !waitingToEmit() && numUncommittedOffsets < maxUncommittedOffsets;
+    private PollablePartitionsInfo getPollablePartitionsInfo() {
+        if (isWaitingToEmit()) {
+            LOG.debug("Not polling. Tuples waiting to be emitted.");
+            return new PollablePartitionsInfo(Collections.<TopicPartition>emptySet(), Collections.<TopicPartition, Long>emptyMap());
+        }
 
-        if (!poll) {
-            if (waitingToEmit()) {
-                LOG.debug("Not polling. Tuples waiting to be emitted. [{}] uncommitted offsets across all topic partitions", numUncommittedOffsets);
-            }
+        Set<TopicPartition> assignment = kafkaConsumer.assignment();
+        if (!isAtLeastOnceProcessing()) {
+            return new PollablePartitionsInfo(assignment, Collections.<TopicPartition, Long>emptyMap());
+        }
 
-            if (numUncommittedOffsets >= maxUncommittedOffsets) {
-                LOG.debug("Not polling. [{}] uncommitted offsets across all topic partitions has reached the threshold of [{}]", numUncommittedOffsets, maxUncommittedOffsets);
+        Map<TopicPartition, Long> earliestRetriableOffsets = retryService.earliestRetriableOffsets();
+        Set<TopicPartition> pollablePartitions = new HashSet<>();
+        final int maxUncommittedOffsets = kafkaSpoutConfig.getMaxUncommittedOffsets();
+        for (TopicPartition tp : assignment) {
+            OffsetManager offsetManager = offsetManagers.get(tp);
+            int numUncommittedOffsets = offsetManager.getNumUncommittedOffsets();
+            if (numUncommittedOffsets < maxUncommittedOffsets) {
+                //Allow poll if the partition is not at the maxUncommittedOffsets limit
+                pollablePartitions.add(tp);
+            } else {
+                long offsetAtLimit = offsetManager.getNthUncommittedOffsetAfterCommittedOffset(maxUncommittedOffsets);
+                Long earliestRetriableOffset = earliestRetriableOffsets.get(tp);
+                if (earliestRetriableOffset != null && earliestRetriableOffset <= offsetAtLimit) {
+                    //Allow poll if there are retriable tuples within the maxUncommittedOffsets limit
+                    pollablePartitions.add(tp);
+                } else {
+                    LOG.debug("Not polling on partition [{}]. It has [{}] uncommitted offsets, which exceeds the limit of [{}]. ", tp,
+                        numUncommittedOffsets, maxUncommittedOffsets);
+                }
             }
         }
-        return poll;
+        return new PollablePartitionsInfo(pollablePartitions, earliestRetriableOffsets);
     }
 
-    private boolean waitingToEmit() {
-        return waitingToEmit != null && waitingToEmit.hasNext();
+    private boolean isWaitingToEmit() {
+        for (List<ConsumerRecord<K, V>> value : waitingToEmit.values()) {
+            if (!value.isEmpty()) {
+                return true;
+            }
+        }
+        return false;
     }
 
-    public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
-        List<ConsumerRecord<K,V>> waitingToEmitList = new LinkedList<>();
+    private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
         for (TopicPartition tp : consumerRecords.partitions()) {
-            waitingToEmitList.addAll(consumerRecords.records(tp));
+            waitingToEmit.put(tp, new ArrayList<>(consumerRecords.records(tp)));
         }
-        waitingToEmit = waitingToEmitList.iterator();
     }
 
     // ======== poll =========
-    private ConsumerRecords<K, V> pollKafkaBroker() {
-        doSeekRetriableTopicPartitions();
-
-        final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
-        final int numPolledRecords = consumerRecords.count();
-        LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets);
-        return consumerRecords;
+    private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
+        doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
+        Set<TopicPartition> pausedPartitions = new HashSet<>(kafkaConsumer.assignment());
+        Iterator<TopicPartition> pausedIter = pausedPartitions.iterator();
+        while (pausedIter.hasNext()) {
+            if (pollablePartitionsInfo.pollablePartitions.contains(pausedIter.next())) {
+                pausedIter.remove();
+            }
+        }
+        try {
+            kafkaConsumer.pause(pausedPartitions);
+            final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+            ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
+            final int numPolledRecords = consumerRecords.count();
+            LOG.debug("Polled [{}] records from Kafka",
+                numPolledRecords);
+            if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
+                //Commit polled records immediately to ensure delivery is at-most-once.
+                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
+                    createFetchedOffsetsMetadata(kafkaConsumer.assignment());
+                kafkaConsumer.commitSync(offsetsToCommit);
+                LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
+            }
+            return consumerRecords;
+        } finally {
+            kafkaConsumer.resume(pausedPartitions);
+        }
     }
 
-    private void doSeekRetriableTopicPartitions() {
-        final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
+    private void doSeekRetriableTopicPartitions(Map<TopicPartition, Long> pollableEarliestRetriableOffsets) {
+        for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : pollableEarliestRetriableOffsets.entrySet()) {
+            //Seek directly to the earliest retriable message for each retriable topic partition
+            kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
+        }
+    }
 
-        for (TopicPartition rtp : retriableTopicPartitions) {
-            final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
-            if (offsetAndMeta != null) {
-                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
-            } else {
-                kafkaConsumer.seekToEnd(rtp);    // Seek to last committed offset
+    private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> earliestRetriableOffsets,
+        ConsumerRecords<K, V> consumerRecords) {
+        for (Entry<TopicPartition, Long> entry : earliestRetriableOffsets.entrySet()) {
+            TopicPartition tp = entry.getKey();
+            List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
+            if (!records.isEmpty()) {
+                ConsumerRecord<K, V> record = records.get(0);
+                long seekOffset = entry.getValue();
+                long earliestReceivedOffset = record.offset();
+                if (seekOffset < earliestReceivedOffset) {
+                    //Since we asked for tuples starting at seekOffset, some retriable records must have been compacted away.
+                    //Ack up to the first offset received if the record is not already acked or currently in the topology
+                    for (long i = seekOffset; i < earliestReceivedOffset; i++) {
+                        KafkaSpoutMessageId msgId = retryService.getMessageId(new ConsumerRecord<>(tp.topic(), tp.partition(), i, null, null));
+                        if (!offsetManagers.get(tp).contains(msgId) && !emitted.contains(msgId)) {
+                            LOG.debug("Record at offset [{}] appears to have been compacted away from topic [{}], marking as acked", i, tp);
+                            retryService.remove(msgId);
+                            emitted.add(msgId);
+                            ack(msgId);
+                        }
+                    }
+                }
             }
         }
     }
 
     // ======== emit  =========
-    private void emit() {
-        emitTupleIfNotEmitted(waitingToEmit.next());
-        waitingToEmit.remove();
+    private void emitIfWaitingNotEmitted() {
+        Iterator<List<ConsumerRecord<K, V>>> waitingToEmitIter = waitingToEmit.values().iterator();
+        outerLoop:
+        while (waitingToEmitIter.hasNext()) {
+            List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmitIter.next();
+            while (!waitingToEmitForTp.isEmpty()) {
+                final boolean emittedTuple = emitOrRetryTuple(waitingToEmitForTp.remove(0));
+                if (emittedTuple) {
+                    break outerLoop;
+                }
+            }
+            waitingToEmitIter.remove();
+        }
     }
 
-    // emits one tuple per record
-    private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
+    /**
+     * Creates a tuple from the kafka record and emits it if it was never emitted or it is ready to be retried.
+     *
+     * @param record to be emitted
+     * @return true if tuple was emitted. False if tuple has been acked or has been emitted and is pending ack or fail
+     */
+    private boolean emitOrRetryTuple(ConsumerRecord<K, V> record) {
         final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
-        final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+        final KafkaSpoutMessageId msgId = retryService.getMessageId(record);
 
-        if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // has been acked
+        if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) {   // has been acked
             LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
-        } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail
+        } else if (emitted.contains(msgId)) {   // has been emitted and it is pending ack or fail
             LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
         } else {
-            boolean isScheduled = retryService.isScheduled(msgId);
-            if (!isScheduled || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
-                final List<Object> tuple = tuplesBuilder.buildTuple(record);
-                kafkaSpoutStreams.emit(collector, tuple, msgId);
-                emitted.add(msgId);
-                numUncommittedOffsets++;
-                if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
-                    retryService.remove(msgId);
+            final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
+            if (isAtLeastOnceProcessing()
+                && committedOffset != null 
+                && committedOffset.offset() > record.offset()
+                && commitMetadataManager.isOffsetCommittedByThisTopology(tp, committedOffset, Collections.unmodifiableMap(offsetManagers))) {
+                // Ensures that after a topology with this id is started, the consumer fetch
+                // position never falls behind the committed offset (STORM-2844)
+                throw new IllegalStateException("Attempting to emit a message that has already been committed."
+                    + " This should never occur when using the at-least-once processing guarantee.");
+            }
+
+            final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
+            if (isEmitTuple(tuple)) {
+                final boolean isScheduled = retryService.isScheduled(msgId);
+                // not scheduled <=> never failed (i.e. never emitted), or scheduled and ready to be retried
+                if (!isScheduled || retryService.isReady(msgId)) {
+                    final String stream = tuple instanceof KafkaTuple ? ((KafkaTuple) tuple).getStream() : Utils.DEFAULT_STREAM_ID;
+
+                    if (!isAtLeastOnceProcessing()) {
+                        if (kafkaSpoutConfig.isTupleTrackingEnforced()) {
+                            collector.emit(stream, tuple, msgId);
+                            LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
+                        } else {
+                            collector.emit(stream, tuple);
+                            LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+                        }
+                    } else {
+                        emitted.add(msgId);
+                        offsetManagers.get(tp).addToEmitMsgs(msgId.offset());
+                        if (isScheduled) {  // Was scheduled for retry and re-emitted, so remove from schedule.
+                            retryService.remove(msgId);
+                        }
+                        collector.emit(stream, tuple, msgId);
+                        tupleListener.onEmit(tuple, msgId);
+                        LOG.trace("Emitted tuple [{}] for record [{}] with msgId [{}]", tuple, record, msgId);
+                    }
+                    return true;
                 }
-                LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+            } else {
+                LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
+                msgId.setEmitted(false);
+                ack(msgId);
             }
         }
+        return false;
+    }
+
+    /**
+     * Emits a tuple if it is not a null tuple, or if the spout is configured to emit null tuples.
+     */
+    private boolean isEmitTuple(List<Object> tuple) {
+        return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
+        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
+        for (TopicPartition tp : assignedPartitions) {
+            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata()));
+        }
+        return offsetsToCommit;
     }
+    
+    private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions) {
+        // Find offsets that are ready to be committed for every assigned topic partition
+        final Map<TopicPartition, OffsetManager> assignedOffsetManagers = new HashMap<>();
+        for (Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) {
+            if (assignedPartitions.contains(entry.getKey())) {
+                assignedOffsetManagers.put(entry.getKey(), entry.getValue());
+            }
+        }
 
-    private void commitOffsetsForAckedTuples() {
-        // Find offsets that are ready to be committed for every topic partition
         final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
-        for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
-            final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset();
+        for (Map.Entry<TopicPartition, OffsetManager> tpOffset : assignedOffsetManagers.entrySet()) {
+            final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset(commitMetadataManager.getCommitMetadata());
             if (nextCommitOffset != null) {
                 nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
             }
@@ -331,9 +546,37 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
             // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition
             // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop
-            for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
-                final OffsetEntry offsetEntry = tpOffset.getValue();
-                offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey()));
+            for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
+                //Update the OffsetManager for each committed partition, and update numUncommittedOffsets
+                final TopicPartition tp = tpOffset.getKey();
+                long position = kafkaConsumer.position(tp);
+                long committedOffset = tpOffset.getValue().offset();
+                if (position < committedOffset) {
+                    /*
+                     * The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, lots of (more
+                     * than max.poll.records) later messages were acked, and the failed message then gets acked. The consumer may only be
+                     * part way through "catching up" to where it was when it went back to retry the failed tuple. Skip the consumer forward
+                     * to the committed offset and drop the current waiting to emit list, since it'll likely contain committed offsets.
+                     */
+                    LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]",
+                        position, committedOffset);
+                    kafkaConsumer.seek(tp, committedOffset);
+                    List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp);
+                    if (waitingToEmitForTp != null) {
+                        //Discard the pending records that are already committed
+                        List<ConsumerRecord<K, V>> filteredRecords = new ArrayList<>();
+                        for (ConsumerRecord<K, V> record : waitingToEmitForTp) {
+                            if (record.offset() >= committedOffset) {
+                                filteredRecords.add(record);
+                            }
+                        }
+                        waitingToEmit.put(tp, filteredRecords);
+                    }
+                }
+
+                final OffsetManager offsetManager = assignedOffsetManagers.get(tp);
+                offsetManager.commit(tpOffset.getValue());
+                LOG.debug("[{}] uncommitted offsets for partition [{}] after commit", offsetManager.getNumUncommittedOffsets(), tp);
             }
         } else {
             LOG.trace("No offsets to commit. {}", this);
@@ -341,78 +584,101 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     // ======== Ack =======
-
     @Override
     public void ack(Object messageId) {
-        final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
-        if(!emitted.contains(msgId)) {
-            LOG.debug("Received ack for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+        if (!isAtLeastOnceProcessing()) {
             return;
         }
-        
-        if (!consumerAutoCommitMode) {  // Only need to keep track of acked tuples if commits are not done automatically
-            acked.get(msgId.getTopicPartition()).add(msgId);
+
+        // Only need to keep track of acked tuples if commits to Kafka are controlled by
+        // tuple acks, which happens only for at-least-once processing semantics
+        final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+        if (!emitted.contains(msgId)) {
+            if (msgId.isEmitted()) {
+                LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
+                    + "came from a topic-partition that this consumer group instance is no longer tracking "
+                    + "due to rebalance/partition reassignment. No action taken.", msgId);
+            } else {
+                LOG.debug("Received direct ack for message [{}], associated with null tuple", msgId);
+            }
+        } else {
+            Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
+                + " This should never occur barring errors in the RetryService implementation or the spout code.");
+            offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
+            emitted.remove(msgId);
         }
-        emitted.remove(msgId);
+        tupleListener.onAck(msgId);
     }
 
     // ======== Fail =======
-
     @Override
     public void fail(Object messageId) {
+        if (!isAtLeastOnceProcessing()) {
+            return;
+        }
+        // Only need to keep track of failed tuples if commits to Kafka are controlled by
+        // tuple acks, which happens only for at-least-once processing semantics
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
-        if(!emitted.contains(msgId)) {
-            LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+        if (!emitted.contains(msgId)) {
+            LOG.debug("Received fail for tuple this spout is no longer tracking."
+                + " Partitions may have been reassigned. Ignoring message [{}]", msgId);
             return;
         }
-        if (msgId.numFails() < maxRetries) {
-            emitted.remove(msgId);
-            msgId.incrementNumFails();
-            retryService.schedule(msgId);
-        } else { // limit to max number of retries
+        Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being failed."
+            + " This should never occur barring errors in the RetryService implementation or the spout code.");
+
+        msgId.incrementNumFails();
+
+        if (!retryService.schedule(msgId)) {
             LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
+            // this tuple should be removed from emitted only inside the ack() method. This is to ensure
+            // that the OffsetManager for that TopicPartition is updated and allows commit progression
+            tupleListener.onMaxRetryReached(msgId);
             ack(msgId);
+        } else {
+            tupleListener.onRetry(msgId);
+            emitted.remove(msgId);
         }
     }
 
     // ======== Activate / Deactivate / Close / Declare Outputs =======
-
     @Override
     public void activate() {
-        subscribeKafkaConsumer();
+        try {
+            subscribeKafkaConsumer();
+        } catch (InterruptException e) {
+            throwKafkaConsumerInterruptedException();
+        }
     }
 
     private void subscribeKafkaConsumer() {
         kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
 
-        if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
-            final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();
-            kafkaConsumer.subscribe(topics, new KafkaSpoutConsumerRebalanceListener());
-            LOG.info("Kafka consumer subscribed topics {}", topics);
-        } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
-            final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) kafkaSpoutStreams).getTopicWildcardPattern();
-            kafkaConsumer.subscribe(pattern, new KafkaSpoutConsumerRebalanceListener());
-            LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
-        }
-        // Initial poll to get the consumer registration process going.
-        // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
-        kafkaConsumer.poll(0);
+        kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context);
     }
 
     @Override
     public void deactivate() {
-        shutdown();
+        try {
+            shutdown();
+        } catch (InterruptException e) {
+            throwKafkaConsumerInterruptedException();
+        }
     }
 
     @Override
     public void close() {
-        shutdown();
+        try {
+            shutdown();
+        } catch (InterruptException e) {
+            throwKafkaConsumerInterruptedException();
+        }
     }
 
     private void shutdown() {
         try {
-            if (!consumerAutoCommitMode) {
-                commitOffsetsForAckedTuples();
+            if (isAtLeastOnceProcessing()) {
+                commitOffsetsForAckedTuples(kafkaConsumer.assignment());
             }
         } finally {
             //remove resources
@@ -422,216 +688,63 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        kafkaSpoutStreams.declareOutputFields(declarer);
+        RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
+        for (String stream : translator.streams()) {
+            declarer.declareStream(stream, translator.getFieldsFor(stream));
+        }
     }
 
     @Override
     public String toString() {
-        return "KafkaSpout{" +
-                "acked=" + acked +
-                ", emitted=" + emitted +
-                "}";
+        return "KafkaSpout{"
+            + "offsetManagers =" + offsetManagers
+            + ", emitted=" + emitted
+            + "}";
     }
 
     @Override
-    public Map<String, Object> getComponentConfiguration () {
+    public Map<String, Object> getComponentConfiguration() {
         Map<String, Object> configuration = super.getComponentConfiguration();
         if (configuration == null) {
             configuration = new HashMap<>();
         }
         String configKeyPrefix = "config.";
 
-        if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
-            configuration.put(configKeyPrefix + "topics", getNamedTopics());
-        } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
-            configuration.put(configKeyPrefix + "topics", getWildCardTopics());
-        }
+        configuration.put(configKeyPrefix + "topics", getTopicsString());
 
         configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId());
         configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
+        configuration.put(configKeyPrefix + "security.protocol", kafkaSpoutConfig.getKafkaProps().get("security.protocol"));
         return configuration;
     }
 
-    private String getNamedTopics() {
-        StringBuilder topics = new StringBuilder();
-        for (String topic: kafkaSpoutConfig.getSubscribedTopics()) {
-            topics.append(topic).append(",");
-        }
-        return topics.toString();
+    private String getTopicsString() {
+        return kafkaSpoutConfig.getSubscription().getTopicsString();
     }
 
-    private String getWildCardTopics() {
-        return kafkaSpoutConfig.getTopicWildcardPattern().toString();
-    }
+    private static class PollablePartitionsInfo {
 
-    // ======= Offsets Commit Management ==========
+        private final Set<TopicPartition> pollablePartitions;
+        //The subset of earliest retriable offsets that are on pollable partitions
+        private final Map<TopicPartition, Long> pollableEarliestRetriableOffsets;
 
-    private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> {
-        public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
-            return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1;
-        }
-    }
-
-    /**
-     * This class is not thread safe
-     */
-    private class OffsetEntry {
-        private final TopicPartition tp;
-        private final long initialFetchOffset;  /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
-                                                 * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
-        private long committedOffset;     // last offset committed to Kafka. Initially it is set to fetchOffset - 1
-        private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR);     // acked messages sorted by ascending order of offset
-
-        public OffsetEntry(TopicPartition tp, long initialFetchOffset) {
-            this.tp = tp;
-            this.initialFetchOffset = initialFetchOffset;
-            this.committedOffset = initialFetchOffset - 1;
-            LOG.debug("Instantiated {}", this);
-        }
-
-        public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
-            ackedMsgs.add(msgId);
-        }
-
-        /**
-         * @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit.
-         */
-        public OffsetAndMetadata findNextCommitOffset() {
-            boolean found = false;
-            long currOffset;
-            long nextCommitOffset = committedOffset;
-            KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
-
-            for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
-                if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) {            // found the next offset to commit
-                    found = true;
-                    nextCommitMsg = currAckedMsg;
-                    nextCommitOffset = currOffset;
-                } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
-                    LOG.debug("topic-partition [{}] has non-continuous offset [{}]. It will be processed in a subsequent batch.", tp, currOffset);
-                    break;
-                } else {
-                    LOG.debug("topic-partition [{}] has unexpected offset [{}].", tp, currOffset);
-                    break;
-                }
-            }
-
-            OffsetAndMetadata nextCommitOffsetAndMetadata = null;
-            if (found) {
-                nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
-                LOG.debug("topic-partition [{}] has offsets [{}-{}] ready to be committed",tp, committedOffset + 1, nextCommitOffsetAndMetadata.offset());
-            } else {
-                LOG.debug("topic-partition [{}] has NO offsets ready to be committed", tp);
-            }
-            LOG.trace("{}", this);
-            return nextCommitOffsetAndMetadata;
-        }
-
-        /**
-         * Marks an offset has committed. This method has side effects - it sets the internal state in such a way that future
-         * calls to {@link #findNextCommitOffset()} will return offsets greater than the offset specified, if any.
-         *
-         * @param committedOffset offset to be marked as committed
-         */
-        public void commit(OffsetAndMetadata committedOffset) {
-            long numCommittedOffsets = 0;
-            if (committedOffset != null) {
-                final long oldCommittedOffset = this.committedOffset;
-                numCommittedOffsets = committedOffset.offset() - this.committedOffset;
-                this.committedOffset = committedOffset.offset();
-                for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext(); ) {
-                    if (iterator.next().offset() <= committedOffset.offset()) {
-                        iterator.remove();
-                    } else {
-                        break;
-                    }
+        public PollablePartitionsInfo(Set<TopicPartition> pollablePartitions, Map<TopicPartition, Long> earliestRetriableOffsets) {
+            this.pollablePartitions = pollablePartitions;
+            this.pollableEarliestRetriableOffsets = new HashMap<>();
+            for (TopicPartition tp : earliestRetriableOffsets.keySet()) {
+                if (this.pollablePartitions.contains(tp)) {
+                    this.pollableEarliestRetriableOffsets.put(tp, earliestRetriableOffsets.get(tp));
                 }
-                numUncommittedOffsets-= numCommittedOffsets;
-                LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions",
-                        oldCommittedOffset + 1, this.committedOffset, numCommittedOffsets, tp, numUncommittedOffsets);
-            } else {
-                LOG.debug("Committed [{}] offsets for topic-partition [{}]. [{}] uncommitted offsets across all topic partitions",
-                        numCommittedOffsets, tp, numUncommittedOffsets);
             }
-            LOG.trace("{}", this);
-        }
-
-        public boolean isEmpty() {
-            return ackedMsgs.isEmpty();
         }
 
-        public boolean contains(ConsumerRecord record) {
-            return contains(new KafkaSpoutMessageId(record));
-        }
-
-        public boolean contains(KafkaSpoutMessageId msgId) {
-            return ackedMsgs.contains(msgId);
-        }
-
-        @Override
-        public String toString() {
-            return "OffsetEntry{" +
-                    "topic-partition=" + tp +
-                    ", fetchOffset=" + initialFetchOffset +
-                    ", committedOffset=" + committedOffset +
-                    ", ackedMsgs=" + ackedMsgs +
-                    '}';
+        public boolean shouldPoll() {
+            return !this.pollablePartitions.isEmpty();
         }
     }
 
-    // =========== Timer ===========
-
-    private class Timer {
-        private final long delay;
-        private final long period;
-        private final TimeUnit timeUnit;
-        private final long periodNanos;
-        private long start;
-
-        /**
-         * Creates a class that mimics a single threaded timer that expires periodically. If a call to {@link
-         * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated or reset, this method returns
-         * true. Each time the method returns true the counter is reset. The timer starts with the specified time delay.
-         *
-         * @param delay    the initial delay before the timer starts
-         * @param period   the period between calls {@link #isExpiredResetOnTrue()}
-         * @param timeUnit the time unit of delay and period
-         */
-        public Timer(long delay, long period, TimeUnit timeUnit) {
-            this.delay = delay;
-            this.period = period;
-            this.timeUnit = timeUnit;
-
-            periodNanos = timeUnit.toNanos(period);
-            start = System.nanoTime() + timeUnit.toNanos(delay);
-        }
-
-        public long period() {
-            return period;
-        }
-
-        public long delay() {
-            return delay;
-        }
-
-        public TimeUnit getTimeUnit() {
-            return timeUnit;
-        }
-
-        /**
-         * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the
-         * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset
-         * (re-initiated) and a new cycle will start.
-         *
-         * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false
-         * otherwise.
-         */
-        public boolean isExpiredResetOnTrue() {
-            final boolean expired = System.nanoTime() - start > periodNanos;
-            if (expired) {
-                start = System.nanoTime();
-            }
-            return expired;
-        }
+    @VisibleForTesting
+    KafkaOffsetMetric getKafkaOffsetMetric() {
+        return kafkaOffsetMetric;
     }
 }


Mime
View raw message