hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1085673 - in /incubator/hama/trunk: CHANGES.txt src/java/org/apache/hama/bsp/BSPPeer.java
Date Sat, 26 Mar 2011 09:26:36 GMT
Author: edwardyoon
Date: Sat Mar 26 09:26:35 2011
New Revision: 1085673

URL: http://svn.apache.org/viewvc?rev=1085673&view=rev
Log:
Reduce overhead of local communications

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1085673&r1=1085672&r2=1085673&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Sat Mar 26 09:26:35 2011
@@ -52,6 +52,7 @@ Trunk (unreleased changes)
 
   IMPROVEMENTS
  
+    HAMA-369: Reduce overhead of local communications (Miklos Erdelyi)
     HAMA-366: Remove unnecessary dependencies (Tommaso Teofili)
     HAMA-365: TestBSPPeer doesn't check if the correct 
                        number of messages have been received (Miklos Erdelyi via edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1085673&r1=1085672&r2=1085673&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Sat Mar 26 09:26:35 2011
@@ -61,7 +61,12 @@ public class BSPPeer implements Watcher,
 
   private final Map<InetSocketAddress, BSPPeerInterface> peers = new ConcurrentHashMap<InetSocketAddress,
BSPPeerInterface>();
   private final Map<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> outgoingQueues
= new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>();
-  private final ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
+  private ConcurrentLinkedQueue<BSPMessage> localQueue = new ConcurrentLinkedQueue<BSPMessage>();
+  private ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration =
+     new ConcurrentLinkedQueue<BSPMessage>();
+  private final Map<String, InetSocketAddress> peerSocketCache =
+     new ConcurrentHashMap<String, InetSocketAddress>();
+
   private SortedSet<String> allPeerNames = new TreeSet<String>();
   private InetSocketAddress peerAddress;
   private TaskStatus currentTaskStatus;
@@ -140,14 +145,26 @@ public class BSPPeer implements Watcher,
    */
   @Override
   public void send(String peerName, BSPMessage msg) throws IOException {
-    LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
-    ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues
-        .get(getAddress(peerName));
-    if (queue == null) {
-      queue = new ConcurrentLinkedQueue<BSPMessage>();
+    if (peerName.equals(getPeerName())) {
+      LOG.debug("Local send bytes (" + msg.getData().toString() + ")");
+      localQueueForNextIteration.add(msg);
+    } else {
+      LOG.debug("Send bytes (" + msg.getData().toString() + ") to " + peerName);
+      InetSocketAddress targetPeerAddress = null;
+      // Get socket for target peer.
+      if (peerSocketCache.containsKey(peerName)) {
+        targetPeerAddress = peerSocketCache.get(peerName);
+      } else {
+        targetPeerAddress = getAddress(peerName);
+        peerSocketCache.put(peerName, targetPeerAddress);
+      }
+      ConcurrentLinkedQueue<BSPMessage> queue = outgoingQueues.get(targetPeerAddress);
+      if (queue == null) {
+        queue = new ConcurrentLinkedQueue<BSPMessage>();
+      }
+      queue.add(msg);
+      outgoingQueues.put(targetPeerAddress, queue);
     }
-    queue.add(msg);
-    outgoingQueues.put(getAddress(peerName), queue);
   }
 
   /*
@@ -189,6 +206,15 @@ public class BSPPeer implements Watcher,
 
     currentTaskStatus.incrementSuperstepCount();
     leaveBarrier();
+
+    // Add non-processed messages from this iteration for the next's queue.
+    while (!localQueue.isEmpty()) {
+      BSPMessage message = localQueue.poll();
+      localQueueForNextIteration.add(message);
+    }
+    // Switch local queues.
+    localQueue = localQueueForNextIteration;
+    localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
   }
 
   protected boolean enterBarrier() throws KeeperException, InterruptedException {
@@ -273,7 +299,7 @@ public class BSPPeer implements Watcher,
 
   @Override
   public void put(BSPMessage msg) throws IOException {
-    this.localQueue.add(msg);
+    this.localQueueForNextIteration.add(msg);
   }
 
   @Override



Mime
View raw message