helix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kisho...@apache.org
Subject [2/2] [HELIX-13] Initial commit for near real time rsync replicated file system
Date Fri, 07 Dec 2012 09:22:43 GMT
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
new file mode 100644
index 0000000..1fec4e3
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/SetupCluster.java
@@ -0,0 +1,58 @@
+package org.apache.helix.filestore;
+
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.StateModelConfigGenerator;
+
+public class SetupCluster
+{
+  public static final String DEFAULT_CLUSTER_NAME = "file-store-test";
+  public static final String DEFAULT_RESOURCE_NAME = "repository";
+  public static final int DEFAULT_PARTITION_NUMBER = 1;
+  public static final String DEFAULT_STATE_MODEL = "MasterSlave";
+
+  public static void main(String[] args)
+  {
+    if (args.length < 1)
+    {
+      System.err.println("USAGE: java SetupConsumerCluster zookeeperAddress (e.g. localhost:2181)");
+      System.exit(1);
+    }
+
+    final String zkAddr = args[0];
+    final String clusterName = DEFAULT_CLUSTER_NAME;
+
+    ZkClient zkclient = null;
+    try
+    {
+      zkclient = new ZkClient(zkAddr, ZkClient.DEFAULT_SESSION_TIMEOUT,
+          ZkClient.DEFAULT_CONNECTION_TIMEOUT, new ZNRecordSerializer());
+      ZKHelixAdmin admin = new ZKHelixAdmin(zkclient);
+
+      // add cluster
+      admin.addCluster(clusterName, true);
+
+      // add state model definition
+      StateModelConfigGenerator generator = new StateModelConfigGenerator();
+      admin.addStateModelDef(clusterName, DEFAULT_STATE_MODEL,
+          new StateModelDefinition(generator.generateConfigForOnlineOffline()));
+
+      // add resource "topic" which has 6 partitions
+      String resourceName = DEFAULT_RESOURCE_NAME;
+      admin.addResource(clusterName, resourceName, DEFAULT_PARTITION_NUMBER, DEFAULT_STATE_MODEL,
+          IdealStateModeProperty.AUTO_REBALANCE.toString());
+      
+      admin.rebalance(clusterName, resourceName, 1);
+
+    } finally
+    {
+      if (zkclient != null)
+      {
+        zkclient.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/StartClusterManager.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/StartClusterManager.java
b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/StartClusterManager.java
new file mode 100644
index 0000000..0b14111
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/StartClusterManager.java
@@ -0,0 +1,42 @@
+package org.apache.helix.filestore;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.HelixControllerMain;
+
+public class StartClusterManager
+{
+  public static void main(String[] args)
+  {
+    if (args.length < 1)
+    {
+      System.err.println("USAGE: java StartClusterManager zookeeperAddress (e.g. localhost:2181)");
+      System.exit(1);
+    }
+    
+    final String clusterName = SetupCluster.DEFAULT_CLUSTER_NAME;
+    final String zkAddr = args[0];
+    
+    try
+    {
+      final HelixManager manager = HelixControllerMain.startHelixController(zkAddr, clusterName,
null,
+                                                        HelixControllerMain.STANDALONE);
+      
+      Runtime.getRuntime().addShutdownHook(new Thread()
+      {
+        @Override
+        public void run()
+        {
+          System.out.println("Shutting down cluster manager: " + manager.getInstanceName());
+          manager.disconnect();
+        }
+      });
+      
+      Thread.currentThread().join();
+    }
+    catch (Exception e)
+    {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342ea1c2/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/Test.java
----------------------------------------------------------------------
diff --git a/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/Test.java
b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/Test.java
new file mode 100644
index 0000000..e9e7bca
--- /dev/null
+++ b/recipes/rsync-replicated-file-system/src/main/java/org/apache/helix/filestore/Test.java
@@ -0,0 +1,143 @@
+package org.apache.helix.filestore;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class Test
+{
+  public static void main(String[] args) throws InterruptedException
+  {
+    while(true){
+    ExecutorService service = Executors.newFixedThreadPool(400);
+    List<Callable<Object>> list = new ArrayList<Callable<Object>>();
+    for (int i = 0; i < 256; i++)
+    {
+      MyRunnable runnable = new MyRunnable();
+      list.add(runnable);
+      new Thread(runnable).start();
+      // service.submit(runnable);
+    }
+    
+//    List<Future<Object>> invokeAll = service.invokeAll(list);
+//    service.shutdownNow();
+    System.out.println("RUN -------------");
+    Thread.sleep(5000);
+    }
+  }
+}
+
+class MyRunnable implements Callable<Object>,Runnable
+{
+
+  // AtomicReference<Object> ref = new AtomicReference<Object>(new Object());
+  class Ref
+  {
+    Object obj;
+
+    public Ref(Object obj)
+    {
+      this.obj = obj;
+    }
+
+    void set(Object obj)
+    {
+      this.obj = obj;
+    }
+
+    Object get()
+    {
+      return obj;
+    }
+  }
+
+  Ref ref = new Ref(new Object());
+  Object lock = new Object();
+
+  //@Override
+  public Object call1() throws Exception
+  {
+    long start = System.currentTimeMillis();
+    System.out.println(start);
+    synchronized (lock)
+    {
+
+      if (ref.get() != null)
+      {
+        try
+        {
+          int sum = 0;
+          List<Float> array = new ArrayList<Float>();
+          for (int i = 0; i < 5000; i++)
+          {
+            array.add((float) Math.random());
+          }
+          Collections.sort(array);
+        } catch (Exception e)
+        {
+          e.printStackTrace();
+        }
+        ref.set(null);
+      }
+    }
+    long end = System.currentTimeMillis();
+    System.out.println("time:" + (end - start));
+    return new Object();
+
+  }
+
+  @Override
+  public Object call() throws Exception
+  {
+    long start = System.currentTimeMillis();
+    System.out.println(start);
+    try
+    {
+//      int sum = 0;
+//      List<Float> array = new ArrayList<Float>();
+//      for (int i = 0; i < 5000; i++)
+//      {
+//        array.add((float) Math.random());
+//      }
+      //Collections.sort(array);
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+    }
+    ref.set(null);
+    long end = System.currentTimeMillis();
+    System.out.println("time:" + (end - start));
+    return new Object();
+
+  }
+
+  @Override
+  public void run()
+  {
+    long start = System.currentTimeMillis();
+    System.out.println(start);
+    try
+    {
+      int sum = 0;
+      List<Float> array = new ArrayList<Float>();
+      for (int i = 0; i < 5000; i++)
+      {
+        array.add((float) Math.random());
+      }
+      Collections.sort(array);
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+    }
+    ref.set(null);
+    long end = System.currentTimeMillis();
+    System.out.println("time:" + (end - start));    
+  }
+}
+


Mime
View raw message