helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject [1/2] git commit: HELIX-134: refactor helix manager to handle zk session expiry more reliably
Date Fri, 21 Jun 2013 20:58:14 GMT
Updated Branches:
  refs/heads/master a97981813 -> 3059f7b5b


HELIX-134: refactor helix manager to handle zk session expiry more reliably


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/94367e1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/94367e1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/94367e1c

Branch: refs/heads/master
Commit: 94367e1c7840b5c35a84f0ee440d443047c6dbaa
Parents: 02dcd64
Author: zzhang <zzhang5@uci.edu>
Authored: Fri Jun 21 13:57:48 2013 -0700
Committer: zzhang <zzhang5@uci.edu>
Committed: Fri Jun 21 13:57:48 2013 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |   1 +
 .../ParticipantHealthReportCollectorImpl.java   |  16 +-
 .../helix/manager/zk/AbstractManager.java       | 714 +++++++++++++++++++
 .../helix/manager/zk/CallbackHandler.java       |   5 +
 .../helix/manager/zk/ControllerManager.java     | 210 ++++++
 .../manager/zk/ControllerManagerHelper.java     | 123 ++++
 .../zk/DistributedControllerManager.java        | 209 ++++++
 .../manager/zk/DistributedLeaderElection.java   | 160 +++++
 .../helix/manager/zk/ParticipantManager.java    | 205 ++++++
 .../manager/zk/ParticipantManagerHelper.java    | 276 +++++++
 .../messaging/handling/HelixTaskExecutor.java   |  68 +-
 .../TestDistributedControllerManager.java       | 102 +++
 .../manager/TestParticipantManager.java         |  86 +++
 13 files changed, 2152 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/94367e1c/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index ff45c74..a898160 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -153,6 +153,7 @@ public class GenericHelixController implements
     }
   }
   
