storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [1/3] storm git commit: STORM-2914: Implement ProcessingGuarantee.NONE in the spout instead of using enable.auto.commit
Date Mon, 05 Feb 2018 21:58:08 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 80cc88112 -> 74ca79596


STORM-2914: Implement ProcessingGuarantee.NONE in the spout instead of using enable.auto.commit


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b56c30db
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b56c30db
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b56c30db

Branch: refs/heads/1.x-branch
Commit: b56c30db45337242b194f1b0217d168d6ee2b733
Parents: 80cc881
Author: Stig Rohde Døssing <srdo@apache.org>
Authored: Sat Jan 27 19:22:07 2018 +0100
Committer: Stig Rohde Døssing <srdo@apache.org>
Committed: Mon Feb 5 22:55:51 2018 +0100

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      |  42 +-----
 .../apache/storm/kafka/spout/KafkaSpout.java    |  57 ++++----
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 132 ++++++++++---------
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |  17 ++-
 .../spout/KafkaSpoutMessagingGuaranteeTest.java |  55 ++++++--
 .../storm/annotation/InterfaceStability.java    |  54 ++++++++
 6 files changed, 221 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b56c30db/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index 0a12910..2992dd6 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -152,18 +152,15 @@ of Java generics.  The deserializers can be specified via the consumer
propertie
 There are a few key configs to pay attention to.
 
 `setFirstPollOffsetStrategy` allows you to set where to start consuming data from.  This
is used both in case of failure recovery and starting the spout
-for the first time. Allowed values include
+for the first time. The allowed values are listed in the [FirstPollOffsetStrategy javadocs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html).
 
- * `EARLIEST` means that the kafka spout polls records starting in the first offset of the
partition, regardless of previous commits
- * `LATEST` means that the kafka spout polls records with offsets greater than the last offset
in the partition, regardless of previous commits
- * `UNCOMMITTED_EARLIEST` (DEFAULT) means that the kafka spout polls records from the last
committed offset, if any. If no offset has been committed, it behaves as `EARLIEST`.
- * `UNCOMMITTED_LATEST` means that the kafka spout polls records from the last committed
offset, if any. If no offset has been committed, it behaves as `LATEST`.
+`setProcessingGuarantee` lets you configure what processing guarantees the spout will provide.
This affects how soon consumed offsets can be committed, and the frequency of commits. See
the [ProcessingGuarantee javadoc](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.ProcessingGuarantee.html)
for details.
 
 `setRecordTranslator` allows you to modify how the spout converts a Kafka Consumer Record
into a Tuple, and which stream that tuple will be published into.
 By default the "topic", "partition", "offset", "key", and "value" will be emitted to the
"default" stream.  If you want to output entries to different
 streams based on the topic, storm provides `ByTopicRecordTranslator`.  See below for more
examples on how to use these.
 
