hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1200240 - in /incubator/hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
Date Thu, 10 Nov 2011 09:33:59 GMT
Author: tjungblut
Date: Thu Nov 10 09:33:59 2011
New Revision: 1200240

URL: http://svn.apache.org/viewvc?rev=1200240&view=rev
Log:
[HAMA-471] Peer names should be sorted by task id


Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1200240&r1=1200239&r2=1200240&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu Nov 10 09:33:59 2011
@@ -25,7 +25,8 @@ Release 0.4 - Unreleased
     HAMA-421: Maven build issues using proxy (Joe Crobak via edwardyoon)
 
   IMPROVEMENTS
-
+   
+    HAMA-471: Peer names should be sorted by task id (tjungblut)
     HAMA-461: Extract a message service from BSPPeer (tjungblut)
     HAMA-463: Integrate checkpoint with bsp task (chl501)
     HAMA-457: Refactoring of BSPPeerImpl (tjungblut)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1200240&r1=1200239&r2=1200240&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu Nov
10 09:33:59 2011
@@ -20,13 +20,9 @@ package org.apache.hama.bsp;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -80,6 +76,7 @@ public class LocalBSPRunner implements J
   protected FileSystem fs;
 
   private static long superStepCount = 0L;
+  private static String[] peerNames;
 
   // this is used for not-input driven job
   private int maxTasks;
@@ -146,7 +143,9 @@ public class LocalBSPRunner implements J
       }
     }
 
+    peerNames = new String[numBspTask];
     for (int i = 0; i < numBspTask; i++) {
+      peerNames[i] = "local:" + i;
       futureList.add(threadPool.submit(new BSPRunner(new Configuration(conf),
           job, i, splits)));
     }
@@ -221,7 +220,9 @@ public class LocalBSPRunner implements J
       this.id = id;
       this.splits = splits;
 
+      // set the peer port to the id, to prevent collision
       conf.setInt(Constants.PEER_PORT, id);
+      conf.set(Constants.PEER_HOST, "local");
 
       bsp = (BSP) ReflectionUtils.newInstance(
           job.getConf().getClass("bsp.work.class", BSP.class), job.getConf());
