storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [5/7] storm git commit: STORM-2936 Overwrite latest storm-kafka-client 1.x-branch into 1.1.x-branch
Date Wed, 07 Feb 2018 21:05:27 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index e1c6c96..7aa836c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -18,83 +18,169 @@
 
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
-import org.apache.storm.tuple.Fields;
-
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.regex.Pattern;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.storm.Config;
+import org.apache.storm.annotation.InterfaceStability;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
  */
 public class KafkaSpoutConfig<K, V> implements Serializable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutConfig.class);
     private static final long serialVersionUID = 141902646130682494L;
-    public static final long DEFAULT_POLL_TIMEOUT_MS = 200;            // 200ms
-    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;   // 30s
-    public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     // Retry forever
-    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;    // 10,000,000 records => 80MBs of memory footprint in the worst case
-    public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; // 2s
-    public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =  
-            new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
-                    DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
+    // 200ms
+    public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
+    // 30s
+    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;
+    // Retry forever
+    public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
+    // 10,000,000 records => 80MBs of memory footprint in the worst case
+    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;
+    // 2s
+    public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
+
+    public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+
+    public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
+        new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+            DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
+
+    public static final ProcessingGuarantee DEFAULT_PROCESSING_GUARANTEE = ProcessingGuarantee.AT_LEAST_ONCE;
+
+    public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener();
+
+    public static final int DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS = 60;
+
+
+    // Kafka consumer configuration
+    private final Map<String, Object> kafkaProps;
+    private final Subscription subscription;
+    private final long pollTimeoutMs;
+
+    // Kafka spout configuration
+    private final RecordTranslator<K, V> translator;
+    private final long offsetCommitPeriodMs;
+    private final int maxUncommittedOffsets;
+    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
+    private final KafkaSpoutRetryService retryService;
+    private final KafkaTupleListener tupleListener;
+    private final long partitionRefreshPeriodMs;
+    private final boolean emitNullTuples;
+    private final SerializableDeserializer<K> keyDes;
+    private final Class<? extends Deserializer<K>> keyDesClazz;
+    private final SerializableDeserializer<V> valueDes;
+    private final Class<? extends Deserializer<V>> valueDesClazz;
+    private final ProcessingGuarantee processingGuarantee;
+    private final boolean tupleTrackingEnforced;
+    private final int metricsTimeBucketSizeInSecs;
+
     /**
-     * Retry in a tight loop (keep unit tests fasts) do not use in production.
+     * Creates a new KafkaSpoutConfig using a Builder.
+     *
+     * @param builder The Builder to construct the KafkaSpoutConfig from
      */
-    public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = 
-    new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0),
-            DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0));
+    public KafkaSpoutConfig(Builder<K, V> builder) {
+        setKafkaPropsForProcessingGuarantee(builder);
+        this.kafkaProps = builder.kafkaProps;
+        this.subscription = builder.subscription;
+        this.translator = builder.translator;
+        this.pollTimeoutMs = builder.pollTimeoutMs;
+        this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
+        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+        this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+        this.retryService = builder.retryService;
+        this.tupleListener = builder.tupleListener;
+        this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
+        this.emitNullTuples = builder.emitNullTuples;
+        this.keyDes = builder.keyDes;
+        this.keyDesClazz = builder.keyDesClazz;
+        this.valueDes = builder.valueDes;
+        this.valueDesClazz = builder.valueDesClazz;
+        this.processingGuarantee = builder.processingGuarantee;
+        this.tupleTrackingEnforced = builder.tupleTrackingEnforced;
+        this.metricsTimeBucketSizeInSecs = builder.metricsTimeBucketSizeInSecs;
+    }
 
     /**
-     * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will
-     * affect the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
-     * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. <br/>
-     * <ul>
-     * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits</li>
-     * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits</li>
-     * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any.
-     * If no offset has been committed, it behaves as EARLIEST.</li>
-     * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any.
-     * If no offset has been committed, it behaves as LATEST.</li>
-     * </ul>
-     * */
-    public static enum FirstPollOffsetStrategy {
+     * Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to Kafka upon topology deployment.
+     * By default this parameter is set to UNCOMMITTED_EARLIEST. 
+     */
+    public enum FirstPollOffsetStrategy {
+        /**
+         * The kafka spout polls records starting in the first offset of the partition, regardless of previous commits. This setting only
+         * takes effect on topology deployment
+         */
         EARLIEST,
+        /**
+         * The kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits. This
+         * setting only takes effect on topology deployment
+         */
         LATEST,
+        /**
+         * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST
+         */
         UNCOMMITTED_EARLIEST,
-        UNCOMMITTED_LATEST }
+        /**
+         * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST
+         */
+        UNCOMMITTED_LATEST;
 
-    public static Builder<String, String> builder(String bootstrapServers, String ... topics) {
-        return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics);
-    }
-    
-    public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) {
-        return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics);
-    }
-    
-    public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
-        return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics);
-    }
-    
-    private static Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
-        // set defaults for properties not specified
-        if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-            kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        @Override
+        public String toString() {
+            return "FirstPollOffsetStrategy{" + super.toString() + "}";
         }
-        return kafkaProps;
     }
