hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1198674 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/test/java/org/apache/hama/bsp/message/
Date Mon, 07 Nov 2011 09:39:31 GMT
Author: tjungblut
Date: Mon Nov  7 09:39:30 2011
New Revision: 1198674

URL: http://svn.apache.org/viewvc?rev=1198674&view=rev
Log:
[HAMA-461] extract message service

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
  (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
  (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
  (with props)
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
  (with props)
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
  (with props)
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1198674&r1=1198673&r2=1198674&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Nov  7 09:39:30 2011
@@ -24,7 +24,7 @@ Release 0.4 - Unreleased
     HAMA-421: Maven build issues using proxy (Joe Crobak via edwardyoon)
 
   IMPROVEMENTS
-  
+    HAMA-461: Extract a message service from BSPPeer (tjungblut)
     HAMA-463: Integrate checkpoint with bsp task (chl501)
     HAMA-457: Refactoring of BSPPeerImpl (tjungblut)
     HAMA-448: Restructure BSP API (Thomas Jungblut via edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1198674&r1=1198673&r2=1198674&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Mon Nov  7 09:39:30
2011
@@ -21,14 +21,12 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.Constants;
-import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.util.KeyValuePair;
 
 /**
  * BSP communication interface.
  */
-public interface BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
-    HamaRPCProtocolVersion, Constants {
+public interface BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Constants {
 
   /**
    * Send a data with a tag to another BSPSlave corresponding to hostname.
@@ -42,22 +40,6 @@ public interface BSPPeer<KEYIN, VALUEIN,
   public void send(String peerName, BSPMessage msg) throws IOException;
 
   /**
-   * Puts a message to local queue.
-   * 
-   * @param msg
-   * @throws IOException
-   */
-  public void put(BSPMessage msg) throws IOException;
-
-  /**
-   * Puts a bundle of messages to local queue.
-   * 
-   * @param messages
-   * @throws IOException
-   */
-  public void put(BSPMessageBundle messages) throws IOException;
-
-  /**
    * @return A message from the peer's received messages queue (a FIFO).
    * @throws IOException
    */

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1198674&r1=1198673&r2=1198674&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Mon Nov 
7 09:39:30 2011
@@ -20,10 +20,8 @@ package org.apache.hama.bsp;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
-import java.util.Map;
+import java.util.LinkedList;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,10 +31,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.MessageManagerFactory;
 import org.apache.hama.bsp.sync.SyncClient;
 import org.apache.hama.bsp.sync.SyncServiceFactory;
 import org.apache.hama.ipc.BSPPeerProtocol;
@@ -48,21 +46,12 @@ import org.apache.hama.util.KeyValuePair
 public class BSPPeerImpl<KEYIN, VALUEIN, KEYOUT, VALUEOUT> implements
     BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
 
-  public static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
+  private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   private final Configuration conf;
   private final FileSystem fs;
   private BSPJob bspJob;
 
-  private volatile Server server = null;
-
-  private final Map<InetSocketAddress, BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>
peers = new ConcurrentHashMap<InetSocketAddress, BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>>();
-  private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues
= new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
-  private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
-  private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
-  private final Map<String, InetSocketAddress> peerSocketCache = new ConcurrentHashMap<String,
InetSocketAddress>();
-
-  private InetSocketAddress peerAddress;
   private TaskStatus currentTaskStatus;
 
   private TaskAttemptID taskId;
@@ -72,6 +61,7 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
 
   // SYNC
   private SyncClient syncClient;
+  private MessageManager messenger;
 
   // IO
   private int partition;
@@ -81,6 +71,8 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   private RecordReader<KEYIN, VALUEIN> in;
   private RecordWriter<KEYOUT, VALUEOUT> outWriter;
 
+  private InetSocketAddress peerAddress;
+
   /**
    * Protected default constructor for LocalBSPRunner.
    */
@@ -139,22 +131,14 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     setCurrentTaskStatus(new TaskStatus(taskId.getJobID(), taskId, 0,
         TaskStatus.State.RUNNING, "running", peerAddress.getHostName(),
         TaskStatus.Phase.STARTING));
+
+    messenger = MessageManagerFactory.getMessageManager(conf);
+    messenger.init(conf, peerAddress);
+    
   }
 
   @SuppressWarnings("unchecked")
   public void initialize() throws Exception {
-    try {
-      if (LOG.isDebugEnabled())
-        LOG.debug("reinitialize(): " + getPeerName());
-      this.server = RPC.getServer(this, peerAddress.getHostName(),
-          peerAddress.getPort(), conf);
-      server.start();
-      LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
-          + peerAddress.getPort());
-    } catch (IOException e) {
-      LOG.error("Fail to start RPC server!", e);
-    }
-
     syncClient = SyncServiceFactory.getSyncClient(conf);
     syncClient.init(conf, taskId.getJobID(), taskId);
 
@@ -198,32 +182,12 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
 
   @Override
   public BSPMessage getCurrentMessage() throws IOException {
-    return localQueue.poll();
+    return messenger.getCurrentMessage();
   }
 
-  /*
-   * (non-Javadoc)
-   * @see org.apache.hama.bsp.BSPPeerInterface#send(java.net.InetSocketAddress,
-   * org.apache.hadoop.io.Writable, org.apache.hadoop.io.Writable)
-   */
   @Override
   public void send(String peerName, BSPMessage msg) throws IOException {
-    LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
-    InetSocketAddress targetPeerAddress = null;
-    // Get socket for target peer.
-    if (peerSocketCache.containsKey(peerName)) {
-      targetPeerAddress = peerSocketCache.get(peerName);
-    } else {
-      targetPeerAddress = getAddress(peerName);
-      peerSocketCache.put(peerName, targetPeerAddress);
-    }
-    ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues
-        .get(targetPeerAddress);
-    if (queue == null) {
-      queue = new ConcurrentLinkedQueue<BSPMessage>();
-    }
-    queue.add(msg);
-    outgoingQueues.put(targetPeerAddress, queue);
+    messenger.send(peerName, msg);
   }
 
   private String checkpointedPath() {
@@ -260,52 +224,33 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   public void sync() throws InterruptedException {
     try {
       enterBarrier();
-      Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>>
it = this.outgoingQueues
-          .entrySet().iterator();
+      Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> it = messenger
+          .getMessageIterator();
 
       while (it.hasNext()) {
-        Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
-            .next();
+        Entry<InetSocketAddress, LinkedList<BSPMessage>> entry = it.next();
+        final InetSocketAddress addr = entry.getKey();
+        final Iterable<BSPMessage> messages = entry.getValue();
 
-        BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> peer = getBSPPeerConnection(entry
-            .getKey());
-        Iterable<BSPMessage> messages = entry.getValue();
-        BSPMessageBundle bundle = new BSPMessageBundle();
-
-        if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(
-            Combiner.class)) {
-          Combiner combiner = (Combiner) ReflectionUtils.newInstance(
-              conf.getClass("bsp.combiner.class", Combiner.class), conf);
-
-          bundle = combiner.combine(messages);
-        } else {
-          for (BSPMessage message : messages) {
-            bundle.addMessage(message);
-          }
-        }
+        final BSPMessageBundle bundle = combineMessages(messages);
 
         if (conf.getBoolean("bsp.checkpoint.enabled", false)) {
           checkpoint(checkpointedPath(), bundle);
         }
 
-        peer.put(bundle);
+        // remove this message during runtime to save a bit of memory
+        it.remove();
+
+        messenger.transfer(addr, bundle);
       }
 
       leaveBarrier();
       currentTaskStatus.incrementSuperstepCount();
       umbilical.statusUpdate(taskId, currentTaskStatus);
-      
+
       // Clear outgoing queues.
-      clearOutgoingQueues();
+      messenger.clearOutgoingQueues();
 
-      // Add non-processed messages from this iteration for the next's queue.
-      while (!localQueue.isEmpty()) {
-        BSPMessage message = localQueue.poll();
-        localQueueForNextIteration.add(message);
-      }
-      // Switch local queues.
-      localQueue = localQueueForNextIteration;
-      localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
     } catch (Exception e) {
       LOG.fatal(
           "Caught exception during superstep "
@@ -313,6 +258,22 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     }
   }
 
+  private BSPMessageBundle combineMessages(Iterable<BSPMessage> messages) {
+    if (!conf.getClass("bsp.combiner.class", Combiner.class).equals(
+        Combiner.class)) {
+      Combiner combiner = (Combiner) ReflectionUtils.newInstance(
+          conf.getClass("bsp.combiner.class", Combiner.class), conf);
+
+      return combiner.combine(messages);
+    } else {
+      BSPMessageBundle bundle = new BSPMessageBundle();
+      for (BSPMessage message : messages) {
+        bundle.addMessage(message);
+      }
+      return bundle;
+    }
+  }
+
   protected void enterBarrier() throws Exception {
     syncClient.enterBarrier(taskId.getJobID(), taskId,
         currentTaskStatus.getSuperstepCount());
@@ -323,11 +284,6 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
         currentTaskStatus.getSuperstepCount());
   }
 
-  public void clear() {
-    this.localQueue.clear();
-    this.outgoingQueues.clear();
-  }
-
   public void close() throws Exception {
     if (in != null) {
       in.close();
@@ -337,41 +293,14 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     }
     this.clear();
     syncClient.close();
-    if (server != null) {
-      server.stop();
-    }
-  }
 
-  @Override
-  public void put(BSPMessage msg) throws IOException {
-    this.localQueueForNextIteration.add(msg);
-  }
+    messenger.close();
 
-  @Override
-  public void put(BSPMessageBundle messages) throws IOException {
-    for (BSPMessage message : messages.getMessages()) {
-      this.localQueueForNextIteration.add(message);
-    }
   }
 
   @Override
-  public long getProtocolVersion(String arg0, long arg1) throws IOException {
-    return BSPPeer.versionID;
-  }
-
-  @SuppressWarnings("unchecked")
-  protected BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> getBSPPeerConnection(
-      InetSocketAddress addr) throws NullPointerException, IOException {
-    BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> peer;
-    peer = peers.get(addr);
-    if (peer == null) {
-      synchronized (this.peers) {
-        peer = (BSPPeer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>) RPC.getProxy(
-            BSPPeer.class, BSPPeer.versionID, addr, this.conf);
-        this.peers.put(addr, peer);
-      }
-    }
-    return peer;
+  public void clear() {
+    messenger.clearOutgoingQueues();
   }
 
   /**
@@ -381,17 +310,6 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     return peerAddress.getHostName() + ":" + peerAddress.getPort();
   }
 
-  private InetSocketAddress getAddress(String peerName) {
-    String[] peerAddrParts = peerName.split(":");
-    if (peerAddrParts.length != 2) {
-      throw new ArrayIndexOutOfBoundsException(
-          "Peername must consist of exactly ONE \":\"! Given peername was: "
-              + peerName);
-    }
-    return new InetSocketAddress(peerAddrParts[0],
-        Integer.valueOf(peerAddrParts[1]));
-  }
-
   @Override
   public String[] getAllPeerNames() {
     initPeerNames();
@@ -419,8 +337,9 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   /**
    * @return the number of messages
    */
+  @Override
   public int getNumCurrentMessages() {
-    return localQueue.size();
+    return messenger.getNumCurrentMessages();
   }
 
   /**
@@ -440,20 +359,6 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
   }
 
   /**
-   * @return the size of local queue
-   */
-  public int getLocalQueueSize() {
-    return localQueue.size();
-  }
-
-  /**
-   * @return the size of outgoing queue
-   */
-  public int getOutgoingQueueSize() {
-    return outgoingQueues.size();
-  }
-
-  /**
    * Gets the job configuration.
    * 
    * @return the conf
@@ -462,20 +367,6 @@ public class BSPPeerImpl<KEYIN, VALUEIN,
     return conf;
   }
 
-  /**
-   * Clears local queue
-   */
-  public void clearLocalQueue() {
-    this.localQueue.clear();
-  }
-
-  /**
-   * Clears outgoing queues
-   */
-  public void clearOutgoingQueues() {
-    this.outgoingQueues.clear();
-  }
-
   /*
    * IO STUFF
    */

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1198674&r1=1198673&r2=1198674&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon Nov
 7 09:39:30 2011
@@ -274,7 +274,7 @@ public class LocalBSPRunner implements J
     @Override
     public void send(String peerName, BSPMessage msg) throws IOException {
       if (this.peerName.equals(peerName)) {
-        put(msg);
+//        put(msg);
       } else {
         // put this into a outgoing queue
         if (outgoingQueues.get(peerName) == null) {
@@ -284,10 +284,10 @@ public class LocalBSPRunner implements J
       }
     }
 
-    @Override
-    public void put(BSPMessage msg) throws IOException {
-      localMessageQueue.add(msg);
-    }
+//    @Override
+//    public void put(BSPMessage msg) throws IOException {
+//      localMessageQueue.add(msg);
+//    }
 
     @Override
     public BSPMessage getCurrentMessage() throws IOException {
@@ -307,12 +307,12 @@ public class LocalBSPRunner implements J
       for (Entry<String, ConcurrentLinkedQueue<BSPMessage>> entry : outgoingQueues
           .entrySet()) {
         String peerName = entry.getKey();
-        for (BSPMessage msg : entry.getValue())
-          try {
-            localGrooms.get(peerName).put(msg);
-          } catch (IOException e) {
-            LOG.error("Putting message \"" + msg.toString() + "\" failed! ", e);
-          }
+//        for (BSPMessage msg : entry.getValue())
+//          try {
+//            localGrooms.get(peerName).put(msg);
+//          } catch (IOException e) {
+//            LOG.error("Putting message \"" + msg.toString() + "\" failed! ", e);
+//          }
       }
       // clear the local outgoing queue
       outgoingQueues.clear();
@@ -355,17 +355,6 @@ public class LocalBSPRunner implements J
     }
 
     @Override
-    public long getProtocolVersion(String protocol, long clientVersion)
-        throws IOException {
-      return 3;
-    }
-
-
-    @Override
-    public void put(BSPMessageBundle messages) throws IOException {
-    }
-
-    @Override
     public Configuration getConfiguration() {
       return conf;
     }

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java?rev=1198674&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
Mon Nov  7 09:39:30 2011
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.ipc.HamaRPCProtocolVersion;
+
+/**
+ * Hadoop RPC Interface for messaging.
+ * 
+ */
+public interface HadoopMessageManager extends HamaRPCProtocolVersion {
+
+  /**
+   * This method puts a message for the next iteration. Accessed concurrently
+   * from protocol, this must be synchronized internal.
+   * 
+   * @param msg
+   */
+  public void put(BSPMessage msg);
+
+  /**
+   * This method puts a messagebundle for the next iteration. Accessed
+   * concurrently from protocol, this must be synchronized internal.
+   * 
+   * @param messages
+   */
+  public void put(BSPMessageBundle messages);
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java?rev=1198674&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
Mon Nov  7 09:39:30 2011
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPMessageBundle;
+
+/**
+ * Implementation of the {@link HadoopMessageManager}.
+ * 
+ */
+public class HadoopMessageManagerImpl implements MessageManager,
+    HadoopMessageManager {
+
+  private static final Log LOG = LogFactory
+      .getLog(HadoopMessageManagerImpl.class);
+
+  private Server server = null;
+  private Configuration conf;
+
+  private final HashMap<InetSocketAddress, HadoopMessageManager> peers = new HashMap<InetSocketAddress,
HadoopMessageManager>();
+  private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String,
InetSocketAddress>();
+
+  private final HashMap<InetSocketAddress, LinkedList<BSPMessage>> outgoingQueues
= new HashMap<InetSocketAddress, LinkedList<BSPMessage>>();
+  private Deque<BSPMessage> localQueue = new LinkedList<BSPMessage>();
+  // this must be a synchronized implementation: this is accessed per RPC
+  private final ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new
ConcurrentLinkedQueue<BSPMessage>();
+
+  @Override
+  public void init(Configuration conf, InetSocketAddress peerAddress) {
+    this.conf = conf;
+    startRPCServer(conf, peerAddress);
+  }
+
+  private void startRPCServer(Configuration conf, InetSocketAddress peerAddress) {
+    try {
+      this.server = RPC.getServer(this, peerAddress.getHostName(),
+          peerAddress.getPort(), conf);
+      server.start();
+      LOG.info(" BSPPeer address:" + peerAddress.getHostName() + " port:"
+          + peerAddress.getPort());
+    } catch (IOException e) {
+      LOG.error("Fail to start RPC server!", e);
+      throw new RuntimeException("RPC Server could not be launched!");
+    }
+  }
+
+  @Override
+  public void close() {
+    if (server != null) {
+      server.stop();
+    }
+  }
+
+  @Override
+  public BSPMessage getCurrentMessage() throws IOException {
+    return localQueue.poll();
+  }
+
+  @Override
+  public void send(String peerName, BSPMessage msg) throws IOException {
+    LOG.debug("Send message (" + msg.toString() + ") to " + peerName);
+    InetSocketAddress targetPeerAddress = null;
+    // Get socket for target peer.
+    if (peerSocketCache.containsKey(peerName)) {
+      targetPeerAddress = peerSocketCache.get(peerName);
+    } else {
+      targetPeerAddress = getAddress(peerName);
+      peerSocketCache.put(peerName, targetPeerAddress);
+    }
+    LinkedList<BSPMessage> queue = outgoingQueues.get(targetPeerAddress);
+    if (queue == null) {
+      queue = new LinkedList<BSPMessage>();
+    }
+    queue.add(msg);
+    outgoingQueues.put(targetPeerAddress, queue);
+  }
+
+  private InetSocketAddress getAddress(String peerName) {
+    String[] peerAddrParts = peerName.split(":");
+    if (peerAddrParts.length != 2) {
+      throw new ArrayIndexOutOfBoundsException(
+          "Peername must consist of exactly ONE \":\"! Given peername was: "
+              + peerName);
+    }
+    return new InetSocketAddress(peerAddrParts[0],
+        Integer.valueOf(peerAddrParts[1]));
+  }
+
+  @Override
+  public Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> getMessageIterator()
{
+    return this.outgoingQueues.entrySet().iterator();
+  }
+
+  protected HadoopMessageManager getBSPPeerConnection(InetSocketAddress addr)
+      throws IOException {
+    HadoopMessageManager peer = peers.get(addr);
+    if (peer == null) {
+      peer = (HadoopMessageManager) RPC.getProxy(HadoopMessageManager.class,
+          HadoopMessageManager.versionID, addr, this.conf);
+      this.peers.put(addr, peer);
+    }
+    return peer;
+  }
+
+  @Override
+  public void transfer(InetSocketAddress addr, BSPMessageBundle bundle)
+      throws IOException {
+
+    HadoopMessageManager bspPeerConnection = this.getBSPPeerConnection(addr);
+
+    if (bspPeerConnection == null) {
+      throw new IllegalArgumentException("Can not find " + addr.toString()
+          + " to transfer messages to!");
+    } else {
+      bspPeerConnection.put(bundle);
+    }
+  }
+
+  @Override
+  public void clearOutgoingQueues() {
+    this.outgoingQueues.clear();
+    localQueue.addAll(localQueueForNextIteration);
+    localQueueForNextIteration.clear();
+  }
+
+  @Override
+  public void put(BSPMessage msg) {
+    this.localQueueForNextIteration.add(msg);
+  }
+
+  @Override
+  public void put(BSPMessageBundle messages) {
+    for (BSPMessage message : messages.getMessages()) {
+      this.localQueueForNextIteration.add(message);
+    }
+  }
+
+  @Override
+  public int getNumCurrentMessages() {
+    return localQueue.size();
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    return versionID;
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HadoopMessageManagerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1198674&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
Mon Nov  7 09:39:30 2011
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPMessageBundle;
+
+/**
+ * This manager takes care of the messaging. It is responsible to launch a
+ * server if needed and deal with incoming data.
+ * 
+ */
+public interface MessageManager {
+
+  /**
+   * Init can be used to start servers and initialize internal state.
+   * 
+   * @param conf
+   * @param peerAddress
+   */
+  public void init(Configuration conf, InetSocketAddress peerAddress);
+
+  /**
+   * Close is called after a task ran. Should be used to cleanup things e.G.
+   * stop a server.
+   */
+  public void close();
+
+  /**
+   * Get the current message.
+   * 
+   * @return
+   * @throws IOException
+   */
+  public BSPMessage getCurrentMessage() throws IOException;
+
+  /**
+   * Send a message to the peer.
+   * 
+   * @param peerName
+   * @param msg
+   * @throws IOException
+   */
+  public void send(String peerName, BSPMessage msg) throws IOException;
+
+  /**
+   * Returns an iterator of messages grouped by peer.
+   * 
+   * @return
+   */
+  public Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> getMessageIterator();
+
+  /**
+   * This is the real transferring to a host with a bundle.
+   * 
+   * @param addr
+   * @param bundle
+   * @throws IOException
+   */
+  public void transfer(InetSocketAddress addr, BSPMessageBundle bundle)
+      throws IOException;
+
+  /**
+   * Clears the outgoing queue. Can be used to switch queues.
+   */
+  public void clearOutgoingQueues();
+
+  /**
+   * Gets the number of messages in the current queue.
+   * 
+   * @return
+   */
+  public int getNumCurrentMessages();
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java?rev=1198674&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
(added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
Mon Nov  7 09:39:30 2011
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class MessageManagerFactory {
+
+  public static final String MESSAGE_MANAGER_CLASS = "hama.messanger.class";
+
+  /**
+   * Returns a messenger via reflection based on what was configured.
+   * 
+   * @param conf
+   * @return
+   */
+  public static MessageManager getMessageManager(Configuration conf)
+      throws ClassNotFoundException {
+    return (MessageManager) ReflectionUtils.newInstance(conf
+        .getClassByName(conf.get(MESSAGE_MANAGER_CLASS,
+            org.apache.hama.bsp.message.HadoopMessageManagerImpl.class
+                .getCanonicalName())), conf);
+  }
+
+}

Propchange: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1198674&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
(added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
Mon Nov  7 09:39:30 2011
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.IntegerMessage;
+import org.apache.hama.util.BSPNetUtils;
+
+public class TestHadoopMessageManager extends TestCase {
+
+  public void testMessaging() throws Exception {
+    Configuration conf = new Configuration();
+    MessageManager messageManager = MessageManagerFactory
+        .getMessageManager(conf);
+
+    assertTrue(messageManager instanceof HadoopMessageManagerImpl);
+
+    InetSocketAddress peer = new InetSocketAddress(
+        BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort());
+    messageManager.init(conf, peer);
+    String peerName = peer.getHostName() + ":" + peer.getPort();
+
+    messageManager.send(peerName, new IntegerMessage("test", 1337));
+
+    Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> messageIterator
= messageManager
+        .getMessageIterator();
+
+    Entry<InetSocketAddress, LinkedList<BSPMessage>> entry = messageIterator
+        .next();
+
+    assertEquals(entry.getKey(), peer);
+
+    assertTrue(entry.getValue().size() == 1);
+
+    BSPMessageBundle bundle = new BSPMessageBundle();
+    for (BSPMessage msg : entry.getValue()) {
+      bundle.addMessage(msg);
+    }
+
+    messageManager.transfer(peer, bundle);
+    
+    messageManager.clearOutgoingQueues();
+
+    assertTrue(messageManager.getNumCurrentMessages() == 1);
+    BSPMessage currentMessage = messageManager.getCurrentMessage();
+
+    assertTrue(currentMessage instanceof IntegerMessage);
+
+    IntegerMessage rec = (IntegerMessage) currentMessage;
+
+    assertEquals(rec.getTag(), "test");
+    assertEquals(rec.getData(), Integer.valueOf(1337));
+
+  }
+}

Propchange: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message