storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/2] storm git commit: STORM-3047: Ensure Trident emitter refreshPartitions is only called with partitions assigned to the emitter
Date Mon, 09 Jul 2018 07:54:00 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 83e5788c0 -> 8a16fec0d


STORM-3047: Ensure Trident emitter refreshPartitions is only called with partitions assigned
to the emitter


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f66a6d05
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f66a6d05
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f66a6d05

Branch: refs/heads/1.x-branch
Commit: f66a6d057fef1134e30b185055c6a61809bf356a
Parents: 43c19d6
Author: Stig Rohde Døssing <srdo@apache.org>
Authored: Sat Apr 28 14:31:11 2018 +0200
Committer: Stig Rohde Døssing <srdo@apache.org>
Committed: Tue May 1 20:52:28 2018 +0200

----------------------------------------------------------------------
 .../eventhubs/trident/OpaqueTridentEventHubEmitter.java  | 11 +++++------
 .../kafka/spout/trident/KafkaTridentSpoutEmitter.java    |  2 +-
 .../apache/storm/kafka/trident/TridentKafkaEmitter.java  | 11 +++++------
 .../trident/spout/IOpaquePartitionedTridentSpout.java    |  2 +-
 .../spout/OpaquePartitionedTridentSpoutExecutor.java     | 10 ++++++----
 5 files changed, 18 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f66a6d05/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
index 20375a2..35fb37a 100755
--- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
+++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/trident/OpaqueTridentEventHubEmitter.java
@@ -63,12 +63,11 @@ public class OpaqueTridentEventHubEmitter implements IOpaquePartitionedTridentSp
   }
 
   @Override
-  public List<Partition> getPartitionsForTask(int taskId, int numTasks, Partitions
allPartitionInfo) {
-    final List<Partition> orderedPartitions = getOrderedPartitions(allPartitionInfo);
-    final List<Partition> taskPartitions = new ArrayList<>(orderedPartitions
== null ? 0 : orderedPartitions.size());
-    if (orderedPartitions != null) {
-      for (int i = taskId; i < orderedPartitions.size(); i += numTasks) {
-        taskPartitions.add(orderedPartitions.get(i));
+  public List<Partition> getPartitionsForTask(int taskId, int numTasks, List<Partition>
allPartitionInfoSorted) {
+    final List<Partition> taskPartitions = new ArrayList<>(allPartitionInfoSorted
== null ? 0 : allPartitionInfoSorted.size());
+    if (allPartitionInfoSorted != null) {
+      for (int i = taskId; i < allPartitionInfoSorted.size(); i += numTasks) {
+        taskPartitions.add(allPartitionInfoSorted.get(i));
       }
     }
     return taskPartitions;

http://git-wip-us.apache.org/repos/asf/storm/blob/f66a6d05/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index 3b4aa4b..e60bef3 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -241,7 +241,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
 
     @Override
     public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int
numTasks,
-        List<Map<String, Object>> allPartitionInfo) {
+        List<KafkaTridentSpoutTopicPartition> allPartitionInfo) {
         final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
         LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions
{}", kafkaConsumer, taskId, assignedTps);
         final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);

http://git-wip-us.apache.org/repos/asf/storm/blob/f66a6d05/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
index 2a407ca..8370028 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -248,12 +248,11 @@ public class TridentKafkaEmitter {
             }
 
             @Override
-            public List<Partition> getPartitionsForTask(int taskId, int numTasks, List<GlobalPartitionInformation>
allPartitionInfo) {
-                final List<Partition> orderedPartitions = getOrderedPartitions(allPartitionInfo);
-                final List<Partition> taskPartitions = new ArrayList<>(orderedPartitions
== null ? 0 : orderedPartitions.size());
-                if (orderedPartitions != null) {
-                    for (int i = taskId; i < orderedPartitions.size(); i += numTasks)
{
-                        taskPartitions.add(orderedPartitions.get(i));
+            public List<Partition> getPartitionsForTask(int taskId, int numTasks, List<Partition>
allPartitionInfoSorted) {
+                final List<Partition> taskPartitions = new ArrayList<>(allPartitionInfoSorted
== null ? 0 : allPartitionInfoSorted.size());
+                if (allPartitionInfoSorted != null) {
+                    for (int i = taskId; i < allPartitionInfoSorted.size(); i += numTasks)
{
+                        taskPartitions.add(allPartitionInfoSorted.get(i));
                     }
                 }
                 return taskPartitions;

http://git-wip-us.apache.org/repos/asf/storm/blob/f66a6d05/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
index 062500f..574ac7a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
@@ -82,7 +82,7 @@ public interface IOpaquePartitionedTridentSpout<Partitions, Partition
extends IS
         /**
          * @return The list of partitions that are to be processed by the task with id {@code
taskId}
          */
-        List<Partition> getPartitionsForTask(int taskId, int numTasks, Partitions allPartitionInfo);
+        List<Partition> getPartitionsForTask(int taskId, int numTasks, List<Partition>
allPartitionInfoSorted);
 
         void close();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/f66a6d05/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
index a6eeff0..a2e133a 100644
--- a/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
@@ -111,13 +111,13 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
 
             if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta))
{
                 _partitionStates.clear();
-                final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index,
_numTasks, coordinatorMeta);
+                final List<ISpoutPartition> sortedPartitions = _emitter.getOrderedPartitions(coordinatorMeta);
+                final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index,
_numTasks, sortedPartitions);
                 for (ISpoutPartition partition : taskPartitions) {
                     _partitionStates.put(partition.getId(), new EmitterPartitionState(new
RotatingTransactionalState(_state, partition.getId()), partition));
                 }
+                _emitter.refreshPartitions(taskPartitions);
 
-                // refresh all partitions for backwards compatibility with old spout
-                _emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
                 _savedCoordinatorMeta = coordinatorMeta;
                 _changedMeta = true;
             }
@@ -137,7 +137,9 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
                 EmitterPartitionState s = e.getValue();
                 s.rotatingState.removeState(tx.getTransactionId());
                 Object lastMeta = prevCached.get(id);
-                if(lastMeta==null) lastMeta = s.rotatingState.getLastState();
+                if(lastMeta==null) {
+                    lastMeta = s.rotatingState.getLastState();
+                }
                 Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta);
                 metas.put(id, meta);
             }


Mime
View raw message