storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/4] storm git commit: STORM-2014: Make KafkaSpout delegate maxRetry check to RetryService
Date Fri, 03 Feb 2017 02:52:13 GMT
Repository: storm
Updated Branches:
  refs/heads/master e8c390059 -> 38ee403bf


STORM-2014: Make KafkaSpout delegate maxRetry check to RetryService


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0aa0e857
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0aa0e857
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0aa0e857

Branch: refs/heads/master
Commit: 0aa0e8579723a76f53fe7e0ae71dfd1722465c0e
Parents: e3b2f96
Author: Stig Rohde Døssing <stigdoessing@gmail.com>
Authored: Tue Aug 2 19:17:02 2016 +0200
Committer: Stig Rohde Døssing <sdo@it-minds.dk>
Committed: Thu Jan 12 21:27:05 2017 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 13 ++++-----
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 23 +--------------
 .../KafkaSpoutRetryExponentialBackoff.java      | 30 ++++++++++++--------
 .../kafka/spout/KafkaSpoutRetryService.java     | 12 ++++++--
 4 files changed, 34 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0aa0e857/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index d405c4d..60ca0b9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -72,7 +72,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
 
     // Bookkeeping
-    private transient int maxRetries;                                   // Max number of
times a tuple is retried
     private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  // Strategy to determine
the fetch offset of the first realized by the spout upon activation
     private transient KafkaSpoutRetryService retryService;              // Class that has
the logic to handle tuple failure
     private transient Timer commitTimer;                                // timer == null
for auto commit mode
@@ -105,7 +104,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         // Spout internals
         this.collector = collector;
-        maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
         numUncommittedOffsets = 0;
 
         // Offset management
@@ -381,11 +379,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions
may have been reassigned. Ignoring message [{}]", msgId);
             return;
         }
-        if (msgId.numFails() < maxRetries) {
-            emitted.remove(msgId);
-            msgId.incrementNumFails();
-            retryService.schedule(msgId);
-        } else { // limit to max number of retries
+        emitted.remove(msgId);
+        msgId.incrementNumFails();
+        if (!retryService.schedule(msgId)) {
             LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.",
msgId);
             ack(msgId);
         }
@@ -523,6 +519,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
 
         /**
+         * An offset is only committed when all records with lower offset have
+         * been acked. This guarantees that all offsets smaller than the
+         * committedOffset have been delivered.
          * @return the next OffsetAndMetadata to commit, or null if no offset is ready to
commit.
          */
         public OffsetAndMetadata findNextCommitOffset() {

http://git-wip-us.apache.org/repos/asf/storm/blob/0aa0e857/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 8aa525b..2818c9a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -74,7 +74,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     // Kafka spout configuration
     private final long offsetCommitPeriodMs;
-    private final int maxRetries;
     private final int maxUncommittedOffsets;
     private final FirstPollOffsetStrategy firstPollOffsetStrategy;
     private final KafkaSpoutStreams kafkaSpoutStreams;
@@ -87,7 +86,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         this.valueDeserializer = builder.valueDeserializer;
         this.pollTimeoutMs = builder.pollTimeoutMs;
         this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-        this.maxRetries = builder.maxRetries;
         this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
         this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
         this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
@@ -109,7 +107,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         private SerializableDeserializer<V> valueDeserializer;
         private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
-        private int maxRetries = DEFAULT_MAX_RETRIES;
         private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
         private final KafkaSpoutStreams kafkaSpoutStreams;
         private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
@@ -194,20 +191,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             this.offsetCommitPeriodMs = offsetCommitPeriodMs;
             return this;
         }
