storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2506: Print mapping between Task ID and Kafka Partitions
Date Wed, 21 Jun 2017 23:51:36 GMT
Repository: storm
Updated Branches:
  refs/heads/master 34a7f6fee -> 659b59d03


STORM-2506: Print mapping between Task ID and Kafka Partitions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3ab11ea2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3ab11ea2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3ab11ea2

Branch: refs/heads/master
Commit: 3ab11ea2945175713bf759024376b5072d94f0c8
Parents: 4461a1d
Author: Srishty Agrawal <sagrawal@groupon.com>
Authored: Wed Mar 29 12:35:57 2017 -0700
Committer: Srishty Agrawal <sagrawal@groupon.com>
Committed: Tue Jun 20 11:06:09 2017 -0700

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  4 ++--
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  |  4 ++--
 .../jvm/org/apache/storm/kafka/KafkaUtils.java  | 17 ++++++++-------
 .../apache/storm/kafka/StaticCoordinator.java   |  5 +++--
 .../org/apache/storm/kafka/ZkCoordinator.java   | 22 ++++++++++++--------
 .../org/apache/storm/kafka/KafkaUtilsTest.java  |  8 +++----
 .../apache/storm/kafka/ZkCoordinatorTest.java   |  2 +-
 .../src/jvm/org/apache/storm/daemon/Task.java   |  2 +-
 8 files changed, 35 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3ab11ea2/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 09795ed..eaf3995 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -148,8 +148,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
-                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+            LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={},
topic-partitions={}]",
+                    context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer,
partitions);
 
             initialize(partitions);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/3ab11ea2/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
index ead6057..79cde56 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -82,11 +82,11 @@ public class KafkaSpout extends BaseRichSpout {
         if (_spoutConfig.hosts instanceof StaticHosts) {
             _coordinator = new StaticCoordinator(_connections, conf,
                     _spoutConfig, _state, context.getThisTaskIndex(),
-                    totalTasks, topologyInstanceId);
+                    totalTasks, context.getThisTaskId(), topologyInstanceId);
         } else {
             _coordinator = new ZkCoordinator(_connections, conf,
                     _spoutConfig, _state, context.getThisTaskIndex(),
-                    totalTasks, topologyInstanceId);
+                    totalTasks, context.getThisTaskId(), topologyInstanceId);
         }
 
         context.registerMetric("kafkaOffset", new IMetric() {

http://git-wip-us.apache.org/repos/asf/storm/blob/3ab11ea2/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
index 604f1f3..76bb896 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
@@ -258,7 +258,8 @@ public class KafkaUtils {
     }
 
 
-    public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation>
partitons, int totalTasks, int taskIndex) {
+    public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation>
partitons,
+            int totalTasks, int taskIndex, int taskId) {
         Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that
total tasks");
         List<Partition> taskPartitions = new ArrayList<Partition>();
         List<Partition> partitions = new ArrayList<Partition>();
@@ -273,20 +274,20 @@ public class KafkaUtils {
             Partition taskPartition = partitions.get(i);
             taskPartitions.add(taskPartition);
         }
-        logPartitionMapping(totalTasks, taskIndex, taskPartitions);
+        logPartitionMapping(totalTasks, taskIndex, taskPartitions, taskId);
         return taskPartitions;
     }
 
-    private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition>
taskPartitions) {
-        String taskPrefix = taskId(taskIndex, totalTasks);
+    private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition>
taskPartitions, int taskId) {
+        String taskPrefix = taskPrefix(taskIndex, totalTasks, taskId);
         if (taskPartitions.isEmpty()) {
-            LOG.warn(taskPrefix + "no partitions assigned");
+            LOG.warn(taskPrefix + " no partitions assigned");
         } else {
-            LOG.info(taskPrefix + "assigned " + taskPartitions);
+            LOG.info(taskPrefix + " assigned " + taskPartitions);
         }
     }
 
