helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject helix git commit: [HELIX-709] Prepare controller stages for async execution
Date Fri, 29 Jun 2018 00:38:36 GMT
Repository: helix
Updated Branches:
  refs/heads/master 4a99bc43c -> d22adbf97


[HELIX-709] Prepare controller stages for async execution


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/d22adbf9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/d22adbf9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/d22adbf9

Branch: refs/heads/master
Commit: d22adbf9760316118dd8e6eda5aba4219e399a60
Parents: 4a99bc4
Author: Harry Zhang <hrzhang@linkedin.com>
Authored: Thu Jun 28 14:25:21 2018 -0700
Committer: Harry Zhang <hrzhang@linkedin.com>
Committed: Thu Jun 28 15:27:29 2018 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  4 +
 .../pipeline/AbstractAsyncBaseStage.java        | 77 ++++++++++++++++++++
 .../controller/pipeline/AbstractBaseStage.java  | 12 +--
 .../controller/pipeline/AsyncWorkerType.java    | 32 ++++++++
 .../helix/controller/pipeline/Pipeline.java     |  4 +
 .../controller/stages/AsyncWorkerType.java      | 32 --------
 .../helix/controller/stages/AttributeName.java  |  3 +-
 .../stages/PersistAssignmentStage.java          | 28 ++-----
 .../stages/TargetExteralViewCalcStage.java      | 46 +++---------
 .../common/ZkIntegrationTestBase.java           | 10 ++-
 10 files changed, 147 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 0e0817d..7603975 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -41,6 +41,7 @@ import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
 import org.apache.helix.api.listeners.*;
 import org.apache.helix.common.ClusterEventBlockingQueue;
 import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.pipeline.Pipeline;
 import org.apache.helix.controller.pipeline.PipelineRegistry;
 import org.apache.helix.controller.stages.*;
@@ -198,6 +199,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
     event.addAttribute(AttributeName.changeContext.name(), changeContext);
     event.addAttribute(AttributeName.eventData.name(), new ArrayList<>());
+    event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool);
 
     _taskEventQueue.put(event);
     _eventQueue.put(event);
