storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptgo...@apache.org
Subject [1/6] storm git commit: kafka spout consume from latest when zk offset bigger than latest offset
Date Wed, 08 Feb 2017 20:51:18 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 9e06883b9 -> 2a7e6dc05


kafka spout consume from latest when zk offset bigger than latest offset


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/032f1f33
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/032f1f33
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/032f1f33

Branch: refs/heads/1.x-branch
Commit: 032f1f33af93c99c27c53a6fb0cbe83f8d5fdb8b
Parents: 4154490
Author: chenyuzhao <chenyuzhao@meituan.com>
Authored: Wed Jan 4 10:57:01 2017 +0800
Committer: chenyuzhao <chenyuzhao@meituan.com>
Committed: Wed Jan 4 10:57:01 2017 +0800

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/kafka/PartitionManager.java      | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/032f1f33/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 79e7c3d..faca7a2 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -199,7 +199,12 @@ public class PartitionManager {
         try {
             msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
         } catch (TopicOffsetOutOfRangeException e) {
-            offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition,
kafka.api.OffsetRequest.EarliestTime());
+            long partitionLatestOffset = KafkaUtils.getOffset(_consumer, _partition.topic,
_partition.partition, kafka.api.OffsetRequest.LatestTime());
+            if (partitionLatestOffset < offset) {
+                offset = partitionLatestOffset;
+            }else{
+                offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition,
kafka.api.OffsetRequest.EarliestTime());
+            }
             // fetch failed, so don't update the fetch metrics
             
             //fix bug [STORM-643] : remove outdated failed offsets


Mime
View raw message