helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject helix git commit: [HELIX-690] batch message execution should not share same context
Date Thu, 19 Apr 2018 18:09:35 GMT
Repository: helix
Updated Branches:
  refs/heads/master ae8eb5969 -> f99d9477f


[HELIX-690] batch message execution should not share same context


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f99d9477
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f99d9477
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f99d9477

Branch: refs/heads/master
Commit: f99d9477f279613fd7c3229008721b6d3a1699df
Parents: ae8eb59
Author: Harry Zhang <zhan849@usc.edu>
Authored: Mon Apr 16 09:55:43 2018 -0700
Committer: Harry Zhang <zhan849@usc.edu>
Committed: Thu Apr 19 11:06:51 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/NotificationContext.java   | 18 ++++++++
 .../handling/HelixBatchMessageTask.java         |  6 ++-
 .../helix/messaging/handling/HelixTask.java     |  2 +-
 .../messaging/handling/HelixTaskExecutor.java   | 43 ++++++++++++--------
 4 files changed, 50 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/f99d9477/helix-core/src/main/java/org/apache/helix/NotificationContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/NotificationContext.java b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
index 48a7d07..dd76b60 100644
--- a/helix-core/src/main/java/org/apache/helix/NotificationContext.java
+++ b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
@@ -74,6 +74,24 @@ public class NotificationContext {
   }
 
   /**
+   * Clone a new Notification context from existing one. Map contents are
+   * not recursively deep copied.
+   *
+   * @return new copy of NotificationContext
+   */
+  public NotificationContext clone() {
+
+    NotificationContext copy = new NotificationContext(_manager);
+    copy.setType(_type);
+    copy.setChangeType(_changeType);
+    copy.setPathChanged(_pathChanged);
+    copy.setEventName(_eventName);
+    copy.setCreationTime(_creationTime);
+    copy._map.putAll(_map);
+    return copy;
+  }
+
+  /**
    * Get the HelixManager associated with this notification
    *
    * @return {@link HelixManager} object

http://git-wip-us.apache.org/repos/asf/helix/blob/f99d9477/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
index 05b875e..86abce7 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixBatchMessageTask.java
@@ -48,7 +48,7 @@ public class HelixBatchMessageTask implements MessageTask {
     HelixTaskResult taskResult = null;
 
     long start = System.currentTimeMillis();
-    LOG.info("taskId:" + getTaskId() + " handling task begin, at: " + start);
+    LOG.info("BatchMsg task {} handling task begin, at: {}", getTaskId(), start);
 
     boolean isSucceed = true;
     try {
@@ -74,7 +74,9 @@ public class HelixBatchMessageTask implements MessageTask {
     }
 
     if (isSucceed) {
-      LOG.info("task: " + getTaskId() + " completed sucessfully");
+      LOG.info("BatchMsg task {} completed successfully", getTaskId());
+    } else {
+      LOG.warn("BatchMsg task {} failed.", getTaskId());
     }
 
     taskResult = new HelixTaskResult();

http://git-wip-us.apache.org/repos/asf/helix/blob/f99d9477/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
index 7225f70..337a933 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTask.java
@@ -84,7 +84,7 @@ public class HelixTask implements MessageTask {
 
     // add a concurrent map to hold currentStateUpdates for sub-messages of a batch-message
     // partitionName -> csUpdate
-    if (_message.getBatchMessageMode() == true) {
+    if (_message.getBatchMessageMode()) {
       _notificationContext.add(MapKey.CURRENT_STATE_UPDATE.toString(),
           new ConcurrentHashMap<String, CurrentStateUpdate>());
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/f99d9477/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index ccd7100..70a30ec 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -35,9 +36,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.codec.binary.StringUtils;
-import org.apache.helix.AccessOption;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.Criteria;
 import org.apache.helix.HelixConstants;
@@ -50,7 +48,6 @@ import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyType;
 import org.apache.helix.api.listeners.MessageListener;
 import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.controller.GenericHelixController;
@@ -62,10 +59,10 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
-import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
 import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
 import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor.ProcessedMessageState;
+import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -74,8 +71,6 @@ import org.apache.helix.util.StatusUpdateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-
 public class HelixTaskExecutor implements MessageListener, TaskExecutor {
   /**
    * Put together all registration information about a message handler factory
@@ -769,9 +764,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
     HelixDataAccessor accessor = manager.getHelixDataAccessor();
     Builder keyBuilder = accessor.keyBuilder();
 
-    // message handlers created
+    // message handlers and corresponding contexts created
     Map<String, MessageHandler> stateTransitionHandlers = new HashMap<>();
+    Map<String, NotificationContext> stateTransitionContexts = new HashMap<>();
+
     List<MessageHandler> nonStateTransitionHandlers = new ArrayList<>();
+    List<NotificationContext> nonStateTransitionContexts = new ArrayList<>();
 
     // message read
     List<Message> readMsgs = new ArrayList<>();
@@ -858,8 +856,9 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
       _monitor.reportReceivedMessage(message);
 
       // create message handlers, if handlers not found, leave its state as NEW
+      NotificationContext msgWorkingContext = changeContext.clone();
       try {
-        MessageHandler createHandler = createMessageHandler(message, changeContext);
+        MessageHandler createHandler = createMessageHandler(message, msgWorkingContext);
         if (createHandler == null) {
           continue;
         }
@@ -868,8 +867,12 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
           stateTransitionHandlers
               .put(getMessageTarget(message.getResourceName(), message.getPartitionName()),
                   createHandler);
+          stateTransitionContexts
+              .put(getMessageTarget(message.getResourceName(), message.getPartitionName()),
+                  msgWorkingContext);
         } else {
           nonStateTransitionHandlers.add(createHandler);
+          nonStateTransitionContexts.add(msgWorkingContext);
         }
       } catch (Exception e) {
         LOG.error("Failed to create message handler for " + message.getMsgId(), e);
@@ -885,7 +888,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
         continue;
       }
 
-      markReadMessage(message, changeContext, manager);
+      markReadMessage(message, msgWorkingContext, manager);
       readMsgs.add(message);
 
       // batch creation of all current state meta data
@@ -927,14 +930,22 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
     if (readMsgs.size() > 0) {
       updateMessageState(readMsgs, accessor, instanceName);
 
-      for (MessageHandler handler : stateTransitionHandlers.values()) {
-        HelixTask task = new HelixTask(handler._message, changeContext, handler, this);
-        scheduleTask(task);
+      for (Map.Entry<String, MessageHandler> handlerEntry : stateTransitionHandlers.entrySet())
{
+        MessageHandler handler = handlerEntry.getValue();
+        NotificationContext context = stateTransitionContexts.get(handlerEntry.getKey());
+        Message msg = handler._message;
+        scheduleTask(
+            new HelixTask(msg, context, handler, this)
+        );
       }
 
-      for (MessageHandler handler : nonStateTransitionHandlers) {
-        HelixTask task = new HelixTask(handler._message, changeContext, handler, this);
-        scheduleTask(task);
+      for (int i = 0; i < nonStateTransitionHandlers.size(); i++) {
+        MessageHandler handler = nonStateTransitionHandlers.get(i);
+        NotificationContext context = nonStateTransitionContexts.get(i);
+        Message msg = handler._message;
+        scheduleTask(
+            new HelixTask(msg, context, handler, this)
+        );
       }
     }
   }


Mime
View raw message