hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1199343 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/util/ core/src/test/java/org/apache/hama/bsp/ examples/src/main/java/org/apac...
Date Tue, 08 Nov 2011 16:52:57 GMT
Author: tjungblut
Date: Tue Nov  8 16:52:57 2011
New Revision: 1199343

URL: http://svn.apache.org/viewvc?rev=1199343&view=rev
Log:
[HAMA-465] add combiners and IO to localrunner

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1199343&r1=1199342&r2=1199343&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Nov  8 16:52:57 2011
@@ -15,6 +15,7 @@ Release 0.4 - Unreleased
 
   BUG FIXES
 
+    HAMA-465: LocalJobRunner should support combiners and IO
     HAMA-459: GroomServerStatus.countTask() always returns 1 (edwardyoon)
     HAMA-432: Add statusUpdate() method to BSPPeerProtocol (edwardyoon)
     HAMA-437: PiEstimator is not working in Local Mode (Thomas Jungblut)
@@ -24,6 +25,7 @@ Release 0.4 - Unreleased
     HAMA-421: Maven build issues using proxy (Joe Crobak via edwardyoon)
 
   IMPROVEMENTS
+
     HAMA-461: Extract a message service from BSPPeer (tjungblut)
     HAMA-463: Integrate checkpoint with bsp task (chl501)
     HAMA-457: Refactoring of BSPPeerImpl (tjungblut)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1199343&r1=1199342&r2=1199343&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Nov
 8 16:52:57 2011
@@ -17,20 +17,24 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.DataInputStream;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.Map;
+import java.util.List;
 import java.util.Map.Entry;
-import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.logging.Log;
@@ -38,11 +42,19 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJobClient.RawSplit;
 import org.apache.hama.bsp.BSPMaster.State;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.MessageManagerFactory;
