storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sati...@apache.org
Subject [1/3] storm git commit: STORM-2026 Inconsistency between (SpoutExecutor, BoltExecutor) and (spout-transfer-fn, bolt-transfer-fn)
Date Wed, 10 Aug 2016 11:45:41 GMT
Repository: storm
Updated Branches:
  refs/heads/master d43a91c70 -> 3b1ab3d8a


STORM-2026 Inconsistency between (SpoutExecutor, BoltExecutor) and (spout-transfer-fn, bolt-transfer-fn)

* fix Executor, SpoutExecutor, BoltExecutor to not calling init() while creating Executor
itself
* call init() for the first time in async loop


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/aa8e94c4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/aa8e94c4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/aa8e94c4

Branch: refs/heads/master
Commit: aa8e94c43dab44d1ef8f85adb0f7662894efb2f0
Parents: 28563ec
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Mon Aug 8 18:02:07 2016 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Mon Aug 8 18:02:07 2016 +0900

----------------------------------------------------------------------
 .../src/jvm/org/apache/storm/executor/Executor.java      |  4 +---
 .../jvm/org/apache/storm/executor/bolt/BoltExecutor.java | 11 ++++++-----
 .../org/apache/storm/executor/spout/SpoutExecutor.java   |  9 +++++----
 3 files changed, 12 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/aa8e94c4/storm-core/src/jvm/org/apache/storm/executor/Executor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/Executor.java b/storm-core/src/jvm/org/apache/storm/executor/Executor.java
index 614d44d..e9041f2 100644
--- a/storm-core/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-core/src/jvm/org/apache/storm/executor/Executor.java
@@ -204,8 +204,8 @@ public abstract class Executor implements Callable, EventHandler<Object>
{
                 throw Utils.wrapInRuntime(ex);
             }
         }
-        executor.init(idToTask);
 
+        executor.idToTask = idToTask;
         return executor;
     }
 
@@ -243,8 +243,6 @@ public abstract class Executor implements Callable, EventHandler<Object>
{
 
     public abstract void tupleActionFn(int taskId, TupleImpl tuple) throws Exception;
 
-    public abstract void init(Map<Integer, Task> idToTask);
-
     @SuppressWarnings("unchecked")
     @Override
     public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {

http://git-wip-us.apache.org/repos/asf/storm/blob/aa8e94c4/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index 2fceb28..cb925bd 100644
--- a/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -53,9 +53,11 @@ public class BoltExecutor extends Executor {
         this.executeSampler = ConfigUtils.mkStatsSampler(stormConf);
     }
 
-    @Override
     public void init(Map<Integer, Task> idToTask) {
-        this.idToTask = idToTask;
+        while (!stormActive.get()) {
+            Utils.sleep(100);
+        }
+
         LOG.info("Preparing bolt {}:{}", componentId, idToTask.keySet());
         for (Map.Entry<Integer, Task> entry : idToTask.entrySet()) {
             Task taskData = entry.getValue();
@@ -88,9 +90,8 @@ public class BoltExecutor extends Executor {
 
     @Override
     public Callable<Object> call() throws Exception {
-        while (!stormActive.get()) {
-            Utils.sleep(100);
-        }
+        init(idToTask);
+
         return new Callable<Object>() {
             @Override
             public Object call() throws Exception {

http://git-wip-us.apache.org/repos/asf/storm/blob/aa8e94c4/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index a2ee650..ba1c830 100644
--- a/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -79,8 +79,11 @@ public class SpoutExecutor extends Executor {
         this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
     }
 
-    @Override
     public void init(final Map<Integer, Task> idToTask) {
+        while (!stormActive.get()) {
+            Utils.sleep(100);
+        }
+
         LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
         this.idToTask = idToTask;
         this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING),
0) * idToTask.size();
@@ -126,9 +129,7 @@ public class SpoutExecutor extends Executor {
 
     @Override
     public Callable<Object> call() throws Exception {
-        while (!stormActive.get()) {
-            Utils.sleep(100);
-        }
+        init(idToTask);
 
         return new Callable<Object>() {
             @Override


Mime
View raw message