-    
-    public static class Builder<K,V> {
+
+    /**
+     * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed,
+     * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
+     * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval.
+     * NO_GUARANTEE may be removed in a later release without warning, we're still evaluating whether it makes sense to keep.
+     */
+    @InterfaceStability.Unstable
+    public enum ProcessingGuarantee {
+        /**
+         * An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If a tuple fails or
+         * times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits synchronously on the defined
+         * interval.
+         */
+        AT_LEAST_ONCE,
+        /**
+         * Every offset will be synchronously committed to Kafka right after being polled but before being emitted to the downstream
+         * components of the topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by
+         * ensuring the spout won't retry tuples that fail or time out after the commit to Kafka has been done
+         */
+        AT_MOST_ONCE,
+        /**
+         * The polled offsets are ready to commit immediately after being polled. The offsets are committed periodically, i.e. a message may
+         * be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but allows the
+         * spout to control when commits occur. Commits asynchronously on the defined interval.
+         */
+        NO_GUARANTEE,
+    }
+
+    public static class Builder<K, V> {
+
         private final Map<String, Object> kafkaProps;
-        private Subscription subscription;
+        private final Subscription subscription;
         private final SerializableDeserializer<K> keyDes;
         private final Class<? extends Deserializer<K>> keyDesClazz;
         private final SerializableDeserializer<V> valueDes;
@@ -102,232 +188,362 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         private RecordTranslator<K, V> translator;
         private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
-        private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+        private FirstPollOffsetStrategy firstPollOffsetStrategy = DEFAULT_FIRST_POLL_OFFSET_STRATEGY;
         private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
         private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE;
+        private KafkaTupleListener tupleListener = DEFAULT_TUPLE_LISTENER;
         private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
         private boolean emitNullTuples = false;
+        private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE;
+        private boolean tupleTrackingEnforced = false;
+        private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;
+
+        public Builder(String bootstrapServers, String... topics) {
+            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
+        }
 
-        public Builder(String bootstrapServers, String ... topics) {
-            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new NamedSubscription(topics));
+        public Builder(String bootstrapServers, Collection<String> topics) {
+            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(),
+                new NamedTopicFilter(new HashSet<String>(topics))));
         }
 
-        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) {
-            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+        public Builder(String bootstrapServers, Pattern topics) {
+            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
         }
-        
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, java.lang.String...)} instead, and set the deserializer with
+         * {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+         * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
+        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String... topics) {
+            this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
+        }
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, java.util.Collection) } instead, and set the deserializer with
+         * {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+         * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
         public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) {
-            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+            this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet<String>(topics))));
         }
-        
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, java.util.regex.Pattern) } instead, and set the deserializer with
+         * {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+         * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
         public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern topics) {
-            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
+            this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
         }
-        
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, org.apache.storm.kafka.spout.Subscription) } instead, and set the
+         * deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+         * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
         public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Subscription subscription) {
             this(bootstrapServers, keyDes, null, valDes, null, subscription);
         }
-        
-        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String ... topics) {
-            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, java.lang.String...)} instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
+        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String... topics) {
+            this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
         }
-        
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, java.util.Collection) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
         public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
-            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+            this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet<String>(topics))));
         }
-        
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, java.util.regex.Pattern) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
         public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern topics) {
-            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
+            this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
         }
-        
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, org.apache.storm.kafka.spout.Subscription) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
         public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Subscription subscription) {
             this(bootstrapServers, null, keyDes, null, valDes, subscription);
         }
-        
+
+        /**
+         * Create a KafkaSpoutConfig builder with default property values and no key/value deserializers.
+         *
+         * @param bootstrapServers The bootstrap servers the consumer will use
+         * @param subscription The subscription defining which topics and partitions each spout instance will read.
+         */
+        public Builder(String bootstrapServers, Subscription subscription) {
+            this(bootstrapServers, null, null, null, null, subscription);
+        }
+
         private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
-                SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
-            kafkaProps = new HashMap<>();
+            SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
+
+            this(keyDes, keyDesClazz, valDes, valDesClazz, subscription,
+                    new DefaultRecordTranslator<K, V>(), new HashMap<String, Object>());
+
             if (bootstrapServers == null || bootstrapServers.isEmpty()) {
                 throw new IllegalArgumentException("bootstrap servers cannot be null");
             }
             kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-            this.keyDes = keyDes;
-            this.keyDesClazz = keyDesClazz;
-            this.valueDes = valDes;
-            this.valueDesClazz = valDesClazz;
-            this.subscription = subscription;
-            this.translator = new DefaultRecordTranslator<K,V>();
+
+            setNonNullSerDeKafkaProp(keyDes, keyDesClazz, valueDes, valueDesClazz);
         }
 
