helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [13/38] helix git commit: participant syncs session id to controller
Date Wed, 08 Feb 2017 17:59:48 GMT
participant syncs session id to controller


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2981bbd1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2981bbd1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2981bbd1

Branch: refs/heads/helix-0.6.x
Commit: 2981bbd11dc869f5235e85e71f1b30368ef7596e
Parents: 9bf4437
Author: Boyan Li <boli@linkedin.com>
Authored: Fri Aug 5 11:07:15 2016 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Sun Feb 5 19:15:16 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixManager.java     |  5 ++
 .../apache/helix/manager/zk/ZKHelixManager.java | 23 ++++--
 .../messaging/DefaultMessagingService.java      |  4 +-
 .../messaging/handling/HelixTaskExecutor.java   | 75 ++++++++++++++---
 .../java/org/apache/helix/model/Message.java    |  3 +-
 .../src/test/java/org/apache/helix/Mocks.java   |  6 ++
 .../controller/stages/DummyClusterManager.java  |  7 ++
 .../TestSyncSessionToController.java            | 84 ++++++++++++++++++++
 .../helix/participant/MockZKHelixManager.java   |  6 ++
 9 files changed, 194 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/main/java/org/apache/helix/HelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixManager.java b/helix-core/src/main/java/org/apache/helix/HelixManager.java
index 25c29e6..7b574aa 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixManager.java
@@ -253,6 +253,11 @@ public interface HelixManager {
   StateMachineEngine getStateMachineEngine();
 
   /**
+   * @return the session start time
+   */
+  Long getSessionStartTime();
+
+  /**
    * Check if the cluster manager is the leader
    * @return true if this is a controller and a leader of the cluster
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index bef2eac..f9d03c3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -79,7 +79,7 @@ import org.apache.zookeeper.ZooKeeper.States;
 public class ZKHelixManager implements HelixManager, IZkStateListener {
   private static Logger LOG = Logger.getLogger(ZKHelixManager.class);
 
-  public static final int FLAPPING_TIME_WINDIOW = 300000; // Default to 300 sec
+  public static final int FLAPPING_TIME_WINDOW = 300000; // Default to 300 sec
   public static final int MAX_DISCONNECT_THRESHOLD = 5;
   public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
 
@@ -125,6 +125,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
   private final List<HelixTimerTask> _timerTasks = new ArrayList<HelixTimerTask>();
 
   private final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
+  private Long _sessionStartTime;
 
   /**
    * controller fields
@@ -206,7 +207,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
      */
     _flappingTimeWindowMs =
         getSystemPropertyAsInt("helixmanager.flappingTimeWindow",
-            ZKHelixManager.FLAPPING_TIME_WINDIOW);
+            ZKHelixManager.FLAPPING_TIME_WINDOW);
 
     _maxDisconnectThreshold =
         getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold",
@@ -326,9 +327,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
   @Override
   public void addIdealStateChangeListener(final IdealStateChangeListener listener) throws
Exception {
     addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE,
-        new EventType[] {
-            EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated
-        });
+        new EventType[]{EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated});
   }
 
   @Override
