storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2296 Kafka spout no dup on leader changes
Date Fri, 03 Feb 2017 23:25:17 GMT
Repository: storm
Updated Branches:
  refs/heads/master 38ee403bf -> 5d8f23026


STORM-2296 Kafka spout no dup on leader changes


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bd6e0c5f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bd6e0c5f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bd6e0c5f

Branch: refs/heads/master
Commit: bd6e0c5fbcebd1f3ac99e4095c3c492c39873455
Parents: 38ee403
Author: Ernestas Vaiciukevicius <e.vaiciukevicius@adform.com>
Authored: Thu Jan 12 16:54:59 2017 +0200
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Sat Feb 4 08:07:37 2017 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/PartitionManager.java    | 125 ++++++++++++-------
 .../org/apache/storm/kafka/ZkCoordinator.java   |  16 ++-
 .../apache/storm/kafka/ZkCoordinatorTest.java   |  42 ++++++-
 3 files changed, 131 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bd6e0c5f/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 8d608d9..e4ce657 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -66,7 +66,29 @@ public class PartitionManager {
     ZkState _state;
     Map _stormConf;
     long numberFailed, numberAcked;
-    public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId,
ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
+
+    public PartitionManager(
+            DynamicPartitionConnections connections,
+            String topologyInstanceId,
+            ZkState state,
+            Map stormConf,
+            SpoutConfig spoutConfig,
+            Partition id)
+    {
+        this(connections, topologyInstanceId, state, stormConf, spoutConfig, id, null);
+    }
+
+    /**
+     * @param previousManager previous partition manager if manager for partition is being
recreated
+     */
+    public PartitionManager(
+            DynamicPartitionConnections connections,
+            String topologyInstanceId,
+            ZkState state,
+            Map stormConf,
+            SpoutConfig spoutConfig,
+            Partition id,
+            PartitionManager previousManager) {
         _partition = id;
         _connections = connections;
         _spoutConfig = spoutConfig;
@@ -76,53 +98,64 @@ public class PartitionManager {
         _stormConf = stormConf;
         numberAcked = numberFailed = 0;
 
-        try {
-            _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
-            _failedMsgRetryManager.prepare(spoutConfig, _stormConf);
-        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
e) {
-            throw new IllegalArgumentException(String.format("Failed to create an instance
of <%s> from: <%s>",
-                                                             FailedMsgRetryManager.class,
-                                                             spoutConfig.failedMsgRetryManagerClass),
e);
-        }
+        if (previousManager != null) {
+            _failedMsgRetryManager = previousManager._failedMsgRetryManager;
+            _committedTo = previousManager._committedTo;
+            _emittedToOffset = previousManager._emittedToOffset;
+            _waitingToEmit = previousManager._waitingToEmit;
+            _pending = previousManager._pending;
+            LOG.info("Recreating PartitionManager based on previous manager, _waitingToEmit
size: {}, _pending size: {}",
+                    _waitingToEmit.size(),
+                    _pending.size());
+        } else {
+            try {
+                _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
+                _failedMsgRetryManager.prepare(spoutConfig, _stormConf);
+            } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
e) {
+                throw new IllegalArgumentException(String.format("Failed to create an instance
of <%s> from: <%s>",
+                        FailedMsgRetryManager.class,
+                        spoutConfig.failedMsgRetryManagerClass), e);
+            }
 
-        String jsonTopologyId = null;
-        Long jsonOffset = null;
-        String path = committedPath();
-        try {
-            Map<Object, Object> json = _state.readJSON(path);
-            LOG.info("Read partition information from: " + path +  "  --> " + json );
-            if (json != null) {
-                jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
-                jsonOffset = (Long) json.get("offset");
+            String jsonTopologyId = null;
+            Long jsonOffset = null;
+            String path = committedPath();
+            try {
+                Map<Object, Object> json = _state.readJSON(path);
+                LOG.info("Read partition information from: " + path + "  --> " + json);
+                if (json != null) {
+                    jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
+                    jsonOffset = (Long) json.get("offset");
+                }
+            } catch (Throwable e) {
+                LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
             }
-        } catch (Throwable e) {
-            LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
-        }
 
-        String topic = _partition.topic;
-        Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig);
+            String topic = _partition.topic;
+            Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig);
 
-        if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
-            _committedTo = currentOffset;
-            LOG.info("No partition information found, using configuration to determine offset");
-        } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets)
{
-            _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime);
-            LOG.info("Topology change detected and ignore zookeeper offsets set to true,
using configuration to determine offset");
-        } else {
-            _committedTo = jsonOffset;
-            LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id:
" + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
-        }
+            if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
+                _committedTo = currentOffset;
+                LOG.info("No partition information found, using configuration to determine
offset");
+            } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets)
{
+                _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime);
+                LOG.info("Topology change detected and ignore zookeeper offsets set to true,
using configuration to determine offset");
+            } else {
+                _committedTo = jsonOffset;
+                LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old
topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId);
+            }
 
-        if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo
<= 0) {
-            LOG.info("Last commit offset from zookeeper: " + _committedTo);
-            Long lastCommittedOffset = _committedTo;
-            _committedTo = currentOffset;
-            LOG.info("Commit offset " + lastCommittedOffset + " is more than " +
-                    spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset
+ ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
-        }
+            if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo
<= 0) {
+                LOG.info("Last commit offset from zookeeper: " + _committedTo);
+                Long lastCommittedOffset = _committedTo;
+                _committedTo = currentOffset;
+                LOG.info("Commit offset " + lastCommittedOffset + " is more than " +
+                        spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset
+ ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime);
+            }
 