-    public static String taskId(int taskIndex, int totalTasks) {
-        return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";
+    public static String taskPrefix(int taskIndex, int totalTasks, int taskId) {
+        return "Task [" + (taskIndex + 1) + "/" + totalTasks + "], Task-ID: " + taskId;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3ab11ea2/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
index 46cb7d9..cd23ca6 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
@@ -26,11 +26,12 @@ public class StaticCoordinator implements PartitionCoordinator {
     Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
     List<PartitionManager> _allManagers = new ArrayList<>();
 
-    public StaticCoordinator(DynamicPartitionConnections connections, Map<String, Object>
topoConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId)
{
+    public StaticCoordinator(DynamicPartitionConnections connections, Map<String, Object>
topoConf, SpoutConfig config, ZkState state,
+            int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
         StaticHosts hosts = (StaticHosts) config.hosts;
         List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
         partitions.add(hosts.getPartitionInformation());
-        List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions,
totalTasks, taskIndex);
+        List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions,
totalTasks, taskIndex, taskId);
         for (Partition myPartition : myPartitions) {
             _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId,
state, topoConf, config, myPartition));
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/3ab11ea2/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
index e814157..136dc51 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
@@ -23,7 +23,7 @@ import org.apache.storm.kafka.trident.GlobalPartitionInformation;
 
 import java.util.*;
 
-import static org.apache.storm.kafka.KafkaUtils.taskId;
+import static org.apache.storm.kafka.KafkaUtils.taskPrefix;
 
 public class ZkCoordinator implements PartitionCoordinator {
     private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
@@ -31,6 +31,7 @@ public class ZkCoordinator implements PartitionCoordinator {
     SpoutConfig _spoutConfig;
     int _taskIndex;
     int _totalTasks;
+    int _taskId;
     String _topologyInstanceId;
     Map<Partition, PartitionManager> _managers = new HashMap();
     List<PartitionManager> _cachedList = new ArrayList<PartitionManager>();
@@ -41,15 +42,18 @@ public class ZkCoordinator implements PartitionCoordinator {
     ZkState _state;
     Map _topoConf;
 
-    public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object>
topoConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId)
{
-        this(connections, topoConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId,
buildReader(topoConf, spoutConfig));
+    public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object>
topoConf, SpoutConfig spoutConfig, ZkState state,
+            int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
+        this(connections, topoConf, spoutConfig, state, taskIndex, totalTasks, taskId, topologyInstanceId,
buildReader(topoConf, spoutConfig));
     }
 
-    public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object>
topoConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId,
DynamicBrokersReader reader) {
+    public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object>
topoConf, SpoutConfig spoutConfig, ZkState state,
+            int taskIndex, int totalTasks, int taskId, String topologyInstanceId, DynamicBrokersReader
reader) {
         _spoutConfig = spoutConfig;
         _connections = connections;
         _taskIndex = taskIndex;
         _totalTasks = totalTasks;
+        _taskId = taskId;
         _topologyInstanceId = topologyInstanceId;
         _topoConf = topoConf;
         _state = state;
@@ -75,9 +79,9 @@ public class ZkCoordinator implements PartitionCoordinator {
     @Override
     public void refresh() {
         try {
-            LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections");
+            LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Refreshing partition
manager connections");
             List<GlobalPartitionInformation> brokerInfo = _reader.getBrokerInfo();
-            List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo,
_totalTasks, _taskIndex);
+            List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo,
_totalTasks, _taskIndex, _taskId);
 
             Set<Partition> curr = _managers.keySet();
             Set<Partition> newPartitions = new HashSet<Partition>(mine);
@@ -86,7 +90,7 @@ public class ZkCoordinator implements PartitionCoordinator {
             Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
             deletedPartitions.removeAll(mine);
 
-            LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());
+            LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Deleted partition managers:
" + deletedPartitions.toString());
 
             Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
             for (Partition id : deletedPartitions) {
@@ -95,7 +99,7 @@ public class ZkCoordinator implements PartitionCoordinator {
             for (PartitionManager manager : deletedManagers.values()) {
                 if (manager != null) manager.close();
             }
-            LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());
+            LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers:
" + newPartitions.toString());
 
             for (Partition id : newPartitions) {
                 PartitionManager man = new PartitionManager(
@@ -113,7 +117,7 @@ public class ZkCoordinator implements PartitionCoordinator {
             throw new RuntimeException(e);
         }
         _cachedList = new ArrayList<PartitionManager>(_managers.values());
-        LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
+        LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Finished refreshing");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/3ab11ea2/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
index 7fd1d41..b575bc9 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java
@@ -271,7 +271,7 @@ public class KafkaUtilsTest {
         partitions.add(globalPartitionInformation);
         int numTasks = numPartitions / partitionsPerTask;
         for (int i = 0 ; i < numTasks ; i++) {
-            assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions,
numTasks, i).size());
+            assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions,
numTasks, i, i).size());
         }
     }
 
@@ -281,8 +281,8 @@ public class KafkaUtilsTest {
         List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
         partitions.add(globalPartitionInformation);
         int numTasks = 2;
-        assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0).size());
-        assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1).size());
+        assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0, 0).size());
+        assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1, 1).size());
     }
 
     @Test (expected = IllegalArgumentException.class )
@@ -290,6 +290,6 @@ public class KafkaUtilsTest {
         GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC);
         List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
         partitions.add(globalPartitionInformation);
-        KafkaUtils.calculatePartitionsForTask(partitions, 1, 1);
+        KafkaUtils.calculatePartitionsForTask(partitions, 1, 1, 1);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3ab11ea2/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
index 9778d15..8d8746f 100644
--- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
+++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java
@@ -174,7 +174,7 @@ public class ZkCoordinatorTest {
     private List<ZkCoordinator> buildCoordinators(int totalTasks) {
         List<ZkCoordinator> coordinatorList = new ArrayList<ZkCoordinator>();
         for (int i = 0; i < totalTasks; i++) {
-            ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, topoConf,
spoutConfig, state, i, totalTasks, "test-id", reader);
+            ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, topoConf,
spoutConfig, state, i, totalTasks, i, "test-id", reader);
             coordinatorList.add(coordinator);
         }
         return coordinatorList;

http://git-wip-us.apache.org/repos/asf/storm/blob/3ab11ea2/storm-client/src/jvm/org/apache/storm/daemon/Task.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index 5c28085..b79a259 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -124,7 +124,7 @@ public class Task {
 
     public List<Integer> getOutgoingTasks(String stream, List<Object> values)
{
         if (debug) {
-            LOG.info("Emitting: {} {} {}", componentId, stream, values);
+            LOG.info("Emitting Tuple: taskId={} componentId={} stream={} values={}", taskId,
componentId, stream, values);
         }
 
         List<Integer> outTasks = new ArrayList<>();


Mime
View raw message