-        private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
-                SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
-            this.kafkaProps = new HashMap<>(builder.kafkaProps);
-            this.subscription = builder.subscription;
+        /**
+         * This constructor will always be called by one of the methods {@code setKey} or {@code setVal}, which implies
+         * that only one of its SerDe parameters will be non null, for which the corresponding Kafka property will be set
+         */
+        @SuppressWarnings("unchecked")
+        private Builder(final Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+                        SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
+
+            this(keyDes, keyDesClazz, valueDes, valueDesClazz, builder.subscription,
+                    (RecordTranslator<K, V>) builder.translator, new HashMap<>(builder.kafkaProps));
+
             this.pollTimeoutMs = builder.pollTimeoutMs;
             this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
             this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
             this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-            //this could result in a lot of class case exceptions at runtime,
-            // but because some translators will work no matter what the generics
-            // are I thought it best not to force someone to reset the translator
-            // when they change the key/value types.
-            this.translator = (RecordTranslator<K, V>) builder.translator;
             this.retryService = builder.retryService;
+
+            setNonNullSerDeKafkaProp(keyDes, keyDesClazz, valueDes, valueDesClazz);
+        }
+
+        private Builder(SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+               SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz,
+               Subscription subscription, RecordTranslator<K, V> translator, Map<String, Object> kafkaProps) {
             this.keyDes = keyDes;
             this.keyDesClazz = keyDesClazz;
             this.valueDes = valueDes;
             this.valueDesClazz = valueDesClazz;
+            this.subscription = subscription;
+            this.translator = translator;
+            this.kafkaProps = kafkaProps;
+        }
+
+        private void setNonNullSerDeKafkaProp(SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+                SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
+            if (keyDesClazz != null) {
+                kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz);
+            }
+            if (keyDes != null) {
+                kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDes.getClass());
+            }
+            if (valueDesClazz != null) {
+                kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDesClazz);
+            }
+            if (valueDes != null) {
+                kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDes.getClass());
+            }
         }
 
         /**
-         * Specifying this key deserializer overrides the property key.deserializer. If you have
-         * set a custom RecordTranslator before calling this it may result in class cast
-         * exceptions at runtime.
+         * Specifying this key deserializer overrides the property key.deserializer. If you have set a custom RecordTranslator before
+         * calling this it may result in class cast exceptions at runtime.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} instead
          */
-        public <NK> Builder<NK,V> setKey(SerializableDeserializer<NK> keyDeserializer) {
-            return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz);
+        @Deprecated
+        public <NK> Builder<NK, V> setKey(SerializableDeserializer<NK> keyDeserializer) {
+            return new Builder<>(this, keyDeserializer, null, null, null);
         }
-        
+
         /**
-         * Specify a class that can be instantiated to create a key.deserializer
-         * This is the same as setting key.deserializer, but overrides it. If you have
-         * set a custom RecordTranslator before calling this it may result in class cast
-         * exceptions at runtime.
+         * Specify a class that can be instantiated to create a key.deserializer This is the same as setting key.deserializer, but overrides
+         * it. If you have set a custom RecordTranslator before calling this it may result in class cast exceptions at runtime.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead
          */
+        @Deprecated
         public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> clazz) {
-            return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
+            return new Builder<>(this, null, clazz, null, null);
         }
 
         /**
-         * Specifying this value deserializer overrides the property value.deserializer.  If you have
-         * set a custom RecordTranslator before calling this it may result in class cast
-         * exceptions at runtime.
+         * Specifying this value deserializer overrides the property value.deserializer. If you have set a custom RecordTranslator before
+         * calling this it may result in class cast exceptions at runtime.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} instead
          */
-        public <NV> Builder<K,NV> setValue(SerializableDeserializer<NV> valueDeserializer) {
-            return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null);
+        @Deprecated
+        public <NV> Builder<K, NV> setValue(SerializableDeserializer<NV> valueDeserializer) {
+            return new Builder<>(this, null, null, valueDeserializer, null);
         }
-        
+
         /**
-         * Specify a class that can be instantiated to create a value.deserializer
-         * This is the same as setting value.deserializer, but overrides it.  If you have
-         * set a custom RecordTranslator before calling this it may result in class cast
-         * exceptions at runtime.
+         * Specify a class that can be instantiated to create a value.deserializer This is the same as setting value.deserializer, but
+         * overrides it. If you have set a custom RecordTranslator before calling this it may result in class cast exceptions at runtime.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead
          */
