storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/4] storm git commit: STORM-2914: Implement ProcessingGuarantee.NONE in the spout instead of using enable.auto.commit
Date Mon, 05 Feb 2018 02:58:00 GMT
Repository: storm
Updated Branches:
  refs/heads/master 7fbe7a278 -> b5d70e17d


STORM-2914: Implement ProcessingGuarantee.NONE in the spout instead of using enable.auto.commit


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eff32a32
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eff32a32
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eff32a32

Branch: refs/heads/master
Commit: eff32a32bb724c37f4eaca6a363665c55b17813c
Parents: 7fbe7a2
Author: Stig Rohde Døssing <srdo@apache.org>
Authored: Sat Jan 27 19:22:07 2018 +0100
Committer: Stig Rohde Døssing <srdo@apache.org>
Committed: Sun Feb 4 19:41:46 2018 +0100

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      |  47 ++-----
 .../apache/storm/kafka/spout/KafkaSpout.java    |  40 +++---
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 131 ++++++++++---------
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |  13 ++
 .../spout/KafkaSpoutMessagingGuaranteeTest.java |  54 ++++++--
 5 files changed, 160 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/eff32a32/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index 0354a0d..a6f7ac1 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -1,3 +1,8 @@
+---
+title: Storm Kafka Integration (0.10.x+)
+layout: documentation
+documentation: true
+---
 # Storm Apache Kafka integration using the kafka-client jar
 This includes the new Apache Kafka consumer API.
 
@@ -147,18 +152,15 @@ of Java generics.  The deserializers can be specified via the consumer
propertie
 There are a few key configs to pay attention to.
 
 `setFirstPollOffsetStrategy` allows you to set where to start consuming data from.  This
is used both in case of failure recovery and starting the spout
-for the first time. Allowed values include
+for the first time. The allowed values are listed in the [FirstPollOffsetStrategy javadocs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html).
 
- * `EARLIEST` means that the kafka spout polls records starting in the first offset of the
partition, regardless of previous commits
- * `LATEST` means that the kafka spout polls records with offsets greater than the last offset
in the partition, regardless of previous commits
- * `UNCOMMITTED_EARLIEST` (DEFAULT) means that the kafka spout polls records from the last
committed offset, if any. If no offset has been committed, it behaves as `EARLIEST`.
- * `UNCOMMITTED_LATEST` means that the kafka spout polls records from the last committed
offset, if any. If no offset has been committed, it behaves as `LATEST`.
+`setProcessingGuarantee` lets you configure what processing guarantees the spout will provide.
This affects how soon consumed offsets can be committed, and the frequency of commits. See
the [ProcessingGuarantee javadoc](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.ProcessingGuarantee.html)
for details.
 
 `setRecordTranslator` allows you to modify how the spout converts a Kafka Consumer Record
into a Tuple, and which stream that tuple will be published into.
 By default the "topic", "partition", "offset", "key", and "value" will be emitted to the
"default" stream.  If you want to output entries to different
 streams based on the topic, storm provides `ByTopicRecordTranslator`.  See below for more
examples on how to use these.
 