-
-        /**
-         * Defines the max number of retrials in case of tuple failure. The default is to
retry forever, which means that
-         * no new records are committed until the previous polled records have been acked.
This guarantees at once delivery of
-         * all the previously polled records.
-         * By specifying a finite value for maxRetries, the user decides to sacrifice guarantee
of delivery for the previous
-         * polled records in favor of processing more records.
-         * @param maxRetries max number of retrials
-         */
-        public Builder<K,V> setMaxRetries(int maxRetries) {
-            this.maxRetries = maxRetries;
-            return this;
-        }
-
+        
         /**
          * Defines the max number of polled offsets (records) that can be pending commit,
before another poll can take place.
          * Once this limit is reached, no more offsets (records) can be polled until the
next successful commit(s) sets the number
@@ -283,10 +267,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
                 null;
     }
 
-    public int getMaxTupleRetries() {
-        return maxRetries;
-    }
-
     public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
         return firstPollOffsetStrategy;
     }
@@ -315,7 +295,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
                 ", valueDeserializer=" + valueDeserializer +
                 ", pollTimeoutMs=" + pollTimeoutMs +
                 ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
-                ", maxRetries=" + maxRetries +
                 ", maxUncommittedOffsets=" + maxUncommittedOffsets +
                 ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
                 ", kafkaSpoutStreams=" + kafkaSpoutStreams +

http://git-wip-us.apache.org/repos/asf/storm/blob/0aa0e857/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index f59367d..2c8d7e4 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -40,25 +40,26 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
     private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR
= new RetryEntryTimeStampComparator();
 
-    private TimeInterval initialDelay;
-    private TimeInterval delayPeriod;
-    private TimeInterval maxDelay;
-    private int maxRetries;
+    private final TimeInterval initialDelay;
+    private final TimeInterval delayPeriod;
+    private final TimeInterval maxDelay;
+    private final int maxRetries;
 
-    private Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
-    private Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>();      // Convenience
data structure to speedup lookups
+    private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
+    private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>();   
  // Convenience data structure to speedup lookups
 
     /**
      * Comparator ordering by timestamp 
      */
     private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule>
{
+        @Override
         public int compare(RetrySchedule entry1, RetrySchedule entry2) {
             return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
         }
     }
 
     private class RetrySchedule {
-        private KafkaSpoutMessageId msgId;
+        private final KafkaSpoutMessageId msgId;
         private long nextRetryTimeNanos;
 
         public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
@@ -94,9 +95,9 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     }
 
     public static class TimeInterval implements Serializable {
-        private long lengthNanos;
-        private long length;
-        private TimeUnit timeUnit;
+        private final long lengthNanos;
+        private final long length;
+        private final TimeUnit timeUnit;
 
         /**
          * @param length length of the time interval in the units specified by {@link TimeUnit}
@@ -144,7 +145,10 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     /**
      * The time stamp of the next retry is scheduled according to the exponential backoff
formula ( geometric progression):
      * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1)
where failCount = 1, 2, 3, ...
-     * nextRetry = Min(nextRetry, currentTime + maxDelay)
+     * nextRetry = Min(nextRetry, currentTime + maxDelay).
+     * 
+     * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides
to sacrifice guarantee of delivery for the previous
+     * polled records in favor of processing more records.
      *
      * @param initialDelay      initial delay of the first retry
      * @param delayPeriod       the time interval that is the ratio of the exponential backoff
formula (geometric progression)
@@ -239,9 +243,10 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     }
 
     @Override
-    public void schedule(KafkaSpoutMessageId msgId) {
+    public boolean schedule(KafkaSpoutMessageId msgId) {
         if (msgId.numFails() > maxRetries) {
             LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].",
msgId, maxRetries);
+            return false;
         } else {
             if (toRetryMsgs.contains(msgId)) {
                 for (Iterator<RetrySchedule> iterator = retrySchedules.iterator();
iterator.hasNext(); ) {
@@ -257,6 +262,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
             toRetryMsgs.add(msgId);
             LOG.debug("Scheduled. {}", retrySchedule);
             LOG.trace("Current state {}", retrySchedules);
+            return true;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0aa0e857/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index 5aab167..bf17a5a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -29,14 +29,18 @@ import java.util.Set;
  */
 public interface KafkaSpoutRetryService extends Serializable {
     /**
-     * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry
time if it has already been scheduled.
+     * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or
+     * updates retry time if it has already been scheduled. It may also indicate
+     * that the message should not be retried, in which case the message will not be scheduled.
      * @param msgId message to schedule for retrial
+     * @return true if the message will be retried, false otherwise
      */
-    void schedule(KafkaSpoutMessageId msgId);
+    boolean schedule(KafkaSpoutMessageId msgId);
 
     /**
      * Removes a message from the list of messages scheduled for retrial
      * @param msgId message to remove from retrial
+     * @return true if the message was scheduled for retrial, false otherwise
      */
     boolean remove(KafkaSpoutMessageId msgId);
 
@@ -56,8 +60,9 @@ public interface KafkaSpoutRetryService extends Serializable {
     Set<TopicPartition> retriableTopicPartitions();
 
     /**
-     * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried,
+     * Checks if a specific failed {@link KafkaSpoutMessageId} is ready to be retried,
      * i.e is scheduled and has retry time that is less than current time.
+     * @param msgId message to check for readiness
      * @return true if message is ready to be retried, false otherwise
      */
     boolean isReady(KafkaSpoutMessageId msgId);
@@ -65,6 +70,7 @@ public interface KafkaSpoutRetryService extends Serializable {
     /**
      * Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried.
      * The message may or may not be ready to be retried yet.
+     * @param msgId message to check for scheduling status
      * @return true if the message is scheduled to be retried, regardless of being or not
ready to be retried.
      * Returns false is this message is not scheduled for retrial
      */


Mime
View raw message