-        public <NV> Builder<K,NV> setValue(Class<? extends Deserializer<NV>> clazz) {
-            return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
+        @Deprecated
+        public <NV> Builder<K, NV> setValue(Class<? extends Deserializer<NV>> clazz) {
+            return new Builder<>(this, null, null, null, clazz);
         }
-        
+
         /**
-         * Set a Kafka property config
+         * Set a {@link KafkaConsumer} property.
          */
-        public Builder<K,V> setProp(String key, Object value) {
+        public Builder<K, V> setProp(String key, Object value) {
             kafkaProps.put(key, value);
             return this;
         }
-        
+
         /**
-         * Set multiple Kafka property configs
+         * Set multiple {@link KafkaConsumer} properties.
          */
-        public Builder<K,V> setProp(Map<String, Object> props) {
+        public Builder<K, V> setProp(Map<String, Object> props) {
             kafkaProps.putAll(props);
             return this;
         }
-        
+
         /**
-         * Set multiple Kafka property configs
+         * Set multiple {@link KafkaConsumer} properties.
          */
-        public Builder<K,V> setProp(Properties props) {
-            for (String name: props.stringPropertyNames()) {
-                kafkaProps.put(name, props.get(name));
+        public Builder<K, V> setProp(Properties props) {
+            for(Entry<Object, Object> entry : props.entrySet()) {
+            if (entry.getKey() instanceof String) {
+                    kafkaProps.put((String) entry.getKey(), entry.getValue());
+                } else {
+                    throw new IllegalArgumentException("Kafka Consumer property keys must be Strings");
+                }
             }
             return this;
         }
-        
+
         /**
          * Set the group.id for the consumers
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#GROUP_ID_CONFIG} instead
          */
-        public Builder<K,V> setGroupId(String id) {
+        @Deprecated
+        public Builder<K, V> setGroupId(String id) {
             return setProp("group.id", id);
         }
-        
+
         /**
          * reset the bootstrap servers for the Consumer
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} instead
          */
-        public Builder<K,V> setBootstrapServers(String servers) {
+        @Deprecated
+        public Builder<K, V> setBootstrapServers(String servers) {
             return setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
         }
-        
+
         /**
          * The minimum amount of data the broker should return for a fetch request.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} instead
          */
-        public Builder<K,V> setFetchMinBytes(int bytes) {
+        @Deprecated
+        public Builder<K, V> setFetchMinBytes(int bytes) {
             return setProp(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, bytes);
         }
-        
+
         /**
          * The maximum amount of data per-partition the broker will return.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#MAX_PARTITION_FETCH_BYTES_CONFIG} instead
          */
-        public Builder<K,V> setMaxPartitionFectchBytes(int bytes) {
+        @Deprecated
+        public Builder<K, V> setMaxPartitionFectchBytes(int bytes) {
             return setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, bytes);
         }
-        
+
         /**
          * The maximum number of records a poll will return.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#MAX_POLL_RECORDS_CONFIG} instead
          */
-        public Builder<K,V> setMaxPollRecords(int records) {
+        @Deprecated
+        public Builder<K, V> setMaxPollRecords(int records) {
             return setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);
         }
-        
+
         //Security Related Configs
-        
         /**
          * Configure the SSL Keystore for mutual authentication
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "ssl.keystore.location" and "ssl.keystore.password" instead
          */
-        public Builder<K,V> setSSLKeystore(String location, String password) {
+        @Deprecated
+        public Builder<K, V> setSSLKeystore(String location, String password) {
             return setProp("ssl.keystore.location", location)
-                    .setProp("ssl.keystore.password", password);
+                .setProp("ssl.keystore.password", password);
         }
-       
+
         /**
          * Configure the SSL Keystore for mutual authentication
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "ssl.keystore.location", "ssl.keystore.password" and "ssl.key.password" instead
          */
-        public Builder<K,V> setSSLKeystore(String location, String password, String keyPassword) {
+        @Deprecated
+        public Builder<K, V> setSSLKeystore(String location, String password, String keyPassword) {
             return setProp("ssl.key.password", keyPassword)
-                    .setSSLKeystore(location, password);
+                .setSSLKeystore(location, password);
         }
-        
+
         /**
          * Configure the SSL Truststore to authenticate with the brokers
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "security.protocol", "ssl.truststore.location" and "ssl.truststore.password" instead
          */
-        public Builder<K,V> setSSLTruststore(String location, String password) {
+        @Deprecated
+        public Builder<K, V> setSSLTruststore(String location, String password) {
             return setSecurityProtocol("SSL")
-                    .setProp("ssl.truststore.location", location)
-                    .setProp("ssl.truststore.password", password);
+                .setProp("ssl.truststore.location", location)
+                .setProp("ssl.truststore.password", password);
         }
-        
+
         /**
-         * Protocol used to communicate with brokers. 
-         * Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
+         * Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "security.protocol" instead
          */
+        @Deprecated
         public Builder<K, V> setSecurityProtocol(String protocol) {
             return setProp("security.protocol", protocol);
         }
 
         //Spout Settings
         /**
-         * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s
+         * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s.
+         *
          * @param pollTimeoutMs time in ms
          */
-        public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) {
+        public Builder<K, V> setPollTimeoutMs(long pollTimeoutMs) {
             this.pollTimeoutMs = pollTimeoutMs;
             return this;
         }
 
         /**
          * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
+         *
+         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE} or
+         * {@link ProcessingGuarantee#NO_GUARANTEE}.
+         *
          * @param offsetCommitPeriodMs time in ms
          */
-        public Builder<K,V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
+        public Builder<K, V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
             this.offsetCommitPeriodMs = offsetCommitPeriodMs;
             return this;
         }
@@ -336,26 +552,34 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
          * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
          * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
          * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
-         * Note that this limit can in some cases be exceeded, but no partition will exceed this limit by more than maxPollRecords - 1.
+         * This limit is per partition and may in some cases be exceeded,
+         * but each partition cannot exceed this limit by more than maxPollRecords - 1.
+         * 
+         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+         *
          * @param maxUncommittedOffsets max number of records that can be be pending commit
          */
-        public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
+        public Builder<K, V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
             this.maxUncommittedOffsets = maxUncommittedOffsets;
             return this;
         }
 
         /**
-         * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start.
-         * Please refer to to the documentation in {@link FirstPollOffsetStrategy}
+         * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. Please refer to to the
+         * documentation in {@link FirstPollOffsetStrategy}
+         *
          * @param firstPollOffsetStrategy Offset used by Kafka spout first poll
-         * */
+         */
         public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
             this.firstPollOffsetStrategy = firstPollOffsetStrategy;
             return this;
         }
-        
+
         /**
          * Sets the retry service for the spout to use.
+         *
+         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+         *
          * @param retryService the new retry service
          * @return the builder (this).
          */
@@ -367,13 +591,28 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             return this;
         }
 
+        /**
+         * Sets the tuple listener for the spout to use.
+         *
+         * @param tupleListener the tuple listener
+         * @return the builder (this).
+         */
+        public Builder<K, V> setTupleListener(KafkaTupleListener tupleListener) {
+            if (tupleListener == null) {
+                throw new NullPointerException("KafkaTupleListener cannot be null");
+            }
+            this.tupleListener = tupleListener;
+            return this;
+        }
+
         public Builder<K, V> setRecordTranslator(RecordTranslator<K, V> translator) {
             this.translator = translator;
             return this;
         }
-        
+
         /**
          * Configure a translator with tuples to be emitted on the default stream.
+         *
          * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
          * @param fields the names of the fields extracted
          * @return this to be able to chain configuration
@@ -381,9 +620,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
             return setRecordTranslator(new SimpleRecordTranslator<>(func, fields));
         }
-        
+
         /**
          * Configure a translator with tuples to be emitted to a given stream.
+         *
          * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
          * @param fields the names of the fields extracted
          * @param stream the stream to emit the tuples on
@@ -392,12 +632,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
             return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream));
         }
-        
+
         /**
-         * Sets partition refresh period in milliseconds. This is how often kafka will be polled
-         * to check for new topics and/or new partitions.
-         * This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and
+         * Sets partition refresh period in milliseconds. This is how often kafka will be polled to check for new topics and/or new
+         * partitions. This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and
          * PatternSubscription rely on kafka to handle this instead.
+         *
          * @param partitionRefreshPeriodMs time in milliseconds
          * @return the builder (this)
          */
@@ -407,8 +647,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
 
         /**
-         * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly
-         * ack them. By default this parameter is set to false, which means that null tuples are not emitted.
+         * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly ack them. By default
+         * this parameter is set to false, which means that null tuples are not emitted.
+         *
          * @param emitNullTuples sets if null tuples should or not be emitted downstream
          */
         public Builder<K, V> setEmitNullTuples(boolean emitNullTuples) {
@@ -416,50 +657,139 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             return this;
         }
 
-        public KafkaSpoutConfig<K,V> build() {
+        /**
+         * Specifies which processing guarantee the spout should offer. Refer to the documentation for {@link ProcessingGuarantee}.
+         *
+         * @param processingGuarantee The processing guarantee the spout should offer.
+         */
+        public Builder<K, V> setProcessingGuarantee(ProcessingGuarantee processingGuarantee) {
+            this.processingGuarantee = processingGuarantee;
+            return this;
+        }
+
+        /**
+         * Specifies whether the spout should require Storm to track emitted tuples when using a {@link ProcessingGuarantee} other than
+         * {@link ProcessingGuarantee#AT_LEAST_ONCE}. The spout will always track emitted tuples when offering at-least-once guarantees
+         * regardless of this setting. This setting is false by default.
+         *
+         * <p>Enabling tracking can be useful even in cases where reliability is not a concern, because it allows
+         * {@link Config#TOPOLOGY_MAX_SPOUT_PENDING} to have an effect, and enables some spout metrics (e.g. complete-latency) that would
+         * otherwise be disabled.
+         *
+         * @param tupleTrackingEnforced true if Storm should track emitted tuples, false otherwise
+         */
+        public Builder<K, V> setTupleTrackingEnforced(boolean tupleTrackingEnforced) {
+            this.tupleTrackingEnforced = tupleTrackingEnforced;
+            return this;
+        }
+
+        /**
+         * The time period that metrics data in bucketed into.
+         * @param metricsTimeBucketSizeInSecs time in seconds
+         */
+        public Builder<K, V> setMetricsTimeBucketSizeInSecs(int metricsTimeBucketSizeInSecs) {
+            this.metricsTimeBucketSizeInSecs = metricsTimeBucketSizeInSecs;
+            return this;
+        }
+
+        public KafkaSpoutConfig<K, V> build() {
             return new KafkaSpoutConfig<>(this);
         }
     }
 
-    // Kafka consumer configuration
-    private final Map<String, Object> kafkaProps;
-    private final Subscription subscription;
-    private final SerializableDeserializer<K> keyDes;
-    private final Class<? extends Deserializer<K>> keyDesClazz;
-    private final SerializableDeserializer<V> valueDes;
-    private final Class<? extends Deserializer<V>> valueDesClazz;
-    private final long pollTimeoutMs;
+    /**
+     * Factory method that creates a Builder with String key/value deserializers.
+     *
+     * @param bootstrapServers The bootstrap servers for the consumer
+     * @param topics The topics to subscribe to
+     * @return The new builder
+     */
+    public static Builder<String, String> builder(String bootstrapServers, String... topics) {
+        return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics));
+    }
 
-    // Kafka spout configuration
-    private final RecordTranslator<K, V> translator;
-    private final long offsetCommitPeriodMs;
-    private final int maxUncommittedOffsets;
-    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-    private final KafkaSpoutRetryService retryService;
-    private final long partitionRefreshPeriodMs;
-    private final boolean emitNullTuples;
+    /**
+     * Factory method that creates a Builder with String key/value deserializers.
+     *
+     * @param bootstrapServers The bootstrap servers for the consumer
+     * @param topics The topics to subscribe to
+     * @return The new builder
+     */
+    public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) {
+        return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics));
+    }
 
-    public KafkaSpoutConfig(Builder<K,V> builder) {
-        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-        this.subscription = builder.subscription;
-        this.translator = builder.translator;
-        this.pollTimeoutMs = builder.pollTimeoutMs;
-        this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-        this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-        this.retryService = builder.retryService;
-        this.keyDes = builder.keyDes;
-        this.keyDesClazz = builder.keyDesClazz;
-        this.valueDes = builder.valueDes;
-        this.valueDesClazz = builder.valueDesClazz;
-        this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
-        this.emitNullTuples = builder.emitNullTuples;
+    /**
+     * Factory method that creates a Builder with String key/value deserializers.
+     *
+     * @param bootstrapServers The bootstrap servers for the consumer
+     * @param topics The topic pattern to subscribe to
+     * @return The new builder
+     */
+    public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
+        return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics));
     }
 
