storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [2/4] storm git commit: Remove outdated failed offsets
Date Sat, 18 Jul 2015 01:11:08 GMT
Remove outdated failed offsets

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a6557a56
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a6557a56
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a6557a56

Branch: refs/heads/0.9.x-branch
Commit: a6557a565429b2e12fc0659c6484fc50f90b5cd5
Parents: 9221dba
Author: Xin Wang <best.wangxin@163.com>
Authored: Thu Jul 16 10:19:41 2015 +0800
Committer: Xin Wang <best.wangxin@163.com>
Committed: Thu Jul 16 10:19:41 2015 +0800

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/PartitionManager.java   | 25 ++++++++++++++++++++
 1 file changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a6557a56/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index d24a49e..001c5df 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -164,6 +164,31 @@ public class PartitionManager {
             _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition,
_spoutConfig);
             LOG.warn("Using new offset: {}", _emittedToOffset);
             // fetch failed, so don't update the metrics
+
+            //fix bug [STORM-643] : remove outdated failed offsets
+            if (had_failed) {
+                // For the case of EarliestTime it would be better to discard
+                // all the failed offsets, that are earlier than actual EarliestTime
+                // offset, since they are anyway not there.
+                // These calls to broker API will be then saved.
+
+                // In case of LatestTime - it is a question, if we still need to try out
and
+                // reach those that are failed (they still may be available).
+                // But, by moving to LatestTime we are discarding messages in kafka queue.
+                // Since it is configured so, assume that it is ok for user to loose information
+                // and user cares about newest messages first.
+                // It makes sense not to do exceptions for those that are failed and discard
them as well.
+
+                SortedSet<Long> omitted = failed.headSet(_emittedToOffset);
+
+                // Use tail, since sortedSet maintains its elements in ascending order
+                // Using tailSet will set a 'range' on original implementation
+                // so we couldn't then add objects that are out of range.
+                // For that reason we copy tail into new Set, where range is not set.
+                failed = new TreeSet<Long>(failed.tailSet(_emittedToOffset));
+                LOG.warn("Removing the failed offsets that are out of range: {}", omitted);
+            }
+
             return;
         }
         long end = System.nanoTime();


Mime
View raw message