helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From h...@apache.org
Subject [helix] branch master updated: Prevent parallel controller pipelines run causing two master replicas (#1066)
Date Wed, 07 Oct 2020 02:44:00 GMT
This is an automated email from the ASF dual-hosted git repository.

hzlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new fa8f4b6  Prevent parallel controller pipelines run causing two master replicas (#1066)
fa8f4b6 is described below

commit fa8f4b676fc8cd81a80b2f8d8aa97d2cf5c5087f
Author: Huizhi Lu <ihuizhi.lu@gmail.com>
AuthorDate: Tue Oct 6 19:43:50 2020 -0700

    Prevent parallel controller pipelines run causing two master replicas (#1066)
    
    There is a case that after controller leader switches, pipelines of both old Helix controller leader and new leader are running in parallel.
    Different assignment decisions are sent to different participants so there are double masters for a single partition.
    
    The commit addresses this issue by blocking non-leader controller to send messages to ZK.
---
 .../main/java/org/apache/helix/HelixManager.java   |  12 ++
 .../main/java/org/apache/helix/HelixProperty.java  |   4 +-
 .../helix/controller/GenericHelixController.java   |  36 ++++-
 .../helix/controller/stages/AttributeName.java     |   4 +
 .../helix/controller/stages/ClusterEvent.java      |   3 +
 .../controller/stages/MessageDispatchStage.java    |  16 +++
 .../controller/stages/MessageGenerationPhase.java  |   8 +-
 .../apache/helix/manager/zk/ZKHelixManager.java    |  17 ++-
 .../main/java/org/apache/helix/model/Message.java  |  50 +++----
 .../java/org/apache/helix/common/ZkTestBase.java   |  20 +--
 .../controller/stages/DummyClusterManager.java     |  15 ++-
 .../controller/stages/TestCustomizedViewStage.java |   6 +-
 .../controller/stages/TestExternalViewStage.java   |   2 +-
 .../stages/TestMessageThrottleStage.java           |   4 +-
 .../controller/stages/TestRebalancePipeline.java   | 145 +++++++++++++++++----
 .../zookeeper/datamodel/SessionAwareZNRecord.java  |  64 +++++++++
 .../apache/helix/zookeeper/zkclient/ZkClient.java  | 102 ++++++++-------
 .../zookeeper/impl/client/TestRawZkClient.java     |  40 +++++-
 18 files changed, 421 insertions(+), 127 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index f403b7b..06a4ec5 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -20,6 +20,7 @@ package org.apache.helix;
  */
 
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.helix.api.listeners.ClusterConfigChangeListener;
@@ -420,6 +421,17 @@ public interface HelixManager {
   Long getSessionStartTime();
 
   /**
+   * Checks whether the cluster manager is leader and returns the session ID associated to the
+   * connection of cluster data store, if and only if it is leader.
+   *
+   * @return {@code Optional<String>} session ID is present inside the {@code Optional} object
+   * if the cluster manager is leader. Otherwise, returns an empty {@code Optional} object.
+   */
+  default Optional<String> getSessionIdIfLead() {
+    throw new UnsupportedOperationException("Not implemented");
+  }
+
+  /**
    * Check if the cluster manager is the leader
    * @return true if this is a controller and a leader of the cluster
    */
diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
index a631f40..52780ed 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.zookeeper.datamodel.SessionAwareZNRecord;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.ZNRecordDelta;
 import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
@@ -172,7 +173,8 @@ public class HelixProperty {
    * @param id
    */
   public HelixProperty(ZNRecord record, String id) {
-    _record = new ZNRecord(record, id);
+    _record = (record instanceof SessionAwareZNRecord) ? new SessionAwareZNRecord(record, id)
+        : new ZNRecord(record, id);
     _stat = new Stat(_record.getVersion(), _record.getCreationTime(), _record.getModifiedTime(),
         _record.getEphemeralOwner());
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index b2ab6ae..06e0fa1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -312,6 +312,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     changeContext.setType(NotificationContext.Type.CALLBACK);
     String uid = UUID.randomUUID().toString().substring(0, 8);
     ClusterEvent event = new ClusterEvent(_clusterName, eventType, uid);
+    event.addAttribute(AttributeName.EVENT_SESSION.name(),
+        changeContext.getManager().getSessionIdIfLead());
     event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
     event.addAttribute(AttributeName.changeContext.name(), changeContext);
     event.addAttribute(AttributeName.eventData.name(), new ArrayList<>());
@@ -703,10 +705,26 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(),
         _rebalancerRef.getRebalancer(manager));
 
-    if (!manager.isLeader()) {
-      logger.error("Cluster manager: " + manager.getInstanceName() + " is not leader for " + manager
-          .getClusterName() + ". Pipeline will not be invoked");
-      return;
+    Optional<String> eventSessionId = Optional.empty();
+    // We should expect only events in tests don't have it.
+    // All events generated by controller in prod should have EVENT_SESSION.
+    // This is to avoid tests failed or adding EVENT_SESSION to all ClusterEvent in tests.
+    // TODO: may add EVENT_SESSION to tests' cluster events that need it
+    if (!event.containsAttribute(AttributeName.EVENT_SESSION.name())) {
+      logger.info("Event {} does not have event session attribute", event.getEventId());
+    } else {
+      eventSessionId = event.getAttribute(AttributeName.EVENT_SESSION.name());
+      String managerSessionId = manager.getSessionId();
+
+      // If manager session changes, no need to run pipeline for the stale event.
+      if (!eventSessionId.isPresent() || !eventSessionId.get().equals(managerSessionId)) {
+        logger.warn(
+            "Controller pipeline is not invoked because event session doesn't match cluster " +
+                "manager session. Event type: {}, id: {}, session: {}, actual manager session: "
+                + "{}, instance: {}, cluster: {}", event.getEventType(), event.getEventId(),
+            eventSessionId.orElse("NOT_PRESENT"), managerSessionId, manager.getInstanceName(), manager.getClusterName());
+        return;
+      }
     }
 
     _helixManager = manager;
@@ -765,9 +783,9 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     }
     event.addAttribute(AttributeName.ControllerDataProvider.name(), dataProvider);
 
-    logger.info(String.format("START: Invoking %s controller pipeline for cluster %s event: %s  %s",
-        manager.getClusterName(), dataProvider.getPipelineName(), event.getEventType(),
-        event.getEventId()));
+    logger.info("START: Invoking {} controller pipeline for cluster: {}. Event type: {}, ID: {}. "
+            + "Event session ID: {}", manager.getClusterName(), dataProvider.getPipelineName(),
+        event.getEventType(), event.getEventId(), eventSessionId.orElse("NOT_PRESENT"));
 
     long startTime = System.currentTimeMillis();
     boolean rebalanceFail = false;
@@ -1124,6 +1142,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
     String uid = UUID.randomUUID().toString().substring(0, 8);
     ClusterEvent event = new ClusterEvent(_clusterName, eventType,
         String.format("%s_%s", uid, Pipeline.Type.DEFAULT.name()));
+    event.addAttribute(AttributeName.EVENT_SESSION.name(),
+        changeContext.getManager().getSessionIdIfLead());
     event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
     event.addAttribute(AttributeName.changeContext.name(), changeContext);
     event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool);
@@ -1368,6 +1388,8 @@ public class GenericHelixController implements IdealStateChangeListener, LiveIns
         String uid = UUID.randomUUID().toString().substring(0, 8);
         ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.Resume,
             String.format("%s_%s", uid, Pipeline.Type.DEFAULT.name()));
+        event.addAttribute(AttributeName.EVENT_SESSION.name(),
+            changeContext.getManager().getSessionIdIfLead());
         event.addAttribute(AttributeName.changeContext.name(), changeContext);
         event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
         event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), _asyncFIFOWorkerPool);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
index 9a0bbb6..9093166 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/AttributeName.java
@@ -41,6 +41,10 @@ public enum AttributeName {
   LastRebalanceFinishTimeStamp,
   ControllerDataProvider,
   STATEFUL_REBALANCER,
+
+  /** This is the cluster manager's session id when event is received. */
+  EVENT_SESSION,
+
   // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
   TO_BE_PURGED_WORKFLOWS,
   // This attribute should only be used in TaskGarbageCollectionStage, misuse could cause race conditions.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
index 4e27fa6..83b9675 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java
@@ -90,6 +90,9 @@ public class ClusterEvent {
     return _eventId;
   }
 
+  public boolean containsAttribute(String attrName) {
+    return _eventAttributeMap.containsKey(attrName);
+  }
 
   @SuppressWarnings("unchecked")
   public <T extends Object> T getAttribute(String attrName) {
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageDispatchStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageDispatchStage.java
index 74230c7..fab07e4 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageDispatchStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageDispatchStage.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -78,7 +79,22 @@ public abstract class MessageDispatchStage extends AbstractBaseStage {
         batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveInstanceMap,
             manager.getProperties());
 
+    // Only expect tests' events don't have EVENT_SESSION, while all events in prod should have it.
+    if (!event.containsAttribute(AttributeName.EVENT_SESSION.name())) {
+      logger.info("Event {} does not have event session attribute", event.getEventId());
+    } else {
+      // An early check for expected leader session. If the sessions don't match, it means the
+      // controller's session changes, then messages should not be sent and pipeline should stop.
+      Optional<String> expectedSession = event.getAttribute(AttributeName.EVENT_SESSION.name());
+      if (!expectedSession.isPresent() || !expectedSession.get().equals(manager.getSessionId())) {
+        throw new StageException(String.format(
+            "Event session doesn't match controller %s session! Expected session: %s, actual: %s",
+            manager.getInstanceName(), expectedSession.orElse("NOT_PRESENT"), manager.getSessionId()));
+      }
+    }
+
     List<Message> messagesSent = sendMessages(dataAccessor, outputMessages);
+
     // TODO: Need also count messages from task rebalancer
     if (!(cache instanceof WorkflowControllerDataProvider)) {
       ClusterStatusMonitor clusterStatusMonitor =
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 955fc9c..8774812 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -400,6 +400,7 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
       String partitionName, String instanceName, String currentState, String nextState,
       String sessionId, String stateModelDefName) {
     String uuid = UUID.randomUUID().toString();
+    String managerSessionId = manager.getSessionId();
     Message message = new Message(MessageType.STATE_TRANSITION, uuid);
     message.setSrcName(manager.getInstanceName());
     message.setTgtName(instanceName);
@@ -409,7 +410,8 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
     message.setFromState(currentState);
     message.setToState(nextState);
     message.setTgtSessionId(sessionId);
-    message.setSrcSessionId(manager.getSessionId());
+    message.setSrcSessionId(managerSessionId);
+    message.setExpectedSessionId(managerSessionId);
     message.setStateModelDef(stateModelDefName);
     message.setStateModelFactoryName(resource.getStateModelFactoryname());
     message.setBucketSize(resource.getBucketSize());
@@ -436,6 +438,7 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
               + ", nextState: " + (nextState == null ? "N/A" : nextState));
 
       String uuid = UUID.randomUUID().toString();
+      String managerSessionId = manager.getSessionId();
       Message message = new Message(MessageType.STATE_TRANSITION_CANCELLATION, uuid);
       message.setSrcName(manager.getInstanceName());
       message.setTgtName(instanceName);
@@ -445,7 +448,8 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage {
       message.setFromState(fromState);
       message.setToState(toState);
       message.setTgtSessionId(sessionId);
-      message.setSrcSessionId(manager.getSessionId());
+      message.setSrcSessionId(managerSessionId);
+      message.setExpectedSessionId(managerSessionId);
       message.setStateModelDef(stateModelDefName);
       message.setStateModelFactoryName(resource.getStateModelFactoryname());
       message.setBucketSize(resource.getBucketSize());
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index e943985..05e1e26 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.Timer;
 import java.util.concurrent.TimeUnit;
@@ -974,6 +975,11 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
 
   @Override
   public boolean isLeader() {
+    return getSessionIdIfLead().isPresent();
+  }
+
+  @Override
+  public Optional<String> getSessionIdIfLead() {
     String warnLogPrefix = String
         .format("Instance %s is not leader of cluster %s due to", _instanceName, _clusterName);
     if (_instanceType != InstanceType.CONTROLLER
@@ -981,12 +987,12 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
       LOG.warn(String
           .format("%s instance type %s does not match to CONTROLLER/CONTROLLER_PARTICIPANT",
               warnLogPrefix, _instanceType.name()));
-      return false;
+      return Optional.empty();
     }
 
     if (!isConnected()) {
       LOG.warn(String.format("%s HelixManager is not connected", warnLogPrefix));
-      return false;
+      return Optional.empty();
     }
 
     try {
@@ -996,7 +1002,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
         String sessionId = leader.getEphemeralOwner();
         if (leaderName != null && leaderName.equals(_instanceName) && sessionId
             .equals(_sessionId)) {
-          return true;
+          // Ensure the same leader session is set and returned. If we get _session from helix
+          // manager, _session might change after the check. This guarantees the session is
+          // leader's session we checked.
+          return Optional.of(sessionId);
         }
         LOG.warn(String
             .format("%s current session %s does not match leader session %s", warnLogPrefix,
@@ -1007,7 +1016,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener {
     } catch (Exception e) {
       LOG.warn(String.format("%s exception happen when session check", warnLogPrefix), e);
     }
-    return false;
+    return Optional.empty();
   }
 
   @Override
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 6f3175d..c81a494 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -39,6 +39,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.util.HelixUtil;
+import org.apache.helix.zookeeper.datamodel.SessionAwareZNRecord;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 /**
@@ -136,8 +137,6 @@ public class Message extends HelixProperty {
     }
   };
 
-  // AtomicInteger _groupMsgCountDown = new AtomicInteger(1);
-
   /**
    * Instantiate a message
    * @param type the message category
@@ -153,7 +152,7 @@ public class Message extends HelixProperty {
    * @param msgId unique message identifier
    */
   public Message(String type, String msgId) {
-    super(new ZNRecord(msgId));
+    super(new SessionAwareZNRecord(msgId), msgId);
     _record.setSimpleField(Attributes.MSG_TYPE.toString(), type);
     setMsgId(msgId);
     setMsgState(MessageState.NEW);
@@ -165,7 +164,7 @@ public class Message extends HelixProperty {
    * @param record a ZNRecord corresponding to a message
    */
   public Message(ZNRecord record) {
-    super(record);
+    super(new SessionAwareZNRecord(record, record.getId()));
     if (getMsgState() == null) {
       setMsgState(MessageState.NEW);
     }
@@ -175,42 +174,45 @@ public class Message extends HelixProperty {
   }
 
   /**
-   * Set the time that the message was created
-   * @param timestamp a UNIX timestamp (in ms)
-   */
-  public void setCreateTimeStamp(long timestamp) {
-    _record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), timestamp);
-  }
-
-  /**
-   * Set the time that the message was expected to be completed
-   * @param timestamp a UNIX timestamp (in ms)
-   */
-  public void setCompletionDueTimeStamp(long timestamp) {
-    _record.setLongField(Attributes.COMPLETION_DUE_TIMESTAMP.name(), timestamp);
-  }
-
-  /**
    * Instantiate a message with a new id
    * @param record a ZNRecord corresponding to a message
    * @param id unique message identifier
    */
   public Message(ZNRecord record, String id) {
-    super(new ZNRecord(record, id));
+    super(new SessionAwareZNRecord(record, id));
     setMsgId(id);
   }
 
   /**
+   * @deprecated Not being used.
+   *
    * Instantiate a message with a new id
    * @param message message to be copied
    * @param id unique message identifier
    */
+  @Deprecated
   public Message(Message message, String id) {
-    super(new ZNRecord(message.getRecord(), id));
+    super(new SessionAwareZNRecord(message.getRecord(), id));
     setMsgId(id);
   }
 
   /**
+   * Set the time that the message was created
+   * @param timestamp a UNIX timestamp
+   */
+  public void setCreateTimeStamp(long timestamp) {
+    _record.setLongField(Attributes.CREATE_TIMESTAMP.toString(), timestamp);
+  }
+
+  /**
+   * Set the time that the message was expected to be completed
+   * @param timestamp a UNIX timestamp (in ms)
+   */
+  public void setCompletionDueTimeStamp(long timestamp) {
+    _record.setLongField(Attributes.COMPLETION_DUE_TIMESTAMP.name(), timestamp);
+  }
+
+  /**
    * Set a subtype of the message
    * @param subType name of the subtype
    */
@@ -274,6 +276,10 @@ public class Message extends HelixProperty {
     _record.setSimpleField(Attributes.SRC_SESSION_ID.toString(), srcSessionId);
   }
 
+  public void setExpectedSessionId(String expectedSessionId) {
+    ((SessionAwareZNRecord) _record).setExpectedSessionId(expectedSessionId);
+  }
+
   /**
    * Get the session identifier of the node that executes the message
    * @return session identifier
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index 0d9c6d2..67fee96 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -44,8 +44,6 @@ import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.TestHelper;
-import org.apache.helix.zookeeper.impl.client.ZkClient;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.Pipeline;
@@ -60,8 +58,6 @@ import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
-import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.ConfigScope;
@@ -78,6 +74,10 @@ import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.tools.ClusterSetup;
 import org.apache.helix.tools.ClusterStateVerifier;
 import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
 import org.apache.helix.zookeeper.zkclient.IZkStateListener;
 import org.apache.helix.zookeeper.zkclient.ZkConnection;
 import org.apache.helix.zookeeper.zkclient.ZkServer;
@@ -788,14 +788,18 @@ public class ZkTestBase {
     }
   }
 
-  protected void runPipeline(ClusterEvent event, Pipeline pipeline) {
+  protected void runPipeline(ClusterEvent event, Pipeline pipeline, boolean shouldThrowException)
+      throws Exception {
     try {
       pipeline.handle(event);
       pipeline.finish();
     } catch (Exception e) {
-      LOG.error(
-          "Exception while executing pipeline:" + pipeline + ". Will not continue to next pipeline",
-          e);
+      if (shouldThrowException) {
+        throw e;
+      } else {
+        LOG.error("Exception while executing pipeline: {}. Will not continue to next pipeline",
+            pipeline, e);
+      }
     }
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index f56ddf9..5d23a32 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -58,11 +58,20 @@ public class DummyClusterManager implements HelixManager {
   HelixDataAccessor _accessor;
   String _clusterName;
   String _sessionId;
+  private String _instanceName;
 
   public DummyClusterManager(String clusterName, HelixDataAccessor accessor) {
     _clusterName = clusterName;
     _accessor = accessor;
     _sessionId = "session_" + clusterName;
+    _instanceName = "DummyInstance_" + clusterName;
+  }
+
+  public DummyClusterManager(String clusterName, HelixDataAccessor accessor, String sessionId) {
+    _clusterName = clusterName;
+    _accessor = accessor;
+    _sessionId = sessionId;
+    _instanceName = "DummyInstance_" + clusterName;
   }
 
   @Override
@@ -198,8 +207,7 @@ public class DummyClusterManager implements HelixManager {
 
   @Override
   public String getInstanceName() {
-    // TODO Auto-generated method stub
-    return null;
+    return _instanceName;
   }
 
   @Override
@@ -375,4 +383,7 @@ public class DummyClusterManager implements HelixManager {
     return 0L;
   }
 
+  protected void setSessionId(String sessionId) {
+    _sessionId = sessionId;
+  }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
index 1e46a99..44bd623 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java
@@ -87,7 +87,7 @@ public class TestCustomizedViewStage extends ZkUnitTestBase {
         new CustomizedViewAggregationStage();
     Pipeline dataRefresh = new Pipeline();
     dataRefresh.addStage(new ReadClusterDataStage());
-    runPipeline(event, dataRefresh);
+    runPipeline(event, dataRefresh, false);
     runStage(event, new ResourceComputationStage());
     runStage(event, new CustomizedStateComputationStage());
     runStage(event, customizedViewComputeStage);
@@ -153,7 +153,7 @@ public class TestCustomizedViewStage extends ZkUnitTestBase {
 
     Pipeline dataRefresh = new Pipeline();
     dataRefresh.addStage(new ReadClusterDataStage());
-    runPipeline(event, dataRefresh);
+    runPipeline(event, dataRefresh, false);
     runStage(event, new ResourceComputationStage());
     runStage(event, new CustomizedStateComputationStage());
     runStage(event, new CustomizedViewAggregationStage());
@@ -212,7 +212,7 @@ public class TestCustomizedViewStage extends ZkUnitTestBase {
 
     Pipeline dataRefresh = new Pipeline();
     dataRefresh.addStage(new ReadClusterDataStage());
-    runPipeline(event, dataRefresh);
+    runPipeline(event, dataRefresh, false);
     runStage(event, new ResourceComputationStage());
     runStage(event, new CustomizedStateComputationStage());
     runStage(event, new CustomizedViewAggregationStage());
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestExternalViewStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestExternalViewStage.java
index 5f80e1f..0c84d8d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestExternalViewStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestExternalViewStage.java
@@ -62,7 +62,7 @@ public class TestExternalViewStage extends ZkUnitTestBase {
     ExternalViewComputeStage externalViewComputeStage = new ExternalViewComputeStage();
     Pipeline dataRefresh = new Pipeline();
     dataRefresh.addStage(new ReadClusterDataStage());
-    runPipeline(event, dataRefresh);
+    runPipeline(event, dataRefresh, false);
     runStage(event, new ResourceComputationStage());
     runStage(event, new CurrentStateComputationStage());
     runStage(event, externalViewComputeStage);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
index 8d26e38..e7b9d34 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMessageThrottleStage.java
@@ -84,7 +84,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
 
     Pipeline dataRefresh = new Pipeline();
     dataRefresh.addStage(new ReadClusterDataStage());
-    runPipeline(event, dataRefresh);
+    runPipeline(event, dataRefresh, false);
 
     try {
       runStage(event, throttleStage);
@@ -258,7 +258,7 @@ public class TestMessageThrottleStage extends ZkUnitTestBase {
 
     Pipeline dataRefresh = new Pipeline();
     dataRefresh.addStage(new ReadClusterDataStage());
-    runPipeline(event, dataRefresh);
+    runPipeline(event, dataRefresh, false);
     runStage(event, new ResourceComputationStage());
     MessageOutput msgSelectOutput = new MessageOutput();
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index fe32293..84ee28d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -35,6 +36,7 @@ import org.apache.helix.TestHelper;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.controller.stages.resource.ResourceMessageDispatchStage;
 import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -61,7 +63,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
     refreshClusterConfig(clusterName, accessor);
-    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+    HelixManager manager =
+        new DummyClusterManager(clusterName, accessor, Long.toHexString(_gZkClient.getSessionId()));
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
     event.addAttribute(AttributeName.helixmanager.name(), manager);
     ResourceControllerDataProvider dataCache = new ResourceControllerDataProvider();
@@ -104,8 +107,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
         "OFFLINE");
 
-    runPipeline(event, dataRefresh);
-    runPipeline(event, rebalancePipeline);
+    runPipeline(event, dataRefresh, false);
+    runPipeline(event, rebalancePipeline, false);
     MessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     List<Message> messages =
         msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
@@ -121,7 +124,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0",
         liveInstances.get(0).getEphemeralOwner(), "SLAVE");
 
-    runPipeline(event, dataRefresh);
+    runPipeline(event, dataRefresh, false);
     refreshClusterConfig(clusterName, accessor);
 
     Pipeline computationPipeline = new Pipeline();
@@ -136,14 +139,14 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     messagePipeline.addStage(new MessageThrottleStage());
     messagePipeline.addStage(new ResourceMessageDispatchStage());
 
-    runPipeline(event, computationPipeline);
+    runPipeline(event, computationPipeline, false);
 
     Map<String, Map<String, Message>> staleMessages = dataCache.getStaleMessages();
     Assert.assertEquals(staleMessages.size(), 1);
     Assert.assertTrue(staleMessages.containsKey("localhost_0"));
     Assert.assertTrue(staleMessages.get("localhost_0").containsKey(message.getMsgId()));
 
-    runPipeline(event, messagePipeline);
+    runPipeline(event, messagePipeline, false);
 
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
@@ -152,7 +155,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     Assert.assertTrue(messages.get(0).getFromState().equalsIgnoreCase("SLAVE"));
     Assert.assertTrue(messages.get(0).getToState().equalsIgnoreCase("MASTER"));
 
-    runPipeline(event, dataRefresh);
+    runPipeline(event, dataRefresh, false);
 
     // Verify the stale message should be deleted
     Assert.assertTrue(TestHelper.verify(() -> {
@@ -292,13 +295,14 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
   }
 
   @Test
-  public void testChangeIdealStateWithPendingMsg() {
+  public void testChangeIdealStateWithPendingMsg() throws Exception {
     String clusterName = "CLUSTER_" + _className + "_pending";
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
-    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+    HelixManager manager =
+        new DummyClusterManager(clusterName, accessor, Long.toHexString(_gZkClient.getSessionId()));
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
     event.addAttribute(AttributeName.helixmanager.name(), manager);
 
@@ -339,8 +343,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
         "OFFLINE");
 
-    runPipeline(event, dataRefresh);
-    runPipeline(event, rebalancePipeline);
+    runPipeline(event, dataRefresh, false);
+    runPipeline(event, rebalancePipeline, false);
     MessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     List<Message> messages =
         msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
@@ -357,10 +361,10 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     List<IdealState> idealStates = accessor.getChildValues(accessor.keyBuilder().idealStates(), true);
     cache.setIdealStates(idealStates);
 
-    runPipeline(event, dataRefresh);
+    runPipeline(event, dataRefresh, false);
     cache = event.getAttribute(AttributeName.ControllerDataProvider.name());
     cache.setClusterConfig(new ClusterConfig(clusterName));
-    runPipeline(event, rebalancePipeline);
+    runPipeline(event, rebalancePipeline, false);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 0,
@@ -371,8 +375,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     Builder keyBuilder = accessor.keyBuilder();
     List<String> msgIds = accessor.getChildNames(keyBuilder.messages("localhost_0"));
     accessor.removeProperty(keyBuilder.message("localhost_0", msgIds.get(0)));
-    runPipeline(event, dataRefresh);
-    runPipeline(event, rebalancePipeline);
+    runPipeline(event, dataRefresh, false);
+    runPipeline(event, rebalancePipeline, false);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 1,
@@ -388,14 +392,15 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
   }
 
   @Test
-  public void testMasterXfer() {
+  public void testMasterXfer() throws Exception {
     String clusterName = "CLUSTER_" + _className + "_xfer";
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
-    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+    HelixManager manager =
+        new DummyClusterManager(clusterName, accessor, Long.toHexString(_gZkClient.getSessionId()));
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
     event.addAttribute(AttributeName.helixmanager.name(), manager);
     event.addAttribute(AttributeName.ControllerDataProvider.name(),
@@ -435,8 +440,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", liveInstances.get(0).getEphemeralOwner(),
         "SLAVE");
 
-    runPipeline(event, dataRefresh);
-    runPipeline(event, rebalancePipeline);
+    runPipeline(event, dataRefresh, false);
+    runPipeline(event, rebalancePipeline, false);
     MessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     List<Message> messages =
         msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
@@ -454,8 +459,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setCurrentState(clusterName, "localhost_0", resourceName, resourceName + "_0", "session_0",
         "SLAVE");
 
-    runPipeline(event, dataRefresh);
-    runPipeline(event, rebalancePipeline);
+    runPipeline(event, dataRefresh, false);
+    runPipeline(event, rebalancePipeline, false);
     msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     messages = msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
     Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message: SLAVE-MASTER for node0");
@@ -466,14 +471,15 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
   }
 
   @Test
-  public void testNoDuplicatedMaster() {
+  public void testNoDuplicatedMaster() throws Exception {
     String clusterName = "CLUSTER_" + _className + "_no_duplicated_master";
 
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     HelixDataAccessor accessor =
         new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
-    HelixManager manager = new DummyClusterManager(clusterName, accessor);
+    HelixManager manager =
+        new DummyClusterManager(clusterName, accessor, Long.toHexString(_gZkClient.getSessionId()));
     ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
     event.addAttribute(AttributeName.helixmanager.name(), manager);
     event.addAttribute(AttributeName.ControllerDataProvider.name(),
@@ -518,8 +524,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     setCurrentState(clusterName, "localhost_1", resourceName, resourceName + "_0", liveInstances.get(1).getEphemeralOwner(),
         "MASTER");
 
-    runPipeline(event, dataRefresh);
-    runPipeline(event, rebalancePipeline);
+    runPipeline(event, dataRefresh, false);
+    runPipeline(event, rebalancePipeline, false);
     MessageOutput msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
     List<Message> messages =
         msgSelOutput.getMessages(resourceName, new Partition(resourceName + "_0"));
@@ -535,6 +541,95 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
   }
 
+  /*
+   * Tests if controller loses leadership when pipeline is still running, messages should not be
+   * sent to participants. No message sent on leadership loss prevents double masters issue.
+   * This test simulates that controller leader's zk session changes after ReadClusterDataStage
+   * and so state transition messages are not sent out and stage exception is thrown to terminate
+   * the pipeline.
+   */
+  @Test
+  public void testNoMessageSentOnControllerLeadershipLoss() throws Exception {
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = _className + "_" + methodName;
+
+    final String resourceName = "testResource_" + methodName;
+    final String partitionName = resourceName + "_0";
+    String[] resourceGroups = new String[] {
+        resourceName
+    };
+
+    // ideal state: localhost_0 is MASTER
+    // replica=1 means 1 master
+    setupIdealState(clusterName, new int[] {
+        0
+    }, resourceGroups, 1, 1);
+    List<LiveInstance> liveInstances = setupLiveInstances(clusterName, new int[] {
+        0
+    });
+    setupStateModel(clusterName);
+
+    HelixDataAccessor accessor =
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient));
+    DummyClusterManager manager =
+        new DummyClusterManager(clusterName, accessor, Long.toHexString(_gZkClient.getSessionId()));
+    ClusterEvent event = new ClusterEvent(clusterName, ClusterEventType.OnDemandRebalance);
+    event.addAttribute(AttributeName.helixmanager.name(), manager);
+    event.addAttribute(AttributeName.ControllerDataProvider.name(),
+        new ResourceControllerDataProvider());
+    event.addAttribute(AttributeName.EVENT_SESSION.name(), Optional.of(manager.getSessionId()));
+    refreshClusterConfig(clusterName, accessor);
+
+    // cluster data cache refresh pipeline
+    Pipeline dataRefresh = new Pipeline();
+    dataRefresh.addStage(new ReadClusterDataStage());
+
+    // rebalance pipeline
+    Pipeline rebalancePipeline = new Pipeline();
+    rebalancePipeline.addStage(new ResourceComputationStage());
+    rebalancePipeline.addStage(new CurrentStateComputationStage());
+    rebalancePipeline.addStage(new BestPossibleStateCalcStage());
+    rebalancePipeline.addStage(new IntermediateStateCalcStage());
+    rebalancePipeline.addStage(new ResourceMessageGenerationPhase());
+    rebalancePipeline.addStage(new MessageSelectionStage());
+    rebalancePipeline.addStage(new MessageThrottleStage());
+    rebalancePipeline.addStage(new ResourceMessageDispatchStage());
+
+    // set node0 currentState to SLAVE, node1 currentState to MASTER
+    // Helix will try to switch the state of the instance, so it should transit S->M
+    setCurrentState(clusterName, "localhost_0", resourceName, partitionName,
+        liveInstances.get(0).getEphemeralOwner(), "SLAVE");
+
+    runPipeline(event, dataRefresh, false);
+
+    // After data refresh, controller loses leadership and its session id changes.
+    manager.setSessionId(manager.getSessionId() + "_new");
+
+    try {
+      // Because leader loses leadership, StageException should be thrown and message is not sent.
+      runPipeline(event, rebalancePipeline, true);
+      Assert.fail("StageException should be thrown because controller leader session changed.");
+    } catch (StageException e) {
+      Assert.assertTrue(
+          e.getMessage().matches("Event session doesn't match controller .* Expected session: .*"));
+    }
+
+    // Verify the ST message not being sent out is the expected one to
+    // transit the replica SLAVE->MASTER
+    MessageOutput msgThrottleOutput = event.getAttribute(AttributeName.MESSAGES_THROTTLE.name());
+    List<Message> messages =
+        msgThrottleOutput.getMessages(resourceName, new Partition(partitionName));
+    Assert.assertEquals(messages.size(), 1,
+        "Should output 1 message: SLAVE->MASTER for localhost_0");
+    Message message = messages.get(0);
+    Assert.assertEquals(message.getFromState(), "SLAVE");
+    Assert.assertEquals(message.getToState(), "MASTER");
+    Assert.assertEquals(message.getTgtName(), "localhost_0");
+
+    deleteLiveInstances(clusterName);
+    deleteCluster(clusterName);
+  }
+
   protected void setCurrentState(String clusterName, String instance, String resourceGroupName,
       String resourceKey, String sessionId, String state) {
     setCurrentState(clusterName, instance, resourceGroupName, resourceKey, sessionId, state, false);
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/SessionAwareZNRecord.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/SessionAwareZNRecord.java
new file mode 100644
index 0000000..e79c319
--- /dev/null
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/SessionAwareZNRecord.java
@@ -0,0 +1,64 @@
+package org.apache.helix.zookeeper.datamodel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * A class represents a session aware ZNRecord: the ZNRecord should be written to zk by
+ * the expected zk session. When the ZNRecord is being written to zk, if the actual
+ * zk session id doesn't match the expected zk session id set in the {@code SessionAwareZNRecord},
+ * writing to zk will fail. It is supposed to be used within Helix only.
+ * <p>
+ * If this ZNRecord is not supposed to be written only by the expected zk session,
+ * {@link ZNRecord} is recommended to use.
+ */
+public class SessionAwareZNRecord extends ZNRecord {
+  @JsonIgnore
+  private String expectedSessionId;
+
+  public SessionAwareZNRecord(String id) {
+    super(id);
+  }
+
+  public SessionAwareZNRecord(ZNRecord record, String id) {
+    super(record, id);
+  }
+
+  /**
+   * Gets expected zk session id.
+   *
+   * @return id of the expected zk session that is supposed to write the ZNRecord.
+   */
+  @JsonIgnore
+  public String getExpectedSessionId() {
+    return expectedSessionId;
+  }
+
+  /**
+   * Sets expected zk session id that is supposed to write the ZNRecord.
+   *
+   * @param sessionId
+   */
+  @JsonIgnore
+  public void setExpectedSessionId(String sessionId) {
+    expectedSessionId = sessionId;
+  }
+}
diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
index 2f26f8b..d7b0b3f 100644
--- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
+++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkClient.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.management.JMException;
 
 import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
+import org.apache.helix.zookeeper.datamodel.SessionAwareZNRecord;
 import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.exception.ZkClientException;
@@ -719,41 +720,8 @@ public class ZkClient implements Watcher {
       final byte[] dataBytes = dataObject == null ? null : serialize(dataObject, path);
       checkDataSizeLimit(dataBytes);
 
-      final String actualPath = retryUntilConnected(() -> {
-        ZooKeeper zooKeeper = ((ZkConnection) getConnection()).getZookeeper();
-
-        /*
-         * 1. If operation is session aware, we have to check whether or not the
-         * passed-in(expected) session id matches actual session's id.
-         * If not, ephemeral node creation is failed. This validation is
-         * critical to guarantee the ephemeral node created by the expected ZK session.
-         *
-         * 2. Otherwise, the operation is NOT session aware.
-         * In this case, we will use the actual zookeeper session to create the node.
-         */
-        if (isSessionAwareOperation(expectedSessionId, mode)) {
-          acquireEventLock();
-          try {
-            final String actualSessionId = Long.toHexString(zooKeeper.getSessionId());
-            if (!actualSessionId.equals(expectedSessionId)) {
-              throw new ZkSessionMismatchedException(
-                  "Failed to create ephemeral node! There is a session id mismatch. Expected: "
-                      + expectedSessionId + ". Actual: " + actualSessionId);
-            }
-
-            /*
-             * Cache the zookeeper reference and make sure later zooKeeper.create() is being run
-             * under this zookeeper connection. This is to avoid locking zooKeeper.create() which
-             * may cause potential performance issue.
-             */
-            zooKeeper = ((ZkConnection) getConnection()).getZookeeper();
-          } finally {
-            getEventLock().unlock();
-          }
-        }
-
-        return zooKeeper.create(path, dataBytes, acl, mode);
-      });
+      final String actualPath = retryUntilConnected(
+          () -> getExpectedZookeeper(expectedSessionId).create(path, dataBytes, acl, mode));
 
       record(path, dataBytes, startT, ZkClientMonitor.AccessType.WRITE);
       return actualPath;
@@ -1945,18 +1913,20 @@ public class ZkClient implements Watcher {
           new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
       return;
     }
-    doAsyncCreate(path, data, mode, startT, cb);
+    doAsyncCreate(path, data, mode, startT, cb, parseExpectedSessionId(datat));
   }
 
   private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode,
-      final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb) {
+      final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb,
+      final String expectedSessionId) {
     retryUntilConnected(() -> {
-      ((ZkConnection) getConnection()).getZookeeper()
+      getExpectedZookeeper(expectedSessionId)
           .create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, cb,
               new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false) {
                 @Override
                 protected void doRetry() {
-                  doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb);
+                  doAsyncCreate(path, data, mode, System.currentTimeMillis(), cb,
+                      expectedSessionId);
                 }
               });
       return null;
@@ -1975,18 +1945,19 @@ public class ZkClient implements Watcher {
           new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null);
       return;
     }
-    doAsyncSetData(path, data, version, startT, cb);
+    doAsyncSetData(path, data, version, startT, cb, parseExpectedSessionId(datat));
   }
 
   private void doAsyncSetData(final String path, byte[] data, final int version, final long startT,
-      final ZkAsyncCallbacks.SetDataCallbackHandler cb) {
+      final ZkAsyncCallbacks.SetDataCallbackHandler cb, final String expectedSessionId) {
     retryUntilConnected(() -> {
-      ((ZkConnection) getConnection()).getZookeeper().setData(path, data, version, cb,
+      getExpectedZookeeper(expectedSessionId).setData(path, data, version, cb,
           new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT,
               data == null ? 0 : data.length, false) {
             @Override
             protected void doRetry() {
-              doAsyncSetData(path, data, version, System.currentTimeMillis(), cb);
+              doAsyncSetData(path, data, version, System.currentTimeMillis(), cb,
+                  expectedSessionId);
             }
           });
       return null;
@@ -2362,13 +2333,46 @@ public class ZkClient implements Watcher {
   }
 
   /*
-   * Session aware operation needs below requirements:
-   * 1. the session id is NOT null or empty
-   * 2. create mode is EPHEMERAL or EPHEMERAL_SEQUENTIAL
+   * Gets the zookeeper instance that ensures its session ID matches the expected session ID.
+   * It is used for write operations that suppose the znode to be created by the expected session.
    */
-  private boolean isSessionAwareOperation(String expectedSessionId, CreateMode mode) {
-    return expectedSessionId != null && !expectedSessionId.isEmpty() && (
-        mode == CreateMode.EPHEMERAL || mode == CreateMode.EPHEMERAL_SEQUENTIAL);
+  private ZooKeeper getExpectedZookeeper(final String expectedSessionId) {
+    /*
+     * Cache the zookeeper reference and make sure later zooKeeper.create() is being run
+     * under this zookeeper connection. This is to avoid zk session change after expected
+     * session check.
+     */
+    ZooKeeper zk = ((ZkConnection) getConnection()).getZookeeper();
+
+    /*
+     * The operation is NOT session aware, we will use the actual zookeeper session without
+     * checking expected session.
+     */
+    if (expectedSessionId == null || expectedSessionId.isEmpty()) {
+      return zk;
+    }
+
+    /*
+     * If operation is session aware (expectedSession is valid),
+     * we have to check whether or not the passed-in(expected) session id
+     * matches actual session's id.
+     * If not, we should not return a zk object for the zk operation.
+     */
+    final String actualSessionId = Long.toHexString(zk.getSessionId());
+    if (!actualSessionId.equals(expectedSessionId)) {
+      throw new ZkSessionMismatchedException(
+          "Failed to get expected zookeeper instance! There is a session id mismatch. Expected: "
+              + expectedSessionId + ". Actual: " + actualSessionId);
+    }
+
+    return zk;
+  }
+
+  private String parseExpectedSessionId(Object data) {
+    if (!(data instanceof SessionAwareZNRecord)) {
+      return null;
+    }
+    return ((SessionAwareZNRecord) data).getExpectedSessionId();
   }
 
   // operations to update monitor's counters
diff --git a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
index 535d760..a227f36 100644
--- a/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
+++ b/zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client/TestRawZkClient.java
@@ -38,6 +38,7 @@ import javax.management.ObjectName;
 import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
 import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys;
+import org.apache.helix.zookeeper.datamodel.SessionAwareZNRecord;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.exception.ZkClientException;
@@ -148,7 +149,6 @@ public class TestRawZkClient extends ZkTestBase {
     }
   }
 
-
   /*
    * Tests state changes subscription for I0Itec's IZkStateListener.
    * This is a test for backward compatibility.
@@ -863,9 +863,47 @@ public class TestRawZkClient extends ZkTestBase {
       }
       zkClient.delete("/tmp/async");
       zkClient.delete("/tmp/asyncOversize");
+      zkClient.close();
     }
   }
 
+  @Test
+  public void testAsyncCreateByExpectedSession() throws Exception {
+    ZkClient zkClient = new ZkClient(ZkTestBase.ZK_ADDR);
+    zkClient.setZkSerializer(new ZNRecordSerializer());
+    String sessionId = Long.toHexString(zkClient.getSessionId());
+    String path = "/" + TestHelper.getTestClassName() + "_" + TestHelper.getTestMethodName();
+    SessionAwareZNRecord record = new SessionAwareZNRecord("test");
+
+    // Set a dummy session id string to be mismatched with the real session id in ZkClient.
+    record.setExpectedSessionId("ExpectedSession");
+    ZkAsyncCallbacks.CreateCallbackHandler createCallback =
+        new ZkAsyncCallbacks.CreateCallbackHandler();
+
+    try {
+      zkClient.asyncCreate(path, record, CreateMode.PERSISTENT, createCallback);
+      createCallback.waitForSuccess();
+      Assert.fail("Invalid session should not create znode");
+    } catch (ZkSessionMismatchedException expected) {
+      Assert.assertEquals(expected.getMessage(),
+          "Failed to get expected zookeeper instance! There is a session id mismatch. Expected: "
+              + "ExpectedSession. Actual: " + sessionId);
+    }
+
+    Assert.assertFalse(zkClient.exists(path));
+
+    // A valid session should be able to create the znode.
+    record.setExpectedSessionId(sessionId);
+    zkClient.asyncCreate(path, record, CreateMode.PERSISTENT, createCallback);
+    createCallback.waitForSuccess();
+
+    Assert.assertEquals(createCallback.getRc(), 0);
+    Assert.assertTrue(zkClient.exists(path));
+
+    TestHelper.verify(() -> zkClient.delete(path), TestHelper.WAIT_DURATION);
+    zkClient.close();
+  }
+
   /*
    * Tests getChildren() when there are an excessive number of children and connection loss happens,
    * the operation should terminate and exit retry loop.


Mime
View raw message