hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1430108 - in /hama/trunk: examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Date Tue, 08 Jan 2013 01:39:20 GMT
Author: edwardyoon
Date: Tue Jan  8 01:39:19 2013
New Revision: 1430108

URL: http://svn.apache.org/viewvc?rev=1430108&view=rev
Log:
HAMA-709: Set tasks to 1

Modified:
    hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1430108&r1=1430107&r2=1430108&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Tue
Jan  8 01:39:19 2013
@@ -59,7 +59,7 @@ public class MindistSearchTest extends T
   public void testMindistSearch() throws Exception {
     generateTestData();
     try {
-      MindistSearch.main(new String[] { INPUT, OUTPUT, "30", "2" });
+      MindistSearch.main(new String[] { INPUT, OUTPUT, "30", "1" });
 
       verifyResult();
     } finally {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1430108&r1=1430107&r2=1430108&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue Jan  8 01:39:19
2013
@@ -105,6 +105,8 @@ public final class GraphJobRunner<V exte
 
   }
 
+  Map<V, List<M>> messages = null;
+
   @Override
   public final void bsp(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -118,7 +120,7 @@ public final class GraphJobRunner<V exte
       peer.sync();
 
       // note that the messages must be parsed here
-      final Map<V, List<M>> messages = parseMessages(peer);
+      messages = parseMessages(peer);
       // master needs to update
       doMasterUpdates(peer);
       // if aggregators say we don't have updates anymore, break
@@ -126,7 +128,7 @@ public final class GraphJobRunner<V exte
         break;
       }
       // loop over vertices and do their computation
-      doSuperstep(messages, peer);
+      doSuperstep(peer);
 
       if (isMasterTask(peer)) {
         peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
@@ -175,7 +177,7 @@ public final class GraphJobRunner<V exte
    * Do the main logic of a superstep, namely checking if vertices are active,
    * feeding compute with messages and controlling combiners/aggregators.
    */
-  private void doSuperstep(Map<V, List<M>> messages,
+  private void doSuperstep(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     int activeVertices = 0;
@@ -202,10 +204,14 @@ public final class GraphJobRunner<V exte
           activeVertices++;
         }
       }
+
+      msgs = null;
+      messages.remove(vertex.getVertexID());
     }
 
     aggregationRunner.sendAggregatorValues(peer, activeVertices);
     iteration++;
+    messages = new HashMap<V, List<M>>();
   }
 
   /**
@@ -331,8 +337,8 @@ public final class GraphJobRunner<V exte
   @SuppressWarnings("unchecked")
   private void repair(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
-      boolean selfReference) throws IOException,
-      SyncException, InterruptedException {
+      boolean selfReference) throws IOException, SyncException,
+      InterruptedException {
 
     Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
 
@@ -408,7 +414,8 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     GraphJobMessage msg = null;
-    final Map<V, List<M>> msgMap = new HashMap<V, List<M>>();
+    Map<V, List<M>> msgMap = new HashMap<V, List<M>>();
+    
     while ((msg = peer.getCurrentMessage()) != null) {
       // either this is a vertex message or a directive that must be read
       // as map
@@ -445,6 +452,7 @@ public final class GraphJobRunner<V exte
       }
 
     }
+    
     return msgMap;
   }
 



Mime
View raw message