helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [02/17] git commit: [HELIX-380] Incompatibility issue with HELIX_PROPERTYSTORE, rb=18053
Date Fri, 11 Jul 2014 19:57:57 GMT
[HELIX-380] Incompatibility issue with HELIX_PROPERTYSTORE, rb=18053


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/35e3ca10
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/35e3ca10
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/35e3ca10

Branch: refs/heads/master
Commit: 35e3ca10ef16f8e594ee5823978e5041afdbf987
Parents: fdee2dd
Author: zzhang <zzhang5@uci.edu>
Authored: Thu Feb 13 16:19:46 2014 -0800
Committer: Kanak Biscuitwala <kanak@apache.org>
Committed: Fri Jul 11 10:42:06 2014 -0700

----------------------------------------------------------------------
 .../apache/helix/manager/zk/ZKHelixManager.java |   7 +-
 .../helix/manager/zk/ZkBaseDataAccessor.java    |   3 +
 .../manager/zk/ZkCacheBaseDataAccessor.java     |   5 +-
 .../store/zk/AutoFallbackPropertyStore.java     | 332 ++++++++++
 .../store/zk/TestAutoFallbackPropertyStore.java | 625 +++++++++++++++++++
 .../zk/TestZkManagerWithAutoFallbackStore.java  | 115 ++++
 6 files changed, 1081 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/35e3ca10/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index d446430..3bc1985 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -70,6 +70,7 @@ import org.apache.helix.model.builder.ConfigScopeBuilder;
 import org.apache.helix.monitoring.ZKPathDataDumpTask;
 import org.apache.helix.participant.HelixStateMachineEngine;
 import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.store.zk.AutoFallbackPropertyStore;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
@@ -622,10 +623,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener
{
 
     if (_helixPropertyStore == null) {
       String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName);
-
+      String fallbackPath = String.format("/%s/%s", _clusterName, "HELIX_PROPERTYSTORE");
       _helixPropertyStore =
-          new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient),
path,
-              null);
+          new AutoFallbackPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient),
path,
+              fallbackPath);
     }
 
     return _helixPropertyStore;