@@ -426,10 +427,6 @@ public class LocalBSPRunner implements J
   public static class LocalSyncClient implements SyncClient {
     // note that this is static, because we will have multiple peers
     private static CyclicBarrier barrier;
-    private static List<String> peers = Collections
-        .synchronizedList(new ArrayList<String>());
-    private String[] hosts;
-
     private int tasks;
 
     @Override
@@ -454,23 +451,17 @@ public class LocalBSPRunner implements J
     @Override
     public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId,
         long superstep) throws Exception {
-      // in our initial superstep we have to set the tasks to an array
-      if (superstep == -1l) {
-        hosts = peers.toArray(new String[peers.size()]);
-        Arrays.sort(hosts);
-      }
       barrier.await();
     }
 
     @Override
     public void register(BSPJobID jobId, TaskAttemptID taskId,
         String hostAddress, long port) {
-      peers.add(hostAddress + ":" + port);
     }
 
     @Override
     public String[] getAllPeerNames(TaskAttemptID taskId) {
-      return hosts;
+      return peerNames;
     }
 
     @Override

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java?rev=1200240&r1=1200239&r2=1200240&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncClient.java Thu Nov
10 09:33:59 2011
@@ -72,7 +72,7 @@ public interface SyncClient {
 
   /**
    * Returns all registered tasks within the sync daemon. They have to be
-   * ordered descending.
+   * ordered ascending by their task id.
    * 
    * @param taskId the tasks ID
    * @return an <b>ordered</b> string array of host:port pairs of all tasks

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1200240&r1=1200239&r2=1200240&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
Thu Nov 10 09:33:59 2011
@@ -17,10 +17,16 @@
  */
 package org.apache.hama.bsp.sync;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,15 +44,17 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
 /**
- * This client class abstracts the use of our zookeeper sync code. <br/>
- * <br/>
- * TODO maybe extract an abstract class and let the subclasses implement
- * enter-/leaveBarrier so we can have multiple implementations, just like
- * goldenorb.
+ * This client class abstracts the use of our zookeeper sync code.
  * 
  */
 public class ZooKeeperSyncClientImpl implements SyncClient, Watcher {
 
+  /*
+   * TODO maybe extract an abstract class and let the subclasses implement
+   * enter-/leaveBarrier so we can have multiple implementations, just like
+   * goldenorb.
+   */
+
   public static final Log LOG = LogFactory
       .getLog(ZooKeeperSyncClientImpl.class);
 
@@ -236,7 +244,7 @@ public class ZooKeeperSyncClientImpl imp
     } catch (InterruptedException e) {
       LOG.error(e);
     }
-    registerTask(zk, jobId, hostAddress, port);
+    registerTask(zk, jobId, hostAddress, port, taskId);
   }
 
   /**
@@ -248,12 +256,16 @@ public class ZooKeeperSyncClientImpl imp
    * @param taskId
    * @param hostAddress
    * @param port
+   * @param taskId
    */
   public static void registerTask(ZooKeeper zk, BSPJobID jobId,
-      String hostAddress, long port) {
+      String hostAddress, long port, TaskAttemptID taskId) {
+
+    byte[] taskIdBytes = serializeTaskId(taskId);
+
     try {
       zk.create("/" + jobId.toString() + "/" + hostAddress + ":" + port,
-          new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+          taskIdBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
     } catch (KeeperException e) {
       LOG.error(e);
     } catch (InterruptedException e) {
@@ -261,19 +273,71 @@ public class ZooKeeperSyncClientImpl imp
     }
   }
 
+  private static byte[] serializeTaskId(TaskAttemptID taskId) {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(bos);
+    try {
+      taskId.write(out);
+    } catch (IOException e) {
+      LOG.error(e);
+    } finally {
+      try {
+        out.close();
+      } catch (IOException e) {
+        LOG.error(e);
+      }
+    }
+    return bos.toByteArray();
+  }
+
+  public static TaskAttemptID deserializeTaskId(byte[] arr) {
+    ByteArrayInputStream bis = new ByteArrayInputStream(arr);
+    DataInputStream in = new DataInputStream(bis);
+    TaskAttemptID id = new TaskAttemptID();
+    try {
+      id.readFields(in);
+    } catch (IOException e) {
+      LOG.error(e);
+    } finally {
+      try {
+        in.close();
+      } catch (IOException e) {
+        LOG.error(e);
+      }
+    }
+    return id;
+  }
+
   @Override
   public String[] getAllPeerNames(TaskAttemptID taskId) {
     if (allPeers == null) {
+      TreeMap<Integer, String> sortedMap = new TreeMap<Integer, String>();
       try {
         allPeers = zk.getChildren("/" + taskId.getJobID().toString(), this)
             .toArray(new String[0]);
+
+        for (String s : allPeers) {
+          byte[] data = zk.getData(
+              "/" + taskId.getJobID().toString() + "/" + s, this, null);
+          TaskAttemptID thatTask = deserializeTaskId(data);
+          LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
+              + thatTask.getTaskID().getId() + " : " + s);
+          sortedMap.put(thatTask.getTaskID().getId(), s);
+        }
+
       } catch (Exception e) {
         LOG.error(e);
-        throw new NullPointerException("All peer names could not be retrieved!");
+        throw new RuntimeException("All peer names could not be retrieved!");
       }
-      // don't forget to sort the peers, since zookeeper does not care about
-      // ordering the children.
-      Arrays.sort(allPeers);
+
+      allPeers = new String[sortedMap.size()];
+      int count = 0;
+      for (Entry<Integer, String> entry : sortedMap.entrySet()) {
+        allPeers[count++] = entry.getValue();
+        LOG.debug("TASK mapping from zookeeper: " + entry.getKey() + " : "
+            + entry.getValue() + " at index " + (count-1));
+      }
+
     }
     return allPeers;
   }



Mime
View raw message