+    private static Builder<String, String> setStringDeserializers(Builder<String, String> builder) {
+        builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        return builder;
+    }
+
+    private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder) {
+        if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+            LOG.warn("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+                + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee."
+                + "This will be treated as an error in the next major release.");
+
+            final boolean enableAutoCommit = Boolean.parseBoolean(builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).toString());
+            if (enableAutoCommit) {
+                builder.processingGuarantee = ProcessingGuarantee.NO_GUARANTEE;
+            } else {
+                builder.processingGuarantee = ProcessingGuarantee.AT_LEAST_ONCE;
+            }
+        }
+        String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+        if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
+            if (autoOffsetResetPolicy == null) {
+                /*
+                 * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
+                 * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
+                 * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
+                 * requests an offset that was deleted.
+                 */
+                LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once processing",
+                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+                builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+            } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
+                LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
+                    + " Some messages may be skipped.");
+            }
+        } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
+            if (autoOffsetResetPolicy != null
+                && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
+                LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
+                    + " Some messages may be processed more than once.");
+            }
+        }
+        LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does not support auto-commit",
+            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+        builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+    }
+
+    /**
+     * Gets the properties that will be passed to the KafkaConsumer.
+     *
+     * @return The Kafka properties map
+     */
     public Map<String, Object> getKafkaProps() {
         return kafkaProps;
     }
 