-`setProp` and `setProps` can be used to set KafkaConsumer properties. The list of these properties
can be found in the KafkaConsumer configuration documentation on the [Kafka website](http://kafka.apache.org/documentation.html#consumerconfigs).
+`setProp` and `setProps` can be used to set KafkaConsumer properties. The list of these properties
can be found in the KafkaConsumer configuration documentation on the [Kafka website](http://kafka.apache.org/documentation.html#consumerconfigs).
Note that KafkaConsumer autocommit is unsupported. The KafkaSpoutConfig constructor will throw
an exception if the "enable.auto.commit" property is set, and the consumer used by the spout
will always have that property set to false. You can configure similar behavior to autocommit
through the `setProcessingGuarantee` method on the KafkaSpoutConfig builder.
 
 ### Usage Examples
 
@@ -274,7 +276,7 @@ When selecting a kafka client version, you should ensure -
 
 # Kafka Spout Performance Tuning
 
-The Kafka spout provides two internal parameters to control its performance. The parameters
can be set using the [KafkaSpoutConfig](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java)
methods [setOffsetCommitPeriodMs](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L189-L193)
and [setMaxUncommittedOffsets](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L211-L217).

+The Kafka spout provides two internal parameters to control its performance. The parameters
can be set using the [setOffsetCommitPeriodMs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setOffsetCommitPeriodMs-long-)
and [setMaxUncommittedOffsets](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setMaxUncommittedOffsets-int-)
methods. 
 
 * "offset.commit.period.ms" controls how often the spout commits to Kafka
 * "max.uncommitted.offsets" controls how many offsets can be pending commit before another
poll can take place
@@ -284,7 +286,7 @@ The [Kafka consumer config] (http://kafka.apache.org/documentation.html#consumer
 
 * “fetch.min.bytes”
 * “fetch.max.wait.ms”
-* [Kafka Consumer](http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
instance poll timeout, which is specified for each Kafka spout using the [KafkaSpoutConfig](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java)
method [setPollTimeoutMs](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L180-L184)
+* [Kafka Consumer](http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
instance poll timeout, which is specified for each Kafka spout using the [setPollTimeoutMs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setPollTimeoutMs-long-)
method.
 <br/>
 
 Depending on the structure of your Kafka cluster, distribution of the data, and availability
of data to poll, these parameters will have to be configured appropriately. Please refer to
the Kafka documentation on Kafka parameter tuning.
@@ -298,35 +300,6 @@ Currently the Kafka spout has has the following default values, which
have been
 * max.uncommitted.offsets = 10000000
 <br/>
 
-# Processing Guarantees
-
-The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when an offset is committed
to Kafka. This is
-conceptually equivalent to marking the tuple with the `ConsumerRecord` for that offset as
being successfully processed
-because the tuple won't get re-emitted in case of failure or time out.
-
-For the AT_LEAST_ONCE and AT_MOST_ONCE processing guarantees the spout controls when the
commit happens.
-When the guarantee is NONE Kafka controls when the commit happens.
-
-* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been
processed (at-least-once)
-     and acked. If a tuple fails or times out it will be re-emitted. A tuple can be processed
more than once if for instance
-     the ack gets lost.
-
-* AT_MOST_ONCE - Offsets will be committed to Kafka right after being polled but before being
emitted to the downstream
-     components of the topology. Offsets are processed at most once because tuples that fail
or timeout won't be retried.
-
-* NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka
properties
-     "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control
when the commit happens
-     it cannot give any message processing guarantees, i.e. a message may be processed 0,
1 or more times.
-     This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception
will be thrown.
-
-To set the processing guarantee use the `KafkaSpoutConfig.Builder.setProcessingGuarantee`
method as follows:
-
-```java
-KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
-  .builder(String bootstrapServers, String ... topics)
-  .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
-```
-
 # Tuple Tracking
 
 By default the spout only tracks emitted tuples when the processing guarantee is AT_LEAST_ONCE.
It may be necessary to track

http://git-wip-us.apache.org/repos/asf/storm/blob/eff32a32/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index c52309e..84e7851 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -39,7 +39,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang.Validate;
@@ -52,6 +51,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
 import org.apache.storm.kafka.spout.internal.CommitMetadata;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
@@ -89,7 +89,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient KafkaSpoutRetryService retryService;
     // Handles tuple events (emit, ack etc.)
     private transient KafkaTupleListener tupleListener;
-    // timer == null if processing guarantee is none or at-most-once
+    // timer == null only if the processing guarantee is at-most-once
     private transient Timer commitTimer;
     // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
@@ -133,8 +133,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         tupleListener = kafkaSpoutConfig.getTupleListener();
 
-        if (isAtLeastOnceProcessing()) {
-            // Only used if the spout should commit an offset to Kafka only after the corresponding
tuple has been acked.
+        if (kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
{
+            // In at-most-once mode the offsets are committed after every poll, and not periodically
as controlled by the timer
             commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(),
TimeUnit.MILLISECONDS);
         }
         refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(),
TimeUnit.MILLISECONDS);
@@ -307,8 +307,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 kafkaSpoutConfig.getSubscription().refreshAssignment();
             }
 
-            if (shouldCommit()) {
-                commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+            if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
+                if (isAtLeastOnceProcessing()) {
+                    commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+                } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE)
{
+                    commitFetchedOffsetsAsync(kafkaConsumer.assignment());
+                }
             }
 
             PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
@@ -332,10 +336,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted"));
     }
 
-    private boolean shouldCommit() {
-        return isAtLeastOnceProcessing() && commitTimer.isExpiredResetOnTrue(); 
  // timer != null for non auto commit mode
-    }
-
     private PollablePartitionsInfo getPollablePartitionsInfo() {
         if (isWaitingToEmit()) {
             LOG.debug("Not polling. Tuples waiting to be emitted.");
@@ -519,6 +519,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
     }
 
+    private void commitFetchedOffsetsAsync(Set<TopicPartition> assignedPartitions)
{
+        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
+        for (TopicPartition tp : assignedPartitions) {
+            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp)));
+        }
+        kafkaConsumer.commitAsync(offsetsToCommit, null);
+        LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
+    }
+    
     private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions)
{
         // Find offsets that are ready to be committed for every assigned topic partition
         final Map<TopicPartition, OffsetManager> assignedOffsetManagers = offsetManagers.entrySet().stream()
@@ -546,11 +555,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 long committedOffset = tpOffset.getValue().offset();
                 if (position < committedOffset) {
                     /*
-                     * The position is behind the committed offset. This can happen in some
cases, e.g. if a message failed,
-                     * lots of (more than max.poll.records) later messages were acked, and
the failed message then gets acked. 
-                     * The consumer may only be part way through "catching up" to where it
was when it went back to retry the failed tuple. 
-                     * Skip the consumer forward to the committed offset and drop the current
waiting to emit list,
-                     * since it'll likely contain committed offsets.
+                     * The position is behind the committed offset. This can happen in some
cases, e.g. if a message failed, lots of (more
+                     * than max.poll.records) later messages were acked, and the failed message
then gets acked. The consumer may only be
+                     * part way through "catching up" to where it was when it went back to
retry the failed tuple. Skip the consumer forward
+                     * to the committed offset and drop the current waiting to emit list,
since it'll likely contain committed offsets.
                      */
                     LOG.debug("Consumer fell behind committed offset. Catching up. Position
was [{}], skipping to [{}]",
                         position, committedOffset);
@@ -732,6 +740,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @VisibleForTesting
     KafkaOffsetMetric getKafkaOffsetMetric() {
-        return  kafkaOffsetMetric;
+        return kafkaOffsetMetric;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/eff32a32/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index a063790..c2305cb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.storm.Config;
+import org.apache.storm.annotation.InterfaceStability;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 import org.apache.storm.kafka.spout.subscription.ManualPartitionSubscription;
 import org.apache.storm.kafka.spout.subscription.NamedTopicFilter;
@@ -96,7 +97,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
      * @param builder The Builder to construct the KafkaSpoutConfig from
      */
     public KafkaSpoutConfig(Builder<K, V> builder) {
-        setAutoCommitMode(builder);
+        setKafkaPropsForProcessingGuarantee(builder);
         this.kafkaProps = builder.kafkaProps;
         this.subscription = builder.subscription;
         this.translator = builder.translator;
@@ -115,23 +116,26 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     /**
      * Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to
Kafka upon topology deployment.
-     * By default this parameter is set to UNCOMMITTED_EARLIEST. If the strategy is set to:
-     * <br/>
-     * <ul>
-     * <li>EARLIEST - the kafka spout polls records starting in the first offset of
the partition, regardless
-     * of previous commits. This setting only takes effect on topology deployment.</li>
-     * <li>LATEST - the kafka spout polls records with offsets greater than the last
offset in the partition,
-     * regardless of previous commits. This setting only takes effect on topology deployment.</li>
-     * <li>UNCOMMITTED_EARLIEST - the kafka spout polls records from the last committed
offset, if any. If no offset has been
-     * committed it behaves as EARLIEST.</li>
-     * <li>UNCOMMITTED_LATEST - the kafka spout polls records from the last committed
offset, if any. If no offset has been
-     * committed it behaves as LATEST.</li>
-     * </ul>
+     * By default this parameter is set to UNCOMMITTED_EARLIEST. 
      */
     public enum FirstPollOffsetStrategy {
+        /**
+         * The kafka spout polls records starting in the first offset of the partition, regardless
of previous commits. This setting only
+         * takes effect on topology deployment
+         */
         EARLIEST,
+        /**
+         * The kafka spout polls records with offsets greater than the last offset in the
partition, regardless of previous commits. This
+         * setting only takes effect on topology deployment
+         */
         LATEST,
+        /**
+         * The kafka spout polls records from the last committed offset, if any. If no offset
has been committed it behaves as EARLIEST
+         */
         UNCOMMITTED_EARLIEST,
+        /**
+         * The kafka spout polls records from the last committed offset, if any. If no offset
has been committed it behaves as LATEST
+         */
         UNCOMMITTED_LATEST;
 
         @Override
@@ -142,28 +146,30 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     /**
      * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is
marked as processed,
-     * i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE the
spout controls when
-     * the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
-     *
-     * <ul>
-     * <li>AT_LEAST_ONCE - an offset is ready to commit only after the corresponding
tuple has been processed (at-least-once)
-     * and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed
more than once if for instance
-     * the ack gets lost.</li>
-     * <br/>
-     * <li>AT_MOST_ONCE - every offset will be committed to Kafka right after being
polled but before being emitted
-     * to the downstream components of the topology. It guarantees that the offset is processed
at-most-once because it
-     * won't retry tuples that fail or timeout after the commit to Kafka has been done.</li>
-     * <br/>
-     * <li>NONE - the polled offsets are committed to Kafka periodically as controlled
by the Kafka properties
-     * "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control
when the commit happens
-     * it cannot give any message processing guarantees, i.e. a message may be processed
0, 1 or more times.
-     * This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception
will be thrown.</li>
-     * </ul>
+     * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
+     * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs()
}, if the mode commits on an interval.
+     * NO_GUARANTEE may be removed in a later release without warning, we're still evaluating
whether it makes sense to keep.
      */
+    @InterfaceStability.Unstable
     public enum ProcessingGuarantee {
+        /**
+         * An offset is ready to commit only after the corresponding tuple has been processed
and acked (at least once). If a tuple fails or
+         * times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}.
Commits synchronously on the defined
+         * interval.
+         */
         AT_LEAST_ONCE,
+        /**
+         * Every offset will be synchronously committed to Kafka right after being polled
but before being emitted to the downstream
+         * components of the topology. The commit interval is ignored. This mode guarantees
that the offset is processed at most once by
+         * ensuring the spout won't retry tuples that fail or time out after the commit to
Kafka has been done
+         */
         AT_MOST_ONCE,
-        NONE,
+        /**
+         * The polled offsets are ready to commit immediately after being polled. The offsets
are committed periodically, i.e. a message may
+         * be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true
in the consumer, but allows the
+         * spout to control when commits occur. Commits asynchronously on the defined interval.
+         */
+        NO_GUARANTEE,
     }
 
     public static class Builder<K, V> {
@@ -213,7 +219,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
 
         /**
-         * Set a {@link KafkaConsumer} property.
+         * Set a {@link KafkaConsumer} property. 
          */
         public Builder<K, V> setProp(String key, Object value) {
             kafkaProps.put(key, value);
@@ -221,7 +227,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
 
         /**
-         * Set multiple {@link KafkaConsumer} properties.
+         * Set multiple {@link KafkaConsumer} properties. 
          */
         public Builder<K, V> setProp(Map<String, Object> props) {
             kafkaProps.putAll(props);
@@ -229,7 +235,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
 
         /**
-         * Set multiple {@link KafkaConsumer} properties.
+         * Set multiple {@link KafkaConsumer} properties. 
          */
         public Builder<K, V> setProp(Properties props) {
             props.forEach((key, value) -> {
@@ -256,7 +262,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         /**
          * Specifies the period, in milliseconds, the offset commit task is periodically
called. Default is 15s.
          *
-         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee}
is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee}
is {@link ProcessingGuarantee#AT_LEAST_ONCE} or
+         * {@link ProcessingGuarantee#NO_GUARANTEE}.
          *
          * @param offsetCommitPeriodMs time in ms
          */
@@ -453,37 +460,37 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return builder;
     }
 
-    private static void setAutoCommitMode(Builder<?, ?> builder) {
+    private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder)
{
         if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-            throw new IllegalArgumentException("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+ " manually."
-                + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee");
+            throw new IllegalStateException("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+                + " setting is not supported. You can configure similar behavior through
KafkaSpoutConfig.Builder.setProcessingGuarantee");
         }
-        if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-        } else {
-            String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-            if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
-                if (autoOffsetResetPolicy == null) {
-                    /*
-                    If the user wants to explicitly set an auto offset reset policy, we should
respect it, but when the spout is configured
-                    for at-least-once processing we should default to seeking to the earliest
offset in case there's an offset out of range
-                    error, rather than seeking to the latest (Kafka's default). This type
of error will typically happen when the consumer 
-                    requests an offset that was deleted.
-                     */
-                    builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-                } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none"))
{
-                    LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy
other than 'earliest' or 'none'."
-                        + " Some messages may be skipped.");
-                }
-            } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
-                if (autoOffsetResetPolicy != null
-                    && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none")))
{
-                    LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy
other than 'latest' or 'none'."
-                        + " Some messages may be processed more than once.");
-                }
+        String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+        if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
+            if (autoOffsetResetPolicy == null) {
+                /*
+                 * If the user wants to explicitly set an auto offset reset policy, we should
respect it, but when the spout is configured
+                 * for at-least-once processing we should default to seeking to the earliest
offset in case there's an offset out of range
+                 * error, rather than seeking to the latest (Kafka's default). This type
of error will typically happen when the consumer
+                 * requests an offset that was deleted.
+                 */
+                LOG.info("Setting consumer property '{}' to 'earliest' to ensure at-least-once
processing",
+                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+                builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+            } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none"))
{
+                LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy
other than 'earliest' or 'none'."
+                    + " Some messages may be skipped.");
+            }
+        } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
+            if (autoOffsetResetPolicy != null
+                && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none")))
{
+                LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy
other than 'latest' or 'none'."
+                    + " Some messages may be processed more than once.");
             }
-            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
         }
+        LOG.info("Setting consumer property '{}' to 'false', because the spout does not support
auto-commit",
+            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+        builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/eff32a32/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index 9885cf6..17e0700 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -28,10 +28,15 @@ import java.util.HashMap;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 public class KafkaSpoutConfigTest {
 
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    
     @Test
     public void testBasic() {
         KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234",
"topic").build();
@@ -85,4 +90,12 @@ public class KafkaSpoutConfigTest {
 
         assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
     }
+    
+    @Test
+    public void testThrowsIfEnableAutoCommitIsSet() {
+        expectedException.expect(IllegalStateException.class);
+        KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
+            .build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/eff32a32/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index 353ee36..12391c8 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -17,6 +17,7 @@
 package org.apache.storm.kafka.spout;
 
 import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
@@ -24,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -37,6 +39,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.subscription.Subscription;
@@ -46,11 +49,18 @@ import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.InOrder;
+import org.mockito.runners.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class KafkaSpoutMessagingGuaranteeTest {
 
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+    
     private final TopologyContext contextMock = mock(TopologyContext.class);
     private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
     private final Map<String, Object> conf = new HashMap<>();
@@ -109,10 +119,10 @@ public class KafkaSpoutMessagingGuaranteeTest {
     }
 
     @Test
-    public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws Exception {
+    public void testNoGuaranteeModeDisregardsMaxUncommittedOffsets() throws Exception {
         //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful
in at-least-once mode
         KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class),
-1)
-            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .build();
         doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
     }
@@ -153,10 +163,10 @@ public class KafkaSpoutMessagingGuaranteeTest {
     }
 
     @Test
-    public void testAnyTimesModeCannotReplayTuples() throws Exception {
-        //When tuple tracking is enabled, the spout must not replay tuples in any-times mode
+    public void testNoGuaranteeModeCannotReplayTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not replay tuples in no guarantee
mode
         KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class),
-1)
-            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .setTupleTrackingEnforced(true)
             .build();
         doTestModeCannotReplayTuples(spoutConfig);
@@ -177,7 +187,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
 
             spout.ack(msgIdCaptor.getValue());
             
-            Time.advanceTime(spoutConfig.getOffsetsCommitPeriodMs());
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
             
             spout.nextTuple();
             
@@ -196,13 +206,37 @@ public class KafkaSpoutMessagingGuaranteeTest {
     }
     
     @Test
-    public void testAnyTimesModeDoesNotCommitAckedTuples() throws Exception {
-        //When tuple tracking is enabled, the spout must not commit acked tuples in any-times
mode because committing is managed by the consumer
+    public void testNoGuaranteeModeCommitsPolledTuples() throws Exception {
+        //When using the no guarantee mode, the spout must commit tuples periodically, regardless
of whether they've been acked
         KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class),
-1)
-            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .setTupleTrackingEnforced(true)
             .build();
-        doTestModeDoesNotCommitAckedTuples(spoutConfig);
+        
+        try (SimulatedTime time = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig,
conf, contextMock, collectorMock, consumerMock,partition);
+
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
+
+            spout.nextTuple();
+            
+            when(consumerMock.position(partition)).thenReturn(1L);
+
+            ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(),
msgIdCaptor.capture());
+            assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
+            
+            spout.nextTuple();
+            
+            verify(consumerMock).commitAsync(commitCapture.capture(), isNull());
+            
+            Map<TopicPartition, OffsetAndMetadata> commit = commitCapture.getValue();
+            assertThat(commit.containsKey(partition), is(true));
+            assertThat(commit.get(partition).offset(), is(1L));
+        }
     }
 
 }


Mime
View raw message