http://git-wip-us.apache.org/repos/asf/helix/blob/35e3ca10/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index 724e299..e978a23 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -84,6 +84,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T>
{
   private final ZkClient _zkClient;
 
   public ZkBaseDataAccessor(ZkClient zkClient) {
+    if (zkClient == null) {
+      throw new NullPointerException("zkclient is null");
+    }
     _zkClient = zkClient;
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/35e3ca10/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 57bdc21..1fe5b8d 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -506,11 +506,10 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
{
   @Override
   public boolean[] exists(List<String> paths, int options) {
     final int size = paths.size();
-    List<String> serverPaths = prependChroot(paths);
 
     boolean exists[] = new boolean[size];
     for (int i = 0; i < size; i++) {
-      exists[i] = exists(serverPaths.get(i), options);
+      exists[i] = exists(paths.get(i), options);
     }
     return exists;
   }
@@ -656,7 +655,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T>
{
 
     List<String> paths = new ArrayList<String>();
     for (String childName : childNames) {
-      String path = parentPath + "/" + childName;
+      String path = parentPath.equals("/") ? "/" + childName : parentPath + "/" + childName;
       paths.add(path);
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/35e3ca10/helix-core/src/main/java/org/apache/helix/store/zk/AutoFallbackPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/AutoFallbackPropertyStore.java
b/helix-core/src/main/java/org/apache/helix/store/zk/AutoFallbackPropertyStore.java
new file mode 100644
index 0000000..02fa5bc
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/store/zk/AutoFallbackPropertyStore.java
@@ -0,0 +1,332 @@
+package org.apache.helix.store.zk;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Property store that does auto fallback to an old location.
+ * Assuming no concurrent updates
+ */
+public class AutoFallbackPropertyStore<T> extends ZkHelixPropertyStore<T> {
+  private static Logger LOG = Logger.getLogger(AutoFallbackPropertyStore.class);
+
+  private final ZkHelixPropertyStore<T> _fallbackStore;
+
+  public AutoFallbackPropertyStore(ZkBaseDataAccessor<T> accessor, String root, String
fallbackRoot) {
+    super(accessor, root, null);
+
+    if (accessor.exists(fallbackRoot, 0)) {
+      _fallbackStore = new ZkHelixPropertyStore<T>(accessor, fallbackRoot, null);
+    } else {
+      LOG.info("fallbackRoot: " + fallbackRoot
+          + " doesn't exist, skip creating fallback property store");
+      _fallbackStore = null;
+    }
+
+  }
+
+  @Override
+  public boolean update(String path, DataUpdater<T> updater, int options) {
+    if (_fallbackStore == null) {
+      return super.update(path, updater, options);
+    } else {
+      Stat stat = super.getStat(path, options);
+      if (stat == null) {
+        // create znode at new location with fallback-value
+        T fallbackValue = _fallbackStore.get(path, null, options);
+        boolean succeed = super.create(path, fallbackValue, AccessOption.PERSISTENT);
+        if (!succeed) {
+          LOG.error("Can't update " + path + " since there are concurrent updates");
+          return false;
+        }
+      }
+      return super.update(path, updater, options);
+    }
+  }
+
+  @Override
+  public boolean exists(String path, int options) {
+    if (_fallbackStore == null) {
+      return super.exists(path, options);
+    } else {
+      boolean exist = super.exists(path, options);
+      if (!exist) {
+        exist = _fallbackStore.exists(path, options);
+      }
+      return exist;
+    }
+  }
+
+  @Override
+  public boolean remove(String path, int options) {
+    if (_fallbackStore != null) {
+      _fallbackStore.remove(path, options);
+    }
+    return super.remove(path, options);
+  }
+
+  @Override
+  public T get(String path, Stat stat, int options) {
+    if (_fallbackStore == null) {
+      return super.get(path, stat, options);
+    } else {
+      T value = super.get(path, stat, options);
+      if (value == null) {
+        value = _fallbackStore.get(path, stat, options);
+      }
+
+      return value;
+    }
+  }
+
+  @Override
+  public Stat getStat(String path, int options) {
+    if (_fallbackStore == null) {
+      return super.getStat(path, options);
+    } else {
+      Stat stat = super.getStat(path, options);
+
+      if (stat == null) {
+        stat = _fallbackStore.getStat(path, options);
+      }
+      return stat;
+    }
+  }
+
+  @Override
+  public boolean[] updateChildren(List<String> paths, List<DataUpdater<T>>
updaters, int options) {
+    if (_fallbackStore == null) {
+      return super.updateChildren(paths, updaters, options);
+    } else {
+      Stat[] stats = super.getStats(paths, options);
+      Map<String, Integer> fallbackMap = new HashMap<String, Integer>();
+      Map<String, Integer> updateMap = new HashMap<String, Integer>();
+      for (int i = 0; i < paths.size(); i++) {
+        String path = paths.get(i);
+        if (stats[i] == null) {
+          fallbackMap.put(path, i);
+        } else {
+          updateMap.put(path, i);
+        }
+      }
+
+      if (fallbackMap.size() > 0) {
+        List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet());
+        List<T> fallbackValues = _fallbackStore.get(fallbackPaths, null, options);
+        boolean createSucceed[] =
+            super.createChildren(fallbackPaths, fallbackValues, AccessOption.PERSISTENT);
+
+        for (int i = 0; i < fallbackPaths.size(); i++) {
+          String fallbackPath = fallbackPaths.get(i);
+          if (createSucceed[i]) {
+            updateMap.put(fallbackPath, fallbackMap.get(fallbackPath));
+          } else {
+            LOG.error("Can't update " + fallbackPath + " since there are concurrent updates");
+          }
+        }
+      }
+
+      boolean succeed[] = new boolean[paths.size()]; // all init'ed to false
+      if (updateMap.size() > 0) {
+        List<String> updatePaths = new ArrayList<String>(updateMap.keySet());
+        List<DataUpdater<T>> subUpdaters = new ArrayList<DataUpdater<T>>();
+        for (int i = 0; i < updatePaths.size(); i++) {
+          String updatePath = updatePaths.get(i);
+          subUpdaters.add(updaters.get(updateMap.get(updatePath)));
+        }
+
+        boolean updateSucceed[] = super.updateChildren(updatePaths, subUpdaters, options);
+        for (int i = 0; i < updatePaths.size(); i++) {
+          String updatePath = updatePaths.get(i);
+          if (updateSucceed[i]) {
+            succeed[updateMap.get(updatePath)] = true;
+          }
+        }
+      }
+
+      return succeed;
+    }
+  }
+
+  @Override
+  public boolean[] exists(List<String> paths, int options) {
+    if (_fallbackStore == null) {
+      return super.exists(paths, options);
+    } else {
+      boolean[] exists = super.exists(paths, options);
+
+      Map<String, Integer> fallbackMap = new HashMap<String, Integer>();
+      for (int i = 0; i < paths.size(); i++) {
+        boolean exist = exists[i];
+        if (!exist) {
+          fallbackMap.put(paths.get(i), i);
+        }
+      }
+
+      if (fallbackMap.size() > 0) {
+        List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet());
+
+        boolean[] fallbackExists = _fallbackStore.exists(fallbackPaths, options);
+        for (int i = 0; i < fallbackPaths.size(); i++) {
+          String fallbackPath = fallbackPaths.get(i);
+          int j = fallbackMap.get(fallbackPath);
+          exists[j] = fallbackExists[i];
+        }
+      }
+
+      return exists;
+    }
+  }
+
+  @Override
+  public boolean[] remove(List<String> paths, int options) {
+    if (_fallbackStore != null) {
+      _fallbackStore.remove(paths, options);
+    }
+    return super.remove(paths, options);
+  }
+
+  @Override
+  public List<T> get(List<String> paths, List<Stat> stats, int options)
{
+    if (_fallbackStore == null) {
+      return super.get(paths, stats, options);
+    } else {
+      List<T> values = super.get(paths, stats, options);
+
+      Map<String, Integer> fallbackMap = new HashMap<String, Integer>();
+      for (int i = 0; i < paths.size(); i++) {
+        T value = values.get(i);
+        if (value == null) {
+          fallbackMap.put(paths.get(i), i);
+        }
+      }
+
+      if (fallbackMap.size() > 0) {
+        List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet());
+        List<Stat> fallbackStats = new ArrayList<Stat>();
+        List<T> fallbackValues = _fallbackStore.get(fallbackPaths, fallbackStats, options);
+        for (int i = 0; i < fallbackPaths.size(); i++) {
+          String fallbackPath = fallbackPaths.get(i);
+          int j = fallbackMap.get(fallbackPath);
+          values.set(j, fallbackValues.get(i));
+          if (stats != null) {
+            stats.set(j, fallbackStats.get(i));
+          }
+        }
+      }
+
+      return values;
+    }
+  }
+
+  @Override
+  public Stat[] getStats(List<String> paths, int options) {
+    if (_fallbackStore == null) {
+      return super.getStats(paths, options);
+    } else {
+      Stat[] stats = super.getStats(paths, options);
+
+      Map<String, Integer> fallbackMap = new HashMap<String, Integer>();
+      for (int i = 0; i < paths.size(); i++) {
+        Stat stat = stats[i];
+        if (stat == null) {
+          fallbackMap.put(paths.get(i), i);
+        }
+      }
+
+      if (fallbackMap.size() > 0) {
+        List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet());
+
+        Stat[] fallbackStats = _fallbackStore.getStats(fallbackPaths, options);
+        for (int i = 0; i < fallbackPaths.size(); i++) {
+          String fallbackPath = fallbackPaths.get(i);
+          int j = fallbackMap.get(fallbackPath);
+          stats[j] = fallbackStats[i];
+        }
+      }
+
+      return stats;
+    }
+  }
+
+  @Override
+  public List<String> getChildNames(String parentPath, int options) {
+    if (_fallbackStore == null) {
+      return super.getChildNames(parentPath, options);
+    } else {
+      List<String> childs = super.getChildNames(parentPath, options);
+      List<String> fallbackChilds = _fallbackStore.getChildNames(parentPath, options);
+
+      if (childs == null && fallbackChilds == null) {
+        return null;
+      }
+
+      // merge two child lists
+      Set<String> allChildSet = new HashSet<String>();
+      if (childs != null) {
+        allChildSet.addAll(childs);
+      }
+
+      if (fallbackChilds != null) {
+        allChildSet.addAll(fallbackChilds);
+      }
+
+      List<String> allChilds = new ArrayList<String>(allChildSet);
+      return allChilds;
+    }
+  }
+
+  @Override
+  public void start() {
+    if (_fallbackStore != null) {
+      _fallbackStore.start();
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (_fallbackStore != null) {
+      _fallbackStore.stop();
+    }
+
+    super.stop();
+  }
+
+  @Override
+  public void reset() {
+    if (_fallbackStore != null) {
+      _fallbackStore.reset();
+    }
+
+    super.reset();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/35e3ca10/helix-core/src/test/java/org/apache/helix/store/zk/TestAutoFallbackPropertyStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestAutoFallbackPropertyStore.java
b/helix-core/src/test/java/org/apache/helix/store/zk/TestAutoFallbackPropertyStore.java
new file mode 100644
index 0000000..d4cb658
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestAutoFallbackPropertyStore.java
@@ -0,0 +1,625 @@
+package org.apache.helix.store.zk;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.PropertyType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.zookeeper.data.Stat;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestAutoFallbackPropertyStore extends ZkUnitTestBase {
+
+  class MyDataUpdater implements DataUpdater<ZNRecord> {
+    final String _id;
+
+    public MyDataUpdater(String id) {
+      _id = id;
+    }
+
+    @Override
+    public ZNRecord update(ZNRecord currentData) {
+      if (currentData == null) {
+        currentData = new ZNRecord(_id);
+      } else {
+        currentData.setSimpleField("key", "value");
+      }
+      return currentData;
+    }
+  }
+
+  @Test
+  public void testSingleUpdateUsingFallbackPath() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name());
+    String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE");
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    // create 0 under fallbackRoot
+    for (int i = 0; i < 1; i++) {
+      String path = String.format("%s/%d", fallbackRoot, i);
+      baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT);
+    }
+
+    AutoFallbackPropertyStore<ZNRecord> store =
+        new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot);
+
+    String path = String.format("/%d", 0);
+    Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0),
+        "Should not exist under new location");
+    Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0),
+        "Should exist under fallback location");
+
+    boolean succeed = store.update(path, new MyDataUpdater("new0"), AccessOption.PERSISTENT);
+    Assert.assertTrue(succeed);
+
+    // fallback path should remain unchanged
+    ZNRecord record = baseAccessor.get(String.format("%s%s", fallbackRoot, path), null, 0);
+    Assert.assertNotNull(record);
+    Assert.assertEquals(record.getId(), "0");
+    Assert.assertNull(record.getSimpleField("key"));
+
+    // new path should have simple field set
+    record = baseAccessor.get(String.format("%s%s", root, path), null, 0);
+    Assert.assertNotNull(record);
+    Assert.assertEquals(record.getId(), "0");
+    Assert.assertNotNull(record.getSimpleField("key"));
+    Assert.assertEquals(record.getSimpleField("key"), "value");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testSingleUpdateUsingNewPath() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name());
+    String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE");
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    // create 0 under both fallbackRoot and root
+    for (int i = 0; i < 1; i++) {
+      String path = String.format("%s/%d", root, i);
+      baseAccessor.create(path, new ZNRecord("new" + i), AccessOption.PERSISTENT);
+
+      path = String.format("%s/%d", fallbackRoot, i);
+      baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT);
+    }
+
+    AutoFallbackPropertyStore<ZNRecord> store =
+        new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot);
+
+    String path = String.format("/%d", 0);
+    Assert.assertTrue(baseAccessor.exists(String.format("%s%s", root, path), 0),
+        "Should exist under new location");
+    Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0),
+        "Should exist under fallback location");
+
+    boolean succeed = store.update(path, new MyDataUpdater("0"), AccessOption.PERSISTENT);
+    Assert.assertTrue(succeed);
+
+    ZNRecord record = baseAccessor.get(String.format("%s%s", fallbackRoot, path), null, 0);
+    Assert.assertNotNull(record);
+    Assert.assertEquals(record.getId(), "0");
+    Assert.assertNull(record.getSimpleField("key"));
+
+    record = baseAccessor.get(String.format("%s%s", root, path), null, 0);
+    Assert.assertNotNull(record);
+    Assert.assertEquals(record.getId(), "new0");
+    Assert.assertNotNull(record.getSimpleField("key"));
+    Assert.assertEquals(record.getSimpleField("key"), "value");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testMultiUpdateUsingFallbackPath() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name());
+    String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE");
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    // create 0-9 under fallbackRoot
+    for (int i = 0; i < 10; i++) {
+      String path = String.format("%s/%d", fallbackRoot, i);
+      baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT);
+    }
+
+    AutoFallbackPropertyStore<ZNRecord> store =
+        new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot);
+
+    List<String> paths = new ArrayList<String>();
+    List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
+    for (int i = 0; i < 10; i++) {
+      String path = String.format("/%d", i);
+      Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0),
+          "Should not exist under new location");
+      Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0),
+          "Should exist under fallback location");
+      paths.add(path);
+      updaters.add(new MyDataUpdater("new" + i));
+    }
+
+    boolean succeed[] = store.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(succeed[i]);
+      String path = paths.get(i);
+
+      // fallback path should remain unchanged
+      ZNRecord record = baseAccessor.get(String.format("%s%s", fallbackRoot, path), null,
0);
+      Assert.assertNotNull(record);
+      Assert.assertEquals(record.getId(), "" + i);
+      Assert.assertNull(record.getSimpleField("key"));
+
+      // new path should have simple field set
+      record = baseAccessor.get(String.format("%s%s", root, path), null, 0);
+      Assert.assertNotNull(record);
+      Assert.assertEquals(record.getId(), "" + i);
+      Assert.assertNotNull(record.getSimpleField("key"));
+      Assert.assertEquals(record.getSimpleField("key"), "value");
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testMultiUpdateUsingNewath() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name());
+    String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE");
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    // create 0-9 under both fallbackRoot and new root
+    for (int i = 0; i < 10; i++) {
+      String path = String.format("%s/%d", fallbackRoot, i);
+      baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT);
+
+      path = String.format("%s/%d", root, i);
+      baseAccessor.create(path, new ZNRecord("new" + i), AccessOption.PERSISTENT);
+    }
+
+    AutoFallbackPropertyStore<ZNRecord> store =
+        new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot);
+
+    List<String> paths = new ArrayList<String>();
+    List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>();
+    for (int i = 0; i < 20; i++) {
+      String path = String.format("/%d", i);
+      if (i < 10) {
+        Assert.assertTrue(baseAccessor.exists(String.format("%s%s", root, path), 0),
+            "Should exist under new location");
+        Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path),
0),
+            "Should exist under fallback location");
+      } else {
+        Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0),
+            "Should not exist under new location");
+        Assert.assertFalse(baseAccessor.exists(String.format("%s%s", fallbackRoot, path),
0),
+            "Should not exist under fallback location");
+      }
+      paths.add(path);
+      updaters.add(new MyDataUpdater("new" + i));
+    }
+
+    boolean succeed[] = store.updateChildren(paths, updaters, AccessOption.PERSISTENT);
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(succeed[i]);
+      String path = paths.get(i);
+
+      // fallback path should remain unchanged
+      if (i < 10) {
+        ZNRecord record = baseAccessor.get(String.format("%s%s", fallbackRoot, path), null,
0);
+        Assert.assertNotNull(record);
+        Assert.assertEquals(record.getId(), "" + i);
+        Assert.assertNull(record.getSimpleField("key"));
+      } else {
+        Assert.assertFalse(baseAccessor.exists(String.format("%s%s", fallbackRoot, path),
0),
+            "Should not exist under fallback location");
+      }
+
+      // new path should have simple field set
+      ZNRecord record = baseAccessor.get(String.format("%s%s", root, path), null, 0);
+      Assert.assertNotNull(record);
+      Assert.assertEquals(record.getId(), "new" + i);
+      if (i < 10) {
+        Assert.assertNotNull(record.getSimpleField("key"));
+        Assert.assertEquals(record.getSimpleField("key"), "value");
+      } else {
+        Assert.assertNull(record.getSimpleField("key"));
+      }
+
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testSingleSet() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name());
+    String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE");
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    // create 0 under fallbackRoot
+    for (int i = 0; i < 1; i++) {
+      String path = String.format("%s/%d", fallbackRoot, i);
+      baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT);
+    }
+
+    AutoFallbackPropertyStore<ZNRecord> store =
+        new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot);
+
+    String path = String.format("/%d", 0);
+    Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0),
+        "Should not exist under new location");
+    Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0),
+        "Should exist under fallback location");
+    ZNRecord record = new ZNRecord("new0");
+    boolean succeed = store.set(path, record, AccessOption.PERSISTENT);
+    Assert.assertTrue(succeed);
+
+    record = baseAccessor.get(String.format("%s%s", fallbackRoot, path), null, 0);
+    Assert.assertNotNull(record);
+    Assert.assertEquals(record.getId(), "0");
+
+    record = baseAccessor.get(String.format("%s%s", root, path), null, 0);
+    Assert.assertNotNull(record);
+    Assert.assertEquals(record.getId(), "new0");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testMultiSet() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name());
+    String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE");
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    // create 0-9 under fallbackRoot
+    for (int i = 0; i < 10; i++) {
+      String path = String.format("%s/%d", fallbackRoot, i);
+      baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT);
+    }
+
+    AutoFallbackPropertyStore<ZNRecord> store =
+        new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot);
+
+    List<String> paths = new ArrayList<String>();
+    List<ZNRecord> records = new ArrayList<ZNRecord>();
+    for (int i = 0; i < 10; i++) {
+      String path = String.format("/%d", i);
+      Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0),
+          "Should not exist under new location");
+      Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0),
+          "Should exist under fallback location");
+      paths.add(path);
+      ZNRecord record = new ZNRecord("new" + i);
+      records.add(record);
+    }
+
+    boolean succeed[] = store.setChildren(paths, records, AccessOption.PERSISTENT);
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(succeed[i]);
+      String path = String.format("/%d", i);
+      ZNRecord record = baseAccessor.get(String.format("%s%s", fallbackRoot, path), null,
0);
+      Assert.assertNotNull(record);
+      Assert.assertEquals(record.getId(), Integer.toString(i));
+
+      record = baseAccessor.get(String.format("%s%s", root, path), null, 0);
+      Assert.assertNotNull(record);
+      Assert.assertEquals(record.getId(), "new" + i);
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testSingleGetOnFallbackPath() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name());
+    String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE");
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    // create 0 under fallbackRoot
+    for (int i = 0; i < 1; i++) {
+      String path = String.format("%s/%d", fallbackRoot, i);
+      baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT);
+    }
+
+    AutoFallbackPropertyStore<ZNRecord> store =
+        new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot);
+
+    String path = String.format("/%d", 0);
+    Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0),
+        "Should not exist under new location");
+    Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0),
+        "Should exist under fallback location");
+
+    // test single exist
+    boolean exist = store.exists(path, 0);
+    Assert.assertTrue(exist);
+
+    // test single getStat
+    Stat stat = store.getStat(path, 0);
+    Assert.assertNotNull(stat);
+
+    // test single get
+    ZNRecord record = store.get(path, null, 0);
+    Assert.assertNotNull(record);
+    Assert.assertEquals(record.getId(), "0");
+    Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0),
+        "Should not exist under new location after get");
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+  }
+
+  @Test
+  void testMultiGetOnFallbackPath() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name());
+    String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE");
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    // create 0-9 under fallbackRoot
+    for (int i = 0; i < 10; i++) {
+      String path = String.format("%s/%d", fallbackRoot, i);
+      baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT);
+    }
+
+    AutoFallbackPropertyStore<ZNRecord> store =
+        new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot);
+
+    List<String> paths = new ArrayList<String>();
+    for (int i = 0; i < 10; i++) {
+      String path = String.format("/%d", i);
+      Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0),
+          "Should not exist under new location");
+      Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0),
+          "Should exist under fallback location");
+      paths.add(path);
+    }
+
+    // test multi-exist
+    boolean exists[] = store.exists(paths, 0);
+    for (int i = 0; i < paths.size(); i++) {
+      Assert.assertTrue(exists[i]);
+    }
+
+    // test multi-getStat
+    Stat stats[] = store.getStats(paths, 0);
+    for (int i = 0; i < paths.size(); i++) {
+      Assert.assertNotNull(stats[i]);
+    }
+
+    // test multi-get
+    List<ZNRecord> records = store.get(paths, null, 0);
+    Assert.assertNotNull(records);
+    Assert.assertEquals(records.size(), 10);
+    for (int i = 0; i < 10; i++) {
+      ZNRecord record = records.get(i);
+      String path = paths.get(i);
+      Assert.assertNotNull(record);
+      Assert.assertEquals(record.getId(), Integer.toString(i));
+      Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0),
+          "Should not exist under new location after get");
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testFailOnSingleGet() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name());
+    String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE");
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    AutoFallbackPropertyStore<ZNRecord> store =
+        new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot);
+
+    String path = String.format("/%d", 0);
+    Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0),
+        "Should not exist under new location");
+    Assert.assertFalse(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0),
+        "Should not exist under fallback location");
+
+    // test single exist
+    boolean exist = store.exists(path, 0);
+    Assert.assertFalse(exist);
+
+    // test single getStat
+    Stat stat = store.getStat(path, 0);
+    Assert.assertNull(stat);
+
+    // test single get
+    ZNRecord record = store.get(path, null, 0);
+    Assert.assertNull(record);
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testFailOnMultiGet() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name());
+    String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE");
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    // create 0-9 under fallbackRoot
+    for (int i = 0; i < 10; i++) {
+      String path = String.format("%s/%d", fallbackRoot, i);
+      baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT);
+    }
+
+    AutoFallbackPropertyStore<ZNRecord> store =
+        new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot);
+
+    List<String> paths = new ArrayList<String>();
+    for (int i = 0; i < 20; i++) {
+      String path = String.format("/%d", i);
+      Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0),
+          "Should not exist under new location");
+      if (i < 10) {
+        Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path),
0),
+            "Should exist under fallback location");
+      } else {
+        Assert.assertFalse(baseAccessor.exists(String.format("%s%s", fallbackRoot, path),
0),
+            "Should not exist under fallback location");
+      }
+      paths.add(path);
+    }
+
+    // test multi-exist
+    boolean exists[] = store.exists(paths, 0);
+    for (int i = 0; i < paths.size(); i++) {
+      if (i < 10) {
+        Assert.assertTrue(exists[i]);
+      } else {
+        Assert.assertFalse(exists[i]);
+      }
+    }
+
+    // test multi-getStat
+    Stat stats[] = store.getStats(paths, 0);
+    for (int i = 0; i < paths.size(); i++) {
+      if (i < 10) {
+        Assert.assertNotNull(stats[i]);
+      } else {
+        Assert.assertNull(stats[i]);
+      }
+    }
+
+    // test multi-get
+    List<ZNRecord> records = store.get(paths, null, 0);
+    Assert.assertNotNull(records);
+    Assert.assertEquals(records.size(), 20);
+    for (int i = 0; i < 20; i++) {
+      ZNRecord record = records.get(i);
+      String path = paths.get(i);
+      if (i < 10) {
+        Assert.assertNotNull(record);
+        Assert.assertEquals(record.getId(), Integer.toString(i));
+      } else {
+        Assert.assertNull(record);
+      }
+      Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0),
+          "Should not exist under new location after get");
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test
+  public void testGetChildren() {
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name());
+    String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE");
+    ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient);
+
+    // create 0-9 under fallbackRoot and 10-19 under root
+    for (int i = 0; i < 20; i++) {
+
+      if (i < 10) {
+        String path = String.format("%s/%d", fallbackRoot, i);
+        baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT);
+      } else {
+        String path = String.format("%s/%d", root, i);
+        baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT);
+      }
+    }
+
+    AutoFallbackPropertyStore<ZNRecord> store =
+        new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot);
+
+    List<String> paths = new ArrayList<String>();
+    for (int i = 0; i < 20; i++) {
+      String path = String.format("/%d", i);
+      if (i < 10) {
+        Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path),
0),
+            "Should exist under fallback location");
+        Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0),
+            "Should not exist under new location");
+
+      } else {
+        Assert.assertFalse(baseAccessor.exists(String.format("%s%s", fallbackRoot, path),
0),
+            "Should not exist under fallback location");
+        Assert.assertTrue(baseAccessor.exists(String.format("%s%s", root, path), 0),
+            "Should exist under new location");
+
+      }
+      paths.add(path);
+    }
+
+    List<String> childs = store.getChildNames("/", 0);
+    Assert.assertNotNull(childs);
+    Assert.assertEquals(childs.size(), 20);
+    for (int i = 0; i < 20; i++) {
+      String child = childs.get(i);
+      Assert.assertTrue(childs.contains(child));
+    }
+
+    List<ZNRecord> records = store.getChildren("/", null, 0);
+    Assert.assertNotNull(records);
+    Assert.assertEquals(records.size(), 20);
+    for (int i = 0; i < 20; i++) {
+      ZNRecord record = records.get(i);
+      Assert.assertNotNull(record);
+      String id = record.getId();
+      Assert.assertTrue(childs.contains(id));
+    }
+
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/35e3ca10/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java
b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java
new file mode 100644
index 0000000..549ce70
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java
@@ -0,0 +1,115 @@
+package org.apache.helix.store.zk;
+
+import java.util.Date;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestZkManagerWithAutoFallbackStore extends ZkUnitTestBase {
+  @Test
+  public void testBasic() throws Exception {
+    // Logger.getRootLogger().setLevel(Level.INFO);
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+    int n = 2;
+
+    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        32, // partitions per resource
+        n, // number of nodes
+        2, // replicas
+        "MasterSlave", false); // do rebalance
+
+    // start participants
+    MockParticipantManager[] participants = new MockParticipantManager[n];
+    for (int i = 0; i < 1; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].syncStart();
+    }
+
+    // add some data to fallback path: HELIX_PROPERTYSTORE
+    BaseDataAccessor<ZNRecord> accessor =
+        participants[0].getHelixDataAccessor().getBaseDataAccessor();
+    for (int i = 0; i < 10; i++) {
+      String path = String.format("/%s/HELIX_PROPERTYSTORE/%d", clusterName, i);
+      ZNRecord record = new ZNRecord("" + i);
+      record.setSimpleField("key1", "value1");
+      accessor.set(path, record, AccessOption.PERSISTENT);
+    }
+
+    ZkHelixPropertyStore<ZNRecord> store = participants[0].getHelixPropertyStore();
+
+    // read shall use fallback paths
+    for (int i = 0; i < 10; i++) {
+      String path = String.format("/%d", i);
+      ZNRecord record = store.get(path, null, 0);
+      Assert.assertNotNull(record);
+      Assert.assertEquals(record.getId(), "" + i);
+      Assert.assertNotNull(record.getSimpleField("key1"));
+      Assert.assertEquals(record.getSimpleField("key1"), "value1");
+    }
+
+    // update shall update new paths
+    for (int i = 0; i < 10; i++) {
+      String path = String.format("/%d", i);
+      store.update(path, new DataUpdater<ZNRecord>() {
+
+        @Override
+        public ZNRecord update(ZNRecord currentData) {
+          if (currentData != null) {
+            currentData.setSimpleField("key2", "value2");
+          }
+          return currentData;
+        }
+      }, AccessOption.PERSISTENT);
+    }
+
+    for (int i = 0; i < 10; i++) {
+      String path = String.format("/%d", i);
+      ZNRecord record = store.get(path, null, 0);
+      Assert.assertNotNull(record);
+      Assert.assertEquals(record.getId(), "" + i);
+      Assert.assertNotNull(record.getSimpleField("key1"));
+      Assert.assertEquals(record.getSimpleField("key1"), "value1");
+      Assert.assertNotNull(record.getSimpleField("key2"));
+      Assert.assertEquals(record.getSimpleField("key2"), "value2");
+    }
+
+    // set shall use new path
+    for (int i = 10; i < 20; i++) {
+      String path = String.format("/%d", i);
+      ZNRecord record = new ZNRecord("" + i);
+      record.setSimpleField("key3", "value3");
+      store.set(path, record, AccessOption.PERSISTENT);
+    }
+
+    for (int i = 10; i < 20; i++) {
+      String path = String.format("/%d", i);
+      ZNRecord record = store.get(path, null, 0);
+      Assert.assertNotNull(record);
+      Assert.assertEquals(record.getId(), "" + i);
+      Assert.assertNotNull(record.getSimpleField("key3"));
+      Assert.assertEquals(record.getSimpleField("key3"), "value3");
+    }
+
+    participants[0].syncStop();
+    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+}


Mime
View raw message