helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l...@apache.org
Subject [5/5] helix git commit: Improve CallbackHandler by avoiding unnessary re-subscripe to the data change event.Resubscribe to zk changes only when there is any child chanage, with async subscription to ensure not missing any new child paths.
Date Fri, 06 Apr 2018 00:04:08 GMT
Improve CallbackHandler by avoiding unnessary re-subscripe to the data change event.Resubscribe
to zk changes only when there is any child chanage, with async subscription to ensure not
missing any new child paths.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/477264e7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/477264e7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/477264e7

Branch: refs/heads/master
Commit: 477264e72165e706719071fa2c91d13f77ccbea4
Parents: 1a2937c
Author: Lei Xia <lxia@linkedin.com>
Authored: Sat Mar 24 20:55:17 2018 -0700
Committer: Lei Xia <lxia@linkedin.com>
Committed: Thu Apr 5 16:56:55 2018 -0700

----------------------------------------------------------------------
 .../helix/common/ClusterEventBlockingQueue.java |  54 +++----
 .../helix/common/ClusterEventProcessor.java     |  52 ++-----
 .../helix/common/DedupEventBlockingQueue.java   | 139 +++++++++++++++++++
 .../helix/common/DedupEventProcessor.java       |  68 +++++++++
 .../helix/manager/zk/CallbackHandler.java       | 139 +++++++++++++------
 .../helix/manager/zk/zookeeper/ZkClient.java    |   1 +
 .../helix/spectator/RoutingTableProvider.java   |   2 +-
 7 files changed, 326 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/477264e7/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java
b/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java
index 075edf9..437fea0 100644
--- a/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java
+++ b/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java
@@ -18,9 +18,6 @@ package org.apache.helix.common;
  * specific language governing permissions and limitations
  * under the License.
  */
-
-import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.helix.controller.stages.ClusterEvent;
@@ -28,33 +25,31 @@ import org.apache.helix.controller.stages.ClusterEventType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
 /**
  * A blocking queue of ClusterEvent objects to be used by the controller pipeline. This prevents
  * multiple events of the same type from flooding the controller and preventing progress
from being
  * made. This queue has no capacity. This class is meant to be a limited implementation of
the
  * {@link BlockingQueue} interface.
+ *
+ * This class is deprecated, please use {@link org.apache.helix.common.DedupEventBlockingQueue}.
  */