-`setProp` and `setProps` can be used to set KafkaConsumer properties. The list of these properties
can be found in the KafkaConsumer configuration documentation on the [Kafka website](http://kafka.apache.org/documentation.html#consumerconfigs).
+`setProp` and `setProps` can be used to set KafkaConsumer properties. The list of these properties
can be found in the KafkaConsumer configuration documentation on the [Kafka website](http://kafka.apache.org/documentation.html#consumerconfigs).
Note that KafkaConsumer autocommit is unsupported. The KafkaSpoutConfig constructor will throw
an exception if the "enable.auto.commit" property is set, and the consumer used by the spout
will always have that property set to false. You can configure similar behavior to autocommit
through the `setProcessingGuarantee` method on the KafkaSpoutConfig builder.
 
 ### Usage Examples
 
@@ -324,7 +321,7 @@ When selecting a kafka client version, you should ensure -
 
 # Kafka Spout Performance Tuning
 
-The Kafka spout provides two internal parameters to control its performance. The parameters
can be set using the [KafkaSpoutConfig](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java)
methods [setOffsetCommitPeriodMs](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L189-L193)
and [setMaxUncommittedOffsets](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L211-L217).

+The Kafka spout provides two internal parameters to control its performance. The parameters
can be set using the [setOffsetCommitPeriodMs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setOffsetCommitPeriodMs-long-)
and [setMaxUncommittedOffsets](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setMaxUncommittedOffsets-int-)
methods. 
 
 * "offset.commit.period.ms" controls how often the spout commits to Kafka
 * "max.uncommitted.offsets" controls how many offsets can be pending commit before another
poll can take place
@@ -334,7 +331,7 @@ The [Kafka consumer config] (http://kafka.apache.org/documentation.html#consumer
 
 * “fetch.min.bytes”
 * “fetch.max.wait.ms”
-* [Kafka Consumer](http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
instance poll timeout, which is specified for each Kafka spout using the [KafkaSpoutConfig](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java)
method [setPollTimeoutMs](https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L180-L184)
+* [Kafka Consumer](http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html)
instance poll timeout, which is specified for each Kafka spout using the [setPollTimeoutMs](javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setPollTimeoutMs-long-)
method.
 <br/>
 
 Depending on the structure of your Kafka cluster, distribution of the data, and availability
of data to poll, these parameters will have to be configured appropriately. Please refer to
the Kafka documentation on Kafka parameter tuning.
@@ -348,35 +345,6 @@ Currently the Kafka spout has has the following default values, which
have been
 * max.uncommitted.offsets = 10000000
 <br/>
 
-# Processing Guarantees
-
-The `KafkaSpoutConfig.ProcessingGuarantee` enum parameter controls when an offset is committed
to Kafka. This is
-conceptually equivalent to marking the tuple with the `ConsumerRecord` for that offset as
being successfully processed
-because the tuple won't get re-emitted in case of failure or time out.
-
-For the AT_LEAST_ONCE and AT_MOST_ONCE processing guarantees the spout controls when the
commit happens.
-When the guarantee is NONE Kafka controls when the commit happens.
-
-* AT_LEAST_ONCE - an offset is ready to commit only after the corresponding tuple has been
processed (at-least-once)
-     and acked. If a tuple fails or times out it will be re-emitted. A tuple can be processed
more than once if for instance
-     the ack gets lost.
-
-* AT_MOST_ONCE - Offsets will be committed to Kafka right after being polled but before being
emitted to the downstream
-     components of the topology. Offsets are processed at most once because tuples that fail
or timeout won't be retried.
-
-* NONE - the polled offsets are committed to Kafka periodically as controlled by the Kafka
properties
-     "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control
when the commit happens
-     it cannot give any message processing guarantees, i.e. a message may be processed 0,
1 or more times.
-     This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception
will be thrown.
-
-To set the processing guarantee use the `KafkaSpoutConfig.Builder.setProcessingGuarantee`
method as follows:
-
-```java
-KafkaSpoutConfig<String, String> kafkaConf = KafkaSpoutConfig
-  .builder(String bootstrapServers, String ... topics)
-  .setProcessingGuarantee(ProcessingGuarantee.AT_MOST_ONCE)
-```
-
 # Tuple Tracking
 
 By default the spout only tracks emitted tuples when the processing guarantee is AT_LEAST_ONCE.
It may be necessary to track

http://git-wip-us.apache.org/repos/asf/storm/blob/b56c30db/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 3618820..27940d0 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -54,6 +54,7 @@ import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.RetriableException;
 
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
 import org.apache.storm.kafka.spout.internal.CommitMetadata;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
@@ -91,7 +92,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient KafkaSpoutRetryService retryService;
     // Handles tuple events (emit, ack etc.)
     private transient KafkaTupleListener tupleListener;
-    // timer == null if processing guarantee is none or at-most-once
+    // timer == null only if the processing guarantee is at-most-once
     private transient Timer commitTimer;
     // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
@@ -135,8 +136,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         tupleListener = kafkaSpoutConfig.getTupleListener();
 
-        if (isAtLeastOnceProcessing()) {
-            // Only used if the spout should commit an offset to Kafka only after the corresponding
tuple has been acked.
+        if (kafkaSpoutConfig.getProcessingGuarantee() != KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
{
+            // In at-most-once mode the offsets are committed after every poll, and not periodically
as controlled by the timer
             commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(),
TimeUnit.MILLISECONDS);
         }
         refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(),
TimeUnit.MILLISECONDS);
@@ -321,24 +322,28 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void nextTuple() {
         try {
-                if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
-                    kafkaSpoutConfig.getSubscription().refreshAssignment();
-                }
+            if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
+                kafkaSpoutConfig.getSubscription().refreshAssignment();
+            }
 
-                if (shouldCommit()) {
+            if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
+                if (isAtLeastOnceProcessing()) {
                 commitOffsetsForAckedTuples(kafkaConsumer.assignment());
+                } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE)
{
+                    commitFetchedOffsetsAsync(kafkaConsumer.assignment());
                 }
+            }
 
-                PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
-                if (pollablePartitionsInfo.shouldPoll()) {
-                    try {
-                        setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
-                    } catch (RetriableException e) {
-                        LOG.error("Failed to poll from kafka.", e);
-                    }
+            PollablePartitionsInfo pollablePartitionsInfo = getPollablePartitionsInfo();
+            if (pollablePartitionsInfo.shouldPoll()) {
+                try {
+                    setWaitingToEmit(pollKafkaBroker(pollablePartitionsInfo));
+                } catch (RetriableException e) {
+                    LOG.error("Failed to poll from kafka.", e);
                 }
+            }
 
-                emitIfWaitingNotEmitted();
+            emitIfWaitingNotEmitted();
         } catch (InterruptException e) {
             throwKafkaConsumerInterruptedException();
         }
@@ -350,10 +355,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         throw new RuntimeException(new InterruptedException("Kafka consumer was interrupted"));
     }
 
-    private boolean shouldCommit() {
-        return isAtLeastOnceProcessing() && commitTimer.isExpiredResetOnTrue(); 
  // timer != null for non auto commit mode
-    }
-
     private PollablePartitionsInfo getPollablePartitionsInfo() {
         if (isWaitingToEmit()) {
             LOG.debug("Not polling. Tuples waiting to be emitted.");
@@ -546,6 +547,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return tuple != null || kafkaSpoutConfig.isEmitNullTuples();
     }
 
+    private void commitFetchedOffsetsAsync(Set<TopicPartition> assignedPartitions)
{
+        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
+        for (TopicPartition tp : assignedPartitions) {
+            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp)));
+        }
+        kafkaConsumer.commitAsync(offsetsToCommit, null);
+        LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
+    }
+    
     private void commitOffsetsForAckedTuples(Set<TopicPartition> assignedPartitions)
{
         // Find offsets that are ready to be committed for every assigned topic partition
         final Map<TopicPartition, OffsetManager> assignedOffsetManagers = new HashMap<>();
@@ -576,11 +586,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 long committedOffset = tpOffset.getValue().offset();
                 if (position < committedOffset) {
                     /*
-                     * The position is behind the committed offset. This can happen in some
cases, e.g. if a message failed,
-                     * lots of (more than max.poll.records) later messages were acked, and
the failed message then gets acked. 
-                     * The consumer may only be part way through "catching up" to where it
was when it went back to retry the failed tuple. 
-                     * Skip the consumer forward to the committed offset and drop the current
waiting to emit list,
-                     * since it'll likely contain committed offsets.
+                     * The position is behind the committed offset. This can happen in some
cases, e.g. if a message failed, lots of (more
+                     * than max.poll.records) later messages were acked, and the failed message
then gets acked. The consumer may only be
+                     * part way through "catching up" to where it was when it went back to
retry the failed tuple. Skip the consumer forward
+                     * to the committed offset and drop the current waiting to emit list,
since it'll likely contain committed offsets.
                      */
                     LOG.debug("Consumer fell behind committed offset. Catching up. Position
was [{}], skipping to [{}]",
                         position, committedOffset);

http://git-wip-us.apache.org/repos/asf/storm/blob/b56c30db/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 094ae03..cbe52a7 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.storm.Config;
+import org.apache.storm.annotation.InterfaceStability;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 import org.apache.storm.tuple.Fields;
 import org.slf4j.Logger;
@@ -97,7 +98,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
      * @param builder The Builder to construct the KafkaSpoutConfig from
      */
     public KafkaSpoutConfig(Builder<K, V> builder) {
-        setAutoCommitMode(builder);
+        setKafkaPropsForProcessingGuarantee(builder);
         this.kafkaProps = builder.kafkaProps;
         this.subscription = builder.subscription;
         this.translator = builder.translator;
@@ -120,23 +121,26 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     /**
      * Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to
Kafka upon topology deployment.
-     * By default this parameter is set to UNCOMMITTED_EARLIEST. If the strategy is set to:
-     * <br/>
-     * <ul>
-     * <li>EARLIEST - the kafka spout polls records starting in the first offset of
the partition, regardless
-     * of previous commits. This setting only takes effect on topology deployment.</li>
-     * <li>LATEST - the kafka spout polls records with offsets greater than the last
offset in the partition,
-     * regardless of previous commits. This setting only takes effect on topology deployment.</li>
-     * <li>UNCOMMITTED_EARLIEST - the kafka spout polls records from the last committed
offset, if any. If no offset has been
-     * committed it behaves as EARLIEST.</li>
-     * <li>UNCOMMITTED_LATEST - the kafka spout polls records from the last committed
offset, if any. If no offset has been
-     * committed it behaves as LATEST.</li>
-     * </ul>
+     * By default this parameter is set to UNCOMMITTED_EARLIEST. 
      */
     public enum FirstPollOffsetStrategy {
+        /**
+         * The kafka spout polls records starting in the first offset of the partition, regardless
of previous commits. This setting only
+         * takes effect on topology deployment
+         */
         EARLIEST,
+        /**
+         * The kafka spout polls records with offsets greater than the last offset in the
partition, regardless of previous commits. This
+         * setting only takes effect on topology deployment
+         */
         LATEST,
+        /**
+         * The kafka spout polls records from the last committed offset, if any. If no offset
has been committed it behaves as EARLIEST
+         */
         UNCOMMITTED_EARLIEST,
+        /**
+         * The kafka spout polls records from the last committed offset, if any. If no offset
has been committed it behaves as LATEST
+         */
         UNCOMMITTED_LATEST;
 
         @Override
@@ -147,28 +151,30 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     /**
      * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is
marked as processed,
-     * i.e. when the offset is committed to Kafka. For AT_LEAST_ONCE and AT_MOST_ONCE the
spout controls when
-     * the commit happens. When the guarantee is NONE Kafka controls when the commit happens.
-     *
-     * <ul>
-     * <li>AT_LEAST_ONCE - an offset is ready to commit only after the corresponding
tuple has been processed (at-least-once)
-     * and acked. If a tuple fails or times-out it will be re-emitted. A tuple can be processed
more than once if for instance
-     * the ack gets lost.</li>
-     * <br/>
-     * <li>AT_MOST_ONCE - every offset will be committed to Kafka right after being
polled but before being emitted
-     * to the downstream components of the topology. It guarantees that the offset is processed
at-most-once because it
-     * won't retry tuples that fail or timeout after the commit to Kafka has been done.</li>
-     * <br/>
-     * <li>NONE - the polled offsets are committed to Kafka periodically as controlled
by the Kafka properties
-     * "enable.auto.commit" and "auto.commit.interval.ms". Because the spout does not control
when the commit happens
-     * it cannot give any message processing guarantees, i.e. a message may be processed
0, 1 or more times.
-     * This option requires "enable.auto.commit=true". If "enable.auto.commit=false" an exception
will be thrown.</li>
-     * </ul>
+     * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
+     * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs()
}, if the mode commits on an interval.
+     * NO_GUARANTEE may be removed in a later release without warning, we're still evaluating
whether it makes sense to keep.
      */
+    @InterfaceStability.Unstable
     public enum ProcessingGuarantee {
+        /**
+         * An offset is ready to commit only after the corresponding tuple has been processed
and acked (at least once). If a tuple fails or
+         * times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}.
Commits synchronously on the defined
+         * interval.
+         */
         AT_LEAST_ONCE,
+        /**
+         * Every offset will be synchronously committed to Kafka right after being polled
but before being emitted to the downstream
+         * components of the topology. The commit interval is ignored. This mode guarantees
that the offset is processed at most once by
+         * ensuring the spout won't retry tuples that fail or time out after the commit to
Kafka has been done
+         */
         AT_MOST_ONCE,
-        NONE,
+        /**
+         * The polled offsets are ready to commit immediately after being polled. The offsets
are committed periodically, i.e. a message may
+         * be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true
in the consumer, but allows the
+         * spout to control when commits occur. Commits asynchronously on the defined interval.
+         */
+        NO_GUARANTEE,
     }
 
     public static class Builder<K, V> {
@@ -532,7 +538,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         /**
          * Specifies the period, in milliseconds, the offset commit task is periodically
called. Default is 15s.
          *
-         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee}
is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee}
is {@link ProcessingGuarantee#AT_LEAST_ONCE} or
+         * {@link ProcessingGuarantee#NO_GUARANTEE}.
          *
          * @param offsetCommitPeriodMs time in ms
          */
@@ -729,46 +736,45 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return builder;
     }
 
-    private static void setAutoCommitMode(Builder<?, ?> builder) {
+    private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder)
{
         if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-            LOG.warn("Do not set " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " manually."
-                + " Instead use KafkaSpoutConfig.Builder.setProcessingGuarantee."
-                + " This will be treated as an error in the next major release."
-                + " For now the spout will be configured to behave like it would have in
pre-1.2.0 releases.");
+            LOG.warn("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+                + " setting is not supported. You can configure similar behavior through
KafkaSpoutConfig.Builder.setProcessingGuarantee."
+                + "This will be treated as an error in the next major release.");
 
             final boolean enableAutoCommit = Boolean.parseBoolean(builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).toString());
-            if(enableAutoCommit) {
-                builder.processingGuarantee = ProcessingGuarantee.NONE;
+            if (enableAutoCommit) {
+                builder.processingGuarantee = ProcessingGuarantee.NO_GUARANTEE;
             } else {
                 builder.processingGuarantee = ProcessingGuarantee.AT_LEAST_ONCE;
             }
         }
-        if (builder.processingGuarantee == ProcessingGuarantee.NONE) {
-            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
-        } else {
-            String autoOffsetResetPolicy = (String)builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-            if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
-                if (autoOffsetResetPolicy == null) {
-                    /*
-                    If the user wants to explicitly set an auto offset reset policy, we should
respect it, but when the spout is configured
-                    for at-least-once processing we should default to seeking to the earliest
offset in case there's an offset out of range
-                    error, rather than seeking to the latest (Kafka's default). This type
of error will typically happen when the consumer 
-                    requests an offset that was deleted.
-                     */
-                    builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-                } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none"))
{
-                    LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy
other than 'earliest' or 'none'."
-                        + " Some messages may be skipped.");
-                }
-            } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
-                if (autoOffsetResetPolicy != null
-                    && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none")))
{
-                    LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy
other than 'latest' or 'none'."
-                        + " Some messages may be processed more than once.");
-                }
+        String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+        if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
+            if (autoOffsetResetPolicy == null) {
+                /*
+                 * If the user wants to explicitly set an auto offset reset policy, we should
respect it, but when the spout is configured
+                 * for at-least-once processing we should default to seeking to the earliest
offset in case there's an offset out of range
+                 * error, rather than seeking to the latest (Kafka's default). This type
of error will typically happen when the consumer
+                 * requests an offset that was deleted.
+                 */
+                LOG.info("Setting consumer property '{}' to 'earliest' to ensure at-least-once
processing",
+                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+                builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+            } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none"))
{
+                LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy
other than 'earliest' or 'none'."
+                    + " Some messages may be skipped.");
+            }
+        } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
+            if (autoOffsetResetPolicy != null
+                && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none")))
{
+                LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy
other than 'latest' or 'none'."
+                    + " Some messages may be processed more than once.");
             }
-            builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
         }
+        LOG.info("Setting consumer property '{}' to 'false', because the spout does not support
auto-commit",
+            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+        builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/b56c30db/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index b701be9..91a7be6 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -32,10 +32,15 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.hamcrest.CoreMatchers;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 public class KafkaSpoutConfigTest {
 
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+    
     @Test
     public void testBasic() {
         KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234",
"topic").build();
@@ -94,7 +99,7 @@ public class KafkaSpoutConfigTest {
             .build();
         
         assertThat("When setting enable auto commit to true explicitly the spout should use
the 'none' processing guarantee",
-            conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NONE));
+            conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE));
     }
     
     @Test
@@ -114,7 +119,7 @@ public class KafkaSpoutConfigTest {
             .build();
         
         assertThat("When setting enable auto commit to true explicitly the spout should use
the 'none' processing guarantee",
-            conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NONE));
+            conf.getProcessingGuarantee(), is(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE));
     }
     
     @Test
