helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zzh...@apache.org
Subject git commit: [HELIX-109] Review Helix model package, more accessor changes
Date Sun, 01 Sep 2013 01:09:57 GMT
Updated Branches:
  refs/heads/helix-logical-model c3c13a62e -> 75b534ddb


[HELIX-109] Review Helix model package, more accessor changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/75b534dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/75b534dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/75b534dd

Branch: refs/heads/helix-logical-model
Commit: 75b534ddb562d45e416859f9f864efab2530f2d3
Parents: c3c13a6
Author: zzhang <zzhang@apache.org>
Authored: Sat Aug 31 18:09:34 2013 -0700
Committer: zzhang <zzhang@apache.org>
Committed: Sat Aug 31 18:09:34 2013 -0700

----------------------------------------------------------------------
 .../controller/GenericHelixController.java      |  14 +-
 .../stages/CompatibilityCheckStage.java         |   2 +-
 .../helix/manager/zk/ZkBaseDataAccessor.java    | 809 ++++++++++---------
 .../manager/zk/ZkCacheBaseDataAccessor.java     |  55 +-
 .../java/org/apache/helix/ZkUnitTestBase.java   |   4 +-
 .../stages/TestRebalancePipeline.java           |  13 +-
 6 files changed, 481 insertions(+), 416 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/75b534dd/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 7d69152..314733f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -179,6 +179,7 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
 
       // rebalance pipeline
       Pipeline rebalancePipeline = new Pipeline();
+      rebalancePipeline.addStage(new CompatibilityCheckStage());
       rebalancePipeline.addStage(new ResourceComputationStage());
       rebalancePipeline.addStage(new CurrentStateComputationStage());
       rebalancePipeline.addStage(new RebalanceIdealStateStage());
@@ -192,14 +193,10 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       Pipeline externalViewPipeline = new Pipeline();
       externalViewPipeline.addStage(new ExternalViewComputeStage());
 
-      // backward compatibility check
-      Pipeline liveInstancePipeline = new Pipeline();
-      liveInstancePipeline.addStage(new CompatibilityCheckStage());
-
       registry.register("idealStateChange", dataRefresh, rebalancePipeline);
       registry.register("currentStateChange", dataRefresh, rebalancePipeline, externalViewPipeline);
       registry.register("configChange", dataRefresh, rebalancePipeline);