+  // TODO who should stop this timer
   /**
    * Starts the rebalancing timer with the specified period. Start the timer if necessary;
    * If the period is smaller than the current period, cancel the current timer and use 

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/94367e1c/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
index 1b337fd..14c12de 100644
--- a/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
+++ b/helix-core/src/main/java/org/apache/helix/healthcheck/ParticipantHealthReportCollectorImpl.java
@@ -27,6 +27,7 @@ import java.util.TimerTask;
 
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixTimerTask;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.alerts.StatsHolder;
@@ -34,7 +35,7 @@ import org.apache.helix.model.HealthStat;
 import org.apache.log4j.Logger;
 
 
-public class ParticipantHealthReportCollectorImpl implements
+public class ParticipantHealthReportCollectorImpl extends HelixTimerTask implements
     ParticipantHealthReportCollector
 {
   private final LinkedList<HealthReportProvider> _healthReportProviderList = new LinkedList<HealthReportProvider>();
@@ -58,12 +59,13 @@ public class ParticipantHealthReportCollectorImpl implements
     addHealthReportProvider(new DefaultHealthReportProvider());
   }
 
+  @Override
   public void start()
   {
     if (_timer == null)
     {
       _timer = new Timer(true);
-      _timer.scheduleAtFixedRate(new HealthCheckInfoReportingTask(),
+      _timer.scheduleAtFixedRate(this,
           new Random().nextInt(DEFAULT_REPORT_LATENCY), DEFAULT_REPORT_LATENCY);
     }
     else
@@ -124,6 +126,7 @@ public class ParticipantHealthReportCollectorImpl implements
 
   }
 
+  @Override
   public void stop()
   {
     _logger.info("Stop HealthCheckInfoReportingTask");
@@ -179,12 +182,9 @@ public class ParticipantHealthReportCollectorImpl implements
     }
   }
 
-  class HealthCheckInfoReportingTask extends TimerTask
+  @Override
+  public void run()
   {
-    @Override
-    public void run()
-    {
-      transmitHealthReports();
-    }
+    transmitHealthReports();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/94367e1c/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
new file mode 100644
index 0000000..165f639
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractManager.java
@@ -0,0 +1,714 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IZkStateListener;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceConfigChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.LiveInstanceInfoProvider;
+import org.apache.helix.MessageListener;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ScopedConfigChangeListener;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper.States;
+
+public abstract class AbstractManager implements HelixManager, IZkStateListener {
+  private static Logger LOG = Logger.getLogger(AbstractManager.class);
+
+  final String _zkAddress;
+  final String _clusterName;
+  final String _instanceName;
+  final InstanceType _instanceType;
+  final int _sessionTimeout;
+  final List<PreConnectCallback> _preConnectCallbacks;
+  final List<CallbackHandler> _handlers;
+  final HelixManagerProperties _properties;
+  
+  /**
+   * helix version#
+   */
+  final String _version;
+
+  ZkClient _zkclient = null;
+  final DefaultMessagingService _messagingService;
+
+  BaseDataAccessor<ZNRecord> _baseDataAccessor;
+  ZKHelixDataAccessor _dataAccessor;
+  final Builder _keyBuilder;
+  ConfigAccessor _configAccessor;
+  ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
+  LiveInstanceInfoProvider _liveInstanceInfoProvider = null;
+  final List<HelixTimerTask> _timerTasks = new ArrayList<HelixTimerTask>();
+
+
+  volatile String _sessionId;
+  
+  /**
+   * Keep track of timestamps that zk State has become Disconnected
+   * If in a _timeWindowLengthMs window zk State has become Disconnected 
+   * for more than_maxDisconnectThreshold times disconnect the zkHelixManager
+   */
+  final List<Long> _disconnectTimeHistory = new LinkedList<Long>();
+   
+  final int _flappingTimeWindowMs; 
+  final int _maxDisconnectThreshold;
+
+  
+  public AbstractManager(String zkAddress, String clusterName, String instanceName,
+      InstanceType instanceType) {
+    
+    LOG.info("Create a zk-based cluster manager. zkSvr: " + zkAddress + ", clusterName: " + clusterName
+        + ", instanceName: " + instanceName + ", type: " + instanceType);
+
+    _zkAddress = zkAddress;
+    _clusterName = clusterName;
+    _instanceType = instanceType;
+    _instanceName = instanceName;
+    _preConnectCallbacks = new LinkedList<PreConnectCallback>();
+    _handlers = new ArrayList<CallbackHandler>();
+    _properties = new HelixManagerProperties("cluster-manager-version.properties");
+    _version = _properties.getVersion();
+
+    _keyBuilder = new Builder(clusterName);
+    _messagingService = new DefaultMessagingService(this);
+
+    
+    /**
+     * use system property if available
+     */
+    _flappingTimeWindowMs = getSystemPropertyAsInt("helixmanager.flappingTimeWindow", 
+        ZKHelixManager.FLAPPING_TIME_WINDIOW);
+    
+    _maxDisconnectThreshold = getSystemPropertyAsInt("helixmanager.maxDisconnectThreshold", 
+         ZKHelixManager.MAX_DISCONNECT_THRESHOLD);
+ 
+    _sessionTimeout = getSystemPropertyAsInt("zk.session.timeout", ZkClient.DEFAULT_SESSION_TIMEOUT);
+
+  }
+  
+  private int getSystemPropertyAsInt(String propertyKey, int propertyDefaultValue) {
+    String valueString = System.getProperty(propertyKey, "" + propertyDefaultValue);
+    
+    try
+    {
+      int value = Integer.parseInt(valueString);
+      if (value > 0) {
+        return value;
+      }
+    }
+    catch (NumberFormatException e)
+    {
+      LOG.warn("Exception while parsing property: " + propertyKey + ", string: " + valueString
+          + ", using default value: " + propertyDefaultValue);
+    }
+    
+    return propertyDefaultValue;
+  }
+  
+  /**
+   * different types of helix manager should impl its own handle new session logic
+   */
+//  public abstract void handleNewSession();
+  
+  @Override
+  public void connect() throws Exception {
+    LOG.info("ClusterManager.connect()");
+    if (isConnected())
+    {
+      LOG.warn("Cluster manager: " + _instanceName + " for cluster: " + _clusterName
+          + " already connected. skip connect");
+      return;
+    }
+
+    try
+    {
+      createClient();
+      _messagingService.onConnected();
+    }
+    catch (Exception e)
+    {
+      LOG.error("fail to connect " + _instanceName, e);
+      disconnect();
+      throw e;
+    } 
+  }
+
+  @Override
+  public boolean isConnected() {
+    if (_zkclient == null) {
+      return false;
+    }
+    ZkConnection zkconnection = (ZkConnection) _zkclient.getConnection();
+    if (zkconnection != null) {
+      States state = zkconnection.getZookeeperState();
+      return state == States.CONNECTED;
+    }
+    return false;
+  }
+
+  /**
+   * specific disconnect logic for each helix-manager type
+   */
+  abstract void doDisconnect();
+  
+  /**
+   * This function can be called when the connection are in bad state(e.g. flapping), 
+   * in which isConnected() could be false and we want to disconnect from cluster.
+   */
+  @Override
+  public void disconnect() {
+    LOG.info("disconnect " + _instanceName + "(" + _instanceType + ") from "
+        + _clusterName);
+
+    try {
+      /**
+       * stop all timer tasks
+       */
+      stopTimerTasks();
+      
+      /**
+       * shutdown thread pool first to avoid reset() being invoked in the middle of state
+       * transition
+       */
+      _messagingService.getExecutor().shutdown();
+      
+      // TODO reset user defined handlers only
+      resetHandlers();
+  
+      _dataAccessor.shutdown();
+  
+      doDisconnect();
+  
+      _zkclient.unsubscribeAll();
+    } finally {
+      _zkclient.close();
+      LOG.info("Cluster manager: " + _instanceName + " disconnected");
+    }    
+  }
+
+  @Override
+  public void addIdealStateChangeListener(IdealStateChangeListener listener) throws Exception {
+    addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE, 
+        new EventType[] { EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated });    
+  }
+
+  @Override
+  public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception {
+    addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE, 
+        new EventType[] { EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });    
+  }
+
+  @Override
+  public void addConfigChangeListener(ConfigChangeListener listener) throws Exception {
+    addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG, 
+        new EventType[] { EventType.NodeChildrenChanged });    
+  }
+
+  @Override
+  public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
+      throws Exception {
+    addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG, 
+        new EventType[] { EventType.NodeChildrenChanged });    
+  }
+
+  @Override
+  public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope)
+      throws Exception {
+    Builder keyBuilder = new Builder(_clusterName);
+
+    PropertyKey propertyKey = null;
+    switch(scope)
+    {
+    case CLUSTER:
+      propertyKey = keyBuilder.clusterConfigs();
+      break;
+    case PARTICIPANT:
+      propertyKey = keyBuilder.instanceConfigs();
+      break;
+    case RESOURCE:
+      propertyKey = keyBuilder.resourceConfigs();
+      break;
+    default:
+      break;
+    }
+
+    if (propertyKey != null)
+    {
+      addListener(listener, propertyKey, ChangeType.CONFIG, 
+          new EventType[] { EventType.NodeChildrenChanged });
+    } else
+    {
+      LOG.error("Can't add listener to config scope: " + scope);
+    }    
+  }
+
+  @Override
+  public void addMessageListener(MessageListener listener, String instanceName) {
+    addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE, 
+        new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });    
+  }
+
+  @Override
+  public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
+      String instanceName, String sessionId) throws Exception {
+    addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId), ChangeType.CURRENT_STATE,
+        new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });    
+  }
+
+  @Override
+  public void addHealthStateChangeListener(HealthStateChangeListener listener, String instanceName)
+      throws Exception {
+    addListener(listener, new Builder(_clusterName).healthReports(instanceName), ChangeType.HEALTH,
+        new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });    
+  }
+
+  @Override
+  public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception {
+    addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW,
+        new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });    
+  }
+
+  @Override
+  public void addControllerListener(ControllerChangeListener listener) {
+    addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER,
+        new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
+  }
+
+  void addControllerMessageListener(MessageListener listener)
+  {
+    addListener(listener, new Builder(_clusterName).controllerMessages(), ChangeType.MESSAGES_CONTROLLER,
+          new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated });
+  }
+
+  @Override
+  public boolean removeListener(PropertyKey key, Object listener) {
+    LOG.info("Removing listener: " + listener + " on path: " + key.getPath() 
+        + " from cluster: " + _clusterName + " by instance: " + _instanceName);
+
+    synchronized (this)
+    {
+      List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>();
+      for (CallbackHandler handler : _handlers)
+      {
+        // compare property-key path and listener reference
+        if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener))
+        {
+          toRemove.add(handler);
+        }
+      }
+      
+      _handlers.removeAll(toRemove);
+      
+      // handler.reset() may modify the handlers list, so do it outside the iteration
+      for (CallbackHandler handler : toRemove) {
+        handler.reset();
+      }
+    }
+
+    return true;  
+  }
+
+  @Override
+  public HelixDataAccessor getHelixDataAccessor() {
+    checkConnected();
+    return _dataAccessor;
+  }
+
+  @Override
+  public ConfigAccessor getConfigAccessor() {
+    checkConnected();
+    return _configAccessor;
+  }
+
+  @Override
+  public String getClusterName() {
+    return _clusterName;
+  }
+
+  @Override
+  public String getInstanceName() {
+    return _instanceName;
+  }
+
+  @Override
+  public String getSessionId() {
+    checkConnected();
+    return _sessionId;
+  }
+
+  @Override
+  public long getLastNotificationTime() {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public HelixAdmin getClusterManagmentTool() {
+    checkConnected();
+    if (_zkclient != null)
+    {
+      return new ZKHelixAdmin(_zkclient);
+    }
+    
+    LOG.error("Couldn't get ZKClusterManagementTool because zkclient is null");
+    return null;
+  }
+
+  @Override
+  public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() {
+    checkConnected();
+
+    if (_helixPropertyStore == null)
+    {
+      String path =
+          PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
+
+      _helixPropertyStore =
+          new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient),
+                                             path,
+                                             null);
+    }
+
+    return _helixPropertyStore;
+  }
+
+  @Override
+  public ClusterMessagingService getMessagingService() {
+    // The caller can register message handler factories on messaging service before the
+    // helix manager is connected. Thus we do not check connected here
+    return _messagingService;
+  }
+
+  @Override
+  public ParticipantHealthReportCollector getHealthReportCollector() {
+    // helix-participant will override this
+    return null;
+  }
+
+  @Override
+  public InstanceType getInstanceType() {
+    return _instanceType;
+  }
+
+  @Override
+  public String getVersion() {
+    return _version;
+  }
+
+  @Override
+  public HelixManagerProperties getProperties() {
+    return _properties;
+  }
+
+  @Override
+  public StateMachineEngine getStateMachineEngine() {
+    // helix-participant will override this
+    return null;
+  }
+
+  @Override
+  public abstract boolean isLeader();
+
+  @Override
+  public void startTimerTasks() {
+    for (HelixTimerTask task : _timerTasks)
+    {
+      task.start();
+    }
+
+  }
+
+  @Override
+  public void stopTimerTasks() {
+    for (HelixTimerTask task : _timerTasks)
+    {
+      task.stop();
+    }
+    
+  }
+
+  @Override
+  public void addPreConnectCallback(PreConnectCallback callback) {
+    LOG.info("Adding preconnect callback: " + callback);
+    _preConnectCallbacks.add(callback);    
+  }
+
+  @Override
+  public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
+    _liveInstanceInfoProvider = liveInstanceInfoProvider;    
+  }
+
+  /**
+   * wait until we get a non-zero session-id. note that we might lose zkconnection
+   * right after we read session-id. but it's ok to get stale session-id and we will have 
+   * another handle-new-session callback to correct this. 
+   */
+  protected void waitUntilConnected() {
+    boolean isConnected;
+    do {
+      isConnected = _zkclient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT,
+          TimeUnit.MILLISECONDS);
+      if (!isConnected) {
+        LOG.error("fail to connect zkserver: " + _zkAddress + " in "
+            + ZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
+            + ", clusterName: " + _clusterName);
+        continue;
+      }
+      
+      ZkConnection zkConnection = ((ZkConnection) _zkclient.getConnection());
+      _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId());
+
+      /**
+       * at the time we read session-id, zkconnection might be lost again
+       * wait until we get a non-zero session-id
+       */
+    } while ("0".equals(_sessionId));
+    
+    LOG.info("Handling new session, session id: " + _sessionId
+        + ", instance: " + _instanceName + ", instanceTye: " + _instanceType
+        + ", cluster: " + _clusterName
+        + ", zkconnection: " + ((ZkConnection) _zkclient.getConnection()).getZookeeper());
+  }
+  
+  protected void checkConnected()
+  {
+    if (!isConnected())
+    {
+      throw new HelixException("ClusterManager not connected. Call clusterManager.connect()");
+    }
+  }
+
+  protected void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType, EventType[] eventType)
+  {
+    checkConnected();
+
+    PropertyType type = propertyKey.getType();
+
+    synchronized (this)
+    {
+      for (CallbackHandler handler : _handlers)
+      {
+        // compare property-key path and listener reference
+        if (handler.getPath().equals(propertyKey.getPath()) && handler.getListener().equals(listener))
+        {
+          LOG.info("Listener: " + listener + " on path: " + propertyKey.getPath() + " already exists. skip add");
+          return;
+        }
+      }
+      
+      CallbackHandler newHandler = new CallbackHandler(this, _zkclient, propertyKey, 
+          listener, eventType, changeType);
+      
+      _handlers.add(newHandler);
+      LOG.info("Added listener: " + listener + " for type: " + type + " to path: " + newHandler.getPath());
+    }
+  }
+  
+  protected void initHandlers(List<CallbackHandler> handlers)
+  {
+    synchronized (this)
+    {
+      if (handlers != null)
+      {
+        for (CallbackHandler handler : handlers)
+        {
+          handler.init();
+          LOG.info("init handler: " + handler.getPath() + ", " + handler.getListener());
+        }
+      }
+    }
+  }
+  
+  protected void resetHandlers()
+  {
+    synchronized (this)
+    {
+      if (_handlers != null) {
+        // get a copy of the list and iterate over the copy list
+        // in case handler.reset() modify the original handler list
+        List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>();
+        tmpHandlers.addAll(_handlers);
+  
+        for (CallbackHandler handler : tmpHandlers)
+        {
+          handler.reset();
+          LOG.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
+        }
+      }
+    }
+  }
+
+  /**
+   * different helix-manager may override this to have a cache-enabled based-data-accessor
+   * 
+   * @param baseDataAccessor
+   * @return
+   */
+  BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
+    return baseDataAccessor;
+  }
+  
+  void createClient() throws Exception {
+    PathBasedZkSerializer zkSerializer =
+        ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer())
+                               .build();
+
+    _zkclient = new ZkClient(_zkAddress, _sessionTimeout, ZkClient.DEFAULT_CONNECTION_TIMEOUT, zkSerializer);
+
+    ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkclient);
+
+    _baseDataAccessor = createBaseDataAccessor(baseDataAccessor);
+
+    _dataAccessor =
+        new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor);
+    _configAccessor = new ConfigAccessor(_zkclient);
+    
+    int retryCount = 0;
+
+    _zkclient.subscribeStateChanges(this);
+    while (retryCount < 3)
+    {
+      try
+      {
+        _zkclient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS);
+        handleStateChanged(KeeperState.SyncConnected);
+        handleNewSession();
+        break;
+      }
+      catch (HelixException e)
+      {
+        LOG.error("fail to createClient.", e);
+        throw e;
+      }
+      catch (Exception e)
+      {
+        retryCount++;
+
+        LOG.error("fail to createClient. retry " + retryCount, e);
+        if (retryCount == 3)
+        {
+          throw e;
+        }
+      }
+    }
+  }
+
+  // TODO separate out flapping detection code
+  @Override
+  public void handleStateChanged(KeeperState state) throws Exception {
+    switch (state)
+    {
+    case SyncConnected:
+      ZkConnection zkConnection = (ZkConnection) _zkclient.getConnection();
+      LOG.info("KeeperState: " + state + ", zookeeper:" + zkConnection.getZookeeper());
+      break;
+    case Disconnected:
+      LOG.info("KeeperState:" + state + ", disconnectedSessionId: "
+          + _sessionId + ", instance: " + _instanceName + ", type: "
+          + _instanceType);
+
+      /**
+       * Track the time stamp that the disconnected happens, then check history and see if
+       * we should disconnect the helix-manager
+       */
+      _disconnectTimeHistory.add(System.currentTimeMillis());
+      if (isFlapping())
+      {
+        LOG.error("instanceName: " + _instanceName + " is flapping. diconnect it. "
+          + " maxDisconnectThreshold: " + _maxDisconnectThreshold 
+          + " disconnects in " + _flappingTimeWindowMs + "ms."); 
+        disconnect();
+      }
+      break;
+    case Expired:
+      LOG.info("KeeperState:" + state + ", expiredSessionId: " + _sessionId + ", instance: "
+          + _instanceName + ", type: " + _instanceType);
+      break;
+    default:
+      break;
+    }    
+  }
+  
+  /**
+   * If zk state has changed into Disconnected for _maxDisconnectThreshold times during previous _timeWindowLengthMs Ms
+   * time window, we think that there are something wrong going on and disconnect the zkHelixManager from zk.
+   */
+  private boolean isFlapping()
+  {
+    if(_disconnectTimeHistory.size() == 0)
+    {
+      return false;
+    }
+    long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);
+    
+    // Remove disconnect history timestamp that are older than _flappingTimeWindowMs ago
+    while ((_disconnectTimeHistory.get(0) + _flappingTimeWindowMs) < mostRecentTimestamp)
+    {
+      _disconnectTimeHistory.remove(0);
+    }
+    return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
+  }
+
+  /**
+   * controller should override it to return a list of timers that need to start/stop when leadership changes
+   * 
+   * @return
+   */
+  protected List<HelixTimerTask> getControllerHelixTimerTasks() {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/94367e1c/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index cfe40c1..533650f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -41,6 +41,7 @@ import org.apache.helix.ExternalViewChangeListener;
 import org.apache.helix.HealthStateChangeListener;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.IdealStateChangeListener;
@@ -88,6 +89,10 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
                          EventType[] eventTypes,
                          ChangeType changeType)
   {
+    if (listener == null) {
+      throw new HelixException("listener could not be null");
+    }
+    
     this._manager = manager;
     this._accessor = manager.getHelixDataAccessor();
     this._zkClient = client;

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/94367e1c/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
new file mode 100644
index 0000000..43fa149
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManager.java
@@ -0,0 +1,210 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Timer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.InstanceType;
+import org.apache.helix.MessageListener;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.monitoring.ZKPathDataDumpTask;
+import org.apache.helix.monitoring.mbeans.HelixStageLatencyMonitor;
+import org.apache.helix.participant.DistClusterControllerElection;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class ControllerManager extends AbstractManager {
+  private static Logger LOG = Logger.getLogger(ControllerManager.class);
+
+  final GenericHelixController _controller = new GenericHelixController();
+  
+  // TODO merge into GenericHelixController
+  private CallbackHandler _leaderElectionHandler = null;
+
+  /**
+   * status dump timer-task
+   *
+   */
+  static class StatusDumpTask extends HelixTimerTask {
+    Timer _timer = null;
+    final ZkClient zkclient;
+    final AbstractManager helixController;
+    
+    public StatusDumpTask(ZkClient zkclient, AbstractManager helixController) {
+      this.zkclient = zkclient;
+      this.helixController = helixController;
+    }
+    
+    @Override
+    public void start() {
+      long initialDelay = 30 * 60 * 1000;
+      long period = 120 * 60 * 1000;
+      int timeThresholdNoChange = 180 * 60 * 1000;
+
+      if (_timer == null)
+      {
+        _timer = new Timer(true);
+        _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(helixController,
+                                                          zkclient,
+                                                          timeThresholdNoChange),
+                                   initialDelay,
+                                   period);
+      }
+      
+    }
+
+    @Override
+    public void stop() {
+      if (_timer != null)
+      {
+        _timer.cancel();
+        _timer = null;
+      }      
+    }
+
+    @Override
+    public void run() {
+      // TODO Auto-generated method stub
+      
+    }
+    
+  }
+  
+  public ControllerManager(String zkAddress, String clusterName, String instanceName) {
+    super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER);
+    
+    _timerTasks.add(new HealthStatsAggregationTask(this));
+    _timerTasks.add(new StatusDumpTask(_zkclient, this));
+  }
+
+  @Override
+  protected List<HelixTimerTask> getControllerHelixTimerTasks() {
+    return _timerTasks;
+  }
+
+  @Override
+  public void handleNewSession() throws Exception {  
+    waitUntilConnected();
+    
+    /**
+     * reset all handlers, make sure cleanup completed for previous session
+     * disconnect if fail to cleanup
+     */    
+    if (_leaderElectionHandler != null) {
+      _leaderElectionHandler.reset();
+    }
+    // TODO reset user defined handlers only
+    resetHandlers();
+
+    /**
+     * from here on, we are dealing with new session
+     */
+    
+    if (_leaderElectionHandler != null) {
+      _leaderElectionHandler.init();
+    } else {
+      _leaderElectionHandler = new CallbackHandler(this,
+                                                   _zkclient,
+                                                   _keyBuilder.controller(),
+                                  new DistributedLeaderElection(this, _controller),
+                                  new EventType[] { EventType.NodeChildrenChanged,
+                                      EventType.NodeDeleted, EventType.NodeCreated },
+                                  ChangeType.CONTROLLER);
+    }
+    
+    /**
+     * init user defined handlers only
+     */
+    List<CallbackHandler> userHandlers = new ArrayList<CallbackHandler>();
+    for (CallbackHandler handler : _handlers) {
+      Object listener = handler.getListener();
+      if (!listener.equals(_messagingService.getExecutor())
+          && !listener.equals(_controller)) {
+        userHandlers.add(handler);
+      }
+    }
+    initHandlers(userHandlers);
+
+  }
+
+  @Override
+  void doDisconnect() {
+    if (_leaderElectionHandler != null)
+    {
+      _leaderElectionHandler.reset();
+    }
+  }
+
+  @Override
+  public boolean isLeader() {
+    if (!isConnected())
+    {
+      return false;
+    }
+
+    try {
+      LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
+      if (leader != null)
+      {
+        String leaderName = leader.getInstanceName();
+        String sessionId = leader.getSessionId();
+        if (leaderName != null && leaderName.equals(_instanceName)
+            && sessionId != null && sessionId.equals(_sessionId))
+        {
+          return true;
+        }
+      }
+    } catch (Exception e) {
+      // log
+    }
+    return false;  
+  }
+
+  /**
+   * helix-controller uses a write-through cache for external-view
+   * 
+   */
+  @Override
+  BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
+    String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, _clusterName);
+    return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
+                                                Arrays.asList(extViewPath));
+
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/94367e1c/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
new file mode 100644
index 0000000..d724095
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ControllerManagerHelper.java
@@ -0,0 +1,123 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.log4j.Logger;
+
+/**
+ * helper class for controller manager
+ *
+ */
+public class ControllerManagerHelper {
+  private static Logger LOG = Logger.getLogger(ControllerManagerHelper.class);
+  
+  final AbstractManager _manager;
+  final DefaultMessagingService _messagingService;
+  final List<HelixTimerTask> _controllerTimerTasks;
+
+  public ControllerManagerHelper(AbstractManager manager) {
+    _manager = manager;
+    _messagingService = (DefaultMessagingService)manager.getMessagingService();
+    _controllerTimerTasks = manager.getControllerHelixTimerTasks();
+  }
+
+  public void addListenersToController(GenericHelixController controller)
+  {
+    try
+    {
+      /**
+       *  setup controller message listener and register message handlers
+       */
+      _manager.addControllerMessageListener(_messagingService.getExecutor());
+      MessageHandlerFactory defaultControllerMsgHandlerFactory =
+          new DefaultControllerMessageHandlerFactory();
+      _messagingService.getExecutor()
+                       .registerMessageHandlerFactory(defaultControllerMsgHandlerFactory.getMessageType(),
+                                                      defaultControllerMsgHandlerFactory);
+      MessageHandlerFactory defaultSchedulerMsgHandlerFactory =
+          new DefaultSchedulerMessageHandlerFactory(_manager);
+      _messagingService.getExecutor()
+                       .registerMessageHandlerFactory(defaultSchedulerMsgHandlerFactory.getMessageType(),
+                                                      defaultSchedulerMsgHandlerFactory);
+      MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory =
+          new DefaultParticipantErrorMessageHandlerFactory(_manager);
+      _messagingService.getExecutor()
+                       .registerMessageHandlerFactory(defaultParticipantErrorMessageHandlerFactory.getMessageType(),
+       
+                           defaultParticipantErrorMessageHandlerFactory);
+      
+      /**
+       * setup generic-controller
+       */
+      _manager.addConfigChangeListener(controller);
+      _manager.addLiveInstanceChangeListener(controller);
+      _manager.addIdealStateChangeListener(controller);
+      // no need for controller to listen on external-view
+      // _manager.addExternalViewChangeListener(controller);
+      _manager.addControllerListener(controller);
+    } catch (ZkInterruptedException e)
+    {
+      LOG.warn("zk connection is interrupted during HelixManagerMain.addListenersToController(). "
+              + e);
+    } catch (Exception e)
+    {
+      LOG.error("Error when creating HelixManagerContollerMonitor", e);
+    }
+  }
+  
+  public void removeListenersFromController(GenericHelixController controller) {
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(_manager.getClusterName());
+    /**
+     * reset generic-controller
+     */
+    _manager.removeListener(keyBuilder.instanceConfigs(), controller);
+    _manager.removeListener(keyBuilder.liveInstances(), controller);
+    _manager.removeListener(keyBuilder.idealStates(), controller);
+    _manager.removeListener(keyBuilder.controller(), controller);
+    
+    /**
+     * reset controller message listener and unregister all message handlers
+     */
+    _manager.removeListener(keyBuilder.controllerMessages(), _messagingService.getExecutor());
+  }
+  
+  
+  public void startControllerTimerTasks() {
+    for (HelixTimerTask task : _controllerTimerTasks) {
+      task.start();
+    }
+  }
+  
+  public void stopControllerTimerTasks() {
+    for (HelixTimerTask task : _controllerTimerTasks) {
+      task.stop();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/94367e1c/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
new file mode 100644
index 0000000..92b4f66
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedControllerManager.java
@@ -0,0 +1,209 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixTimerTask;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.healthcheck.HealthStatsAggregationTask;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.manager.zk.ControllerManager.StatusDumpTask;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.participant.DistClusterControllerElection;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+
+public class DistributedControllerManager extends AbstractManager {
+  private static Logger LOG = Logger.getLogger(DistributedControllerManager.class);
+
+  final StateMachineEngine _stateMachineEngine;
+  final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
+
+  CallbackHandler _leaderElectionHandler = null;
+  final GenericHelixController _controller = new GenericHelixController();
+  
+  /**
+   * hold timer tasks for controller only
+   * we need to add/remove controller timer tasks during handle new session
+   */
+  final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<HelixTimerTask>();
+
+  public DistributedControllerManager(String zkAddress, String clusterName, String instanceName) {
+    super(zkAddress, clusterName, instanceName, InstanceType.CONTROLLER_PARTICIPANT);
+    
+    _stateMachineEngine = new HelixStateMachineEngine(this);
+    _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
+
+    _timerTasks.add(_participantHealthInfoCollector);
+    
+    _controllerTimerTasks.add(new HealthStatsAggregationTask(this));
+    _controllerTimerTasks.add(new ControllerManager.StatusDumpTask(_zkclient, this));
+
+  }
+
+  @Override
+  public ParticipantHealthReportCollector getHealthReportCollector() {
+    checkConnected();
+    return _participantHealthInfoCollector;
+  }
+  
+  @Override
+  public StateMachineEngine getStateMachineEngine() {
+    return _stateMachineEngine;
+  }
+  
+  @Override
+  protected List<HelixTimerTask> getControllerHelixTimerTasks() {
+    return _controllerTimerTasks;
+  }
+
+  @Override
+  public void handleNewSession() throws Exception {
+    waitUntilConnected();
+    
+    ParticipantManagerHelper participantHelper 
+      = new ParticipantManagerHelper(this, _zkclient, _sessionTimeout);
+ 
+    /**
+     * stop all timer tasks, reset all handlers, make sure cleanup completed for previous session
+     * disconnect if fail to cleanup
+     */
+    stopTimerTasks();
+    if (_leaderElectionHandler != null) {
+      _leaderElectionHandler.reset();
+    }
+    resetHandlers();
+    
+    /**
+     * clean up write-through cache
+     */
+    _baseDataAccessor.reset();
+
+    
+    /**
+     * from here on, we are dealing with new session
+     */
+    if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
+      throw new HelixException("Cluster structure is not set up for cluster: "
+          + _clusterName);
+    }
+    
+    /**
+     * auto-join
+     */
+    participantHelper.joinCluster();
+
+    /**
+     * Invoke PreConnectCallbacks
+     */
+    for (PreConnectCallback callback : _preConnectCallbacks)
+    {
+      callback.onPreConnect();
+    }
+
+    participantHelper.createLiveInstance();
+    
+    participantHelper.carryOverPreviousCurrentState();
+    
+    participantHelper.setupMsgHandler();
+    
+    /**
+     * leader election
+     */
+    if (_leaderElectionHandler != null) {
+      _leaderElectionHandler.init();
+    } else {
+      _leaderElectionHandler = new CallbackHandler(this,
+                                                   _zkclient,
+                                                   _keyBuilder.controller(),
+                                  new DistributedLeaderElection(this, _controller),
+                                  new EventType[] { EventType.NodeChildrenChanged,
+                                      EventType.NodeDeleted, EventType.NodeCreated },
+                                  ChangeType.CONTROLLER);
+    }
+
+    /**
+     * start health-check timer task
+     */
+    participantHelper.createHealthCheckPath();
+    startTimerTasks();
+    
+    /**
+     * init user defined handlers only
+     */
+    List<CallbackHandler> userHandlers = new ArrayList<CallbackHandler>();
+    for (CallbackHandler handler : _handlers) {
+      Object listener = handler.getListener();
+      if (!listener.equals(_messagingService.getExecutor())
+          && !listener.equals(_dataAccessor)
+          && !listener.equals(_controller)) {
+        userHandlers.add(handler);
+      }
+    }
+    initHandlers(userHandlers);
+    
+  }
+
+  @Override
+  void doDisconnect() {
+    if (_leaderElectionHandler != null)
+    {
+      _leaderElectionHandler.reset();
+    }
+  }
+
+  @Override
+  public boolean isLeader() {
+    if (!isConnected())
+    {
+      return false;
+    }
+
+    try {
+      LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
+      if (leader != null)
+      {
+        String leaderName = leader.getInstanceName();
+        String sessionId = leader.getSessionId();
+        if (leaderName != null && leaderName.equals(_instanceName)
+            && sessionId != null && sessionId.equals(_sessionId))
+        {
+          return true;
+        }
+      }
+    } catch (Exception e) {
+      // log
+    }
+    return false;  
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/94367e1c/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
new file mode 100644
index 0000000..ab1da23
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java
@@ -0,0 +1,160 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.controller.restlet.ZKPropertyTransferServer;
+import org.apache.helix.model.LeaderHistory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.participant.DistClusterControllerElection;
+import org.apache.log4j.Logger;
+
+/**
+ * do distributed leader election
+ * 
+ */
+public class DistributedLeaderElection implements ControllerChangeListener {
+  private static Logger LOG = Logger.getLogger(DistributedLeaderElection.class);
+
+  final AbstractManager _manager;
+  final GenericHelixController _controller;
+
+  public DistributedLeaderElection(AbstractManager manager, GenericHelixController controller) {
+    _manager = manager;
+    _controller = controller;
+  }
+
+  /**
+   * may be accessed by multiple threads: zk-client thread and
+   * ZkHelixManager.disconnect()->reset() TODO: Refactor accessing
+   * HelixMaangerMain class statically
+   */
+  @Override
+  public synchronized void onControllerChange(NotificationContext changeContext) {
+    HelixManager manager = changeContext.getManager();
+    if (manager == null) {
+      LOG.error("missing attributes in changeContext. requires HelixManager");
+      return;
+    }
+
+    InstanceType type = manager.getInstanceType();
+    if (type != InstanceType.CONTROLLER && type != InstanceType.CONTROLLER_PARTICIPANT) {
+      LOG.error("fail to become controller because incorrect instanceType (was " + type.toString()
+          + ", requires CONTROLLER | CONTROLLER_PARTICIPANT)");
+      return;
+    }
+
+    ControllerManagerHelper controllerHelper = new ControllerManagerHelper(_manager);
+    try {
+      if (changeContext.getType().equals(NotificationContext.Type.INIT)
+          || changeContext.getType().equals(NotificationContext.Type.CALLBACK)) {
+        HelixDataAccessor accessor = manager.getHelixDataAccessor();
+        Builder keyBuilder = accessor.keyBuilder();
+
+        while (accessor.getProperty(keyBuilder.controllerLeader()) == null) {
+          boolean success = tryUpdateController(manager);
+          if (success) {
+            updateHistory(manager);
+            _manager._baseDataAccessor.reset();
+            controllerHelper.addListenersToController(_controller);
+            controllerHelper.startControllerTimerTasks();
+          }
+        }
+      } else if (changeContext.getType().equals(NotificationContext.Type.FINALIZE)) {
+        controllerHelper.stopControllerTimerTasks();
+        controllerHelper.removeListenersFromController(_controller);
+        
+        /**
+         * clear write-through cache
+         */
+        _manager._baseDataAccessor.reset();
+      }
+
+    } catch (Exception e) {
+      LOG.error("Exception when trying to become leader", e);
+    }
+  }
+
+  private boolean tryUpdateController(HelixManager manager) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    LiveInstance leader = new LiveInstance(manager.getInstanceName());
+    try {
+      leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+      leader.setSessionId(manager.getSessionId());
+      leader.setHelixVersion(manager.getVersion());
+      if (ZKPropertyTransferServer.getInstance() != null) {
+        String zkPropertyTransferServiceUrl = ZKPropertyTransferServer.getInstance()
+            .getWebserviceUrl();
+        if (zkPropertyTransferServiceUrl != null) {
+          leader.setWebserviceUrl(zkPropertyTransferServiceUrl);
+        }
+      } else {
+        LOG.warn("ZKPropertyTransferServer instnace is null");
+      }
+      boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader);
+      if (success) {
+        return true;
+      } else {
+        LOG.info("Unable to become leader probably because some other controller becames the leader");
+      }
+    } catch (Exception e) {
+      LOG.error(
+          "Exception when trying to updating leader record in cluster:" + manager.getClusterName()
+              + ". Need to check again whether leader node has been created or not", e);
+    }
+
+    leader = accessor.getProperty(keyBuilder.controllerLeader());
+    if (leader != null) {
+      String leaderSessionId = leader.getSessionId();
+      LOG.info("Leader exists for cluster: " + manager.getClusterName() + ", currentLeader: "
+          + leader.getInstanceName() + ", leaderSessionId: " + leaderSessionId);
+
+      if (leaderSessionId != null && leaderSessionId.equals(manager.getSessionId())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private void updateHistory(HelixManager manager) {
+    HelixDataAccessor accessor = manager.getHelixDataAccessor();
+    Builder keyBuilder = accessor.keyBuilder();
+
+    LeaderHistory history = accessor.getProperty(keyBuilder.controllerLeaderHistory());
+    if (history == null) {
+      history = new LeaderHistory(PropertyType.HISTORY.toString());
+    }
+    history.updateHistory(manager.getClusterName(), manager.getInstanceName());
+    accessor.setProperty(keyBuilder.controllerLeaderHistory(), history);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/94367e1c/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
new file mode 100644
index 0000000..fc54f08
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java
@@ -0,0 +1,205 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.ConfigChangeListener;
+import org.apache.helix.ControllerChangeListener;
+import org.apache.helix.CurrentStateChangeListener;
+import org.apache.helix.ExternalViewChangeListener;
+import org.apache.helix.HealthStateChangeListener;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.IdealStateChangeListener;
+import org.apache.helix.InstanceConfigChangeListener;
+import org.apache.helix.InstanceType;
+import org.apache.helix.LiveInstanceChangeListener;
+import org.apache.helix.LiveInstanceInfoProvider;
+import org.apache.helix.MessageListener;
+import org.apache.helix.PreConnectCallback;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathConfig;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ScopedConfigChangeListener;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.HelixConstants.ChangeType;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
+import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.participant.DistClusterControllerElection;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.data.Stat;
+
+public class ParticipantManager extends AbstractManager {
+
+  private static Logger LOG = Logger.getLogger(ParticipantManager.class);
+  
+  /**
+   * state-transition message handler factory for helix-participant
+   */
+  final StateMachineEngine _stateMachineEngine;
+
+  final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
+  
+  public ParticipantManager(String zkAddress, String clusterName, String instanceName) {
+    super(zkAddress, clusterName, instanceName, InstanceType.PARTICIPANT);
+    
+    _stateMachineEngine = new HelixStateMachineEngine(this);
+    _participantHealthInfoCollector = new ParticipantHealthReportCollectorImpl(this, _instanceName);
+    
+    _timerTasks.add(_participantHealthInfoCollector);
+  }
+  
+  @Override
+  public ParticipantHealthReportCollector getHealthReportCollector() {
+    checkConnected();
+    return _participantHealthInfoCollector;
+  }
+  
+  @Override
+  public StateMachineEngine getStateMachineEngine() {
+    return _stateMachineEngine;
+  }
+
+  @Override
+  public void handleNewSession() {
+    waitUntilConnected();
+
+    
+    /**
+     * stop timer tasks, reset all handlers, make sure cleanup completed for previous session
+     * disconnect if cleanup fails
+     */
+    stopTimerTasks();
+    resetHandlers();
+    
+    /**
+     * clear write-through cache
+     */
+    _baseDataAccessor.reset();
+
+    
+    /**
+     * from here on, we are dealing with new session
+     */
+    if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
+      throw new HelixException("Cluster structure is not set up for cluster: "
+          + _clusterName);
+    }
+    
+    /**
+     * auto-join
+     */
+    ParticipantManagerHelper participantHelper 
+          = new ParticipantManagerHelper(this, _zkclient, _sessionTimeout);
+    participantHelper.joinCluster();
+
+    /**
+     * Invoke PreConnectCallbacks
+     */
+    for (PreConnectCallback callback : _preConnectCallbacks)
+    {
+      callback.onPreConnect();
+    }
+
+    participantHelper.createLiveInstance();
+    
+    participantHelper.carryOverPreviousCurrentState();
+    
+    /**
+     * setup message listener
+     */
+    participantHelper.setupMsgHandler();
+
+    /**
+     * start health check timer task
+     */
+    participantHelper.createHealthCheckPath();
+    startTimerTasks();
+    
+    /**
+     * init user defined handlers only
+     */
+    List<CallbackHandler> userHandlers = new ArrayList<CallbackHandler>();
+    for (CallbackHandler handler : _handlers) {
+      Object listener = handler.getListener();
+      if (!listener.equals(_messagingService.getExecutor())
+          && !listener.equals(_dataAccessor)) {
+        userHandlers.add(handler);
+      }
+    }
+    initHandlers(userHandlers);
+
+  }
+  
+  /**
+   * helix-participant uses a write-through cache for current-state
+   * 
+   */
+  @Override
+  BaseDataAccessor<ZNRecord> createBaseDataAccessor(ZkBaseDataAccessor<ZNRecord> baseDataAccessor) {
+    String curStatePath = PropertyPathConfig.getPath(PropertyType.CURRENTSTATES,
+        _clusterName,
+        _instanceName);
+      return new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor,
+                         Arrays.asList(curStatePath));
+
+  }
+
+  @Override
+  public boolean isLeader() {
+    return false;
+  }
+
+  /**
+   * disconnect logic for helix-participant
+   */
+  void doDisconnect() {
+    // nothing for participant
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/94367e1c/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
new file mode 100644
index 0000000..4f9d138
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java
@@ -0,0 +1,276 @@
+package org.apache.helix.manager.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.lang.management.ManagementFactory;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.messaging.DefaultMessagingService;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.helix.participant.HelixStateMachineEngine;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * helper class for participant-manager
+ *
+ */
+public class ParticipantManagerHelper {
+  private static Logger LOG = Logger.getLogger(ParticipantManagerHelper.class);
+
+  final ZkClient _zkclient;
+  final AbstractManager _manager;
+  final PropertyKey.Builder _keyBuilder;
+  final String _clusterName;
+  final String _instanceName;
+  final String _sessionId;
+  final int _sessionTimeout;
+  final ConfigAccessor _configAccessor;
+  final InstanceType _instanceType;
+  final HelixAdmin _helixAdmin;
+  final ZKHelixDataAccessor _dataAccessor;
+  final DefaultMessagingService _messagingService;
+  final StateMachineEngine _stateMachineEngine;
+
+  
+  public ParticipantManagerHelper(AbstractManager manager, ZkClient zkclient, int sessionTimeout) {
+    _zkclient = zkclient;
+    _manager = manager;
+    _clusterName = manager.getClusterName();
+    _instanceName = manager.getInstanceName();
+    _keyBuilder = new PropertyKey.Builder(_clusterName);
+    _sessionId = manager.getSessionId();
+    _sessionTimeout = sessionTimeout;
+    _configAccessor = manager.getConfigAccessor();
+    _instanceType = manager.getInstanceType();
+    _helixAdmin = manager.getClusterManagmentTool();
+    _dataAccessor = (ZKHelixDataAccessor) manager.getHelixDataAccessor();
+    _messagingService = (DefaultMessagingService)manager.getMessagingService();
+    _stateMachineEngine = manager.getStateMachineEngine();
+  }
+  
+  public void joinCluster() {
+    // Read cluster config and see if instance can auto join the cluster
+    boolean autoJoin = false;
+    try {
+      HelixConfigScope scope = new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(
+          _manager.getClusterName()).build();
+      autoJoin = Boolean.parseBoolean(_configAccessor.get(scope, HelixManager.ALLOW_PARTICIPANT_AUTO_JOIN));
+      LOG.info("instance: " + _instanceName + " auto-joining " + _clusterName + " is " + autoJoin);
+    } catch (Exception e) {
+      // autoJoin is false
+    }
+    
+    if (!ZKUtil.isInstanceSetup(_zkclient, _clusterName, _instanceName, _instanceType)) {
+      if (!autoJoin) {
+        throw new HelixException("Initial cluster structure is not set up for instance: "
+            + _instanceName + ", instanceType: " + _instanceType);
+      } else {
+        LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName);
+        InstanceConfig instanceConfig = new InstanceConfig(_instanceName);
+        String hostName = _instanceName;
+        String port = "";
+        int lastPos = _instanceName.lastIndexOf("_");
+        if (lastPos > 0) {
+          hostName = _instanceName.substring(0, lastPos);
+          port = _instanceName.substring(lastPos + 1);
+        }
+        instanceConfig.setHostName(hostName);
+        instanceConfig.setPort(port);
+        instanceConfig.setInstanceEnabled(true);
+        _helixAdmin.addInstance(_clusterName, instanceConfig);
+      }
+    }
+  }
+  
+  public void createLiveInstance() {
+    String liveInstancePath = _keyBuilder.liveInstance(_instanceName).getPath();
+    LiveInstance liveInstance = new LiveInstance(_instanceName);
+    liveInstance.setSessionId(_sessionId);
+    liveInstance.setHelixVersion(_manager.getVersion());
+    liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName());
+
+    boolean retry;
+    do {
+      retry = false;
+      try {
+        _zkclient.createEphemeral(liveInstancePath, liveInstance.getRecord());
+      } catch (ZkNodeExistsException e) {
+        LOG.warn("found another instance with same instanceName: " + _instanceName
+            + " in cluster " + _clusterName);
+
+        Stat stat = new Stat();
+        ZNRecord record = _zkclient.readData(liveInstancePath, stat, true);
+        if (record == null) {
+          /**
+           * live-instance is gone as we check it, retry create live-instance
+           */
+          retry = true;
+        } else {
+          String ephemeralOwner = Long.toHexString(stat.getEphemeralOwner());
+          if (ephemeralOwner.equals(_sessionId)) {
+            /**
+             * update sessionId field in live-instance if necessary
+             */
+            LiveInstance curLiveInstance = new LiveInstance(record);
+            if (!curLiveInstance.getSessionId().equals(_sessionId)) {
+              /**
+               * in last handle-new-session, 
+               * live-instance is created by new zkconnection with stale session-id inside
+               * just update session-id field
+               */
+              curLiveInstance.setSessionId(_sessionId);
+              _zkclient.writeData(liveInstancePath, record);
+            }
+          } else {
+            /**
+             * wait for a while, in case previous helix-participant exits unexpectedly
+             * and its live-instance still hangs around until session timeout
+             */
+            try {
+              TimeUnit.MILLISECONDS.sleep(_sessionTimeout + 5000);
+            } catch (InterruptedException ex)
+            {
+              LOG.warn("Sleep interrupted while waiting for previous live-instance to go away.",
+                          ex);
+            }
+            /**
+             * give a last try after exit while loop
+             */
+            retry = true;
+            break;
+          }
+        }
+      }
+    } while (retry);
+    
+    /**
+     * give a last shot
+     */
+    if (retry) {
+      try {
+        _zkclient.createPersistent(liveInstancePath, liveInstance.getRecord());
+      } catch (Exception e) {
+        String errorMessage = "instance: " + _instanceName 
+            + " already has a live-instance in cluster " + _clusterName;
+        LOG.error(errorMessage);
+        throw new HelixException(errorMessage);
+      }
+    }
+  }
+  
+  
+  /**
+   * carry over current-states from last sessions
+   * set to initial state for current session only when state doesn't exist in current session
+   */
+  public void carryOverPreviousCurrentState()
+  {
+    List<String> sessions = _dataAccessor.getChildNames(_keyBuilder.sessions(_instanceName));
+    
+    for (String session : sessions) {
+      if (session.equals(_sessionId)) {
+        continue;
+      }
+      
+      List<CurrentState> lastCurStates = 
+          _dataAccessor.getChildValues(_keyBuilder.currentStates(_instanceName, session));
+
+      for (CurrentState lastCurState : lastCurStates) {
+        LOG.info("Carrying over old session: " + session + ", resource: "
+            + lastCurState.getId() + " to current session: " + _sessionId);
+        String stateModelDefRef = lastCurState.getStateModelDefRef();
+        if (stateModelDefRef == null)
+        {
+          LOG.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: "
+              + lastCurState);
+          continue;
+        }
+        StateModelDefinition stateModel =
+            _dataAccessor.getProperty(_keyBuilder.stateModelDef(stateModelDefRef));
+
+        String curStatePath = _keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName()).getPath();
+        _dataAccessor.getBaseDataAccessor().update(curStatePath, 
+           new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState), AccessOption.PERSISTENT);
+      }
+    }
+    
+    /**
+     * remove previous current state parent nodes
+     */
+    for (String session : sessions)
+    {
+      if (session.equals(_sessionId)) {
+        continue;
+      }
+
+      String path = _keyBuilder.currentStates(_instanceName, session).getPath();
+      LOG.info("Removing current states from previous sessions. path: " + path);
+      _zkclient.deleteRecursive(path);
+    }
+  }
+  
+  public void setupMsgHandler() {
+    _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(),
+        _stateMachineEngine);
+    _manager.addMessageListener(_messagingService.getExecutor(), _instanceName);
+    _manager.addControllerListener(_dataAccessor);
+    
+    ScheduledTaskStateModelFactory stStateModelFactory 
+              = new ScheduledTaskStateModelFactory(_messagingService.getExecutor());
+    _stateMachineEngine.registerStateModelFactory(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, 
+                                                  stStateModelFactory);
+
+  }
+  
+  /**
+   * create zk path for health check info
+   * TODO move it to cluster-setup
+   */
+  public void createHealthCheckPath() {
+    String healthCheckInfoPath =
+        _dataAccessor.keyBuilder().healthReports(_instanceName).getPath();
+    if (!_zkclient.exists(healthCheckInfoPath))
+    {
+      _zkclient.createPersistent(healthCheckInfoPath, true);
+      LOG.info("Created healthcheck info path " + healthCheckInfoPath);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/94367e1c/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
index ba9b400..22ca9ba 100644
--- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
+++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java
@@ -419,6 +419,58 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
     accessor.setChildren(readMsgKeys, readMsgs);
   }
 
+  /**
+   * remove message-handler factory from map, shutdown the associated executor
+   * 
+   * @param type
+   */
+  private void unregisterMessageHandlerFactory(String type) {
+    // shutdown executor-service. disconnect if fail
+    ExecutorService executorSvc = _executorMap.remove(type);
+    if (executorSvc != null) {
+      List<Runnable> tasksLeft = executorSvc.shutdownNow();
+      LOG.info(tasksLeft.size() + " tasks never executed for msgType: "
+          + type + ". tasks: " + tasksLeft);
+      try {
+        if (!executorSvc.awaitTermination(200, TimeUnit.MILLISECONDS)) {
+          LOG.error("executor-service for msgType: " + type 
+              + " is not fully terminated in 200ms. will disconnect helix-participant");
+          throw new HelixException("fail to unregister msg-handler for msgType: " + type);
+
+        }
+      } catch (InterruptedException e) {
+        LOG.error("interruped when waiting for executor-service shutdown for msgType: " + type, e);
+      }
+    }
+
+    // reset state-model
+    MessageHandlerFactory handlerFty = _handlerFactoryMap.remove(type);
+    if (handlerFty != null) {
+      handlerFty.reset();
+    }
+  }
+  
+  /**
+   * shutdown executor, wait for shutdown complete
+   * if shutdown fails/timeouts, disconnect HelixParticipant
+   * if shutdown completes successfully, reset all state models
+   */
+  private void reset() {
+    LOG.info("Get FINALIZE notification");
+    
+    // shutdown all executor-services
+    synchronized (_lock)
+    {
+      for (String msgType : _executorMap.keySet())
+      {
+        unregisterMessageHandlerFactory(msgType);
+      }
+      
+      // clear task-map, all tasks should be terminated by now
+      _taskMap.clear();
+    }
+  }
+  
   @Override
   public void onMessage(String instanceName,
                         List<Message> messages,
@@ -429,21 +481,7 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor
     // TODO: see if we should have a separate notification call for resetting
     if (changeContext.getType() == Type.FINALIZE)
     {
-      LOG.info("Get FINALIZE notification");
-      for (MessageHandlerFactory factory : _handlerFactoryMap.values())
-      {
-        factory.reset();
-      }
-      // Cancel all scheduled tasks
-      synchronized (_lock)
-      {
-          for (MessageTaskInfo info : _taskMap.values())
-          {
-            cancelTask(info._task);
-          }
-        _taskMap.clear();
-      }
-      return;
+      reset();
     }
 
     if (messages == null || messages.size() == 0)

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/94367e1c/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
new file mode 100644
index 0000000..319633b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestDistributedControllerManager.java
@@ -0,0 +1,102 @@
+package org.apache.helix.integration.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ControllerManager;
+import org.apache.helix.manager.zk.DistributedControllerManager;
+import org.apache.helix.manager.zk.ParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.participant.MockMSModelFactory;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestDistributedControllerManager extends ZkIntegrationTestBase {
+
+  @Test
+  public void simpleIntegrationTest() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            4, // partitions per resource
+                            n, // number of nodes
+                            2, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+ 
+    DistributedControllerManager[] distributedControllers = new DistributedControllerManager[n];
+    for (int i = 0; i < n; i++) {
+      int port = 12918 + i;
+      distributedControllers[i] = new DistributedControllerManager(ZK_ADDR, clusterName, "localhost_" + port);
+      distributedControllers[i].getStateMachineEngine().registerStateModelFactory("MasterSlave", new MockMSModelFactory());
+      distributedControllers[i].connect();
+    }
+    
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // disconnect first distributed-controller, and verify second takes leadership
+    distributedControllers[0].disconnect();
+    
+    // verify leader changes to localhost_12919
+    Thread.sleep(100);
+    result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+    
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
+    LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+    Assert.assertNotNull(leader);
+    Assert.assertEquals(leader.getId(), "localhost_12919");
+
+    // clean up
+    distributedControllers[1].disconnect();
+    Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12919")));
+    Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+
+    
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/94367e1c/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
new file mode 100644
index 0000000..efa3138
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestParticipantManager.java
@@ -0,0 +1,86 @@
+package org.apache.helix.integration.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.integration.ZkIntegrationTestBase;
+import org.apache.helix.manager.zk.ControllerManager;
+import org.apache.helix.manager.zk.ParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.participant.MockMSModelFactory;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestParticipantManager extends ZkIntegrationTestBase {
+
+  @Test
+  public void simpleIntegrationTest() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 1;
+
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+                            "localhost", // participant name prefix
+                            "TestDB", // resource name prefix
+                            1, // resources
+                            4, // partitions per resource
+                            n, // number of nodes
+                            1, // replicas
+                            "MasterSlave",
+                            true); // do rebalance
+ 
+    ParticipantManager participant = new ParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
+    participant.getStateMachineEngine().registerStateModelFactory("MasterSlave", new MockMSModelFactory());
+    participant.connect();
+    
+    ControllerManager controller = new ControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.connect();
+    
+    boolean result =
+        ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+                                                                                 clusterName));
+    Assert.assertTrue(result);
+
+    // cleanup
+    controller.disconnect();
+    participant.disconnect();
+    
+    // verify all live-instances and leader nodes are gone
+    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    Assert.assertNull(accessor.getProperty(keyBuilder.liveInstance("localhost_12918")));
+    Assert.assertNull(accessor.getProperty(keyBuilder.controllerLeader()));
+    
+    System.out.println("START " + clusterName + " at "
+        + new Date(System.currentTimeMillis()));
+  }
+}


Mime
View raw message