@@ -233,4 +238,12 @@ public class KafkaSpoutConfigTest {
 
         assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
     }
+    
+    @Test
+    public void testThrowsIfEnableAutoCommitIsSet() {
+        expectedException.expect(IllegalStateException.class);
+        KafkaSpoutConfig.builder("localhost:1234", "topic")
+            .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
+            .build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b56c30db/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index ae7612a..a20f706 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -17,6 +17,7 @@
 package org.apache.storm.kafka.spout;
 
 import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
+import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.junit.Assert.assertThat;
@@ -24,6 +25,7 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -37,6 +39,8 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.spout.SpoutOutputCollector;
@@ -45,11 +49,18 @@ import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.InOrder;
+import org.mockito.runners.MockitoJUnitRunner;
 
+@RunWith(MockitoJUnitRunner.class)
 public class KafkaSpoutMessagingGuaranteeTest {
 
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+    
     private final TopologyContext contextMock = mock(TopologyContext.class);
     private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
     private final Map<String, Object> conf = new HashMap<>();
@@ -108,10 +119,10 @@ public class KafkaSpoutMessagingGuaranteeTest {
     }
 
     @Test
-    public void testAnyTimesModeDisregardsMaxUncommittedOffsets() throws Exception {
+    public void testNoGuaranteeModeDisregardsMaxUncommittedOffsets() throws Exception {
         //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful
in at-least-once mode
         KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class),
-1)
-            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .build();
         doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
     }
@@ -152,10 +163,10 @@ public class KafkaSpoutMessagingGuaranteeTest {
     }
 
     @Test
-    public void testAnyTimesModeCannotReplayTuples() throws Exception {
-        //When tuple tracking is enabled, the spout must not replay tuples in any-times mode
+    public void testNoGuaranteeModeCannotReplayTuples() throws Exception {
+        //When tuple tracking is enabled, the spout must not replay tuples in no guarantee
mode
         KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class),
-1)
-            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .setTupleTrackingEnforced(true)
             .build();
         doTestModeCannotReplayTuples(spoutConfig);
@@ -176,7 +187,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
 
             spout.ack(msgIdCaptor.getValue());
             
-            Time.advanceTime(spoutConfig.getOffsetsCommitPeriodMs());
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
             
             spout.nextTuple();
             
@@ -195,13 +206,37 @@ public class KafkaSpoutMessagingGuaranteeTest {
     }
     
     @Test
-    public void testAnyTimesModeDoesNotCommitAckedTuples() throws Exception {
-        //When tuple tracking is enabled, the spout must not commit acked tuples in any-times
mode because committing is managed by the consumer
+    public void testNoGuaranteeModeCommitsPolledTuples() throws Exception {
+        //When using the no guarantee mode, the spout must commit tuples periodically, regardless
of whether they've been acked
         KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class),
-1)
-            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NONE)
+            .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .setTupleTrackingEnforced(true)
             .build();
