hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1675244 - in /hama/trunk/graph/src/main/java/org/apache/hama/graph: GraphJobMessage.java GraphJobRunner.java Vertex.java
Date Wed, 22 Apr 2015 01:56:34 GMT
Author: edwardyoon
Date: Wed Apr 22 01:56:34 2015
New Revision: 1675244

URL: http://svn.apache.org/r1675244
Log:
minor code optimization

Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1675244&r1=1675243&r2=1675244&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Wed Apr 22 01:56:34
2015
@@ -91,6 +91,10 @@ public final class GraphJobMessage imple
   public MapWritable getMap() {
     return map;
   }
+  
+  public void setVertexId(WritableComparable<?> vertexId) {
+    this.vertexId = vertexId;
+  }
 
   public WritableComparable<?> getVertexId() {
     return vertexId;

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1675244&r1=1675243&r2=1675244&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed Apr 22 01:56:34
2015
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -117,7 +118,7 @@ public final class GraphJobRunner<V exte
   private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
 
   private RejectedExecutionHandler retryHandler = new RetryRejectedExecutionHandler();
-  
+
   @Override
   public final void setup(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
@@ -170,10 +171,6 @@ public final class GraphJobRunner<V exte
 
       // loop over vertices and do their computation
       doSuperstep(firstVertexMessage, peer);
-
-      if (isMasterTask(peer)) {
-        peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
-      }
     }
 
   }
@@ -268,7 +265,10 @@ public final class GraphJobRunner<V exte
         + " looping: " + (System.currentTimeMillis() - loopStartTime) + " ms");
 
     executor.shutdown();
-    while (!executor.isTerminated()) {
+    try {
+      executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOG.error(e);
     }
 
     for (V v : vertices.getNotComputedVertices()) {
@@ -307,16 +307,19 @@ public final class GraphJobRunner<V exte
         .newCachedThreadPool();
     executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
     executor.setRejectedExecutionHandler(retryHandler);
-    
+
     for (Vertex<V, E, M> v : vertices.getValues()) {
       Runnable worker = new ComputeRunnable(v);
       executor.execute(worker);
     }
 
     executor.shutdown();
-    while (!executor.isTerminated()) {
+    try {
+      executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOG.error(e);
     }
-
+    
     getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
     iteration++;
     finishSuperstep();
@@ -411,8 +414,6 @@ public final class GraphJobRunner<V exte
     EDGE_VALUE_CLASS = edgeValueClass;
   }
 
-  private Map<String, GraphJobMessage> messages = new HashMap<String, GraphJobMessage>();
-
   /**
    * Loads vertices into memory of each peer.
    */
@@ -420,6 +421,8 @@ public final class GraphJobRunner<V exte
   private void loadVertices(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
+    final Map<String, GraphJobMessage> messages = new HashMap<String, GraphJobMessage>();
+    
     VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable,
Writable, V, E, M>) ReflectionUtils
         .newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
             VertexInputReader.class));
@@ -428,7 +431,7 @@ public final class GraphJobRunner<V exte
         .newCachedThreadPool();
     executor.setMaximumPoolSize(conf.getInt(DEFAULT_THREAD_POOL_SIZE, 256));
     executor.setRejectedExecutionHandler(retryHandler);
-    
+
     try {
       KeyValuePair<Writable, Writable> next = null;
       while ((next = peer.readNext()) != null) {
@@ -462,8 +465,7 @@ public final class GraphJobRunner<V exte
       peer.send(e.getKey(), e.getValue());
     }
     messages.clear();
-    messages = null;
-
+    
     peer.sync();
 
     GraphJobMessage msg;
@@ -480,8 +482,7 @@ public final class GraphJobRunner<V exte
       }
     }
     executor.shutdown();
-    while (!executor.isTerminated()) {
-    }
+    executor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
 
     LOG.info(vertices.size() + " vertices are loaded into "
         + peer.getPeerName());
@@ -646,7 +647,8 @@ public final class GraphJobRunner<V exte
     if (storage.containsKey(vertexID)) {
       storage.get(vertexID).add(msg);
     } else {
-      storage.put(vertexID, new GraphJobMessage(vertexID, msg));
+      // To save bit memory we don't set vertexID twice
+      storage.put(vertexID, new GraphJobMessage(null, msg));
     }
   }
 