+@Deprecated
 public class ClusterEventBlockingQueue {
   private static final Logger LOG = LoggerFactory.getLogger(ClusterEventBlockingQueue.class);
-  private final Map<ClusterEventType, ClusterEvent> _eventMap;
-  private final Queue<ClusterEvent> _eventQueue;
+
+  private DedupEventBlockingQueue<ClusterEventType, ClusterEvent> _eventQueue;
 
   /**
    * Instantiate the queue
    */
   public ClusterEventBlockingQueue() {
-    _eventMap = Maps.newHashMap();
-    _eventQueue = Lists.newLinkedList();
+    _eventQueue = new DedupEventBlockingQueue();
   }
 
   /**
    * Remove all events from the queue
    */
-  public synchronized void clear() {
-    _eventMap.clear();
+  public void clear() {
     _eventQueue.clear();
   }
 
@@ -62,19 +57,10 @@ public class ClusterEventBlockingQueue {
    * Add a single event to the queue, overwriting events with the same name
    * @param event ClusterEvent event to add
    */
-  public synchronized void put(ClusterEvent event) {
-    if (!_eventMap.containsKey(event.getEventType())) {
-      // only insert if there isn't a same-named event already present
-      boolean result = _eventQueue.offer(event);
-      if (!result) {
-        return;
-      }
-    }
-    // always overwrite in case this is a FINALIZE
-    _eventMap.put(event.getEventType(), event);
+  public void put(ClusterEvent event) {
+    _eventQueue.put(event.getEventType(), event);
     LOG.debug("Putting event " + event.getEventType());
     LOG.debug("Event queue size: " + _eventQueue.size());
-    notify();
   }
 
   /**
@@ -83,29 +69,21 @@ public class ClusterEventBlockingQueue {
    * @return ClusterEvent at the front of the queue
    * @throws InterruptedException if the wait for elements was interrupted
    */
-  public synchronized ClusterEvent take() throws InterruptedException {
-    while (_eventQueue.isEmpty()) {
-      wait();
-    }
-    ClusterEvent queuedEvent = _eventQueue.poll();
-    if (queuedEvent != null) {
-      LOG.debug("Taking event " + queuedEvent.getEventType());
+  public ClusterEvent take() throws InterruptedException {
+    ClusterEvent event = _eventQueue.take();
+    if (event != null) {
+      LOG.debug("Taking event " + event.getEventType());
       LOG.debug("Event queue size: " + _eventQueue.size());
-      return _eventMap.remove(queuedEvent.getEventType());
     }
-    return null;
+    return event;
   }
 
   /**
    * Get at the head of the queue without removing it
    * @return ClusterEvent at the front of the queue, or null if none available
    */
-  public synchronized ClusterEvent peek() {
-    ClusterEvent queuedEvent = _eventQueue.peek();
-    if (queuedEvent != null) {
-      return _eventMap.get(queuedEvent.getEventType());
-    }
-    return queuedEvent;
+  public ClusterEvent peek() {
+    return _eventQueue.peek();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/477264e7/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java b/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
index e4ceb85..abda7af 100644
--- a/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
+++ b/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
@@ -1,62 +1,26 @@
 package org.apache.helix.common;
 
-import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.helix.controller.stages.ClusterEvent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.helix.controller.stages.ClusterEventType;
 /**
  * A generic extended single-thread class to handle ClusterEvent (multiple-producer/single
consumer
  * style).
+ *
+ * This class is deprecated, please use {@link org.apache.helix.common.DedupEventProcessor}.
  */
-public abstract class ClusterEventProcessor extends Thread {
-  private static final Logger logger = LoggerFactory.getLogger(ClusterEventProcessor.class);
-
-  protected final ClusterEventBlockingQueue _eventQueue;
-  protected final String _clusterName;
-  protected final String _processorName;
+@Deprecated
+public abstract class ClusterEventProcessor
+    extends DedupEventProcessor<ClusterEventType, ClusterEvent> {
 
   public ClusterEventProcessor(String clusterName) {
     this(clusterName, "Helix-ClusterEventProcessor");
   }
 
   public ClusterEventProcessor(String clusterName, String processorName) {
-    super(processorName + "-" + clusterName);
-    _processorName = processorName;
-    _eventQueue = new ClusterEventBlockingQueue();
-    _clusterName = clusterName;
-  }
-
-  @Override
-  public void run() {
-    logger.info("START " + _processorName + " thread for cluster " + _clusterName);
-    while (!isInterrupted()) {
-      try {
-        ClusterEvent event = _eventQueue.take();
-        handleEvent(event);
-      } catch (InterruptedException e) {
-        logger.warn(_processorName + " thread interrupted", e);
-        interrupt();
-      } catch (ZkInterruptedException e) {
-        logger.warn(_processorName + " thread caught a ZK connection interrupt", e);
-        interrupt();
-      } catch (ThreadDeath death) {
-        throw death;
-      } catch (Throwable t) {
-        logger.error(_processorName + " thread failed while running the controller pipeline",
t);
-      }
-    }
-    logger.info("END " + _processorName + " thread");
+    super(clusterName, processorName);
   }
 
-  protected abstract void handleEvent(ClusterEvent event);
-
   public void queueEvent(ClusterEvent event) {
-    _eventQueue.put(event);
-  }
-
-  public void shutdown() {
-    _eventQueue.clear();
-    this.interrupt();
+    _eventQueue.put(event.getEventType(), event);
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/477264e7/helix-core/src/main/java/org/apache/helix/common/DedupEventBlockingQueue.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/DedupEventBlockingQueue.java
b/helix-core/src/main/java/org/apache/helix/common/DedupEventBlockingQueue.java
new file mode 100644
index 0000000..0810456
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/common/DedupEventBlockingQueue.java
@@ -0,0 +1,139 @@
+package org.apache.helix.common;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * A blocking queue of events, which automatically deduplicate events with the same "type"
within
+ * the queue, i.e, when putting an event into the queue, if there is already an event with
the
+ * same type existing in the queue, the new event won't be inserted into the queue.
+ * This class is meant to be a limited implementation of the {@link BlockingQueue} interface.
+ *
+ * T -- the Type of an event.
+ * E -- the event itself.
+ */
+public class DedupEventBlockingQueue<T, E> {
+  private final Map<T, Entry<T, E>> _eventMap;
+  private final Queue<Entry> _eventQueue;
+
+  class Entry <T, E> {
+    private T _type;
+    private E _event;
+
+    Entry (T type, E event) {
+      _type = type;
+      _event = event;
+    }
+
+    T getType() {
+      return _type;
+    }
+
+    E getEvent() {
+      return _event;
+    }
+  }
+
+  /**
+   * Instantiate the queue
+   */
+  public DedupEventBlockingQueue() {
+    _eventMap = Maps.newHashMap();
+    _eventQueue = Lists.newLinkedList();
+  }
+
+  /**
+   * Remove all events from the queue
+   */
+  public synchronized void clear() {
+    _eventMap.clear();
+    _eventQueue.clear();
+  }
+
+  /**
+   * Add a single event to the queue, overwriting events with the same name
+   */
+  public synchronized void put(T type, E event) {
+    Entry entry = new Entry(type, event);
+
+    if (!_eventMap.containsKey(entry.getType())) {
+      // only insert to the queue if there isn't a same-typed event already present
+      boolean result = _eventQueue.offer(entry);
+      if (!result) {
+        return;
+      }
+    }
+    // always overwrite the existing entry in the map in case the entry is different
+    _eventMap.put((T) entry.getType(), entry);
+    notify();
+  }
+
+  /**
+   * Remove an element from the front of the queue, blocking if none is available. This method
+   * will return the most recent event seen with the oldest enqueued event name.
+   * @return ClusterEvent at the front of the queue
+   * @throws InterruptedException if the wait for elements was interrupted
+   */
+  public synchronized E take() throws InterruptedException {
+    while (_eventQueue.isEmpty()) {
+      wait();
+    }
+    Entry entry = _eventQueue.poll();
+    if (entry != null) {
+      entry = _eventMap.remove(entry.getType());
+      return (E) entry.getEvent();
+    }
+    return null;
+  }
+
+  /**
+   * Get at the head of the queue without removing it
+   * @return ClusterEvent at the front of the queue, or null if none available
+   */
+  public synchronized E peek() {
+    Entry entry = _eventQueue.peek();
+    if (entry != null) {
+      entry = _eventMap.get(entry.getType());
+      return (E) entry.getEvent();
+    }
+    return null;
+  }
+
+  /**
+   * Get the queue size
+   * @return integer size of the queue
+   */
+  public int size() {
+    return _eventQueue.size();
+  }
+
+  /**
+   * Check if the queue is empty
+   * @return true if events are not present, false otherwise
+   */
+  public boolean isEmpty() {
+    return _eventQueue.isEmpty();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/477264e7/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
new file mode 100644
index 0000000..b656364
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/common/DedupEventProcessor.java
@@ -0,0 +1,68 @@
+package org.apache.helix.common;
+
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A generic extended single-thread class to handle event with events with the same type
de-duplicated (multiple-producer/single consumer
+ * style).
+ *
+ * T -- Type of the event.
+ * E -- The event itself.
+ */
+public abstract class DedupEventProcessor<T, E> extends Thread {
+  private static final Logger logger = LoggerFactory.getLogger(DedupEventProcessor.class);
+
+  protected final DedupEventBlockingQueue<T, E> _eventQueue;
+  protected final String _clusterName;
+  protected final String _processorName;
+
+  public DedupEventProcessor(String processorName) {
+    this(new String(), processorName);
+  }
+
+  public DedupEventProcessor(String clusterName, String processorName) {
+    super(processorName + "-" + clusterName);
+    _processorName = processorName;
+    _eventQueue = new DedupEventBlockingQueue<>();
+    _clusterName = clusterName;
+  }
+
+  public DedupEventProcessor() {
+    this(new String(), "Default-DedupEventProcessor");
+  }
+
+  @Override
+  public void run() {
+    logger.info("START " + _processorName + " thread for cluster " + _clusterName);
+    while (!isInterrupted()) {
+      try {
+        E event = _eventQueue.take();
+        handleEvent(event);
+      } catch (InterruptedException e) {
+        logger.warn(_processorName + " thread interrupted", e);
+        interrupt();
+      } catch (ZkInterruptedException e) {
+        logger.warn(_processorName + " thread caught a ZK connection interrupt", e);
+        interrupt();
+      } catch (ThreadDeath death) {
+        throw death;
+      } catch (Throwable t) {
+        logger.error(_processorName + " thread failed while running the controller pipeline",
t);
+      }
+    }
+    logger.info("END " + _processorName + " thread");
+  }
+
+  protected abstract void handleEvent(E event);
+
+  public void queueEvent(T eventType, E event) {
+    _eventQueue.put(eventType, event);
+  }
+
+  public void shutdown() {
+    _eventQueue.clear();
+    this.interrupt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/477264e7/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index 958f989..c3b8a4c 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -57,6 +57,7 @@ import org.apache.helix.NotificationContext.Type;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyPathConfig;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.common.DedupEventProcessor;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
@@ -100,6 +101,43 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
   private boolean _preFetchEnabled = true;
   private HelixCallbackMonitor _monitor;
   private Thread _batchProcessThread;  // TODO: change this to use DedupEventProcessor -
Lei.
+  private boolean _watchChild = true;  // Whether we should subscribe to the child znode's
data change.
+  private static DedupEventProcessor SubscribeChangeEventProcessor;
+
+  static {
+    SubscribeChangeEventProcessor =
+        new DedupEventProcessor<CallbackHandler, SubscribeChangeEvent>("",
+            "CallbackHanlder-AsycSubscribe") {
+          @Override protected void handleEvent(SubscribeChangeEvent event) {
+            logger.info("Resubscribe change to " + event.path + " for listener " + event.listener);
+            try {
+              event.handler.subscribeForChanges(event.callbackType, event.path, event.watchChild);
+            } catch (Exception e) {
+              logger.error("Failed to resubscribe change to " + event.path + " for listener
"
+                  + event.listener, e);
+            }
+          }
+        };
+
+    SubscribeChangeEventProcessor.start();
+  }
+
+  class SubscribeChangeEvent {
+    final CallbackHandler handler;
+    final String path;
+    final NotificationContext.Type callbackType;
+    final Object listener;
+    final boolean watchChild;
+
+    SubscribeChangeEvent(CallbackHandler handler, NotificationContext.Type callbackType,
+        String path, boolean watchChild, Object listener) {
+      this.handler = handler;
+      this.path = path;
+      this.callbackType = callbackType;
+      this.listener = listener;
+      this.watchChild = watchChild;
+    }
+  }
 
   /**
    * maintain the expected notification types
@@ -125,18 +163,23 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
               .name());
     }
 
-    this._manager = manager;
-    this._accessor = manager.getHelixDataAccessor();
-    this._zkClient = client;
-    this._propertyKey = propertyKey;
-    this._path = propertyKey.getPath();
-    this._listener = listener;
-    this._eventTypes = new HashSet<>(Arrays.asList(eventTypes));
-    this._changeType = changeType;
-    this._lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
-    this._queue = new LinkedBlockingQueue<>(1000);
-
-    this._monitor = monitor;
+    _manager = manager;
+    _accessor = manager.getHelixDataAccessor();
+    _zkClient = client;
+    _propertyKey = propertyKey;
+    _path = propertyKey.getPath();
+    _listener = listener;
+    _eventTypes = new HashSet<>(Arrays.asList(eventTypes));
+    _changeType = changeType;
+    _lastNotificationTimeStamp = new AtomicLong(System.nanoTime());
+    _queue = new LinkedBlockingQueue<>(1000);
+    _monitor = monitor;
+
+    if (_changeType == MESSAGE || _changeType == MESSAGES_CONTROLLER || _changeType == CONTROLLER)
{
+      _watchChild = false;
+    } else {
+      _watchChild = true;
+    }
 
     parseListenerProperties();
 
@@ -314,16 +357,23 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
             "Callback handler received event in wrong order. Listener: " + _listener + ",
path: "
                 + _path + ", expected types: " + _expectTypes + " but was " + type);
         return;
+
+      }
+      _expectTypes = nextNotificationType.get(type);
+
+      if (type == Type.INIT || type == Type.FINALIZE) {
+        subscribeForChanges(changeContext.getType(), _path, _watchChild);
+      } else {
+        // put SubscribeForChange run in async thread to reduce the latency of zk callback
handling.
+        subscribeForChangesAsyn(changeContext.getType(), _path, _watchChild);
       }
       _expectTypes = nextNotificationType.get(type);
 
       if (_changeType == IDEAL_STATE) {
         IdealStateChangeListener idealStateChangeListener = (IdealStateChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true);
         List<IdealState> idealStates = preFetch(_propertyKey);
         idealStateChangeListener.onIdealStateChange(idealStates, changeContext);
-      } else if (_changeType == ChangeType.INSTANCE_CONFIG) {
-        subscribeForChanges(changeContext, _path, true);
+      } else if (_changeType == INSTANCE_CONFIG) {
         if (_listener instanceof ConfigChangeListener) {
           ConfigChangeListener configChangeListener = (ConfigChangeListener) _listener;
           List<InstanceConfig> configs = preFetch(_propertyKey);
@@ -333,14 +383,12 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
           List<InstanceConfig> configs = preFetch(_propertyKey);
           listener.onInstanceConfigChange(configs, changeContext);
         }
-      } else if (_changeType == ChangeType.RESOURCE_CONFIG) {
-        subscribeForChanges(changeContext, _path, true);
+      } else if (_changeType == RESOURCE_CONFIG) {
         ResourceConfigChangeListener listener = (ResourceConfigChangeListener) _listener;
         List<ResourceConfig> configs = preFetch(_propertyKey);
         listener.onResourceConfigChange(configs, changeContext);
 
-      } else if (_changeType == ChangeType.CLUSTER_CONFIG) {
-        subscribeForChanges(changeContext, _path, true);
+      } else if (_changeType == CLUSTER_CONFIG) {
         ClusterConfigChangeListener listener = (ClusterConfigChangeListener) _listener;
         ClusterConfig config = null;
         if (_preFetchEnabled) {
@@ -349,46 +397,39 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
         listener.onClusterConfigChange(config, changeContext);
 
       } else if (_changeType == CONFIG) {
-        subscribeForChanges(changeContext, _path, true);
         ScopedConfigChangeListener listener = (ScopedConfigChangeListener) _listener;
         List<HelixProperty> configs = preFetch(_propertyKey);
         listener.onConfigChange(configs, changeContext);
 
       } else if (_changeType == LIVE_INSTANCE) {
         LiveInstanceChangeListener liveInstanceChangeListener = (LiveInstanceChangeListener)
_listener;
-        subscribeForChanges(changeContext, _path, true);
         List<LiveInstance> liveInstances = preFetch(_propertyKey);
         liveInstanceChangeListener.onLiveInstanceChange(liveInstances, changeContext);
 
       } else if (_changeType == CURRENT_STATE) {
         CurrentStateChangeListener currentStateChangeListener = (CurrentStateChangeListener)
_listener;
-        subscribeForChanges(changeContext, _path, true);
         String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
         List<CurrentState> currentStates = preFetch(_propertyKey);
         currentStateChangeListener.onStateChange(instanceName, currentStates, changeContext);
 
       } else if (_changeType == MESSAGE) {
         MessageListener messageListener = (MessageListener) _listener;
-        subscribeForChanges(changeContext, _path, false);
         String instanceName = PropertyPathConfig.getInstanceNameFromPath(_path);
         List<Message> messages = preFetch(_propertyKey);
         messageListener.onMessage(instanceName, messages, changeContext);
 
       } else if (_changeType == MESSAGES_CONTROLLER) {
         MessageListener messageListener = (MessageListener) _listener;
-        subscribeForChanges(changeContext, _path, false);
         List<Message> messages = preFetch(_propertyKey);
         messageListener.onMessage(_manager.getInstanceName(), messages, changeContext);
 
       } else if (_changeType == EXTERNAL_VIEW || _changeType == TARGET_EXTERNAL_VIEW) {
         ExternalViewChangeListener externalViewListener = (ExternalViewChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, true);
         List<ExternalView> externalViewList = preFetch(_propertyKey);
         externalViewListener.onExternalViewChange(externalViewList, changeContext);
 
-      } else if (_changeType == ChangeType.CONTROLLER) {
+      } else if (_changeType == CONTROLLER) {
         ControllerChangeListener controllerChangelistener = (ControllerChangeListener) _listener;
-        subscribeForChanges(changeContext, _path, false);
         controllerChangelistener.onControllerChange(changeContext);
       } else {
         logger.warn("Unknown change type: " + _changeType);
@@ -414,13 +455,13 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     }
   }
 
-  private void subscribeChildChange(String path, NotificationContext context) {
-    NotificationContext.Type type = context.getType();
-    if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK)
{
+  private void subscribeChildChange(String path, NotificationContext.Type callbackType) {
+    if (callbackType == NotificationContext.Type.INIT
+        || callbackType == NotificationContext.Type.CALLBACK) {
       logger.info(
           _manager.getInstanceName() + " subscribes child-change. path: " + path + ", listener:
" + _listener);
       _zkClient.subscribeChildChanges(path, this);
-    } else if (type == NotificationContext.Type.FINALIZE) {
+    } else if (callbackType == NotificationContext.Type.FINALIZE) {
       logger.info(
           _manager.getInstanceName() + " unsubscribe child-change. path: " + path + ", listener:
" + _listener);
 
@@ -428,14 +469,14 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     }
   }
 
-  private void subscribeDataChange(String path, NotificationContext context) {
-    NotificationContext.Type type = context.getType();
-    if (type == NotificationContext.Type.INIT || type == NotificationContext.Type.CALLBACK)
{
+  private void subscribeDataChange(String path, NotificationContext.Type callbackType) {
+    if (callbackType == NotificationContext.Type.INIT
+        || callbackType == NotificationContext.Type.CALLBACK) {
       logger.info(
           _manager.getInstanceName() + " subscribe data-change. path: " + path + ", listener:
"
               + _listener);
       _zkClient.subscribeDataChanges(path, this);
-    } else if (type == NotificationContext.Type.FINALIZE) {
+    } else if (callbackType == NotificationContext.Type.FINALIZE) {
       logger.info(
           _manager.getInstanceName() + " unsubscribe data-change. path: " + path + ", listener:
"
               + _listener);
@@ -444,24 +485,29 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
     }
   }
 
-  // TODO watchParent is always true. consider remove it
-  private void subscribeForChanges(NotificationContext context, String path, boolean watchChild)
{
-    long start = System.currentTimeMillis();
+  /** Subscribe Changes in asynchronously */
+  private void subscribeForChangesAsyn(NotificationContext.Type callbackType, String path,
boolean watchChild) {
+    SubscribeChangeEvent subscribeEvent =
+        new SubscribeChangeEvent(this, callbackType, path, watchChild, _listener);
+    SubscribeChangeEventProcessor.queueEvent(subscribeEvent.handler, subscribeEvent);
+  }
 
+  private void subscribeForChanges(NotificationContext.Type callbackType, String path,
+      boolean watchChild) {
+    long start = System.currentTimeMillis();
     if (_eventTypes.contains(EventType.NodeDataChanged) || _eventTypes
         .contains(EventType.NodeCreated) || _eventTypes.contains(EventType.NodeDeleted))
{
       if (logger.isDebugEnabled()) {
         logger.debug("Subscribing data change listener to path:" + path);
       }
-      subscribeDataChange(path, context);
+      subscribeDataChange(path, callbackType);
     }
 
     if (_eventTypes.contains(EventType.NodeChildrenChanged)) {
       if (logger.isDebugEnabled()) {
         logger.debug("Subscribing child change listener to path:" + path);
       }
-
-      subscribeChildChange(path, context);
+      subscribeChildChange(path, callbackType);
       if (watchChild) {
         if (logger.isDebugEnabled()) {
           logger.debug("Subscribing data change listener to all children for path:" + path);
@@ -484,19 +530,19 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
               if (bucketSize > 0) {
                 // subscribe both data-change and child-change on bucketized parent node
                 // data-change gives a delete-callback which is used to remove watch
-                subscribeChildChange(childPath, context);
-                subscribeDataChange(childPath, context);
+                subscribeChildChange(childPath, callbackType);
+                subscribeDataChange(childPath, callbackType);
 
                 // subscribe data-change on bucketized child
                 List<String> bucketizedChildNames = _zkClient.getChildren(childPath);
                 if (bucketizedChildNames != null) {
                   for (String bucketizedChildName : bucketizedChildNames) {
                     String bucketizedChildPath = childPath + "/" + bucketizedChildName;
-                    subscribeDataChange(bucketizedChildPath, context);
+                    subscribeDataChange(bucketizedChildPath, callbackType);
                   }
                 }
               } else {
-                subscribeDataChange(childPath, context);
+                subscribeDataChange(childPath, callbackType);
               }
             }
             break;
@@ -506,7 +552,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
             if (childNames != null) {
               for (String childName : childNames) {
                 String childPath = path + "/" + childName;
-                subscribeDataChange(childPath, context);
+                subscribeDataChange(childPath, callbackType);
               }
             }
             break;
@@ -617,6 +663,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener
{
           changeContext.setType(NotificationContext.Type.CALLBACK);
           changeContext.setPathChanged(parentPath);
           changeContext.setChangeType(_changeType);
+          subscribeForChanges(changeContext.getType(), _path, _watchChild);
           enqueueTask(changeContext);
         }
       }

http://git-wip-us.apache.org/repos/asf/helix/blob/477264e7/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index d6a65ee..af242da 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -876,6 +876,7 @@ public class ZkClient implements Watcher {
     try {
       delete(path);
     } catch (Exception e) {
+      LOG.error("Failed to delete " + path, e);
       throw new HelixException("Failed to delete " + path, e);
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/477264e7/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 8907922..4076697 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -505,7 +505,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener,
Instanc
     private final RoutingDataCache _dataCache;
 
     public RouterUpdater(String clusterName, PropertyType sourceDataType) {
-      super("Helix-RouterUpdater-event_process");
+      super("Helix-RouterUpdater");
       _dataCache = new RoutingDataCache(clusterName, sourceDataType);
     }
 


Mime
View raw message