hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1673392 - /hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Date Tue, 14 Apr 2015 08:52:49 GMT
Author: edwardyoon
Date: Tue Apr 14 08:52:48 2015
New Revision: 1673392

URL: http://svn.apache.org/r1673392
Log: (empty)

Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1673392&r1=1673391&r2=1673392&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue Apr 14 08:52:48
2015
@@ -237,18 +237,18 @@ public final class GraphJobRunner<V exte
     this.changedVertexCnt = 0;
     vertices.startSuperstep();
 
-    ExecutorService executor = Executors.newFixedThreadPool(conf.getInt(
-        "hama.graph.thread.num", 1000));
+    ExecutorService executor = Executors.newFixedThreadPool((peer
+        .getNumCurrentMessages() / conf.getInt(
+        "hama.graph.threadpool.percentage", 10)) + 1);
 
     while (currentMessage != null) {
-      Runnable worker = new ComputeRunnable(
-          vertices.get((V) currentMessage.getVertexId()),
-          (Iterable<M>) currentMessage.getIterableMessages());
+      Runnable worker = new ComputeRunnable(vertices.get((V) currentMessage
+          .getVertexId()), (Iterable<M>) currentMessage.getIterableMessages());
       executor.execute(worker);
-      
+
       currentMessage = peer.getCurrentMessage();
     }
-    
+
     executor.shutdown();
     while (!executor.isTerminated()) {
     }
@@ -278,17 +278,18 @@ public final class GraphJobRunner<V exte
     this.changedVertexCnt = 0;
     vertices.startSuperstep();
 
-    ExecutorService executor = Executors.newFixedThreadPool(conf.getInt(
-        "hama.graph.thread.num", 1000));
-    
-    for(Vertex<V, E, M> v : vertices.getValues()) {
-      Runnable worker = new ComputeRunnable(v, Collections.singleton(v.getValue()));
+    ExecutorService executor = Executors.newFixedThreadPool(vertices.size()
+        / conf.getInt("hama.graph.threadpool.percentage", 10));
+
+    for (Vertex<V, E, M> v : vertices.getValues()) {
+      Runnable worker = new ComputeRunnable(v, Collections.singleton(v
+          .getValue()));
       executor.execute(worker);
     }
     executor.shutdown();
     while (!executor.isTerminated()) {
     }
-    
+
     vertices.finishSuperstep();
     getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
     iteration++;
@@ -306,11 +307,11 @@ public final class GraphJobRunner<V exte
     @Override
     public void run() {
       try {
-          // call once at initial superstep
-          vertex.setup(conf);
-        
-          vertex.compute(msgs);
-          vertices.finishVertexComputation(vertex);
+        // call once at initial superstep
+        vertex.setup(conf);
+
+        vertex.compute(msgs);
+        vertices.finishVertexComputation(vertex);
       } catch (IOException e) {
         e.printStackTrace();
       }
@@ -379,9 +380,8 @@ public final class GraphJobRunner<V exte
         .newInstance(conf.getClass(Constants.RUNTIME_PARTITION_RECORDCONVERTER,
             VertexInputReader.class));
 
-    ExecutorService executor = Executors.newFixedThreadPool(conf.getInt(
-        "hama.graph.thread.num", 1000));
-    
+    ExecutorService executor = Executors.newFixedThreadPool(1000);
+
     try {
       KeyValuePair<Writable, Writable> next = null;
       while ((next = peer.readNext()) != null) {
@@ -406,7 +406,7 @@ public final class GraphJobRunner<V exte
     } catch (Exception e) {
       e.printStackTrace();
     }
-    
+
     peer.sync();
 
     GraphJobMessage msg;
@@ -421,7 +421,7 @@ public final class GraphJobRunner<V exte
     LOG.info(vertices.size() + " vertices are loaded into "
         + peer.getPeerName());
   }
-  
+
   class LoadWorker implements Runnable {
     Vertex<V, E, M> vertex;
 



Mime
View raw message