+    /**
+     * @deprecated Please use {@link #getKafkaProps() } and look up the entry for {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} instead
+     */
+    @Deprecated
     public Deserializer<K> getKeyDeserializer() {
         if (keyDesClazz != null) {
             try {
@@ -471,6 +801,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return keyDes;
     }
 
+    /**
+     * @deprecated Please use {@link #getKafkaProps() } and look up the entry for {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead
+     */
+    @Deprecated
     public Deserializer<V> getValueDeserializer() {
         if (valueDesClazz != null) {
             try {
@@ -481,12 +815,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
         return valueDes;
     }
-    
+
     public Subscription getSubscription() {
         return subscription;
     }
-    
-    public RecordTranslator<K,V> getTranslator() {
+
+    public RecordTranslator<K, V> getTranslator() {
         return translator;
     }
 
@@ -498,9 +832,21 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return offsetCommitPeriodMs;
     }
 
+    /**
+     * @deprecated Use {@link #getProcessingGuarantee()} instead.
+     */
+    @Deprecated
     public boolean isConsumerAutoCommitMode() {
-        return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null     // default is true
-                || Boolean.valueOf((String)kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+        return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false
+            || Boolean.valueOf((String) kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+    }
+    
+    public ProcessingGuarantee getProcessingGuarantee() {
+        return processingGuarantee;
+    }
+
+    public boolean isTupleTrackingEnforced() {
+        return tupleTrackingEnforced;
     }
 
     public String getConsumerGroupId() {
@@ -518,7 +864,11 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     public KafkaSpoutRetryService getRetryService() {
         return retryService;
     }
-    
+
+    public KafkaTupleListener getTupleListener() {
+        return tupleListener;
+    }
+
     public long getPartitionRefreshPeriodMs() {
         return partitionRefreshPeriodMs;
     }
@@ -527,19 +877,26 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return emitNullTuples;
     }
 
+    public int getMetricsTimeBucketSizeInSecs() {
+        return metricsTimeBucketSizeInSecs;
+    }
+
     @Override
     public String toString() {
-        return "KafkaSpoutConfig{" +
-                "kafkaProps=" + kafkaProps +
-                ", key=" + getKeyDeserializer() +
-                ", value=" + getValueDeserializer() +
-                ", pollTimeoutMs=" + pollTimeoutMs +
-                ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
-                ", maxUncommittedOffsets=" + maxUncommittedOffsets +
-                ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
-                ", subscription=" + subscription +
-                ", translator=" + translator +
-                ", retryService=" + retryService +
-                '}';
+        return "KafkaSpoutConfig{"
+            + "kafkaProps=" + kafkaProps
+            + ", key=" + getKeyDeserializer()
+            + ", value=" + getValueDeserializer()
+            + ", pollTimeoutMs=" + pollTimeoutMs
+            + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
+            + ", maxUncommittedOffsets=" + maxUncommittedOffsets
+            + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
+            + ", subscription=" + subscription
+            + ", translator=" + translator
+            + ", retryService=" + retryService
+            + ", tupleListener=" + tupleListener
+            + ", processingGuarantee=" + processingGuarantee
+            + ", metricsTimeBucketSizeInSecs=" + metricsTimeBucketSizeInSecs
+            + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index 59a25c2..1626fee 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -44,6 +44,12 @@ public class KafkaSpoutMessageId implements Serializable {
         this(topicPart, offset, true);
     }
 
+    /**
+     * Creates a new KafkaSpoutMessageId.
+     * @param topicPart The topic partition this message belongs to
+     * @param offset The offset of this message
+     * @param emitted True iff this message is not being skipped as a null tuple
+     */
     public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean emitted) {
         this.topicPart = topicPart;
         this.offset = offset;
@@ -82,22 +88,14 @@ public class KafkaSpoutMessageId implements Serializable {
         this.emitted = emitted;
     }
 
-    public String getMetadata(Thread currThread) {
-        return "{" +
-                "topic-partition=" + topicPart +
-                ", offset=" + offset +
-                ", numFails=" + numFails +
-                ", thread='" + currThread.getName() + "'" +
-                '}';
-    }
-
     @Override
     public String toString() {
-        return "{" +
-                "topic-partition=" + topicPart +
-                ", offset=" + offset +
-                ", numFails=" + numFails +
-                '}';
+        return "{"
+            + "topic-partition=" + topicPart
+            + ", offset=" + offset
+            + ", numFails=" + numFails
+            + ", emitted=" + emitted
+            + '}';
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
new file mode 100644
index 0000000..3f16220
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+
+/**
+ * The KafkaTupleListener handles state changes of a kafka tuple inside a KafkaSpout.
+ */
+public interface KafkaTupleListener extends Serializable {
+
+
+    /**
+     * Called during the initialization of the kafka spout.
+     *
+     * @param conf The storm configuration.
+     * @param context The {@link TopologyContext}
+     */
+    void open(Map<String, Object> conf, TopologyContext context);
+
+    /**
+     * Called when the tuple is emitted and auto commit is disabled.
+     * If kafka auto commit is enabled, the kafka consumer will periodically (depending on the commit interval)
+     * commit the offsets. Therefore, storm disables anchoring for tuples when auto commit is enabled and the spout will
+     * not receive acks and fails for those tuples.
+     *
+     * @param tuple the storm tuple.
+     * @param msgId The id of the tuple in the spout.
+     */
+    void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId);
+
+    /**
+     * Called when a tuple is acked.
+     *
+     * @param msgId The id of the tuple in the spout.
+     */
+    void onAck(KafkaSpoutMessageId msgId);
+
+    /**
+     * Called when kafka partitions are rebalanced.
+     *
+     * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
+     *                   assigned to the consumer)
+     */
+    void onPartitionsReassigned(Collection<TopicPartition> partitions);
+
+    /**
+     * Called when the Kafka spout sets a record for retry.
+     *
+     * @param msgId The id of the tuple in the spout.
+     */
+    void onRetry(KafkaSpoutMessageId msgId);
+
+    /**
+     * Called when the maximum number of retries have been reached.
+     *
+     * @param msgId The id of the tuple in the spout.
+     */
+    void onMaxRetryReached(KafkaSpoutMessageId msgId);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
index 61b98a8..05cd361 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
@@ -25,13 +25,13 @@ import java.util.Set;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.TopicPartitionComparator;
 import org.apache.storm.task.TopologyContext;
 
 public class ManualPartitionSubscription extends Subscription {
     private static final long serialVersionUID = 5633018073527583826L;
     private final ManualPartitioner partitioner;
     private final TopicFilter partitionFilter;
-    private transient Set<TopicPartition> currentAssignment = null;
     private transient KafkaConsumer<?, ?> consumer = null;
     private transient ConsumerRebalanceListener listener = null;
     private transient TopologyContext context = null;
@@ -54,12 +54,10 @@ public class ManualPartitionSubscription extends Subscription {
         List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer);
         Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
         Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context));
+        Set<TopicPartition> currentAssignment = consumer.assignment();
         if (!newAssignment.equals(currentAssignment)) {
+            listener.onPartitionsRevoked(currentAssignment);
             consumer.assign(newAssignment);
-            if (currentAssignment != null) {
-                listener.onPartitionsRevoked(currentAssignment);
-            }
-            currentAssignment = newAssignment;
             listener.onPartitionsAssigned(newAssignment);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
index 0abd6c8..f9a6869 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
@@ -15,6 +15,7 @@
  *   See the License for the specific language governing permissions and
  *   limitations under the License.
  */
+
 package org.apache.storm.kafka.spout;
 
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
index 3409184..3bb7152 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
@@ -30,8 +30,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Subscribe to all topics that follow a given list of values
+ * Subscribe to all topics that follow a given list of values.
+ * @deprecated Please use {@link ManualPartitionSubscription} with {@link NamedTopicFilter} instead
  */
+@Deprecated
 public class NamedSubscription extends Subscription {
     private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class);
     private static final long serialVersionUID = 3438543305215813839L;

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
index 9a8de0f..dc9f9e3 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
@@ -26,8 +26,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Subscribe to all topics that match a given pattern
+ * Subscribe to all topics that match a given pattern.
+ * @deprecated Please use {@link ManualPartitionSubscription} with {@link PatternTopicFilter} instead
  */
+@Deprecated
 public class PatternSubscription extends Subscription {
     private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class);
     private static final long serialVersionUID = 3438543305215813839L;

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
index eb76a90..cc37348 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
@@ -16,10 +16,14 @@
 package org.apache.storm.kafka.spout;
 
 import java.io.Serializable;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Deserializer;
 
 /**
  * @param <T> The type this deserializer deserializes to.
+ * @deprecated Avoid using this class. Use {@link KafkaSpoutConfig.Builder#setProp(java.lang.String, java.lang.Object) } with
+ * {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead
  */
-public interface SerializableDeserializer<T> extends Deserializer<T>, Serializable { 
+@Deprecated
+public interface SerializableDeserializer<T> extends Deserializer<T>, Serializable {
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
index eb20f4f..722039d 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
@@ -30,7 +30,10 @@ public abstract class Subscription implements Serializable {
     private static final long serialVersionUID = -216136367240198716L;
 
     /**
-     * Subscribe the KafkaConsumer to the proper topics
+     * Subscribe the KafkaConsumer to the proper topics. Implementations must ensure that a given topic partition is always assigned to the
+     * same spout task. Adding and removing partitions as necessary is fine, but partitions must not move from one task to another. This
+     * constraint is only important for use with the Trident spout.
+     *
      * @param consumer the Consumer to get.
      * @param listener the rebalance listener to include in the subscription
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java
new file mode 100644
index 0000000..b7fd1a6
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadata.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Object representing metadata committed to Kafka.
+ */
+public class CommitMetadata {
+    private final String topologyId;
+    private final int taskId;
+    private final String threadName;
+
+    /** Kafka metadata. */
+    @JsonCreator
+    public CommitMetadata(@JsonProperty("topologyId") String topologyId,
+                          @JsonProperty("taskId") int taskId,
+                          @JsonProperty("threadName") String threadName) {
+
+        this.topologyId = topologyId;
+        this.taskId = taskId;
+        this.threadName = threadName;
+    }
+
+    public String getTopologyId() {
+        return topologyId;
+    }
+
+    public int getTaskId() {
+        return taskId;
+    }
+
+    public String getThreadName() {
+        return threadName;
+    }
+
+    @Override
+    public String toString() {
+        return "CommitMetadata{"
+            + "topologyId='" + topologyId + '\''
+            + ", taskId=" + taskId
+            + ", threadName='" + threadName + '\''
+            + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
new file mode 100644
index 0000000..a63619c
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommitMetadataManager.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Generates and reads commit metadata.
+ */
+public final class CommitMetadataManager {
+
+    private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
+    private static final Logger LOG = LoggerFactory.getLogger(CommitMetadataManager.class);
+    // Metadata information to commit to Kafka. It is unique per spout instance.
+    private final String commitMetadata;
+    private final ProcessingGuarantee processingGuarantee;
+    private final TopologyContext context;
+
+    /**
+     * Create a manager with the given context.
+     */
+    public CommitMetadataManager(TopologyContext context, ProcessingGuarantee processingGuarantee) {
+        this.context = context;
+        try {
+            commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
+                context.getStormId(), context.getThisTaskId(), Thread.currentThread().getName()));
+            this.processingGuarantee = processingGuarantee;
+        } catch (JsonProcessingException e) {
+            LOG.error("Failed to create Kafka commit metadata due to JSON serialization error", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Checks if {@link OffsetAndMetadata} was committed by a {@link KafkaSpout} instance in this topology.
+     *
+     * @param tp The topic partition the commit metadata belongs to.
+     * @param committedOffset {@link OffsetAndMetadata} info committed to Kafka
+     * @param offsetManagers The offset managers.
+     * @return true if this topology committed this {@link OffsetAndMetadata}, false otherwise
+     */
+    public boolean isOffsetCommittedByThisTopology(TopicPartition tp, OffsetAndMetadata committedOffset,
+        Map<TopicPartition, OffsetManager> offsetManagers) {
+        try {
+            if (processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE
+                && offsetManagers.containsKey(tp)
+                && offsetManagers.get(tp).hasCommitted()) {
+                return true;
+            }
+
+            final CommitMetadata committedMetadata = JSON_MAPPER.readValue(committedOffset.metadata(), CommitMetadata.class);
+            return committedMetadata.getTopologyId().equals(context.getStormId());
+        } catch (IOException e) {
+            LOG.warn("Failed to deserialize expected commit metadata [{}]."
+                + " This error is expected to occur once per partition, if the last commit to each partition"
+                + " was by an earlier version of the KafkaSpout, or by a process other than the KafkaSpout. "
+                + "Defaulting to behavior compatible with earlier version", committedOffset);
+            LOG.trace("", e);
+            return false;
+        }
+    }
+
+    public String getCommitMetadata() {
+        return commitMetadata;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/324bc95b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
index 7900388..ec2fbac 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
@@ -22,8 +22,7 @@ public class KafkaConsumerFactoryDefault<K, V> implements KafkaConsumerFactory<K
 
     @Override
     public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
-        return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
-                kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+        return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps());
     }
     
 }


Mime
View raw message