@@ -656,6 +658,8 @@ public final class GraphJobRunner<V exte
     Iterator<Entry<V, GraphJobMessage>> it = storage.entrySet().iterator();
     while (it.hasNext()) {
       Entry<V, GraphJobMessage> e = it.next();
+      it.remove();
+      
       if (combiner != null && e.getValue().getNumOfValues() > 1) {
         peer.send(
             getHostName(e.getKey()),
@@ -663,9 +667,15 @@ public final class GraphJobRunner<V exte
                 .combine(getIterableMessages(e.getValue().getValuesBytes(), e
                     .getValue().getNumOfValues())))));
       } else {
+        // set vertexID
+        e.getValue().setVertexId(e.getKey());
         peer.send(getHostName(e.getKey()), e.getValue());
       }
-      it.remove();
+    }
+    storage.clear();
+    
+    if (isMasterTask(peer)) {
+      peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
     }
   }
 
@@ -673,7 +683,7 @@ public final class GraphJobRunner<V exte
     ByteArrayOutputStream a = new ByteArrayOutputStream();
     DataOutputStream b = new DataOutputStream(a);
     writable.write(b);
-
+    a.close();
     return a.toByteArray();
   }
 
@@ -732,7 +742,7 @@ public final class GraphJobRunner<V exte
    *         edge.
    */
   public String getHostName(V vertexID) {
-    return peer.getPeerName(getPartitioner().getPartition(vertexID, null,
+    return peer.getPeerName(partitioner.getPartition(vertexID, null,
         peer.getNumPeers()));
   }
 
@@ -758,13 +768,6 @@ public final class GraphJobRunner<V exte
   }
 
   /**
-   * @return the defined partitioner instance.
-   */
-  public final Partitioner<V, M> getPartitioner() {
-    return partitioner;
-  }
-
-  /**
    * Gets the last aggregated value at the given index. The index is dependend
    * on how the aggregators were configured during job setup phase.
    * 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1675244&r1=1675243&r2=1675244&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Wed Apr 22 01:56:34 2015
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Counters.Counter;
-import org.apache.hama.bsp.Partitioner;
 
 /**
  * Vertex is a abstract definition of Google Pregel Vertex. For implementing a
@@ -76,14 +75,15 @@ public abstract class Vertex<V extends W
 
   @Override
   public void sendMessage(Edge<V, E> e, M msg) throws IOException {
-    runner.sendMessage(e.getDestinationVertexID(), GraphJobRunner.serialize(msg));
+    runner.sendMessage(e.getDestinationVertexID(),
+        GraphJobRunner.serialize(msg));
   }
 
   @Override
   public void sendMessage(V destinationVertexID, M msg) throws IOException {
     runner.sendMessage(destinationVertexID, GraphJobRunner.serialize(msg));
   }
-  
+
   @Override
   public void sendMessageToNeighbors(M msg) throws IOException {
     final List<Edge<V, E>> outEdges = this.getEdges();
@@ -109,12 +109,8 @@ public abstract class Vertex<V extends W
     vertex.setVertexID(vertexID);
 
     msg.put(GraphJobRunner.FLAG_VERTEX_INCREASE, vertex);
-    // Find the proper partition to host the new vertex.
-    int partition = getPartitioner().getPartition(vertexID, value,
-        runner.getPeer().getNumPeers());
-    String destPeer = runner.getPeer().getAllPeerNames()[partition];
-
-    runner.getPeer().send(destPeer, new GraphJobMessage(msg));
+    runner.getPeer().send(runner.getHostName(vertexID),
+        new GraphJobMessage(msg));
 
     alterVertexCounter(1);
   }
@@ -182,13 +178,6 @@ public abstract class Vertex<V extends W
     return runner.getPeer();
   }
 
-  /**
-   * @return the configured partitioner instance to message vertices.
-   */
-  public Partitioner<V, M> getPartitioner() {
-    return runner.getPartitioner();
-  }
-
   @Override
   public long getTotalNumVertices() {
     return runner.getNumberVertices();
@@ -345,7 +334,7 @@ public abstract class Vertex<V extends W
   protected GraphJobRunner<V, E, M> getRunner() {
     return runner;
   }
-  
+
   @Override
   public void aggregate(int index, M value) throws IOException {
     this.runner.getAggregationRunner().aggregateVertex(index, oldValue, value);
@@ -364,7 +353,7 @@ public abstract class Vertex<V extends W
   public M getAggregatedValue(int index) {
     return (M) runner.getLastAggregatedValue(index);
   }
-  
+
   /**
    * Get the number of aggregated vertices in the last superstep. Or null if no
    * aggregator is available.You have to supply an index, the index is defined
@@ -381,7 +370,7 @@ public abstract class Vertex<V extends W
   public Counter getCounter(Enum<?> name) {
     return runner.getPeer().getCounter(name);
   }
-  
+
   @Override
   public Counter getCounter(String group, String name) {
     return runner.getPeer().getCounter(group, name);



Mime
View raw message