-        doTestModeDoesNotCommitAckedTuples(spoutConfig);
+        
+        try (SimulatedTime time = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig,
conf, contextMock, collectorMock, consumerMock,partition);
+
+            when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(partition,
+                SpoutWithMockedConsumerSetupHelper.<String, String>createRecords(partition,
0, 1))));
+
+            spout.nextTuple();
+            
+            when(consumerMock.position(partition)).thenReturn(1L);
+
+            ArgumentCaptor<KafkaSpoutMessageId> msgIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            verify(collectorMock).emit(eq(SingleTopicKafkaSpoutConfiguration.STREAM), anyList(),
msgIdCaptor.capture());
+            assertThat("Should have captured a message id", msgIdCaptor.getValue(), not(nullValue()));
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + spoutConfig.getOffsetsCommitPeriodMs());
+            
+            spout.nextTuple();
+            
+            verify(consumerMock).commitAsync(commitCapture.capture(), isNull(OffsetCommitCallback.class));
+            
+            Map<TopicPartition, OffsetAndMetadata> commit = commitCapture.getValue();
+            assertThat(commit.containsKey(partition), is(true));
+            assertThat(commit.get(partition).offset(), is(1L));
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/b56c30db/storm-core/src/jvm/org/apache/storm/annotation/InterfaceStability.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/annotation/InterfaceStability.java b/storm-core/src/jvm/org/apache/storm/annotation/InterfaceStability.java
new file mode 100644
index 0000000..d05ae75
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/annotation/InterfaceStability.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.annotation;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+/**
+ * Annotation to inform users of how much to rely on a particular package,
+ * class or method not changing over time.
+ * </ul>
+ */
+@InterfaceStability.Evolving
+public class InterfaceStability {
+    /**
+     * Can evolve while retaining compatibility for minor release boundaries.;
+     * can break compatibility only at major release (ie. at m.0).
+     */
+    @Documented
+    @Retention(RetentionPolicy.RUNTIME)
+    public @interface Stable {};
+
+    /**
+     * Evolving, but can break compatibility at minor release (i.e. m.x)
+     */
+    @Documented
+    @Retention(RetentionPolicy.RUNTIME)
+    public @interface Evolving {};
+
+    /**
+     * No guarantee is provided as to reliability or stability across any
+     * level of release granularity.
+     */
+    @Documented
+    @Retention(RetentionPolicy.RUNTIME)
+    public @interface Unstable {};
+}
+


Mime
View raw message