-        LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from offset " + _committedTo);
-        _emittedToOffset = _committedTo;
+            LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from offset " +
_committedTo);
+            _emittedToOffset = _committedTo;
+        }
 
         _fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
         _fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
@@ -160,7 +193,7 @@ public class PartitionManager {
             } else {
                 tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.message(), _partition.topic);
             }
-            
+
             if ((tups != null) && tups.iterator().hasNext()) {
                if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
                     for (List<Object> tup : tups) {
@@ -201,7 +234,7 @@ public class PartitionManager {
         } catch (TopicOffsetOutOfRangeException e) {
             offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition,
kafka.api.OffsetRequest.EarliestTime());
             // fetch failed, so don't update the fetch metrics
-            
+
             //fix bug [STORM-643] : remove outdated failed offsets
             if (!processingNewTuples) {
                 // For the case of EarliestTime it would be better to discard
@@ -214,7 +247,7 @@ public class PartitionManager {
                 if (null != omitted) {
                     _lostMessageCount.incrBy(omitted.size());
                 }
-                
+
                 LOG.warn("Removing the failed offsets for {} that are out of range: {}",
_partition, omitted);
             }
 
@@ -223,7 +256,7 @@ public class PartitionManager {
                 _emittedToOffset = offset;
                 LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
             }
-            
+
             return;
         }
         long millis = System.currentTimeMillis() - start;

http://git-wip-us.apache.org/repos/asf/storm/blob/bd6e0c5f/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
index 98bf8a0..14be584 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
@@ -88,14 +88,24 @@ public class ZkCoordinator implements PartitionCoordinator {
 
             LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());
 
+            Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
             for (Partition id : deletedPartitions) {
-                PartitionManager man = _managers.remove(id);
-                man.close();
+                deletedManagers.put(id.partition, _managers.remove(id));
+            }
+            for (PartitionManager manager : deletedManagers.values()) {
+                if (manager != null) manager.close();
             }
             LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());
 
             for (Partition id : newPartitions) {
-                PartitionManager man = new PartitionManager(_connections, _topologyInstanceId,
_state, _stormConf, _spoutConfig, id);
+                PartitionManager man = new PartitionManager(
+                        _connections,
+                        _topologyInstanceId,
+                        _state,
+                        _stormConf,
+                        _spoutConfig,
+                        id,
+                        deletedManagers.get(id.partition));
                 _managers.put(id, man);
             }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/bd6e0c5f/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index 364da33..b23d5bc 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -28,8 +28,7 @@ import org.mockito.MockitoAnnotations;
 
 import java.util.*;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.*;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.when;
@@ -106,7 +105,7 @@ public class ZkCoordinatorTest {
         waitForRefresh();
         when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks,
9093)));
         List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
-        assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size());
+        assertEquals(partitionManagersBeforeRefresh.size(), partitionManagersAfterRefresh.size());
         Iterator<List<PartitionManager>> iterator = partitionManagersAfterRefresh.iterator();
         for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh)
{
             List<PartitionManager> partitionManagersAfter = iterator.next();
@@ -114,6 +113,43 @@ public class ZkCoordinatorTest {
         }
     }
 
+    @Test
+    public void testPartitionManagerRecreate() throws Exception {
+        final int totalTasks = 2;
+        int partitionsPerTask = 2;
+        List<ZkCoordinator> coordinatorList = buildCoordinators(totalTasks / partitionsPerTask);
+        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks,
9092)));
+        List<List<PartitionManager>> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList);
+        waitForRefresh();
+        when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks,
9093)));
+        List<List<PartitionManager>> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList);
+        assertEquals(partitionManagersBeforeRefresh.size(), partitionManagersAfterRefresh.size());
+
+        HashMap<Integer, PartitionManager> managersAfterRefresh = new HashMap<Integer,
PartitionManager>();
+        for (List<PartitionManager> partitionManagersAfter : partitionManagersAfterRefresh)
{
+            for (PartitionManager manager : partitionManagersAfter) {
+                assertFalse("Multiple PartitionManagers for same partition", managersAfterRefresh.containsKey(manager.getPartition().partition));
+                managersAfterRefresh.put(manager.getPartition().partition, manager);
+            }
+        }
+
+        for (List<PartitionManager> partitionManagersBefore : partitionManagersBeforeRefresh)
{
+            for (PartitionManager manager : partitionManagersBefore) {
+                assertStateIsTheSame(manager, managersAfterRefresh.get(manager.getPartition().partition));
+            }
+        }
+    }
+
+    private void assertStateIsTheSame(PartitionManager managerBefore, PartitionManager managerAfter)
{
+        // check if state was actually moved from old PartitionManager
+        assertNotNull(managerBefore);
+        assertNotNull(managerAfter);
+        assertNotSame(managerBefore, managerAfter);
+        assertSame(managerBefore._waitingToEmit, managerAfter._waitingToEmit);
+        assertSame(managerBefore._emittedToOffset, managerAfter._emittedToOffset);
+        assertSame(managerBefore._committedTo, managerAfter._committedTo);
+    }
+
     private void assertPartitionsAreDifferent(List<PartitionManager> partitionManagersBefore,
List<PartitionManager> partitionManagersAfter, int partitionsPerTask) {
         assertEquals(partitionsPerTask, partitionManagersBefore.size());
         assertEquals(partitionManagersBefore.size(), partitionManagersAfter.size());


Mime
View raw message