-      registry.register("liveInstanceChange", dataRefresh, liveInstancePipeline, rebalancePipeline,
+      registry.register("liveInstanceChange", dataRefresh, rebalancePipeline,
           externalViewPipeline);
 
       registry.register("messageChange", dataRefresh, rebalancePipeline);
@@ -208,13 +205,6 @@ public class GenericHelixController implements ConfigChangeListener, IdealStateC
       registry
           .register("periodicalRebalance", dataRefresh, rebalancePipeline, externalViewPipeline);
 
-      // health stats pipeline
-      // Pipeline healthStatsAggregationPipeline = new Pipeline();
-      // StatsAggregationStage statsStage = new StatsAggregationStage();
-      // healthStatsAggregationPipeline.addStage(new ReadHealthDataStage());
-      // healthStatsAggregationPipeline.addStage(statsStage);
-      // registry.register("healthChange", healthStatsAggregationPipeline);
-
       return registry;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/75b534dd/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
index c4fb496..50adf97 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
@@ -54,7 +54,7 @@ public class CompatibilityCheckStage extends AbstractBaseStage {
             "incompatible participant. pipeline will not continue. " + "controller: "
                 + manager.getInstanceName() + ", controllerVersion: " + properties.getVersion()
                 + ", minimumSupportedParticipantVersion: "
-                + properties.getProperty("miminum_supported_version.participant")
+                + properties.getProperty("minimum_supported_version.participant")
                 + ", participant: " + liveInstance.getInstanceName() + ", participantVersion: "
                 + participantVersion;
         LOG.error(errorMsg);

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/75b534dd/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 2ba76a3..0f690db 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -49,31 +49,36 @@ import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree;
 
 public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
+  /**
+   * return code for zk operations
+   */
   enum RetCode {
     OK,
     NODE_EXISTS,
+    NONODE,
     ERROR
   }
 
   /**
-   * struct holding return information
+   * structure holding return information
    */
   public class AccessResult {
-    RetCode _retCode;
-    List<String> _pathCreated;
+    final RetCode _retCode;
+    final List<String> _pathCreated;
+
+    final Stat _stat;
 
-    Stat _stat;
+    final T _resultValue;
 
-    /**
-     * used by update only
-     */
-    T _updatedValue;
+    public AccessResult(RetCode retCode) {
+      this(retCode, null, null, null);
+    }
 
-    public AccessResult() {
-      _retCode = RetCode.ERROR;
-      _pathCreated = new ArrayList<String>();
-      _stat = new Stat();
-      _updatedValue = null;
+    public AccessResult(RetCode retCode, List<String> pathCreated, Stat stat, T resultValue) {
+      _retCode = retCode;
+      _pathCreated = pathCreated;
+      _stat = stat;
+      _resultValue = resultValue;
     }
   }
 
@@ -86,7 +91,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   }
 
   /**
-   * sync create
+   * sync create a znode
    */
   @Override
   public boolean create(String path, T record, int options) {
@@ -95,32 +100,40 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   }
 
   /**
-   * sync create
+   * sync create a znode. create parent znodes if necessary
+   * @param path path to create
+   * @param record value to create, null for no value
+   * @param options
+   * @return
    */
   public AccessResult doCreate(String path, T record, int options) {
-    AccessResult result = new AccessResult();
+    if (path == null) {
+      throw new NullPointerException("path can't be null");
+    }
+
     CreateMode mode = AccessOption.getMode(options);
     if (mode == null) {
-      LOG.error("Invalid create mode. options: " + options);
-      result._retCode = RetCode.ERROR;
-      return result;
+      throw new IllegalArgumentException("Invalid create options: " + options);
     }
 
     boolean retry;
+    List<String> pathCreated = null;
     do {
       retry = false;
       try {
         _zkClient.create(path, record, mode);
-        result._pathCreated.add(path);
+        if (pathCreated == null) {
+          pathCreated = new ArrayList<String>();
+        }
+        pathCreated.add(path);
 
-        result._retCode = RetCode.OK;
-        return result;
+        return new AccessResult(RetCode.OK, pathCreated, null, null);
       } catch (ZkNoNodeException e) {
         // this will happen if parent node does not exist
         String parentPath = HelixUtil.getZkParentPath(path);
         try {
           AccessResult res = doCreate(parentPath, null, AccessOption.PERSISTENT);
-          result._pathCreated.addAll(res._pathCreated);
+          pathCreated = res._pathCreated;
           RetCode rc = res._retCode;
           if (rc == RetCode.OK || rc == RetCode.NODE_EXISTS) {
             // if parent node created/exists, retry
@@ -128,26 +141,22 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
           }
         } catch (Exception e1) {
           LOG.error("Exception while creating path: " + parentPath, e1);
-          result._retCode = RetCode.ERROR;
-          return result;
+          return new AccessResult(RetCode.ERROR, pathCreated, null, null);
         }
       } catch (ZkNodeExistsException e) {
         LOG.warn("Node already exists. path: " + path);
-        result._retCode = RetCode.NODE_EXISTS;
-        return result;
+        return new AccessResult(RetCode.NODE_EXISTS);
       } catch (Exception e) {
         LOG.error("Exception while creating path: " + path, e);
-        result._retCode = RetCode.ERROR;
-        return result;
+        return new AccessResult(RetCode.ERROR);
       }
     } while (retry);
 
-    result._retCode = RetCode.OK;
-    return result;
+    return new AccessResult(RetCode.OK, pathCreated, null, null);
   }
 
   /**
-   * sync set
+   * sync set a znode
    */
   @Override
   public boolean set(String path, T record, int options) {
@@ -155,7 +164,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   }
 
   /**
-   * sync set
+   * sync set a znode with expect version
    */
   @Override
   public boolean set(String path, T record, int expectVersion, int options) {
@@ -168,35 +177,39 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   }
 
   /**
-   * sync set
+   * sync set a znode, create parent paths if necessary
+   * @param path
+   * @param record
+   * @param expectVersion
+   * @param options
    */
   public AccessResult doSet(String path, T record, int expectVersion, int options) {
-    AccessResult result = new AccessResult();
+    if (path == null) {
+      throw new NullPointerException("path can't be null");
+    }
 
     CreateMode mode = AccessOption.getMode(options);
     if (mode == null) {
-      LOG.error("Invalid set mode. options: " + options);
-      result._retCode = RetCode.ERROR;
-      return result;
+      throw new IllegalArgumentException("Invalid set options: " + options);
     }
 
+    Stat stat = null;
+    List<String> pathCreated = null;
     boolean retry;
     do {
       retry = false;
       try {
-        Stat stat = _zkClient.writeDataGetStat(path, record, expectVersion);
-        DataTree.copyStat(stat, result._stat);
+        stat = _zkClient.writeDataGetStat(path, record, expectVersion);
       } catch (ZkNoNodeException e) {
         // node not exists, try create if expectedVersion == -1; in this case, stat will not be set
         if (expectVersion != -1) {
           LOG.error("Could not create node if expectVersion != -1, was " + expectVersion);
-          result._retCode = RetCode.ERROR;
-          return result;
+          return new AccessResult(RetCode.ERROR);
         }
         try {
           // may create recursively
           AccessResult res = doCreate(path, record, options);
-          result._pathCreated.addAll(res._pathCreated);
+          pathCreated = res._pathCreated;
           RetCode rc = res._retCode;
           switch (rc) {
           case OK:
@@ -207,29 +220,25 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
             break;
           default:
             LOG.error("Fail to set path by creating: " + path);
-            result._retCode = RetCode.ERROR;
-            return result;
+            return new AccessResult(RetCode.ERROR, pathCreated, null, null);
           }
         } catch (Exception e1) {
           LOG.error("Exception while setting path by creating: " + path, e);
-          result._retCode = RetCode.ERROR;
-          return result;
+          return new AccessResult(RetCode.ERROR, pathCreated, null, null);
         }
       } catch (ZkBadVersionException e) {
         throw e;
       } catch (Exception e) {
         LOG.error("Exception while setting path: " + path, e);
-        result._retCode = RetCode.ERROR;
-        return result;
+        return new AccessResult(RetCode.ERROR, pathCreated, null, null);
       }
     } while (retry);
 
-    result._retCode = RetCode.OK;
-    return result;
+    return new AccessResult(RetCode.OK, pathCreated, stat, null);
   }
 
   /**
-   * sync update
+   * sync update a znode
    */
   @Override
   public boolean update(String path, DataUpdater<T> updater, int options) {
@@ -238,27 +247,32 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   }
 
   /**
-   * sync update
+   * sync update a znode, create parent paths if necessary
+   * @param path
+   * @param updater
+   * @param options
    */
   public AccessResult doUpdate(String path, DataUpdater<T> updater, int options) {
-    AccessResult result = new AccessResult();
+    if (path == null || updater == null) {
+      throw new NullPointerException("path|updater can't be null");
+    }
+
     CreateMode mode = AccessOption.getMode(options);
     if (mode == null) {
-      LOG.error("Invalid update mode. options: " + options);
-      result._retCode = RetCode.ERROR;
-      return result;
+      throw new IllegalArgumentException("Invalid update options: " + options);
     }
 
     boolean retry;
+    Stat setStat = null;
     T updatedData = null;
+    List<String> pathCreated = null;
     do {
       retry = false;
       try {
         Stat readStat = new Stat();
         T oldData = (T) _zkClient.readData(path, readStat);
         T newData = updater.update(oldData);
-        Stat setStat = _zkClient.writeDataGetStat(path, newData, readStat.getVersion());
-        DataTree.copyStat(setStat, result._stat);
+        setStat = _zkClient.writeDataGetStat(path, newData, readStat.getVersion());
 
         updatedData = newData;
       } catch (ZkBadVersionException e) {
@@ -268,7 +282,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
         try {
           T newData = updater.update(null);
           AccessResult res = doCreate(path, newData, options);
-          result._pathCreated.addAll(res._pathCreated);
+          pathCreated = res._pathCreated;
           RetCode rc = res._retCode;
           switch (rc) {
           case OK:
@@ -279,31 +293,30 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
             break;
           default:
             LOG.error("Fail to update path by creating: " + path);
-            result._retCode = RetCode.ERROR;
-            return result;
+            return new AccessResult(RetCode.ERROR, pathCreated, null, null);
           }
         } catch (Exception e1) {
           LOG.error("Exception while updating path by creating: " + path, e1);
-          result._retCode = RetCode.ERROR;
-          return result;
+          return new AccessResult(RetCode.ERROR, pathCreated, null, null);
         }
       } catch (Exception e) {
         LOG.error("Exception while updating path: " + path, e);
-        result._retCode = RetCode.ERROR;
-        return result;
+        return new AccessResult(RetCode.ERROR, pathCreated, null, null);
       }
     } while (retry);
 
-    result._retCode = RetCode.OK;
-    result._updatedValue = updatedData;
-    return result;
+    return new AccessResult(RetCode.OK, pathCreated, setStat, updatedData);
   }
 
   /**
-   * sync get
+   * sync get a znode
    */
   @Override
   public T get(String path, Stat stat, int options) {
+    if (path == null) {
+      throw new NullPointerException("path can't be null");
+    }
+
     T data = null;
     try {
       data = (T) _zkClient.readData(path, stat);
@@ -316,38 +329,77 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   }
 
   /**
-   * async get
+   * async get a list of znodes
    */
   @Override
   public List<T> get(List<String> paths, List<Stat> stats, int options) {
+    if (paths == null) {
+      throw new NullPointerException("paths can't be null");
+    }
+
+    if (stats != null && stats.size() > 0) {
+      throw new IllegalArgumentException(
+          "stats list is an output parameter and should be empty, but was: " + stats);
+    }
+
     boolean[] needRead = new boolean[paths.size()];
     Arrays.fill(needRead, true);
 
-    return get(paths, stats, needRead);
+    List<AccessResult> accessResults = doGet(paths, needRead);
+    List<T> values = new ArrayList<T>();
+
+    for (AccessResult accessResult : accessResults) {
+      values.add(accessResult._resultValue);
+      if (stats != null) {
+        stats.add(accessResult._stat);
+      }
+    }
+
+    return values;
   }
 
   /**
-   * async get
+   * async get a list of znodes
    */
-  List<T> get(List<String> paths, List<Stat> stats, boolean[] needRead) {
-    if (paths == null || paths.size() == 0) {
-      return Collections.emptyList();
+  List<AccessResult> doGet(List<String> paths, boolean[] needRead) {
+    if (paths == null || needRead == null) {
+      throw new NullPointerException("paths|needRead can't be null");
     }
 
-    // init stats
-    if (stats != null) {
-      stats.clear();
-      stats.addAll(Collections.<Stat> nCopies(paths.size(), null));
+    final int size = paths.size();
+    if (size != needRead.length) {
+      throw new IllegalArgumentException(
+          "paths and needRead should of equal size, but paths size: " + size + ", needRead size: "
+              + needRead.length);
+    }
+
+    for (int i = 0; i < size; i++) {
+      if (!needRead[i]) {
+        continue;
+      }
+
+      if (paths.get(i) == null) {
+        throw new NullPointerException("path[" + i + "] can't be null, but was: " + paths);
+      }
     }
 
+    if (size == 0) {
+      return Collections.emptyList();
+    }
+
+    // init all results to null
+    List<AccessResult> results =
+        new ArrayList<AccessResult>(Collections.<AccessResult> nCopies(size, null));
+
     long startT = System.nanoTime();
 
     try {
       // issue asyn get requests
-      GetDataCallbackHandler[] cbList = new GetDataCallbackHandler[paths.size()];
-      for (int i = 0; i < paths.size(); i++) {
-        if (!needRead[i])
+      GetDataCallbackHandler[] cbList = new GetDataCallbackHandler[size];
+      for (int i = 0; i < size; i++) {
+        if (!needRead[i]) {
           continue;
+        }
 
         String path = paths.get(i);
         cbList[i] = new GetDataCallbackHandler();
@@ -355,38 +407,46 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       }
 
       // wait for completion
-      for (int i = 0; i < cbList.length; i++) {
-        if (!needRead[i])
+      for (int i = 0; i < size; i++) {
+        if (!needRead[i]) {
           continue;
+        }
 
         GetDataCallbackHandler cb = cbList[i];
         cb.waitForSuccess();
       }
 
       // construct return results
-      List<T> records = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
-
       for (int i = 0; i < paths.size(); i++) {
-        if (!needRead[i])
+        if (!needRead[i]) {
           continue;
+        }
 
         GetDataCallbackHandler cb = cbList[i];
-        if (Code.get(cb.getRc()) == Code.OK) {
+        switch (Code.get(cb.getRc())) {
+        case OK: {
           @SuppressWarnings("unchecked")
-          T record = (T) _zkClient.deserialize(cb._data, paths.get(i));
-          records.set(i, record);
-          if (stats != null) {
-            stats.set(i, cb._stat);
-          }
+          T value = (T) _zkClient.deserialize(cb._data, paths.get(i));
+          results.set(i, new AccessResult(RetCode.OK, null, cb._stat, value));
+          break;
+        }
+        case NONODE: {
+          results.set(i, new AccessResult(RetCode.NONODE));
+          break;
+        }
+        default: {
+          results.set(i, new AccessResult(RetCode.ERROR));
+          break;
+        }
         }
       }
 
-      return records;
+      return results;
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("getData_async, size: " + paths.size() + ", paths: " + paths.get(0)
-            + ",... time: " + (endT - startT) + " ns");
+        LOG.trace("getData_async, size: " + size + ", paths: " + paths + ", time: "
+            + (endT - startT) + " ns");
       }
     }
   }
@@ -396,6 +456,11 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
    */
   @Override
   public List<T> getChildren(String parentPath, List<Stat> stats, int options) {
+    if (stats != null && stats.size() > 0) {
+      throw new IllegalArgumentException(
+          "stats list is an output parameter and should be empty, but was: " + stats);
+    }
+
     try {
       // prepare child paths
       List<String> childNames = getChildNames(parentPath, options);
@@ -409,25 +474,21 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
         paths.add(path);
       }
 
-      // remove null record
-      List<Stat> curStats = new ArrayList<Stat>(paths.size());
-      List<T> records = get(paths, curStats, options);
-      Iterator<T> recordIter = records.iterator();
-      Iterator<Stat> statIter = curStats.iterator();
-      while (statIter.hasNext()) {
-        recordIter.next();
-        if (statIter.next() == null) {
-          statIter.remove();
-          recordIter.remove();
-        }
-      }
+      boolean[] needRead = new boolean[paths.size()];
+      Arrays.fill(needRead, true);
 
-      if (stats != null) {
-        stats.clear();
-        stats.addAll(curStats);
+      List<AccessResult> results = doGet(paths, needRead);
+      List<T> values = new ArrayList<T>();
+      for (AccessResult result : results) {
+        if (result._retCode == RetCode.OK) {
+          values.add(result._resultValue);
+          if (stats != null) {
+            stats.add(result._stat);
+          }
+        }
       }
 
-      return records;
+      return values;
     } catch (ZkNoNodeException e) {
       return Collections.emptyList();
     }
@@ -480,166 +541,218 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   /**
    * async create. give up on error other than NONODE
    */
-  CreateCallbackHandler[] create(List<String> paths, List<T> records, boolean[] needCreate,
-      List<List<String>> pathsCreated, int options) {
-    if ((records != null && records.size() != paths.size()) || needCreate.length != paths.size()
-        || (pathsCreated != null && pathsCreated.size() != paths.size())) {
+  List<AccessResult> doCreate(List<String> paths, List<T> records, boolean[] needCreate, int options) {
+    if (paths == null) {
+      throw new NullPointerException("paths can't be null");
+    }
+
+    for (int i = 0; i < paths.size(); i++) {
+      if (!needCreate[i]) {
+        continue;
+      }
+
+      if (paths.get(i) == null) {
+        throw new NullPointerException("path[" + i + "] can't be null, but was: " + paths);
+      }
+    }
+
+    if (records != null && records.size() != paths.size()) {
       throw new IllegalArgumentException(
-          "paths, records, needCreate, and pathsCreated should be of same size");
+          "paths and records should be of same size, but paths size: " + paths.size()
+              + ", records size: " + records.size());
     }
 
-    CreateCallbackHandler[] cbList = new CreateCallbackHandler[paths.size()];
+    if (needCreate == null) {
+      throw new NullPointerException("needCreate can't be null");
+    }
+
+    if (needCreate.length != paths.size()) {
+      throw new IllegalArgumentException(
+          "paths and needCreate should be of same size, but paths size: " + paths.size()
+              + ", needCreate size: " + needCreate.length);
+    }
 
     CreateMode mode = AccessOption.getMode(options);
     if (mode == null) {
-      LOG.error("Invalid async set mode. options: " + options);
-      return cbList;
+      throw new IllegalArgumentException("Invalid async set options: " + options);
     }
 
+    CreateCallbackHandler[] cbList = new CreateCallbackHandler[paths.size()];
+    List<List<String>> pathsCreated =
+        new ArrayList<List<String>>(Collections.<List<String>> nCopies(paths.size(), null));
+    RetCode retCodes[] = new RetCode[paths.size()];
+
     boolean retry;
     do {
       retry = false;
 
       for (int i = 0; i < paths.size(); i++) {
-        if (!needCreate[i])
+        if (!needCreate[i]) {
           continue;
+        }
 
         String path = paths.get(i);
-        T record = records == null ? null : records.get(i);
+        T record = (records == null ? null : records.get(i));
         cbList[i] = new CreateCallbackHandler();
+
         _zkClient.asyncCreate(path, record, mode, cbList[i]);
       }
 
       List<String> parentPaths =
           new ArrayList<String>(Collections.<String> nCopies(paths.size(), null));
-      boolean failOnNoNode = false;
+      boolean failOnNoParentNode = false;
 
       for (int i = 0; i < paths.size(); i++) {
-        if (!needCreate[i])
+        if (!needCreate[i]) {
           continue;
+        }
 
         CreateCallbackHandler cb = cbList[i];
         cb.waitForSuccess();
         String path = paths.get(i);
 
-        if (Code.get(cb.getRc()) == Code.NONODE) {
+        Code code = Code.get(cb.getRc());
+        switch (code) {
+        case NONODE: {
+          // we will try create parent nodes
           String parentPath = HelixUtil.getZkParentPath(path);
           parentPaths.set(i, parentPath);
-          failOnNoNode = true;
-        } else {
-          // if create succeed or fail on error other than NONODE,
-          // give up
+          failOnNoParentNode = true;
+          break;
+        }
+        case NODEEXISTS: {
+          retCodes[i] = RetCode.NODE_EXISTS;
           needCreate[i] = false;
-
-          // if succeeds, record what paths we've created
-          if (Code.get(cb.getRc()) == Code.OK && pathsCreated != null) {
-            if (pathsCreated.get(i) == null) {
-              pathsCreated.set(i, new ArrayList<String>());
-            }
-            pathsCreated.get(i).add(path);
+          break;
+        }
+        case OK: {
+          retCodes[i] = RetCode.OK;
+          if (pathsCreated.get(i) == null) {
+            pathsCreated.set(i, new ArrayList<String>());
           }
+          pathsCreated.get(i).add(path);
+          needCreate[i] = false;
+          break;
+        }
+        default: {
+          retCodes[i] = RetCode.ERROR;
+          needCreate[i] = false;
+          break;
+        }
         }
       }
 
-      if (failOnNoNode) {
-        boolean[] needCreateParent = Arrays.copyOf(needCreate, needCreate.length);
-
-        CreateCallbackHandler[] parentCbList =
-            create(parentPaths, null, needCreateParent, pathsCreated, AccessOption.PERSISTENT);
-        for (int i = 0; i < parentCbList.length; i++) {
-          CreateCallbackHandler parentCb = parentCbList[i];
-          if (parentCb == null)
+      if (failOnNoParentNode) {
+        List<AccessResult> createParentResults =
+            doCreate(parentPaths, null, Arrays.copyOf(needCreate, needCreate.length), AccessOption.PERSISTENT);
+        for (int i = 0; i < createParentResults.size(); i++) {
+          if (!needCreate[i]) {
             continue;
-
-          Code rc = Code.get(parentCb.getRc());
+          }
 
           // if parent is created, retry create child
-          if (rc == Code.OK || rc == Code.NODEEXISTS) {
+          AccessResult result = createParentResults.get(i);
+          pathsCreated.set(i, result._pathCreated);
+
+          if (result._retCode == RetCode.OK || result._retCode == RetCode.NODE_EXISTS) {
             retry = true;
-            break;
+          } else {
+            retCodes[i] = RetCode.ERROR;
+            needCreate[i] = false;
           }
         }
       }
     } while (retry);
 
-    return cbList;
+    List<AccessResult> results = new ArrayList<AccessResult>();
+    for (int i = 0; i < paths.size(); i++) {
+      results.add(new AccessResult(retCodes[i], pathsCreated.get(i), null, null));
+    }
+    return results;
   }
 
+  // TODO: rename to create
   /**
-   * async create
-   * TODO: rename to create
+   * async create multiple znodes
    */
   @Override
   public boolean[] createChildren(List<String> paths, List<T> records, int options) {
     boolean[] success = new boolean[paths.size()];
 
-    CreateMode mode = AccessOption.getMode(options);
-    if (mode == null) {
-      LOG.error("Invalid async create mode. options: " + options);
-      return success;
-    }
-
     boolean[] needCreate = new boolean[paths.size()];
     Arrays.fill(needCreate, true);
-    List<List<String>> pathsCreated =
-        new ArrayList<List<String>>(Collections.<List<String>> nCopies(paths.size(), null));
 
     long startT = System.nanoTime();
     try {
+      List<AccessResult> results = doCreate(paths, records, needCreate, options);
 
-      CreateCallbackHandler[] cbList = create(paths, records, needCreate, pathsCreated, options);
-
-      for (int i = 0; i < cbList.length; i++) {
-        CreateCallbackHandler cb = cbList[i];
-        success[i] = (Code.get(cb.getRc()) == Code.OK);
+      for (int i = 0; i < paths.size(); i++) {
+        AccessResult result = results.get(i);
+        success[i] = (result._retCode == RetCode.OK);
       }
 
       return success;
-
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("create_async, size: " + paths.size() + ", paths: " + paths.get(0)
-            + ",... time: " + (endT - startT) + " ns");
+        LOG.trace("create_async, size: " + paths.size() + ", paths: " + paths + ", time: "
+            + (endT - startT) + " ns");
       }
     }
   }
 
+  // TODO: rename to set
   /**
-   * async set
-   * TODO: rename to set
+   * async set multiple znodes
    */
   @Override
   public boolean[] setChildren(List<String> paths, List<T> records, int options) {
-    return set(paths, records, null, null, options);
+    List<AccessResult> results = doSet(paths, records, options);
+    boolean[] success = new boolean[paths.size()];
+    for (int i = 0; i < paths.size(); i++) {
+      success[i] = (results.get(i)._retCode == RetCode.OK);
+    }
+    return success;
   }
 
   /**
    * async set, give up on error other than NoNode
    */
-  boolean[] set(List<String> paths, List<T> records, List<List<String>> pathsCreated,
-      List<Stat> stats, int options) {
-    if (paths == null || paths.size() == 0) {
-      return new boolean[0];
+  List<AccessResult> doSet(List<String> paths, List<T> records, int options) {
+    if (paths == null) {
+      throw new NullPointerException("paths can't be null");
     }
 
-    if ((records != null && records.size() != paths.size())
-        || (pathsCreated != null && pathsCreated.size() != paths.size())) {
-      throw new IllegalArgumentException("paths, records, and pathsCreated should be of same size");
+    for (String path : paths) {
+      if (path == null) {
+        throw new NullPointerException("path can't be null, but was: " + paths);
+      }
     }
 
-    boolean[] success = new boolean[paths.size()];
+    final int size = paths.size();
+    if (records != null && records.size() != size) {
+      throw new IllegalArgumentException(
+          "paths and records should be of same size, but paths size: " + size
+              + ", records size: " + records.size());
+    }
 
     CreateMode mode = AccessOption.getMode(options);
     if (mode == null) {
-      LOG.error("Invalid async set mode. options: " + options);
-      return success;
+      throw new IllegalArgumentException("Invalid async set options: " + options);
+    }
+
+    if (size == 0) {
+      return Collections.emptyList();
     }
 
-    List<Stat> setStats = new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
-    SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
-    CreateCallbackHandler[] createCbList = null;
-    boolean[] needSet = new boolean[paths.size()];
+    Stat[] setStats = new Stat[size];
+    RetCode[] retCodes = new RetCode[size];
+    List<List<String>> pathsCreated =
+        new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+
+    SetDataCallbackHandler[] setCbList = new SetDataCallbackHandler[size];
+
+    boolean[] needSet = new boolean[size];
     Arrays.fill(needSet, true);
 
     long startT = System.nanoTime();
@@ -649,94 +762,92 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       do {
         retry = false;
 
-        for (int i = 0; i < paths.size(); i++) {
-          if (!needSet[i])
+        for (int i = 0; i < size; i++) {
+          if (!needSet[i]) {
             continue;
+          }
 
           String path = paths.get(i);
-          T record = records.get(i);
-          cbList[i] = new SetDataCallbackHandler();
-          _zkClient.asyncSetData(path, record, -1, cbList[i]);
+          T record = (records == null ? null : records.get(i));
+          setCbList[i] = new SetDataCallbackHandler();
+          _zkClient.asyncSetData(path, record, -1, setCbList[i]);
 
         }
 
         boolean failOnNoNode = false;
 
-        for (int i = 0; i < cbList.length; i++) {
-          SetDataCallbackHandler cb = cbList[i];
+        for (int i = 0; i < size; i++) {
+          SetDataCallbackHandler cb = setCbList[i];
           cb.waitForSuccess();
           Code rc = Code.get(cb.getRc());
           switch (rc) {
-          case OK:
-            setStats.set(i, cb.getStat());
+          case OK: {
+            setStats[i] = cb.getStat();
+            retCodes[i] = RetCode.OK;
             needSet[i] = false;
             break;
-          case NONODE:
+          }
+          case NONODE: {
             // if fail on NoNode, try create the node
             failOnNoNode = true;
             break;
-          default:
+          }
+          default: {
             // if fail on error other than NoNode, give up
+            retCodes[i] = RetCode.ERROR;
             needSet[i] = false;
             break;
           }
+          }
         }
 
         // if failOnNoNode, try create
         if (failOnNoNode) {
-          boolean[] needCreate = Arrays.copyOf(needSet, needSet.length);
-          createCbList = create(paths, records, needCreate, pathsCreated, options);
-          for (int i = 0; i < createCbList.length; i++) {
-            CreateCallbackHandler createCb = createCbList[i];
-            if (createCb == null) {
+          List<AccessResult> createResults =
+              doCreate(paths, records, Arrays.copyOf(needSet, size), options);
+          for (int i = 0; i < size; i++) {
+            if (!needSet[i]) {
               continue;
             }
 
-            Code rc = Code.get(createCb.getRc());
-            switch (rc) {
-            case OK:
-              setStats.set(i, ZNode.ZERO_STAT);
+            AccessResult createResult = createResults.get(i);
+            RetCode code = createResult._retCode;
+            pathsCreated.set(i, createResult._pathCreated);
+
+            switch (code) {
+            case OK: {
+              setStats[i] = ZNode.ZERO_STAT;
+              retCodes[i] = RetCode.OK;
               needSet[i] = false;
               break;
-            case NODEEXISTS:
+            }
+            case NODE_EXISTS: {
               retry = true;
               break;
-            default:
-              // if creation fails on error other than NodeExists
-              // no need to retry set
+            }
+            default: {
+              // creation fails on error other than NodeExists
+              retCodes[i] = RetCode.ERROR;
               needSet[i] = false;
               break;
             }
+            }
           }
         }
       } while (retry);
 
       // construct return results
-      for (int i = 0; i < cbList.length; i++) {
-        SetDataCallbackHandler cb = cbList[i];
-
-        Code rc = Code.get(cb.getRc());
-        if (rc == Code.OK) {
-          success[i] = true;
-        } else if (rc == Code.NONODE) {
-          CreateCallbackHandler createCb = createCbList[i];
-          if (Code.get(createCb.getRc()) == Code.OK) {
-            success[i] = true;
-          }
-        }
+      List<AccessResult> results = new ArrayList<AccessResult>();
+      for (int i = 0; i < size; i++) {
+        results.add(new AccessResult(retCodes[i], pathsCreated.get(i), setStats[i], null));
       }
 
-      if (stats != null) {
-        stats.clear();
-        stats.addAll(setStats);
-      }
-
-      return success;
+      return results;
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
-            + ",... time: " + (endT - startT) + " ns");
+        LOG.trace("setData_async, size: " + size + ", paths: " + paths + ", time: "
+            + (endT - startT) + " ns");
       }
     }
   }
@@ -748,43 +859,59 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   @Override
   public boolean[] updateChildren(List<String> paths, List<DataUpdater<T>> updaters, int options) {
 
-    List<T> updateData = update(paths, updaters, null, null, options);
-    boolean[] success = new boolean[paths.size()]; // init to false
+    List<AccessResult> results = doUpdate(paths, updaters, options);
+    boolean[] success = new boolean[paths.size()];
     for (int i = 0; i < paths.size(); i++) {
-      T data = updateData.get(i);
-      success[i] = (data != null);
+      success[i] = (results.get(i)._retCode == RetCode.OK);
     }
+
     return success;
   }
 
   /**
-   * async update
-   * return: updatedData on success or null on fail
+   * async update multiple znodes
    */
-  List<T> update(List<String> paths, List<DataUpdater<T>> updaters,
-      List<List<String>> pathsCreated, List<Stat> stats, int options) {
-    if (paths == null || paths.size() == 0) {
-      LOG.error("paths is null or empty");
-      return Collections.emptyList();
+  List<AccessResult> doUpdate(List<String> paths, List<DataUpdater<T>> updaters, int options) {
+    if (paths == null || updaters == null) {
+      throw new NullPointerException("paths|updaters can't be null");
+    }
+
+    for (String path : paths) {
+      if (path == null) {
+        throw new NullPointerException("path can't be null, but was: " + paths);
+      }
     }
 
-    if (updaters.size() != paths.size()
-        || (pathsCreated != null && pathsCreated.size() != paths.size())) {
-      throw new IllegalArgumentException("paths, updaters, and pathsCreated should be of same size");
+    for (DataUpdater<T> updater : updaters) {
+      if (updater == null) {
+        throw new NullPointerException("updater can't be null, but was: " + updaters + ", paths: "
+            + paths);
+      }
     }
 
-    List<Stat> setStats = new ArrayList<Stat>(Collections.<Stat> nCopies(paths.size(), null));
-    List<T> updateData = new ArrayList<T>(Collections.<T> nCopies(paths.size(), null));
+    final int size = paths.size();
+    if (updaters.size() != size) {
+      throw new IllegalArgumentException(
+          "paths and updaters should be of same size, but paths size: " + size
+              + ", updaters size: " + updaters.size());
+    }
 
     CreateMode mode = AccessOption.getMode(options);
     if (mode == null) {
-      LOG.error("Invalid update mode. options: " + options);
-      return updateData;
+      throw new IllegalArgumentException("Invalid update options: " + options);
+    }
+
+    if (size == 0) {
+      return Collections.emptyList();
     }
 
-    SetDataCallbackHandler[] cbList = new SetDataCallbackHandler[paths.size()];
-    CreateCallbackHandler[] createCbList = null;
-    boolean[] needUpdate = new boolean[paths.size()];
+    Stat[] updateStats = new Stat[size];
+    RetCode[] retCodes = new RetCode[size];
+    List<List<String>> pathsCreated =
+        new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
+    List<T> updateData = new ArrayList<T>(Collections.<T> nCopies(size, null));
+
+    boolean[] needUpdate = new boolean[size];
     Arrays.fill(needUpdate, true);
 
     long startT = System.nanoTime();
@@ -793,90 +920,102 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
       boolean retry;
       do {
         retry = false;
-        boolean[] needCreate = new boolean[paths.size()]; // init'ed with false
+        SetDataCallbackHandler[] setCbList = new SetDataCallbackHandler[size];
+        boolean[] needCreate = new boolean[size]; // init'ed with false
         boolean failOnNoNode = false;
 
         // asycn read all data
-        List<Stat> curStats = new ArrayList<Stat>();
-        List<T> curDataList = get(paths, curStats, Arrays.copyOf(needUpdate, needUpdate.length));
+        List<AccessResult> readResults = doGet(paths, Arrays.copyOf(needUpdate, size));
 
         // async update
-        List<T> newDataList = new ArrayList<T>();
-        for (int i = 0; i < paths.size(); i++) {
+        List<T> newDataList = new ArrayList<T>(Collections.<T> nCopies(size, null));
+        for (int i = 0; i < size; i++) {
           if (!needUpdate[i]) {
-            newDataList.add(null);
             continue;
           }
           String path = paths.get(i);
           DataUpdater<T> updater = updaters.get(i);
-          T newData = updater.update(curDataList.get(i));
-          newDataList.add(newData);
-          Stat curStat = curStats.get(i);
-          if (curStat == null) {
+          AccessResult readResult = readResults.get(i);
+          T newData = updater.update(readResult._resultValue);
+          newDataList.set(i, newData);
+          if (readResult._retCode == RetCode.NONODE) {
             // node not exists
             failOnNoNode = true;
             needCreate[i] = true;
           } else {
-            cbList[i] = new SetDataCallbackHandler();
-            _zkClient.asyncSetData(path, newData, curStat.getVersion(), cbList[i]);
+            setCbList[i] = new SetDataCallbackHandler();
+            _zkClient.asyncSetData(path, newData, readResult._stat.getVersion(), setCbList[i]);
           }
         }
 
         // wait for completion
         boolean failOnBadVersion = false;
 
-        for (int i = 0; i < paths.size(); i++) {
-          SetDataCallbackHandler cb = cbList[i];
-          if (cb == null)
+        for (int i = 0; i < size; i++) {
+          SetDataCallbackHandler cb = setCbList[i];
+          if (cb == null) {
             continue;
+          }
 
           cb.waitForSuccess();
 
           switch (Code.get(cb.getRc())) {
-          case OK:
+          case OK: {
             updateData.set(i, newDataList.get(i));
-            setStats.set(i, cb.getStat());
+            updateStats[i] = cb.getStat();
+            retCodes[i] = RetCode.OK;
             needUpdate[i] = false;
             break;
-          case NONODE:
+          }
+          case NONODE: {
             failOnNoNode = true;
             needCreate[i] = true;
             break;
-          case BADVERSION:
+          }
+          case BADVERSION: {
             failOnBadVersion = true;
             break;
-          default:
-            // if fail on error other than NoNode or BadVersion
-            // will not retry
+          }
+          default: {
+            // fail on error other than NoNode or BadVersion
             needUpdate[i] = false;
+            retCodes[i] = RetCode.ERROR;
             break;
           }
+          }
         }
 
         // if failOnNoNode, try create
         if (failOnNoNode) {
-          createCbList = create(paths, newDataList, needCreate, pathsCreated, options);
-          for (int i = 0; i < paths.size(); i++) {
-            CreateCallbackHandler createCb = createCbList[i];
-            if (createCb == null) {
+          List<AccessResult> createResults =
+              doCreate(paths, newDataList, Arrays.copyOf(needCreate, size), options);
+          for (int i = 0; i < size; i++) {
+            if (!needCreate[i]) {
               continue;
             }
 
-            switch (Code.get(createCb.getRc())) {
-            case OK:
+            AccessResult result = createResults.get(i);
+            pathsCreated.set(i, result._pathCreated);
+
+            switch (result._retCode) {
+            case OK: {
               needUpdate[i] = false;
               updateData.set(i, newDataList.get(i));
-              setStats.set(i, ZNode.ZERO_STAT);
+              updateStats[i] = ZNode.ZERO_STAT;
+              retCodes[i] = RetCode.OK;
               break;
-            case NODEEXISTS:
+            }
+            case NODE_EXISTS: {
               retry = true;
               break;
-            default:
-              // if fail on error other than NodeExists
-              // will not retry
+            }
+            default: {
+              // fail on error other than NodeExists
+              retCodes[i] = RetCode.ERROR;
               needUpdate[i] = false;
               break;
             }
+            }
           }
         }
 
@@ -886,24 +1025,24 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
         }
       } while (retry);
 
-      if (stats != null) {
-        stats.clear();
-        stats.addAll(setStats);
+      List<AccessResult> results = new ArrayList<AccessResult>();
+      for (int i = 0; i < size; i++) {
+        results.add(new AccessResult(retCodes[i], pathsCreated.get(i), updateStats[i], updateData
+            .get(i)));
       }
-
-      return updateData;
+      return results;
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("setData_async, size: " + paths.size() + ", paths: " + paths.get(0)
-            + ",... time: " + (endT - startT) + " ns");
+        LOG.trace("updateData_async, size: " + size + ", paths: " + paths + ", time: "
+            + (endT - startT) + " ns");
       }
     }
 
   }
 
   /**
-   * async exists
+   * async test existence on multiple znodes
    */
   @Override
   public boolean[] exists(List<String> paths, int options) {
@@ -918,12 +1057,15 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
   }
 
   /**
-   * async getStat
+   * async get stats of mulitple znodes
    */
   @Override
   public Stat[] getStats(List<String> paths, int options) {
-    if (paths == null || paths.size() == 0) {
-      LOG.error("paths is null or empty");
+    if (paths == null) {
+      throw new NullPointerException("paths can't be null");
+    }
+
+    if (paths.size() == 0) {
       return new Stat[0];
     }
 
@@ -949,18 +1091,22 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("exists_async, size: " + paths.size() + ", paths: " + paths.get(0)
-            + ",... time: " + (endT - startT) + " ns");
+        LOG.trace("exists_async, size: " + paths.size() + ", paths: " + paths + ", time: "
+            + (endT - startT) + " ns");
       }
     }
   }
 
   /**
-   * async remove
+   * async remove multiple znodes
    */
   @Override
   public boolean[] remove(List<String> paths, int options) {
-    if (paths == null || paths.size() == 0) {
+    if (paths == null) {
+      throw new NullPointerException("paths can't be null");
+    }
+
+    if (paths.size() == 0) {
       return new boolean[0];
     }
 
@@ -987,8 +1133,8 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     } finally {
       long endT = System.nanoTime();
       if (LOG.isTraceEnabled()) {
-        LOG.trace("delete_async, size: " + paths.size() + ", paths: " + paths.get(0)
-            + ",... time: " + (endT - startT) + " ns");
+        LOG.trace("delete_async, size: " + paths.size() + ", paths: " + paths + ", time: "
+            + (endT - startT) + " ns");
       }
     }
   }
@@ -1025,69 +1171,6 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
     _zkClient.unsubscribeChildChanges(path, childListener);
   }
 
-  // simple test
-  public static void main(String[] args) {
-    ZkClient zkclient = new ZkClient("localhost:2191");
-    zkclient.setZkSerializer(new ZNRecordSerializer());
-    ZkBaseDataAccessor<ZNRecord> accessor = new ZkBaseDataAccessor<ZNRecord>(zkclient);
-
-    // test async create
-    List<String> createPaths = Arrays.asList("/test/child1/child1", "/test/child2/child2");
-    List<ZNRecord> createRecords = Arrays.asList(new ZNRecord("child1"), new ZNRecord("child2"));
-
-    boolean[] needCreate = new boolean[createPaths.size()];
-    Arrays.fill(needCreate, true);
-    List<List<String>> pathsCreated =
-        new ArrayList<List<String>>(Collections.<List<String>> nCopies(createPaths.size(), null));
-    accessor.create(createPaths, createRecords, needCreate, pathsCreated, AccessOption.PERSISTENT);
-    System.out.println("pathsCreated: " + pathsCreated);
-
-    // test async set
-    List<String> setPaths = Arrays.asList("/test/setChild1/setChild1", "/test/setChild2/setChild2");
-    List<ZNRecord> setRecords = Arrays.asList(new ZNRecord("setChild1"), new ZNRecord("setChild2"));
-
-    pathsCreated =
-        new ArrayList<List<String>>(Collections.<List<String>> nCopies(setPaths.size(), null));
-    boolean[] success =
-        accessor.set(setPaths, setRecords, pathsCreated, null, AccessOption.PERSISTENT);
-    System.out.println("pathsCreated: " + pathsCreated);
-    System.out.println("setSuccess: " + Arrays.toString(success));
-
-    // test async update
-    List<String> updatePaths =
-        Arrays.asList("/test/updateChild1/updateChild1", "/test/setChild2/setChild2");
-    class TestUpdater implements DataUpdater<ZNRecord> {
-      final ZNRecord _newData;
-
-      public TestUpdater(ZNRecord newData) {
-        _newData = newData;
-      }
-
-      @Override
-      public ZNRecord update(ZNRecord currentData) {
-        return _newData;
-
-      }
-    }
-    List<DataUpdater<ZNRecord>> updaters =
-        Arrays.asList((DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild1")),
-            (DataUpdater<ZNRecord>) new TestUpdater(new ZNRecord("updateChild2")));
-
-    pathsCreated =
-        new ArrayList<List<String>>(Collections.<List<String>> nCopies(updatePaths.size(), null));
-
-    List<ZNRecord> updateRecords =
-        accessor.update(updatePaths, updaters, pathsCreated, null, AccessOption.PERSISTENT);
-    for (int i = 0; i < updatePaths.size(); i++) {
-      success[i] = updateRecords.get(i) != null;
-    }
-    System.out.println("pathsCreated: " + pathsCreated);
-    System.out.println("updateSuccess: " + Arrays.toString(success));
-
-    System.out.println("CLOSING");
-    zkclient.close();
-  }
-
   /**
    * Reset
    */

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/75b534dd/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 5e7355b..8a00127 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -35,7 +35,9 @@ import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
 import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
 import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor.AccessResult;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
 import org.apache.helix.store.HelixPropertyListener;
 import org.apache.helix.store.HelixPropertyStore;
@@ -250,7 +252,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
         cache.lockWrite();
         ZkBaseDataAccessor<T>.AccessResult result =
             _baseAccessor.doSet(serverPath, data, expectVersion, options);
-        boolean success = result._retCode == RetCode.OK;
+        boolean success = (result._retCode == RetCode.OK);
 
         updateCache(cache, result._pathCreated, success, serverPath, data, result._stat);
 
@@ -279,7 +281,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
         ZkBaseDataAccessor<T>.AccessResult result =
             _baseAccessor.doUpdate(serverPath, updater, options);
         boolean success = (result._retCode == RetCode.OK);
-        updateCache(cache, result._pathCreated, success, serverPath, result._updatedValue,
+        updateCache(cache, result._pathCreated, success, serverPath, result._resultValue,
             result._stat);
 
         return success;
@@ -421,17 +423,15 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
         cache.lockWrite();
         boolean[] needCreate = new boolean[size];
         Arrays.fill(needCreate, true);
-        List<List<String>> pathsCreatedList =
-            new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
-        CreateCallbackHandler[] createCbList =
-            _baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options);
+        List<ZkBaseDataAccessor<T>.AccessResult> results =
+            _baseAccessor.doCreate(serverPaths, records, needCreate, options);
 
         boolean[] success = new boolean[size];
         for (int i = 0; i < size; i++) {
-          CreateCallbackHandler cb = createCbList[i];
-          success[i] = (Code.get(cb.getRc()) == Code.OK);
+          ZkBaseDataAccessor<T>.AccessResult result = results.get(i);
+          success[i] = (result._retCode == RetCode.OK);
 
-          updateCache(cache, pathsCreatedList.get(i), success[i], serverPaths.get(i),
+          updateCache(cache, results.get(i)._pathCreated, success[i], serverPaths.get(i),
               records.get(i), ZNode.ZERO_STAT);
         }
 
@@ -454,15 +454,14 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
     if (cache != null) {
       try {
         cache.lockWrite();
-        List<Stat> setStats = new ArrayList<Stat>();
-        List<List<String>> pathsCreatedList =
-            new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
-        boolean[] success =
-            _baseAccessor.set(serverPaths, records, pathsCreatedList, setStats, options);
+        List<ZkBaseDataAccessor<T>.AccessResult> results =
+            _baseAccessor.doSet(serverPaths, records, options);
 
+        boolean[] success = new boolean[size];
         for (int i = 0; i < size; i++) {
-          updateCache(cache, pathsCreatedList.get(i), success[i], serverPaths.get(i),
-              records.get(i), setStats.get(i));
+          success[i] = (results.get(i)._retCode == RetCode.OK);
+          updateCache(cache, results.get(i)._pathCreated, success[i], serverPaths.get(i),
+              records.get(i), results.get(i)._stat);
         }
 
         return success;
@@ -484,23 +483,17 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
       try {
         cache.lockWrite();
 
-        List<Stat> setStats = new ArrayList<Stat>();
         boolean[] success = new boolean[size];
         List<List<String>> pathsCreatedList =
             new ArrayList<List<String>>(Collections.<List<String>> nCopies(size, null));
-        List<T> updateData =
-            _baseAccessor.update(serverPaths, updaters, pathsCreatedList, setStats, options);
 
-        // System.out.println("updateChild: ");
-        // for (T data : updateData)
-        // {
-        // System.out.println(data);
-        // }
+        List<ZkBaseDataAccessor<T>.AccessResult> results = _baseAccessor.doUpdate(serverPaths, updaters, options);
 
         for (int i = 0; i < size; i++) {
-          success[i] = (updateData.get(i) != null);
-          updateCache(cache, pathsCreatedList.get(i), success[i], serverPaths.get(i),
-              updateData.get(i), setStats.get(i));
+          ZkBaseDataAccessor<T>.AccessResult result = results.get(i);
+          success[i] = (result._retCode == RetCode.OK);
+            updateCache(cache, pathsCreatedList.get(i), success[i], serverPaths.get(i),
+              result._resultValue, results.get(i)._stat);
         }
         return success;
       } finally {
@@ -590,11 +583,13 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
       if (needRead) {
         cache.lockWrite();
         try {
-          List<T> readRecords = _baseAccessor.get(serverPaths, readStats, needReads);
+          List<ZkBaseDataAccessor<T>.AccessResult> readResults =
+              _baseAccessor.doGet(serverPaths, needReads);
           for (int i = 0; i < size; i++) {
             if (needReads[i]) {
-              records.set(i, readRecords.get(i));
-              cache.update(serverPaths.get(i), readRecords.get(i), readStats.get(i));
+              records.set(i, readResults.get(i)._resultValue);
+              readStats.set(i, readResults.get(i)._stat);
+              cache.update(serverPaths.get(i), records.get(i), readStats.get(i));
             }
           }
         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/75b534dd/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
index 2425020..9a484d6 100644
--- a/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
+++ b/helix-core/src/test/java/org/apache/helix/ZkUnitTestBase.java
@@ -304,14 +304,14 @@ public class ZkUnitTestBase {
 
   protected void setupLiveInstances(String clusterName, int[] liveInstances) {
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     for (int i = 0; i < liveInstances.length; i++) {
       String instance = "localhost_" + liveInstances[i];
       LiveInstance liveInstance = new LiveInstance(instance);
       liveInstance.setSessionId("session_" + liveInstances[i]);
-      liveInstance.setHelixVersion("0.0.0");
+      liveInstance.setHelixVersion("0.4.0");
       accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/75b534dd/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index 33dce89..7cd942e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -54,7 +54,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
 
     HelixManager manager = new DummyClusterManager(clusterName, accessor);
     ClusterEvent event = new ClusterEvent("testEvent");
@@ -127,10 +127,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
-    HelixManager manager = new DummyClusterManager(clusterName, accessor);
-    ClusterEvent event = new ClusterEvent("testEvent");
-
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     final String resourceName = "testResource_dup";
     String[] resourceGroups = new String[] {
       resourceName
@@ -202,7 +199,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     HelixManager manager = new DummyClusterManager(clusterName, accessor);
     ClusterEvent event = new ClusterEvent("testEvent");
     event.addAttribute("helixmanager", manager);
@@ -297,7 +294,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
     System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
 
     HelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     HelixManager manager = new DummyClusterManager(clusterName, accessor);
     ClusterEvent event = new ClusterEvent("testEvent");
     event.addAttribute("helixmanager", manager);
@@ -367,7 +364,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
   protected void setCurrentState(String clusterName, String instance, String resourceGroupName,
       String resourceKey, String sessionId, String state) {
     ZKHelixDataAccessor accessor =
-        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor(_gZkClient));
+        new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
     Builder keyBuilder = accessor.keyBuilder();
 
     CurrentState curState = new CurrentState(resourceGroupName);


Mime
View raw message