@@ -421,6 +423,7 @@ public class GenericHelixController implements IdealStateChangeListener,
     long startTime = System.currentTimeMillis();
     boolean rebalanceFail = false;
     for (Pipeline pipeline : pipelines) {
+      event.addAttribute(AttributeName.PipelineType.name(), pipeline.getPipelineType());
       try {
         pipeline.handle(event);
         pipeline.finish();
@@ -887,6 +890,7 @@ public class GenericHelixController implements IdealStateChangeListener,
         event.addAttribute(AttributeName.changeContext.name(), changeContext);
         event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
         event.addAttribute(AttributeName.eventData.name(), signal);
+        event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool);
         _eventQueue.put(event);
         _taskEventQueue.put(event.clone());
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java
new file mode 100644
index 0000000..f305665
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractAsyncBaseStage.java
@@ -0,0 +1,77 @@
+package org.apache.helix.controller.pipeline;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.common.DedupEventProcessor;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractAsyncBaseStage extends AbstractBaseStage {
+  private static final Logger logger = LoggerFactory.getLogger(AbstractAsyncBaseStage.class);
+
+  @Override
+  public void process(final ClusterEvent event) throws Exception {
+    String pipelineType = event.getAttribute(AttributeName.PipelineType.name());
+    final String taskType = getAsyncTaskDedupType(pipelineType);
+    DedupEventProcessor<String, Runnable> worker =
+        getAsyncWorkerFromClusterEvent(event, getAsyncWorkerType());
+    if (worker == null) {
+      throw new StageException("No async worker found for " + taskType);
+    }
+
+    worker.queueEvent(taskType, new Runnable() {
+      @Override
+      public void run() {
+        long startTimestamp = System.currentTimeMillis();
+        logger.info("START AsyncProcess: {}", taskType);
+        try {
+          execute(event);
+        } catch (Exception e) {
+          logger.error("Failed to process {} asynchronously", taskType, e);
+        }
+        long endTimestamp = System.currentTimeMillis();
+        logger.info("END AsyncProcess: {}, took {} ms", taskType, endTimestamp - startTimestamp);
+      }
+    });
+    logger.info("Submitted asynchronous {} task to worker", taskType);
+  }
+
+  /**
+   * Stage that implements AbstractAsyncBaseStage should implement this method
+   * to get it's worker
+   * @return AsyncWorkerType
+   */
+  public abstract AsyncWorkerType getAsyncWorkerType();
+
+  /**
+   * Implements stages main logic
+   *
+   * @param event ClusterEvent
+   * @throws Exception exception
+   */
+  public abstract void execute(final ClusterEvent event) throws Exception;
+
+  private String getAsyncTaskDedupType(String pipelineType) {
+    return String
+        .format("%s::%s", pipelineType, getClass().getSimpleName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
index d12833f..324ed02 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import org.apache.helix.common.DedupEventProcessor;
-import org.apache.helix.controller.stages.AsyncWorkerType;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
 
@@ -68,19 +67,14 @@ public class AbstractBaseStage implements Stage {
   }
 
   protected DedupEventProcessor<String, Runnable> getAsyncWorkerFromClusterEvent(ClusterEvent
event,
-      AsyncWorkerType worker) {
+      AsyncWorkerType workerType) {
     Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool =
         event.getAttribute(AttributeName.AsyncFIFOWorkerPool.name());
     if (workerPool != null) {
-      if (workerPool.containsKey(worker)) {
-        return workerPool.get(worker);
+      if (workerPool.containsKey(workerType)) {
+        return workerPool.get(workerType);
       }
     }
     return null;
   }
-
-  protected String getAsyncTaskDedupType(boolean isTaskPipeline) {
-    return String
-        .format("%s::%s", isTaskPipeline ? "TASK" : "RESOURCE", getClass().getSimpleName());
-  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
new file mode 100644
index 0000000..62e324c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/AsyncWorkerType.java
@@ -0,0 +1,32 @@
+package org.apache.helix.controller.pipeline;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * There are bunch of stages, i.e. TargetExternalViewCalc, PersistAssignment, etc., that
have
+ * the choice to submit its tasks to corresponding workers to do the job asynchronously.
+ *
+ * This class contains Async worker enums that corresponding stages can use
+ */
+
+public enum AsyncWorkerType {
+  TargetExternalViewCalcWorker,
+  PersistAssignmentWorker
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
index ac483f4..2946129 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/pipeline/Pipeline.java
@@ -48,6 +48,10 @@ public class Pipeline {
     stage.init(context);
   }
 
+  public String getPipelineType() {
+    return _pipelineType;
+  }
+
   public void handle(ClusterEvent event) throws Exception {
     if (_stages == null) {
       return;

http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java
deleted file mode 100644
index 995705f..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AsyncWorkerType.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.apache.helix.controller.stages;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * There are bunch of stages, i.e. TargetExternalViewCalc, PersistAssignment, etc., that
have
- * the choice to submit its tasks to corresponding workers to do the job asynchronously.
- *
- * This class contains Async worker enums that corresponding stages can use
- */
-
-public enum AsyncWorkerType {
-  TargetExternalViewCalcWorker,
-  PersistAssignmentWorker
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index b98dc9e..56bbb44 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -36,5 +36,6 @@ public enum AttributeName {
   changeContext,
   instanceName,
   eventData,
-  AsyncFIFOWorkerPool
+  AsyncFIFOWorkerPool,
+  PipelineType
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
index 7463f24..ca83445 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/PersistAssignmentStage.java
@@ -32,7 +32,9 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.controller.common.PartitionStateMap;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
@@ -45,32 +47,16 @@ import org.slf4j.LoggerFactory;
 /**
  * Persist the ResourceAssignment of each resource that went through rebalancing
  */
-public class PersistAssignmentStage extends AbstractBaseStage {
+public class PersistAssignmentStage extends AbstractAsyncBaseStage {
   private static final Logger LOG = LoggerFactory.getLogger(PersistAssignmentStage.class);
 
   @Override
-  public void process(final ClusterEvent event) throws Exception {
-    DedupEventProcessor<String, Runnable> asyncWorker =
-        getAsyncWorkerFromClusterEvent(event, AsyncWorkerType.PersistAssignmentWorker);
-    ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
-
-    if (asyncWorker != null) {
-      LOG.info("Sending PersistAssignmentStage task for cluster {}, {} pipeline to worker",
-          cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE");
-      asyncWorker.queueEvent(getAsyncTaskDedupType(cache.isTaskCache()), new Runnable() {
-        @Override
-        public void run() {
-          doPersistAssignment(event);
-        }
-      });
-    } else {
-      LOG.info("Starting PersistAssignmentStage synchronously for cluster {}, {} pipeline",
-          cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE");
-      doPersistAssignment(event);
-    }
+  public AsyncWorkerType getAsyncWorkerType() {
+    return AsyncWorkerType.PersistAssignmentWorker;
   }
 
-  private void doPersistAssignment(final ClusterEvent event) {
+  @Override
+  public void execute(final ClusterEvent event) throws Exception {
     ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
     ClusterConfig clusterConfig = cache.getClusterConfig();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java
index 95b3988..aa2a8e9 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TargetExteralViewCalcStage.java
@@ -27,9 +27,9 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.controller.common.PartitionStateMap;
-import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.Partition;
@@ -39,42 +39,23 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
 
-public class TargetExteralViewCalcStage extends AbstractBaseStage {
+public class TargetExteralViewCalcStage extends AbstractAsyncBaseStage {
   private static final Logger LOG = LoggerFactory.getLogger(TargetExteralViewCalcStage.class);
 
   @Override
-  public void process(final ClusterEvent event) throws Exception {
-    ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
-    DedupEventProcessor<String, Runnable> tevWorker =
-        getAsyncWorkerFromClusterEvent(event, AsyncWorkerType.TargetExternalViewCalcWorker);
+  public AsyncWorkerType getAsyncWorkerType() {
+    return AsyncWorkerType.TargetExternalViewCalcWorker;
+  }
 
+  @Override
+  public void execute(final ClusterEvent event) throws Exception {
+    ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
     ClusterConfig clusterConfig = cache.getClusterConfig();
-
     if (cache.isTaskCache() || !clusterConfig.isTargetExternalViewEnabled()) {
       return;
     }
 
-    if (tevWorker == null) {
-      LOG.info("Generating target external view synchronously for cluster {}, {} pipeline",
-          cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE");
-      doCalculateTargetExternalView(event);
-    } else {
-      // We have an async worker so update external view asynchronously
-      LOG.info("Sending target external view generating task for cluster {}, {} pipeline
to worker",
-          cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE");
-      tevWorker.queueEvent(getAsyncTaskDedupType(cache.isTaskCache()), new Runnable() {
-        @Override
-        public void run() {
-          doCalculateTargetExternalView(event);
-        }
-      });
-    }
-
-  }
-
-  private void doCalculateTargetExternalView(final ClusterEvent event) {
     HelixManager helixManager = event.getAttribute(AttributeName.helixmanager.name());
-    ClusterDataCache cache = event.getAttribute(AttributeName.ClusterDataCache.name());
     HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
 
     BestPossibleStateOutput bestPossibleAssignments =
@@ -84,10 +65,6 @@ public class TargetExteralViewCalcStage extends AbstractBaseStage {
         event.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
     Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.name());
 
-    long startTimeStamp = System.currentTimeMillis();
-    LOG.info("START: computing target external view for cluster {}, {} pipeline",
-        cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE");
-
     if (!accessor.getBaseDataAccessor()
         .exists(accessor.keyBuilder().targetExternalViews().getPath(), AccessOption.PERSISTENT))
{
       accessor.getBaseDataAccessor()
@@ -143,11 +120,6 @@ public class TargetExteralViewCalcStage extends AbstractBaseStage {
     }
     // TODO (HELIX-964): remove TEV when idealstate is removed
     accessor.setChildren(keys, targetExternalViews);
-
-    long endTimeStamp = System.currentTimeMillis();
-    LOG.info("END: computing target external view for cluster {}, {} pipeline. Took: {} ms",
-        cache.getClusterName(), cache.isTaskCache() ? "TASK" : "RESOURCE",
-        endTimeStamp - startTimeStamp);
   }
 
   private Map<String, Map<String, String>> convertToMapFields(

http://git-wip-us.apache.org/repos/asf/helix/blob/d22adbf9/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
index f1c59e1..2dca16b 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java
@@ -32,6 +32,7 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.Stage;
 import org.apache.helix.controller.pipeline.StageContext;
 import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
@@ -304,7 +305,14 @@ public class ZkIntegrationTestBase {
     StageContext context = new StageContext();
     stage.init(context);
     stage.preProcess();
-    stage.process(event);
+
+    // AbstractAsyncBaseStage will run asynchronously, and it's main logics are implemented
in
+    // execute() function call
+    if (stage instanceof AbstractAsyncBaseStage) {
+      ((AbstractAsyncBaseStage) stage).execute(event);
+    } else {
+      stage.process(event);
+    }
     stage.postProcess();
   }
 }


Mime
View raw message