helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jiajunw...@apache.org
Subject [helix] branch master updated: Add new interface IZkStateListener to provide session aware handleNewSession for ZkHelixManager (#644)
Date Mon, 16 Dec 2019 23:21:17 GMT
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 660ae7c  Add new interface IZkStateListener to provide session aware handleNewSession
for ZkHelixManager (#644)
660ae7c is described below

commit 660ae7c40463a9b4de1af7b0721c575e0c4337a3
Author: Huizhi L <ihuizhi.lu@gmail.com>
AuthorDate: Mon Dec 16 15:21:10 2019 -0800

    Add new interface IZkStateListener to provide session aware handleNewSession for ZkHelixManager
(#644)
    
    I0Itec IZkStateListener doesn't have an API handleNewSession(sessionId) to handle session
aware operation,
    which is needed to fix session race condition for creating ephemeral node in ZkClient.
    So this new IZkStateListener interface is introduced to provide session aware handleNewSession
method for ZkHelixManager.
    
    Changelist:
    - Introduce new IZkStateListener to helix. The new IZkStateListener adds new method
    handleNewSession(String sessionId), and removes the old method handleNewSession().
    - Add default implementations I0ItecIZkStateListenerHelixImpl in IZkStateListener
    and IZkStateListenerI0ItecImpl in ZkClient for backward compatibility.
    - Add session id to ZkEvent as a private field to help debug ZkEvent.
    - Add unit tests to test subscribe/unsubscribe state changes.
---
 .../apache/helix/manager/zk/ZKHelixManager.java    |  43 +++++-
 .../apache/helix/manager/zk/ZkCallbackCache.java   |  24 +++-
 .../helix/manager/zk/client/HelixZkClient.java     | 105 +++++++++++++-
 .../manager/zk/zookeeper/IZkStateListener.java     |  61 ++++++++
 .../helix/manager/zk/zookeeper/ZkClient.java       |  95 ++++++++++++-
 .../helix/manager/zk/zookeeper/ZkEventThread.java  |  26 +++-
 .../test/java/org/apache/helix/ZkTestHelper.java   |  18 +--
 .../java/org/apache/helix/common/ZkTestBase.java   |   6 +-
 .../helix/manager/zk/TestHandleNewSession.java     |   4 +-
 .../apache/helix/manager/zk/TestRawZkClient.java   | 153 ++++++++++++++++++---
 .../helix/manager/zk/TestZkClusterManager.java     |   2 +-
 .../apache/helix/manager/zk/TestZkFlapping.java    |   4 +-
 12 files changed, 486 insertions(+), 55 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 4058719..df2dccc 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
 import javax.management.JMException;
 
 import com.google.common.collect.Sets;
-import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ClusterMessagingService;
@@ -71,6 +70,7 @@ import org.apache.helix.healthcheck.ParticipantHealthReportTask;
 import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.manager.zk.client.SharedZkClientFactory;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
 import org.apache.helix.messaging.DefaultMessagingService;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
@@ -697,6 +697,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
     int retryCount = 0;
     while (retryCount < 3) {
       try {
+        // TODO: synchronize this block and wait for the new non-zero session ID updated.
         _zkclient.waitUntilConnected(_connectionInitTimeout, TimeUnit.MILLISECONDS);
         handleStateChanged(KeeperState.SyncConnected);
         handleNewSession();
@@ -1097,12 +1098,46 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
     }
   }
 
-  @Override
+  /**
+   * Called after the zookeeper session has expired and a new session has been created. This
method
+   * may cause session race condition when creating ephemeral nodes. Internally, this method
calls
+   * {@link #handleNewSession(String)} with a null value as the sessionId parameter, which
results
+   * in later creating the ephemeral node in the session of the latest zk connection.
+   * But please note that the session of the latest zk connection might not be the expected
session.
+   * This is the session race condition issue.
+   *
+   * To avoid the race condition issue, please use {@link #handleNewSession(String)}.
+   *
+   * @deprecated
+   * This method is deprecated, because it may cause session race condition when creating
ephemeral
+   * nodes. It is kept for backward compatibility in case a user class extends this class.
+   *
+   * Please use {@link #handleNewSession(String)} instead, which takes care of race condition.
+   *
+   * @throws Exception If any error occurs.
+   */
+  @Deprecated
   public void handleNewSession() throws Exception {
-    LOG.info(
-        "Handle new session, instance: " + _instanceName + ", type: " + _instanceType);
+    handleNewSession(null);
+  }
+
+  @Override
+  public void handleNewSession(final String sessionId) throws Exception {
+    /*
+     * TODO: after removing I0ItecIZkStateListenerHelixImpl, null session should be checked
and discarded.
+     * Null session is still a special case here, which is treated as non-session aware operation.
+     * This special case could still potentially cause race condition, so null session should
NOT
+     * be acceptable, once I0ItecIZkStateListenerHelixImpl is removed. Currently this special
case
+     * is kept for backward compatibility.
+     */
+
+    // Wait until we get a non-zero session id. Otherwise, getSessionId() might be null.
     waitUntilConnected();
 
+    // TODO: filter out stale sessions here.
+    LOG.info("Handle new session, instance: {}, type: {}, session id: {}.", _instanceName,
+        _instanceType, sessionId == null ? "None" : sessionId);
+
     /**
      * stop all timer tasks, reset all handlers, make sure cleanup completed for previous
session
      * disconnect if fail to cleanup
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
index 7197542..49037eb 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCallbackCache.java
@@ -27,11 +27,11 @@ import java.util.concurrent.CopyOnWriteArraySet;
 
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.manager.zk.ZkCacheEventThread.ZkCacheEvent;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
 import org.apache.helix.store.HelixPropertyListener;
 import org.apache.helix.store.zk.ZNode;
 import org.apache.helix.util.HelixUtil;
@@ -41,8 +41,8 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ZkCallbackCache<T> extends Cache<T> implements IZkChildListener,
IZkDataListener,
-    IZkStateListener {
+
+public class ZkCallbackCache<T> extends Cache<T> implements IZkChildListener,
IZkDataListener, IZkStateListener {
   private static Logger LOG = LoggerFactory.getLogger(ZkCallbackCache.class);
 
   final BaseDataAccessor<T> _accessor;
@@ -219,12 +219,28 @@ public class ZkCallbackCache<T> extends Cache<T> implements
IZkChildListener, IZ
 
   }
 
-  @Override
+  /**
+   * Handle new session without a session id passed in.
+   *
+   * @deprecated
+   * This is deprecated. It is kept for backward compatibility.
+   * Please use {@link #handleNewSession(String)}.
+   *
+   * @throws Exception
+   */
+  @Deprecated
   public void handleNewSession() throws Exception {
     // TODO Auto-generated method stub
 
   }
 
+  @Override
+  public void handleNewSession(final String sessionId) throws Exception {
+    // TODO Auto-generated method stub
+    // Calls the old method for backward compatibility.
+    handleNewSession();
+  }
+
   public void subscribe(String path, HelixPropertyListener listener) {
     synchronized (_listener) {
       Set<HelixPropertyListener> listeners = _listener.get(path);
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
index 65e0027..d735874 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/client/HelixZkClient.java
@@ -6,15 +6,16 @@ import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.serialize.SerializableSerializer;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.manager.zk.BasicZkSerializer;
 import org.apache.helix.manager.zk.PathBasedZkSerializer;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.Op;
 import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 
@@ -35,9 +36,47 @@ public interface HelixZkClient {
 
   void unsubscribeDataChanges(String path, IZkDataListener listener);
 
-  void subscribeStateChanges(final IZkStateListener listener);
+  /*
+   * This is for backwards compatibility.
+   *
+   * TODO: remove below default implementation when getting rid of I0Itec in the new zk client.
+   */
+  default void subscribeStateChanges(final IZkStateListener listener) {
+    subscribeStateChanges(new I0ItecIZkStateListenerHelixImpl(listener));
+  }
 
-  void unsubscribeStateChanges(IZkStateListener listener);
+  /*
+   * This is for backwards compatibility.
+   *
+   * TODO: remove below default implementation when getting rid of I0Itec in the new zk client.
+   */
+  default void unsubscribeStateChanges(IZkStateListener listener) {
+    unsubscribeStateChanges(new I0ItecIZkStateListenerHelixImpl(listener));
+  }
+
+  /**
+   * Subscribes state changes for a {@link org.I0Itec.zkclient.IZkStateListener} listener.
+   *
+   * @deprecated
+   * This is deprecated. It is kept for backwards compatibility. Please use
+   * {@link #subscribeStateChanges(org.apache.helix.manager.zk.zookeeper.IZkStateListener)}.
+   *
+   * @param listener {@link org.I0Itec.zkclient.IZkStateListener} listener
+   */
+  @Deprecated
+  void subscribeStateChanges(final org.I0Itec.zkclient.IZkStateListener listener);
+
+  /**
+   * Unsubscribes state changes for a {@link org.I0Itec.zkclient.IZkStateListener} listener.
+   *
+   * @deprecated
+   * This is deprecated. It is kept for backwards compatibility. Please use
+   * {@link #unsubscribeStateChanges(org.apache.helix.manager.zk.zookeeper.IZkStateListener)}.
+   *
+   * @param listener {@link org.I0Itec.zkclient.IZkStateListener} listener
+   */
+  @Deprecated
+  void unsubscribeStateChanges(org.I0Itec.zkclient.IZkStateListener listener);
 
   void unsubscribeAll();
 
@@ -149,6 +188,66 @@ public interface HelixZkClient {
   PathBasedZkSerializer getZkSerializer();
 
   /**
+   * A class that wraps a default implementation of
+   * {@link org.apache.helix.manager.zk.zookeeper.IZkStateListener}, which means this listener
+   * runs the methods of {@link org.apache.helix.manager.zk.zookeeper.IZkStateListener}.
+   * This is for backward compatibility and to avoid breaking the original implementation
of
+   * {@link org.I0Itec.zkclient.IZkStateListener}.
+   */
+  class I0ItecIZkStateListenerHelixImpl implements org.I0Itec.zkclient.IZkStateListener {
+    private IZkStateListener _listener;
+
+    I0ItecIZkStateListenerHelixImpl(IZkStateListener listener) {
+      _listener = listener;
+    }
+
+    @Override
+    public void handleStateChanged(KeeperState keeperState) throws Exception {
+      _listener.handleStateChanged(keeperState);
+    }
+
+    @Override
+    public void handleNewSession() throws Exception {
+      /*
+       * org.apache.helix.manager.zk.zookeeper.IZkStateListener does not have handleNewSession(),
+       * so null is passed into handleNewSession(sessionId).
+       */
+      _listener.handleNewSession(null);
+    }
+
+    @Override
+    public void handleSessionEstablishmentError(Throwable error) throws Exception {
+      _listener.handleSessionEstablishmentError(error);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+      if (!(obj instanceof I0ItecIZkStateListenerHelixImpl)) {
+        return false;
+      }
+      if (_listener == null) {
+        return false;
+      }
+
+      I0ItecIZkStateListenerHelixImpl defaultListener = (I0ItecIZkStateListenerHelixImpl)
obj;
+
+      return _listener.equals(defaultListener._listener);
+    }
+
+    @Override
+    public int hashCode() {
+      /*
+       * The original listener's hashcode helps find the wrapped listener with the same original
+       * listener. This is helpful in unsubscribeStateChanges(listener).
+       */
+      return _listener.hashCode();
+    }
+  }
+
+  /**
    * Configuration for creating a new ZkConnection.
    */
   class ZkConnectionConfig {
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/IZkStateListener.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/IZkStateListener.java
new file mode 100644
index 0000000..aa1fdd6
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/IZkStateListener.java
@@ -0,0 +1,61 @@
+package org.apache.helix.manager.zk.zookeeper;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+
+public interface IZkStateListener {
+
+  /**
+   * Called when the zookeeper connection state has changed.
+   *
+   * @param state the new zookeeper state.
+   * @throws Exception if any error occurs.
+   */
+  void handleStateChanged(KeeperState state) throws Exception;
+
+  /**
+   * Called after the zookeeper session has expired and a new session has been created. The
new
+   * session id has to be passed in as the parameter.
+   * And you would have to re-create any ephemeral nodes here. This is a session aware operation.
+   * The ephemeral nodes have to be created within the expected session id, which means passed-in
+   * session id has to be checked with current zookeeper's session id. If the passed-in session
id
+   * does not match current zookeeper's session id, ephemeral nodes should not be created.
+   * Otherwise, session race condition may occur and the newly created ephemeral nodes may
not be in
+   * the expected session.
+   *
+   * @param sessionId the new session's id. The ephemeral nodes are expected to be created
in this
+   *                  session. If this session id is expired, ephemeral nodes should not
be created.
+   * @throws Exception if any error occurs.
+   */
+  void handleNewSession(final String sessionId) throws Exception;
+
+  /**
+   * Called when a session cannot be re-established. This should be used to implement connection
+   * failure handling e.g. retry to connect or pass the error up
+   *
+   * @param error
+   *            The error that prevents a session from being established
+   * @throws Exception
+   *             On any error.
+   */
+  void handleSessionEstablishmentError(final Throwable error) throws Exception;
+}
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 1ad7fac..e035d14 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -29,7 +29,6 @@ import org.I0Itec.zkclient.ExceptionUtil;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkConnection;
 import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkLock;
 import org.I0Itec.zkclient.exception.ZkBadVersionException;
 import org.I0Itec.zkclient.exception.ZkException;
@@ -279,12 +278,40 @@ public class ZkClient implements Watcher {
     }
   }
 
+  /**
+   * Subscribes state changes for a {@link org.I0Itec.zkclient.IZkStateListener} listener.
+   *
+   * @deprecated
+   * This is deprecated. It is kept for backwards compatibility. Please use
+   * {@link #subscribeStateChanges(IZkStateListener)}.
+   *
+   * @param listener {@link org.I0Itec.zkclient.IZkStateListener} listener
+   */
+  @Deprecated
+  public void subscribeStateChanges(final org.I0Itec.zkclient.IZkStateListener listener)
{
+    subscribeStateChanges(new IZkStateListenerI0ItecImpl(listener));
+  }
+
   public void unsubscribeStateChanges(IZkStateListener stateListener) {
     synchronized (_stateListener) {
       _stateListener.remove(stateListener);
     }
   }
 
+  /**
+   * Unsubscribes state changes for a {@link org.I0Itec.zkclient.IZkStateListener} listener.
+   *
+   * @deprecated
+   * This is deprecated. It is kept for backwards compatibility. Please use
+   * {@link #unsubscribeStateChanges(IZkStateListener)}.
+   *
+   * @param stateListener {@link org.I0Itec.zkclient.IZkStateListener} listener
+   */
+  @Deprecated
+  public void unsubscribeStateChanges(org.I0Itec.zkclient.IZkStateListener stateListener)
{
+    unsubscribeStateChanges(new IZkStateListenerI0ItecImpl(stateListener));
+  }
+
   public void unsubscribeAll() {
     synchronized (_childListener) {
       _childListener.clear();
@@ -864,15 +891,17 @@ public class ZkClient implements Watcher {
 
         @Override
         public void run() throws Exception {
-          stateListener.handleNewSession();
+          stateListener.handleNewSession(null);
         }
       });
     }
   }
 
   protected void fireStateChangedEvent(final KeeperState state) {
+    final String sessionId = Long.toHexString(getSessionId());
     for (final IZkStateListener stateListener : _stateListener) {
-      _eventThread.send(new ZkEvent("State changed to " + state + " sent to " + stateListener)
{
+      final String description = "State changed to " + state + " sent to " + stateListener;
+      _eventThread.send(new ZkEvent(description, sessionId) {
 
         @Override
         public void run() throws Exception {
@@ -1754,4 +1783,64 @@ public class ZkClient implements Watcher {
       }
     }
   }
+
+  /**
+   * Creates a {@link org.apache.helix.manager.zk.zookeeper.IZkStateListener} that wraps
a default
+   * implementation of {@link org.I0Itec.zkclient.IZkStateListener}, which means the returned
+   * listener runs the methods of {@link org.I0Itec.zkclient.IZkStateListener}.
+   * This is for backward compatibility with {@link org.I0Itec.zkclient.IZkStateListener}.
+   */
+  private static class IZkStateListenerI0ItecImpl implements IZkStateListener {
+    private org.I0Itec.zkclient.IZkStateListener _listener;
+
+    IZkStateListenerI0ItecImpl(org.I0Itec.zkclient.IZkStateListener listener) {
+      _listener = listener;
+    }
+
+    @Override
+    public void handleStateChanged(Watcher.Event.KeeperState keeperState) throws Exception
{
+      _listener.handleStateChanged(keeperState);
+    }
+
+    @Override
+    public void handleNewSession(final String sessionId) throws Exception {
+      /*
+       * org.I0Itec.zkclient.IZkStateListener does not have handleNewSession(sessionId),
+       * so just call handleNewSession() by default.
+       */
+      _listener.handleNewSession();
+    }
+
+    @Override
+    public void handleSessionEstablishmentError(Throwable error) throws Exception {
+      _listener.handleSessionEstablishmentError(error);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+      if (!(obj instanceof IZkStateListenerI0ItecImpl)) {
+        return false;
+      }
+      if (_listener == null) {
+        return false;
+      }
+
+      IZkStateListenerI0ItecImpl defaultListener = (IZkStateListenerI0ItecImpl) obj;
+
+      return _listener.equals(defaultListener._listener);
+    }
+
+    @Override
+    public int hashCode() {
+      /*
+       * The original listener's hashcode helps find the wrapped listener with the same original
+       * listener. This is helpful in unsubscribeStateChanges(listener) when finding the
listener
+       * to remove.
+       */
+      return _listener.hashCode();
+    }
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
index 8572191..f720216 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkEventThread.java
@@ -38,16 +38,36 @@ public class ZkEventThread extends Thread {
 
   public static abstract class ZkEvent {
 
-    private String _description;
+    private final String _description;
+    private final String _sessionId;
 
     public ZkEvent(String description) {
+      this(description, null);
+    }
+
+    ZkEvent(String description, String sessionId) {
       _description = description;
+      _sessionId = sessionId;
     }
 
     public abstract void run() throws Exception;
 
-    @Override public String toString() {
-      return "ZkEvent[" + _description + "]";
+    /**
+     * Returns a string representation of the zk event.
+     * Ex. ZkEvent[description: new session event sent to listener; session: 1001754ac3b0007]
+     *
+     * @return String representation of the zk event.
+     */
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("ZkEvent[description: ").append(_description);
+      if (_sessionId != null) {
+        sb.append("; session: ").append(_sessionId);
+      }
+      sb.append("]");
+
+      return sb.toString();
     }
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
index 2e5409a..8a5b7fe 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkTestHelper.java
@@ -40,12 +40,12 @@ import java.util.concurrent.TimeUnit;
 
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
 import org.apache.helix.manager.zk.zookeeper.ZkConnection;
 import org.apache.helix.model.ExternalView;
 import org.apache.zookeeper.WatchedEvent;
@@ -89,7 +89,7 @@ public class ZkTestHelper {
   }
 
   /**
-   * Expire current zk session and wait for {@link IZkStateListener#handleNewSession()} invoked
+   * Expire current zk session and wait for {@link IZkStateListener#handleNewSession(String)}
invoked
    * @param client
    * @throws Exception
    */
@@ -103,14 +103,11 @@ public class ZkTestHelper {
       }
 
       @Override
-      public void handleNewSession() throws Exception {
+      public void handleNewSession(final String sessionId) throws Exception {
         // make sure zkclient is connected again
         zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
 
-        ZkConnection connection = ((ZkConnection) zkClient.getConnection());
-        ZooKeeper curZookeeper = connection.getZookeeper();
-
-        LOG.info("handleNewSession. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+        LOG.info("handleNewSession. sessionId: {}.", sessionId);
       }
 
       @Override
@@ -158,14 +155,11 @@ public class ZkTestHelper {
       }
 
       @Override
-      public void handleNewSession() throws Exception {
+      public void handleNewSession(final String sessionId) throws Exception {
         // make sure zkclient is connected again
         zkClient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.SECONDS);
 
-        ZkConnection connection = ((ZkConnection) zkClient.getConnection());
-        ZooKeeper curZookeeper = connection.getZookeeper();
-
-        LOG.info("handleNewSession. sessionId: " + Long.toHexString(curZookeeper.getSessionId()));
+        LOG.info("handleNewSession. sessionId: {}.", sessionId);
         waitNewSession.countDown();
       }
 
diff --git a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
index d9a470a..50c36ee 100644
--- a/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/common/ZkTestBase.java
@@ -33,7 +33,6 @@ import java.util.logging.Level;
 import javax.management.MBeanServerConnection;
 import javax.management.ObjectName;
 
-import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.ConfigAccessor;
@@ -63,6 +62,7 @@ import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
 import org.apache.helix.manager.zk.client.HelixZkClient;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
 import org.apache.helix.manager.zk.zookeeper.ZkConnection;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -542,8 +542,8 @@ public class ZkTestBase {
       }
 
       @Override
-      public void handleNewSession() throws Exception {
-        LOG.info("In Old connection, new session");
+      public void handleNewSession(final String sessionId) throws Exception {
+        LOG.info("In Old connection, new session: {}.", sessionId);
       }
 
       @Override
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
index da7dcc9..ef98be9 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestHandleNewSession.java
@@ -181,9 +181,9 @@ public class TestHandleNewSession extends ZkTestBase {
     }
 
     @Override
-    public void handleNewSession() throws Exception {
+    public void handleNewSession(final String sessionId) throws Exception {
       newSessionHandlingCount.acquire();
-      super.handleNewSession();
+      super.handleNewSession(sessionId);
     }
 
     void proceedNewSessionHandling() {
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index fb74e54..156f59f 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -20,6 +20,8 @@ package org.apache.helix.manager.zk;
  */
 
 import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -30,10 +32,11 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.TestHelper;
+import org.apache.helix.ZkTestHelper;
 import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
 import org.apache.helix.manager.zk.zookeeper.ZkConnection;
 import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
 import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
@@ -91,29 +94,141 @@ public class TestRawZkClient extends ZkUnitTestBase {
     AssertJUnit.assertNotSame(stat, newStat);
   }
 
-  @Test()
-  void testSessionExpire() throws Exception {
-    IZkStateListener listener = new IZkStateListener() {
+  /*
+   * Tests subscribing state changes for helix's IZkStateListener.
+   */
+  @Test
+  public void testSubscribeStateChanges() {
+    int numListeners = _zkClient.numberOfListeners();
+    List<IZkStateListener> listeners = new ArrayList<>();
+
+    // Subscribe multiple listeners to test that listener's hashcode works as expected.
+    // Each listener is subscribed and unsubscribed successfully.
+    for (int i = 0; i < 3; i++) {
+      IZkStateListener listener = new IZkStateListener() {
+        @Override
+        public void handleStateChanged(KeeperState state) {
+          System.out.println("Handle new state: " + state);
+        }
 
-      @Override
-      public void handleStateChanged(KeeperState state) {
-        System.out.println("In Old connection New state " + state);
-      }
+        @Override
+        public void handleNewSession(final String sessionId) {
+          System.out.println("Handle new session: " + sessionId);
+        }
 
-      @Override
-      public void handleNewSession() {
-        System.out.println("In Old connection New session");
-      }
+        @Override
+        public void handleSessionEstablishmentError(Throwable error) {
+          System.out.println("Handle session establishment error: " + error);
+        }
+      };
 
-      @Override
-      public void handleSessionEstablishmentError(Throwable var1) {
-      }
-    };
+      _zkClient.subscribeStateChanges(listener);
+      Assert.assertEquals(_zkClient.numberOfListeners(), ++numListeners);
+      listeners.add(listener);
+    }
+
+    for (IZkStateListener listener : listeners) {
+      _zkClient.unsubscribeStateChanges(listener);
+      Assert.assertEquals(_zkClient.numberOfListeners(), --numListeners);
+    }
+  }
+
+  /*
+   * Tests session expiry for the helix's IZkStateListener.
+   */
+  @Test
+  void testSessionExpiry() throws Exception {
+    long lastSessionId = _zkClient.getSessionId();
+
+    // Test multiple times to make sure each time the new session id is increasing.
+    for (int i = 0; i < 3; i++) {
+      ZkTestHelper.expireSession(_zkClient);
+      long newSessionId = _zkClient.getSessionId();
+      Assert.assertTrue(newSessionId > lastSessionId,
+          "New session id should be greater than expired session id.");
+      lastSessionId = newSessionId;
+    }
+  }
+
+  /*
+   * Tests state changes subscription for I0Itec's IZkStateListener.
+   * This is a test for backward compatibility.
+   *
+   * TODO: remove this test when getting rid of I0Itec.
+   */
+  @Test
+  public void testSubscribeStateChangesForI0ItecIZkStateListener() {
+    int numListeners = _zkClient.numberOfListeners();
+    List<org.I0Itec.zkclient.IZkStateListener> listeners = new ArrayList<>();
+
+    // Subscribe multiple listeners to test that listener's hashcode works as expected.
+    // Each listener is subscribed and unsubscribed successfully.
+    for (int i = 0; i < 3; i++) {
+      org.I0Itec.zkclient.IZkStateListener listener = new org.I0Itec.zkclient.IZkStateListener()
{
+        @Override
+        public void handleStateChanged(KeeperState state) {
+          System.out.println("Handle new state: " + state);
+        }
+
+        @Override
+        public void handleNewSession() {
+          System.out.println("Handle new session: ");
+        }
+
+        @Override
+        public void handleSessionEstablishmentError(Throwable error) {
+          System.out.println("Handle session establishment error: " + error);
+        }
+      };
+
+      _zkClient.subscribeStateChanges(listener);
+      Assert.assertEquals(_zkClient.numberOfListeners(), ++numListeners);
+
+      // Try to subscribe the listener again but number of listeners should not change because
the
+      // listener already exists.
+      _zkClient.subscribeStateChanges(listener);
+      Assert.assertEquals(_zkClient.numberOfListeners(), numListeners);
+
+      listeners.add(listener);
+    }
+
+    for (org.I0Itec.zkclient.IZkStateListener listener : listeners) {
+      _zkClient.unsubscribeStateChanges(listener);
+      Assert.assertEquals(_zkClient.numberOfListeners(), --numListeners);
+    }
+  }
+
+  /*
+   * Tests session expiry for I0Itec's IZkStateListener.
+   * This is a test for backward compatibility.
+   *
+   * TODO: remove this test when getting rid of I0Itec.
+   */
+  @Test
+  public void testSessionExpiryForI0IItecZkStateListener() throws Exception {
+    org.I0Itec.zkclient.IZkStateListener listener =
+        new org.I0Itec.zkclient.IZkStateListener() {
+
+          @Override
+          public void handleStateChanged(KeeperState state) {
+            System.out.println("In Old connection New state " + state);
+          }
+
+          @Override
+          public void handleNewSession() {
+            System.out.println("In Old connection New session");
+          }
+
+          @Override
+          public void handleSessionEstablishmentError(Throwable var1) {
+          }
+        };
 
     _zkClient.subscribeStateChanges(listener);
     ZkConnection connection = ((ZkConnection) _zkClient.getConnection());
     ZooKeeper zookeeper = connection.getZookeeper();
-    System.out.println("old sessionId= " + zookeeper.getSessionId());
+    long oldSessionId = zookeeper.getSessionId();
+    System.out.println("old sessionId= " + oldSessionId);
     Watcher watcher = event -> System.out.println("In New connection In process event:"
+ event);
     ZooKeeper newZookeeper = new ZooKeeper(connection.getServers(), zookeeper.getSessionTimeout(),
         watcher, zookeeper.getSessionId(), zookeeper.getSessionPasswd());
@@ -124,7 +239,9 @@ public class TestRawZkClient extends ZkUnitTestBase {
     Thread.sleep(10000);
     connection = ((ZkConnection) _zkClient.getConnection());
     zookeeper = connection.getZookeeper();
-    System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
+    long newSessionId = zookeeper.getSessionId();
+    System.out.println("After session expiry sessionId= " + newSessionId);
+    _zkClient.unsubscribeStateChanges(listener);
   }
 
   @Test
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
index 30043ef..0169c41 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkClusterManager.java
@@ -106,7 +106,7 @@ public class TestZkClusterManager extends ZkUnitTestBase {
     controller.getMessagingService();
     controller.getClusterManagmentTool();
 
-    controller.handleNewSession();
+    controller.handleNewSession(controller.getSessionId());
     controller.disconnect();
     AssertJUnit.assertFalse(controller.isConnected());
 
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
index c5b3b3d..7a90fe3 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkFlapping.java
@@ -22,7 +22,6 @@ package org.apache.helix.manager.zk;
 import java.util.Date;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.I0Itec.zkclient.IZkStateListener;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.SystemPropertyKeys;
@@ -31,6 +30,7 @@ import org.apache.helix.ZkTestHelper;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.zookeeper.IZkStateListener;
 import org.apache.helix.model.LiveInstance;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.testng.Assert;
@@ -50,7 +50,7 @@ public class TestZkFlapping extends ZkUnitTestBase {
     }
 
     @Override
-    public void handleNewSession() {
+    public void handleNewSession(final String sessionId) {
     }
 
     @Override


Mime
View raw message