@@ -510,8 +509,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
   public void connect() throws Exception {
     LOG.info("ClusterManager.connect()");
     if (isConnected()) {
-      LOG.warn("Cluster manager: " + _instanceName + " for cluster: " + _clusterName
-          + " already connected. skip connect");
+      LOG.warn(
+          "Cluster manager: " + _instanceName + " for cluster: " + _clusterName + " already
connected. skip connect");
       return;
     }
 
@@ -520,6 +519,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
     case CONTROLLER_PARTICIPANT:
       if (_controller == null) {
         _controller = new GenericHelixController();
+        _messagingService.getExecutor().setController(_controller);
       }
       break;
     default:
@@ -567,6 +567,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
     } finally {
       _zkclient.close();
       _zkclient = null;
+      _sessionStartTime = null;
       LOG.info("Cluster manager: " + _instanceName + " disconnected");
 
       if (_controller != null) {
@@ -903,6 +904,8 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
      * setup message listener
      */
     participantHelper.setupMsgHandler();
+
+    _sessionStartTime = System.currentTimeMillis();
   }
 
   void handleNewSessionAsController() {
@@ -927,4 +930,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
   @Override
   public void handleSessionEstablishmentError(Throwable var1) throws Exception {
   }
+
+  @Override
+  public Long getSessionStartTime() {
+    return _sessionStartTime;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
index 1121689..f000f69 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java
@@ -56,7 +56,7 @@ public class DefaultMessagingService implements ClusterMessagingService
{
   public DefaultMessagingService(HelixManager manager) {
     _manager = manager;
     _evaluator = new CriteriaEvaluator();
-    _taskExecutor = new HelixTaskExecutor();
+    _taskExecutor = new HelixTaskExecutor(this);
     _asyncCallbackService = new AsyncCallbackService();
     _taskExecutor.registerMessageHandlerFactory(MessageType.TASK_REPLY.toString(),
         _asyncCallbackService);
@@ -187,7 +187,7 @@ public class DefaultMessagingService implements ClusterMessagingService
{
 
   private List<Message> generateMessagesForController(Message message) {
     List<Message> messages = new ArrayList<Message>();
-    String id = UUID.randomUUID().toString();
+    String id = (message.getMsgId() == null) ? UUID.randomUUID().toString() : message.getMsgId();
     Message newMessage = new Message(message.getRecord(), id);
     newMessage.setMsgId(id);
     newMessage.setSrcName(_manager.getInstanceName());

http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index bc536fd..0431770 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -34,20 +34,25 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.ConfigAccessor;
+import org.apache.helix.Criteria;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
 import org.apache.helix.MessageListener;
 import org.apache.helix.NotificationContext;
 import org.apache.helix.NotificationContext.MapKey;
 import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageState;
 import org.apache.helix.model.Message.MessageType;
@@ -104,7 +109,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
   public static final String MAX_THREADS = "maxThreads";
 
   private MessageQueueMonitor _messageQueueMonitor;
-
+  private ClusterMessagingService _messagingService;
+  private GenericHelixController _controller;
+  private Long _lastSessionSyncTime;
+  private static final int SESSION_SYNC_INTERVAL = 2000; // 2 seconds
+  private static final String SESSION_SYNC = "SESSION-SYNC";
   /**
    * Map of MsgType->MsgHandlerFactoryRegistryItem
    */
@@ -135,6 +144,11 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
     startMonitorThread();
   }
 
+  public HelixTaskExecutor(ClusterMessagingService messagingService) {
+    this();
+    _messagingService = messagingService;
+  }
+
   @Override
   public void registerMessageHandlerFactory(String type, MessageHandlerFactory factory) {
     registerMessageHandlerFactory(type, factory, DEFAULT_PARALLEL_TASKS);
@@ -170,6 +184,10 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
     }
   }
 
+  public void setController(GenericHelixController controller) {
+    _controller = controller;
+  }
+
   public ParticipantMonitor getParticipantMonitor() {
     return _monitor;
   }
@@ -386,8 +404,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
           _taskMap.remove(taskId);
           return true;
         } else {
-          _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class,
-              "fail to cancel task: " + taskId,
+          _statusUpdateUtil.logInfo(message, HelixTaskExecutor.class, "fail to cancel task:
" + taskId,
               notificationContext.getManager().getHelixDataAccessor());
         }
       } else {
@@ -403,8 +420,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
   public void finishTask(MessageTask task) {
     Message message = task.getMessage();
     String taskId = task.getTaskId();
-    LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - message
-        .getExecuteStartTimeStamp()));
+    LOG.info("message finished: " + taskId + ", took " + (new Date().getTime() - message.getExecuteStartTimeStamp()));
 
     synchronized (_lock) {
       if (_taskMap.containsKey(taskId)) {
@@ -471,8 +487,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
       item.factory().reset();
     }
 
-    LOG.info("Unregistered message handler factory for type: " + type + ", factory: "
-        + item.factory() + ", pool: " + pool);
+    LOG.info(
+        "Unregistered message handler factory for type: " + type + ", factory: " + item.factory()
+ ", pool: " + pool);
   }
 
   void reset() {
@@ -502,6 +518,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
       LOG.warn("Task: " + taskId + " fails to terminate. Message: " + info._task.getMessage());
     }
     _taskMap.clear();
+
+    _lastSessionSyncTime = null;
   }
 
   void init() {
@@ -526,6 +544,28 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
     }
   }
 
+  private void syncSessionToController(HelixManager manager) {
+    if (_lastSessionSyncTime == null ||
+            System.currentTimeMillis() - _lastSessionSyncTime > SESSION_SYNC_INTERVAL)
{ // > delay since last sync
+      HelixDataAccessor accessor = manager.getHelixDataAccessor();
+      PropertyKey key = new Builder(manager.getClusterName()).controllerMessage(SESSION_SYNC);
+      if (accessor.getProperty(key) == null) {
+        Message msg = new Message(MessageType.PARTICIPANT_SESSION_CHANGE, SESSION_SYNC);
+        msg.setSrcName(manager.getInstanceName());
+        msg.setTgtSessionId("*");
+        msg.setMsgState(MessageState.NEW);
+        msg.setMsgId(SESSION_SYNC);
+
+        Criteria cr = new Criteria();
+        cr.setRecipientInstanceType(InstanceType.CONTROLLER);
+        cr.setSessionSpecific(false);
+
+        manager.getMessagingService().send(cr, msg);
+        _lastSessionSyncTime = System.currentTimeMillis();
+      }
+    }
+  }
+
   @Override
   public void onMessage(String instanceName, List<Message> messages,
       NotificationContext changeContext) {
@@ -596,8 +636,25 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
{
                 + message.getMsgId();
         LOG.warn(warningMessage);
         accessor.removeProperty(message.getKey(keyBuilder, instanceName));
-        _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage,
-            accessor);
+        _statusUpdateUtil.logWarning(message, HelixStateMachineEngine.class, warningMessage,
accessor);
+
+        // Proactively send a session sync message from participant to controller
+        // upon session mismatch after a new session is established
+        if (manager.getInstanceType() == InstanceType.PARTICIPANT
+            || manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
+          if (message.getCreateTimeStamp() > manager.getSessionStartTime()) {
+            syncSessionToController(manager);
+          }
+        }
+        continue;
+      }
+
+      if ((manager.getInstanceType() == InstanceType.CONTROLLER || manager.getInstanceType()
== InstanceType.CONTROLLER_PARTICIPANT)
+          && MessageType.PARTICIPANT_SESSION_CHANGE.name().equals(message.getMsgType()))
{
+        PropertyKey key = new Builder(manager.getClusterName()).liveInstances();
+        List<LiveInstance> liveInstances = manager.getHelixDataAccessor().getChildValues(key);
+        _controller.onLiveInstanceChange(liveInstances, changeContext);
+        accessor.removeProperty(message.getKey(keyBuilder, instanceName));
         continue;
       }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/main/java/org/apache/helix/model/Message.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java
index 31f50ab..2901b1c 100644
--- a/helix-core/src/main/java/org/apache/helix/model/Message.java
+++ b/helix-core/src/main/java/org/apache/helix/model/Message.java
@@ -48,7 +48,8 @@ public class Message extends HelixProperty {
     CONTROLLER_MSG,
     TASK_REPLY,
     NO_OP,
-    PARTICIPANT_ERROR_REPORT
+    PARTICIPANT_ERROR_REPORT,
+    PARTICIPANT_SESSION_CHANGE
   };
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/test/java/org/apache/helix/Mocks.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java
index 7d27693..9688c0d 100644
--- a/helix-core/src/test/java/org/apache/helix/Mocks.java
+++ b/helix-core/src/test/java/org/apache/helix/Mocks.java
@@ -31,6 +31,7 @@ import org.I0Itec.zkclient.DataUpdater;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.messaging.handling.HelixTaskExecutor;
@@ -456,6 +457,11 @@ public class Mocks {
       return null;
     }
 
+    @Override
+    public Long getSessionStartTime() {
+      return 0L;
+    }
+
   }
 
   public static class MockAccessor implements HelixDataAccessor {

http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
index b54ddcc..0167487 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/DummyClusterManager.java
@@ -39,6 +39,7 @@ import org.apache.helix.PreConnectCallback;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ScopedConfigChangeListener;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.participant.StateMachineEngine;
@@ -252,4 +253,10 @@ public class DummyClusterManager implements HelixManager {
     // TODO Auto-generated method stub
     return null;
   }
+
+  @Override
+  public Long getSessionStartTime() {
+    return 0L;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/test/java/org/apache/helix/integration/TestSyncSessionToController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSyncSessionToController.java
b/helix-core/src/test/java/org/apache/helix/integration/TestSyncSessionToController.java
new file mode 100644
index 0000000..36ecf24
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestSyncSessionToController.java
@@ -0,0 +1,84 @@
+package org.apache.helix.integration;
+
+import java.util.Date;
+import java.util.List;
+
+import org.apache.helix.InstanceType;
+import org.apache.helix.MessageListener;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.Message;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestSyncSessionToController extends ZkIntegrationTestBase  {
+  @Test
+  public void testSyncSessionToController() throws Exception {
+    System.out.println("START testSyncSessionToController at " + new Date(System.currentTimeMillis()));
+
+    String clusterName = getShortClassName();
+    MockParticipantManager[] participants = new MockParticipantManager[5];
+    int resourceNb = 10;
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        resourceNb, // resources
+        1, // partitions per resource
+        5, // number of nodes
+        1, // replicas
+        "MasterSlave", true); // do rebalance
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    // start participants
+    for (int i = 0; i < 5; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    ZKHelixManager zkHelixManager = new ZKHelixManager(clusterName, "controllerMessageListener",
InstanceType.CONTROLLER, ZK_ADDR);
+    zkHelixManager.connect();
+    MockMessageListener mockMessageListener = new MockMessageListener();
+    zkHelixManager.addControllerMessageListener(mockMessageListener);
+
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName);
+    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+    String path = keyBuilder.liveInstance("localhost_12918").getPath();
+    Stat stat = new Stat();
+    ZNRecord data = accessor.get(path, stat, 2);
+    data.getSimpleFields().put("SESSION_ID", "invalid-id");
+    accessor.set(path, data, 2);
+    Thread.sleep(2000);
+
+    Assert.assertTrue(mockMessageListener.isSessionSyncMessageSent());
+  }
+
+  class MockMessageListener implements MessageListener {
+    private boolean sessionSyncMessageSent = false;
+
+    @Override
+    public void onMessage(String instanceName, List<Message> messages, NotificationContext
changeContext) {
+      for (Message message: messages) {
+        if (message.getMsgId().equals("SESSION-SYNC")) {
+          sessionSyncMessageSent = true;
+        }
+      }
+    }
+
+    public boolean isSessionSyncMessageSent() {
+      return sessionSyncMessageSent;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/2981bbd1/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
index 2b6f757..db6974e 100644
--- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
+++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java
@@ -41,6 +41,7 @@ import org.apache.helix.PreConnectCallback;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.ScopedConfigChangeListener;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.GenericHelixController;
 import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -259,4 +260,9 @@ public class MockZKHelixManager implements HelixManager {
     return null;
   }
 
+  @Override
+  public Long getSessionStartTime() {
+    return 0L;
+  }
+
 }


Mime
View raw message