+import org.apache.hama.bsp.sync.SyncClient;
+import org.apache.hama.bsp.sync.SyncServiceFactory;
+import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.ipc.JobSubmissionProtocol;
-import org.apache.hama.util.KeyValuePair;
+import org.apache.hama.util.BSPNetUtils;
 
 /**
  * A multithreaded local BSP runner that can be used for debugging and local
@@ -53,13 +65,12 @@ public class LocalBSPRunner implements J
 
   private static final String IDENTIFIER = "localrunner";
   private static String WORKING_DIR = "/user/hama/bsp/";
-  protected static volatile ThreadPoolExecutor threadPool;
-  protected static int threadPoolSize;
+  protected static volatile ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors
+      .newCachedThreadPool();
+
+  @SuppressWarnings("rawtypes")
   protected static final LinkedList<Future<BSP>> futureList = new LinkedList<Future<BSP>>();
-  protected static CyclicBarrier barrier;
 
-  protected HashMap<String, LocalGroom> localGrooms = new HashMap<String, LocalGroom>();
-  protected String[] allPeers;
   protected String jobFile;
   protected String jobName;
 
@@ -68,27 +79,21 @@ public class LocalBSPRunner implements J
   protected Configuration conf;
   protected FileSystem fs;
 
+  private static long superStepCount = 0L;
+
+  // this is used for not-input driven job
+  private int maxTasks;
+
   public LocalBSPRunner(Configuration conf) throws IOException {
     super();
     this.conf = conf;
+
+    maxTasks = conf.getInt("bsp.local.tasks.maximum", 20);
+
     String path = conf.get("bsp.local.dir");
     if (path != null && !path.isEmpty()) {
       WORKING_DIR = path;
     }
-
-    threadPoolSize = conf.getInt("bsp.local.tasks.maximum", 20);
-    threadPool = (ThreadPoolExecutor) Executors
-        .newFixedThreadPool(threadPoolSize);
-    barrier = new CyclicBarrier(threadPoolSize);
-
-    for (int i = 0; i < threadPoolSize; i++) {
-      String name = IDENTIFIER + " " + i;
-      localGrooms.put(name, new LocalGroom(name));
-    }
-    
-    allPeers = localGrooms.keySet().toArray(
-        new String[localGrooms.keySet().size()]);
-
   }
 
   @Override
@@ -106,8 +111,9 @@ public class LocalBSPRunner implements J
   public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException {
     this.jobFile = jobFile;
 
-    if (fs == null)
+    if (fs == null) {
       this.fs = FileSystem.get(conf);
+    }
 
     // add the resource to the current configuration, because add resouce in
     // HamaConfigurations constructor (ID,FILE) does not take local->HDFS
@@ -115,31 +121,43 @@ public class LocalBSPRunner implements J
     // configuration, which yields into failure.
     conf.addResource(fs.open(new Path(jobFile)));
 
+    conf.setClass(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
+        LocalMessageManager.class, MessageManager.class);
+    conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS, LocalSyncClient.class,
+        SyncClient.class);
+
     BSPJob job = new BSPJob(new HamaConfiguration(conf), jobID);
-    job.setNumBspTask(threadPoolSize);
+    currentJobStatus = new JobStatus(jobID, System.getProperty("user.name"),
+        0L, JobStatus.RUNNING);
+
+    int numBspTask = job.getNumBspTask();
+
+    String jobSplit = conf.get("bsp.job.split.file");
+
+    BSPJobClient.RawSplit[] splits = null;
+    if (jobSplit != null) {
 
-    this.jobName = job.getJobName();
-    currentJobStatus = new JobStatus(jobID, System.getProperty("user.name"), 0,
-        JobStatus.RUNNING);
-    for (int i = 0; i < threadPoolSize; i++) {
-      String name = IDENTIFIER + " " + i;
-      LocalGroom localGroom = new LocalGroom(name);
-      localGrooms.put(name, localGroom);
-      futureList.add(threadPool.submit(new BSPRunner(conf, job, ReflectionUtils
-          .newInstance(job.getBspClass(), conf), localGroom)));
+      DataInputStream splitFile = fs.open(new Path(jobSplit));
+
+      try {
+        splits = BSPJobClient.readSplitFile(splitFile);
+      } finally {
+        splitFile.close();
+      }
     }
+
+    for (int i = 0; i < numBspTask; i++) {
+      futureList.add(threadPool.submit(new BSPRunner(new Configuration(conf),
+          job, i, splits)));
+    }
+
     new Thread(new ThreadObserver(currentJobStatus)).start();
     return currentJobStatus;
   }
 
   @Override
   public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
-    Map<String, GroomServerStatus> map = new HashMap<String, GroomServerStatus>();
-    for (Entry<String, LocalGroom> entry : localGrooms.entrySet()) {
-      map.put(entry.getKey(), new GroomServerStatus(entry.getKey(),
-          new ArrayList<TaskStatus>(0), 0, 0, "", entry.getKey()));
-    }
-    return new ClusterStatus(map, threadPoolSize, threadPoolSize, State.RUNNING);
+    return new ClusterStatus(maxTasks, maxTasks, maxTasks, State.RUNNING);
   }
 
   @Override
@@ -150,10 +168,8 @@ public class LocalBSPRunner implements J
 
   @Override
   public JobStatus getJobStatus(BSPJobID jobid) throws IOException {
-    if (currentJobStatus == null) {
-      currentJobStatus = new JobStatus(jobid, System.getProperty("user.name"),
-          0L, JobStatus.RUNNING);
-    }
+    currentJobStatus.setSuperstepCount(superStepCount);
+    currentJobStatus.setprogress(superStepCount);
     return currentJobStatus;
   }
 
@@ -189,33 +205,54 @@ public class LocalBSPRunner implements J
   }
 
   // this class will spawn a new thread and executes the BSP
+  @SuppressWarnings({ "deprecation", "rawtypes" })
   class BSPRunner implements Callable<BSP> {
 
-    Configuration conf;
-    BSPJob job;
-    BSP bsp;
-    LocalGroom groom;
+    private Configuration conf;
+    private BSPJob job;
+    private int id;
+    private BSP bsp;
+    private RawSplit[] splits;
 
-    public BSPRunner(Configuration conf, BSPJob job, BSP bsp, LocalGroom groom) {
+    public BSPRunner(Configuration conf, BSPJob job, int id, RawSplit[] splits) {
       super();
       this.conf = conf;
       this.job = job;
-      this.bsp = bsp;
-      this.groom = groom;
+      this.id = id;
+      this.splits = splits;
+
+      conf.setInt(Constants.PEER_PORT, id);
+
+      bsp = (BSP) ReflectionUtils.newInstance(
+          job.getConf().getClass("bsp.work.class", BSP.class), job.getConf());
+
     }
 
     // deprecated until 0.5.0, then it will be removed.
-    @SuppressWarnings("deprecation")
-    public void run() {
+    @SuppressWarnings("unchecked")
+    public void run() throws Exception {
+
+      String splitname = null;
+      BytesWritable realBytes = null;
+      if (splits != null) {
+        splits[id].getClassName();
+        realBytes = splits[id].getBytes();
+      }
+
+      BSPPeerImpl peer = new BSPPeerImpl(job, conf, new TaskAttemptID(
+          new TaskID(job.getJobID(), id), id), new LocalUmbilical(), id,
+          splitname, realBytes);
+
       bsp.setConf(conf);
       try {
-        bsp.setup(groom);
-        // TODO 
-//        bsp.bsp(groom, null, null);
+        bsp.setup(peer);
+        bsp.bsp(peer);
       } catch (Exception e) {
         LOG.error("Exception during BSP execution!", e);
       }
-      bsp.cleanup(groom);
+      bsp.cleanup(peer);
+      peer.clear();
+      peer.close();
     }
 
     @Override
@@ -237,7 +274,8 @@ public class LocalBSPRunner implements J
     @Override
     public void run() {
       boolean success = true;
-      for (Future<BSP> future : futureList) {
+      for (@SuppressWarnings("rawtypes")
+      Future<BSP> future : futureList) {
         try {
           future.get();
         } catch (InterruptedException e) {
@@ -260,132 +298,197 @@ public class LocalBSPRunner implements J
 
   }
 
-  final class LocalGroom implements BSPPeer {
-    private long superStepCount = 0;
-    private final ConcurrentLinkedQueue<BSPMessage> localMessageQueue = new ConcurrentLinkedQueue<BSPMessage>();
-    // outgoing queue
-    private final Map<String, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues
= new ConcurrentHashMap<String, ConcurrentLinkedQueue<BSPMessage>>();
-    private final String peerName;
+  public static class LocalMessageManager implements MessageManager {
+
+    private static final ConcurrentHashMap<InetSocketAddress, LocalMessageManager>
managerMap = new ConcurrentHashMap<InetSocketAddress, LocalBSPRunner.LocalMessageManager>();
 
-    public LocalGroom(String peerName) throws IOException {
-      this.peerName = peerName;
+    private final HashMap<InetSocketAddress, LinkedList<BSPMessage>> localOutgoingMessages
= new HashMap<InetSocketAddress, LinkedList<BSPMessage>>();
+    private static final ConcurrentHashMap<String, InetSocketAddress> socketCache =
new ConcurrentHashMap<String, InetSocketAddress>();
+    private final LinkedBlockingDeque<BSPMessage> localIncomingMessages = new LinkedBlockingDeque<BSPMessage>();
+
+    @Override
+    public void init(Configuration conf, InetSocketAddress peerAddress) {
+      managerMap.put(peerAddress, this);
     }
 
     @Override
-    public void send(String peerName, BSPMessage msg) throws IOException {
-      if (this.peerName.equals(peerName)) {
-//        put(msg);
+    public void close() {
+
+    }
+
+    @Override
+    public BSPMessage getCurrentMessage() throws IOException {
+      if (localIncomingMessages.isEmpty()) {
+        return null;
       } else {
-        // put this into a outgoing queue
-        if (outgoingQueues.get(peerName) == null) {
-          outgoingQueues.put(peerName, new ConcurrentLinkedQueue<BSPMessage>());
-        }
-        outgoingQueues.get(peerName).add(msg);
+        return localIncomingMessages.pop();
       }
     }
 
-//    @Override
-//    public void put(BSPMessage msg) throws IOException {
-//      localMessageQueue.add(msg);
-//    }
+    @Override
+    public void send(String peerName, BSPMessage msg) throws IOException {
+      LinkedList<BSPMessage> msgs = localOutgoingMessages.get(peerName);
+      if (msgs == null) {
+        msgs = new LinkedList<BSPMessage>();
+      }
+      msgs.add(msg);
+
+      InetSocketAddress inetSocketAddress = socketCache.get(peerName);
+      if (inetSocketAddress == null) {
+        inetSocketAddress = BSPNetUtils.getAddress(peerName);
+        socketCache.put(peerName, inetSocketAddress);
+      }
+
+      localOutgoingMessages.put(inetSocketAddress, msgs);
+    }
 
     @Override
-    public BSPMessage getCurrentMessage() throws IOException {
-      return localMessageQueue.poll();
+    public Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> getMessageIterator()
{
+      return localOutgoingMessages.entrySet().iterator();
+    }
+
+    @Override
+    public void transfer(InetSocketAddress addr, BSPMessageBundle bundle)
+        throws IOException {
+      for (BSPMessage value : bundle.getMessages()) {
+        managerMap.get(addr).localIncomingMessages.add(value);
+      }
+    }
+
+    @Override
+    public void clearOutgoingQueues() {
+      localOutgoingMessages.clear();
     }
 
     @Override
     public int getNumCurrentMessages() {
-      return localMessageQueue.size();
+      return localIncomingMessages.size();
     }
 
+  }
+
+  public static class LocalUmbilical implements BSPPeerProtocol {
+
     @Override
-    public void sync() throws InterruptedException {
-      // wait until all threads reach this barrier
-      barrierSync();
-      // send the messages
-      for (Entry<String, ConcurrentLinkedQueue<BSPMessage>> entry : outgoingQueues
-          .entrySet()) {
-        String peerName = entry.getKey();
-//        for (BSPMessage msg : entry.getValue())
-//          try {
-//            localGrooms.get(peerName).put(msg);
-//          } catch (IOException e) {
-//            LOG.error("Putting message \"" + msg.toString() + "\" failed! ", e);
-//          }
-      }
-      // clear the local outgoing queue
-      outgoingQueues.clear();
-      // sync again to avoid data inconsistency
-      barrierSync();
-      incrementSuperSteps();
+    public long getProtocolVersion(String protocol, long clientVersion)
+        throws IOException {
+      return 0;
     }
 
-    private void barrierSync() throws InterruptedException {
-      try {
-        barrier.await();
-      } catch (BrokenBarrierException e) {
-        throw new InterruptedException("Barrier has been broken!" + e);
-      }
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    @Override
+    public Task getTask(TaskAttemptID taskid) throws IOException {
+      return null;
+    }
+
+    @Override
+    public boolean ping(TaskAttemptID taskid) throws IOException {
+      return false;
+    }
+
+    @Override
+    public void done(TaskAttemptID taskid, boolean shouldBePromoted)
+        throws IOException {
+
     }
 
-    private void incrementSuperSteps() {
-      currentJobStatus.setprogress(superStepCount++);
-      currentJobStatus.setSuperstepCount(currentJobStatus.progress());
+    @Override
+    public void fsError(TaskAttemptID taskId, String message)
+        throws IOException {
+
     }
 
     @Override
-    public long getSuperstepCount() {
-      return superStepCount;
+    public void fatalError(TaskAttemptID taskId, String message)
+        throws IOException {
+
     }
 
     @Override
-    public String getPeerName() {
-      return peerName;
+    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
+        throws IOException, InterruptedException {
+      // does not need to be synchronized, because it is just an information.
+      superStepCount = taskStatus.getSuperstepCount();
+      return true;
     }
 
     @Override
-    public String[] getAllPeerNames() {
-      return allPeers;
+    public int getAssignedPortNum(TaskAttemptID taskid) {
+      return 0;
     }
 
+  }
+
+  public static class LocalSyncClient implements SyncClient {
+    // note that this is static, because we will have multiple peers
+    private static CyclicBarrier barrier;
+    private static List<String> peers = Collections
+        .synchronizedList(new ArrayList<String>());
+    private String[] hosts;
+
+    private int tasks;
+
     @Override
-    public void clear() {
-      localMessageQueue.clear();
+    public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
+        throws Exception {
+      tasks = conf.getInt("bsp.peers.num", 1);
+
+      synchronized (LocalSyncClient.class) {
+        if (barrier == null) {
+          barrier = new CyclicBarrier(tasks);
+          LOG.info("Setting up a new barrier for " + tasks + " tasks!");
+        }
+      }
     }
 
     @Override
-    public Configuration getConfiguration() {
-      return conf;
+    public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId,
+        long superstep) throws Exception {
+      barrier.await();
     }
 
     @Override
-    public String getPeerName(int index) {
-      return allPeers[index];
+    public void leaveBarrier(BSPJobID jobId, TaskAttemptID taskId,
+        long superstep) throws Exception {
+      // in our initial superstep we have to set the tasks to an array
+      if (superstep == -1l) {
+        hosts = peers.toArray(new String[peers.size()]);
+        Arrays.sort(hosts);
+      }
+      barrier.await();
     }
 
     @Override
-    public int getNumPeers() {
-      return allPeers.length;
+    public void register(BSPJobID jobId, TaskAttemptID taskId,
+        String hostAddress, long port) {
+      peers.add(hostAddress + ":" + port);
     }
 
     @Override
-    public void write(Object key, Object value) throws IOException {
-      // TODO Auto-generated method stub
-      
+    public String[] getAllPeerNames(TaskAttemptID taskId) {
+      return hosts;
     }
 
     @Override
-    public boolean readNext(Object key, Object value) throws IOException {
-      // TODO Auto-generated method stub
-      return false;
+    public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
+        String hostAddress, long port) {
+
     }
 
     @Override
-    public KeyValuePair readNext() throws IOException {
-      // TODO Auto-generated method stub
-      return null;
+    public void stopServer() {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+
     }
 
   }
+
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1199343&r1=1199342&r2=1199343&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
(original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
Tue Nov  8 16:52:57 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hama.bsp.BSPMessage;
 import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.util.BSPNetUtils;
 
 /**
  * Implementation of the {@link HadoopMessageManager}.
@@ -94,7 +95,7 @@ public class HadoopMessageManagerImpl im
     if (peerSocketCache.containsKey(peerName)) {
       targetPeerAddress = peerSocketCache.get(peerName);
     } else {
-      targetPeerAddress = getAddress(peerName);
+      targetPeerAddress = BSPNetUtils.getAddress(peerName);
       peerSocketCache.put(peerName, targetPeerAddress);
     }
     LinkedList<BSPMessage> queue = outgoingQueues.get(targetPeerAddress);
@@ -105,17 +106,6 @@ public class HadoopMessageManagerImpl im
     outgoingQueues.put(targetPeerAddress, queue);
   }
 
-  private InetSocketAddress getAddress(String peerName) {
-    String[] peerAddrParts = peerName.split(":");
-    if (peerAddrParts.length != 2) {
-      throw new ArrayIndexOutOfBoundsException(
-          "Peername must consist of exactly ONE \":\"! Given peername was: "
-              + peerName);
-    }
-    return new InetSocketAddress(peerAddrParts[0],
-        Integer.valueOf(peerAddrParts[1]));
-  }
-
   @Override
   public Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> getMessageIterator()
{
     return this.outgoingQueues.entrySet().iterator();

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java?rev=1199343&r1=1199342&r2=1199343&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java Tue Nov
 8 16:52:57 2011
@@ -18,6 +18,7 @@
 package org.apache.hama.util;
 
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 
 import org.apache.mina.util.AvailablePortFinder;
@@ -59,4 +60,22 @@ public class BSPNetUtils {
     return startPort;
   }
 
+  /**
+   * Gets a new InetSocketAddress from the given peerName. peerName must contain
+   * a colon to distinct between host and port.
+   * 
+   * @param peerName
+   * @return
+   */
+  public static InetSocketAddress getAddress(String peerName) {
+    String[] peerAddrParts = peerName.split(":");
+    if (peerAddrParts.length != 2) {
+      throw new ArrayIndexOutOfBoundsException(
+          "Peername must consist of exactly ONE \":\"! Given peername was: "
+              + peerName);
+    }
+    return new InetSocketAddress(peerAddrParts[0],
+        Integer.valueOf(peerAddrParts[1]));
+  }
+
 }

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1199343&r1=1199342&r2=1199343&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
(original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
Tue Nov  8 16:52:57 2011
@@ -34,7 +34,7 @@ import org.apache.hama.HamaConfiguration
 public class TestBSPMasterGroomServer extends HamaCluster {
 
   private static Log LOG = LogFactory.getLog(TestBSPMasterGroomServer.class);
-  private static String TMP_OUTPUT = "/tmp/test-example/";
+  static String TMP_OUTPUT = "/tmp/test-example/";
   private HamaConfiguration configuration;
   private String TEST_JOB = "src/test/java/testjar/testjob.jar";
 

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java?rev=1199343&r1=1199342&r2=1199343&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestLocalRunner.java Tue Nov
 8 16:52:57 2011
@@ -21,22 +21,34 @@ import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 
 public class TestLocalRunner extends TestCase {
 
-  public void testSubmitJob() throws Exception {
-    Configuration configuration = new Configuration();
-    configuration.set("bsp.local.dir", "/tmp/hama-test");
-    BSPJob bsp = new BSPJob(new HamaConfiguration(configuration));
-    bsp.setJobName("Test Serialize Printing");
-    bsp.setBspClass(testjar.ClassSerializePrinting.HelloBSP.class);
-
-    FileSystem fileSys = FileSystem.get(configuration);
-
-    // FIXME
-    // assertTrue(bsp.waitForCompletion(true));
-    //TestBSPMasterGroomServer.checkOutput(fileSys, configuration, 20);
+  public void testOutputJob() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("bsp.local.dir", "/tmp/hama-test");
+    BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
+    bsp.setJobName("Test Serialize Printing with Output");
+    bsp.setBspClass(IOSerializePrinting.class);
+
+    conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
+    bsp.setNumBspTask(2);
+    bsp.setInputFormat(NullInputFormat.class);
+    bsp.setOutputFormat(SequenceFileOutputFormat.class);
+    bsp.setOutputKeyClass(LongWritable.class);
+    bsp.setOutputValueClass(Text.class);
+    bsp.setOutputPath(new Path(TestBSPMasterGroomServer.TMP_OUTPUT));
+
+    FileSystem fileSys = FileSystem.get(conf);
+
+    if (bsp.waitForCompletion(true)) {
+      TestBSPMasterGroomServer.checkOutput(fileSys, conf, 2);
+    }
   }
 
 }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java?rev=1199343&r1=1199342&r2=1199343&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java
(original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SerializePrinting.java
Tue Nov  8 16:52:57 2011
@@ -54,7 +54,7 @@ public class SerializePrinting {
         BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable> peer)
         throws IOException, KeeperException, InterruptedException {
 
-      LOG.info(peer.getAllPeerNames());
+      LOG.debug(peer.getAllPeerNames());
       int i = 0;
       for (String otherPeer : peer.getAllPeerNames()) {
         String peerName = peer.getPeerName();



Mime
View raw message