hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1449666 [2/2] - in /hama/trunk: ./ conf/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/ft/ core/src/main/java/org/apache/hama/util/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama...
Date Mon, 25 Feb 2013 11:40:14 GMT
Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java?rev=1449666&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java Mon Feb
25 11:40:13 2013
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPPeer;
+
+import com.google.common.collect.AbstractIterator;
+
+/**
+ * The rationale behind this class is that it polls messages if they are
+ * requested and once it finds a message that is not dedicated for this vertex,
+ * it breaks the iteration. The message that was polled and doesn't belong to
+ * the vertex is returned by {@link #getOverflowMessage()}.
+ */
+public final class VertexMessageIterable<V, T> implements Iterable<T> {
+
+  private final V vertexID;
+  private final BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
+
+  private GraphJobMessage overflow;
+  private GraphJobMessage currentMessage;
+
+  private Iterator<T> currentIterator;
+
+  public VertexMessageIterable(GraphJobMessage currentMessage, V vertexID,
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
+    this.currentMessage = currentMessage;
+    this.vertexID = vertexID;
+    this.peer = peer;
+    setupIterator();
+  }
+
+  private void setupIterator() {
+    currentIterator = new AbstractIterator<T>() {
+      @SuppressWarnings("unchecked")
+      @Override
+      protected T computeNext() {
+        // spool back the current message
+        if (currentMessage != null) {
+          GraphJobMessage tmp = currentMessage;
+          // set it to null, so we don't send it over and over again
+          currentMessage = null;
+          return (T) tmp.getVertexValue();
+        }
+
+        try {
+          GraphJobMessage msg = peer.getCurrentMessage();
+          if (msg != null) {
+            if (msg.getVertexId().equals(vertexID)) {
+              return (T) msg.getVertexValue();
+            } else {
+              overflow = msg;
+            }
+          }
+          return endOfData();
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  public GraphJobMessage getOverflowMessage() {
+    // check if iterable was completely consumed
+    while (currentIterator.hasNext()) {
+      currentIterator.next();
+    }
+    return overflow;
+  }
+
+  @Override
+  public Iterator<T> iterator() {
+    return currentIterator;
+  }
+
+}

Copied: hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java (from r1449611,
hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java)
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java?p2=hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java&p1=hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java&r1=1449611&r2=1449666&rev=1449666&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java Mon Feb 25 11:40:13
2013
@@ -17,12 +17,12 @@
  */
 package org.apache.hama.graph;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.util.Iterator;
+import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * VerticesInfo interface encapsulates the storage of vertices in a BSP Task.
@@ -31,13 +31,52 @@ import org.apache.hadoop.io.WritableComp
  * @param <E> Edge cost object type
  * @param <M> Vertex value object type
  */
-public interface IVerticesInfo<V extends WritableComparable<V>, E extends Writable,
M extends Writable>
-    extends Iterable<Vertex<V, E, M>> {
+public interface VerticesInfo<V extends WritableComparable<? super V>, E extends
Writable, M extends Writable> {
+
+  /**
+   * Initialization of internal structures.
+   */
+  public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
+      TaskAttemptID attempt) throws IOException;
+
+  /**
+   * Cleanup of internal structures.
+   */
+  public void cleanup(Configuration conf, TaskAttemptID attempt)
+      throws IOException;
 
   /**
    * Add a vertex to the underlying structure.
    */
-  public void addVertex(Vertex<V, E, M> vertex);
+  public void addVertex(Vertex<V, E, M> vertex) throws IOException;
+
+  /**
+   * Finish the additions, from this point on the implementations should close
+   * the adds and throw exceptions in case something is added after this call.
+   */
+  public void finishAdditions();
+
+  /**
+   * Called once a superstep starts.
+   */
+  public void startSuperstep() throws IOException;
+
+  /**
+   * Called once completed a superstep.
+   */
+  public void finishSuperstep() throws IOException;
+
+  /**
+   * Must be called once a vertex is guaranteed not to change any more and can
+   * safely be persisted to a secondary storage.
+   */
+  public void finishVertexComputation(Vertex<V, E, M> vertex)
+      throws IOException;
+
+  /**
+   * @return true of all vertices are added.
+   */
+  public boolean isFinishedAdditions();
 
   /**
    * @return the number of vertices added to the underlying structure.
@@ -45,12 +84,6 @@ public interface IVerticesInfo<V extends
    */
   public int size();
 
-  @Override
-  public Iterator<Vertex<V, E, M>> iterator();
-
-  // to be added and documented soon
-  public void recoverState(DataInput in);
-
-  public void saveState(DataOutput out);
+  public IDSkippingIterator<V, E, M> skippingIterator();
 
 }

Added: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java?rev=1449666&view=auto
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java (added)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java Mon Feb
25 11:40:13 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.junit.Test;
+
+public class TestAbsDiffAggregator extends TestCase {
+
+  @Test
+  public void testAggregator() {
+    AbsDiffAggregator diff = new AbsDiffAggregator();
+    diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2));
+    diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2));
+    diff.aggregate(null, null, new DoubleWritable(5));
+
+    // 0, because this is totally worthless for diffs
+    assertEquals(0, diff.getTimesAggregated().get());
+    assertEquals(6, (int) diff.getValue().get());
+
+  }
+
+}

Added: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java?rev=1449666&view=auto
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java (added)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java Mon Feb
25 11:40:13 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.junit.Test;
+
+public class TestAverageAggregator extends TestCase {
+
+  @Test
+  public void testAggregator() {
+    AverageAggregator diff = new AverageAggregator();
+    diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2));
+    diff.aggregateInternal();
+    diff.aggregate(null, new DoubleWritable(5), new DoubleWritable(2));
+    diff.aggregateInternal();
+    diff.aggregate(null, null, new DoubleWritable(5));
+    diff.aggregateInternal();
+
+    assertEquals(3, diff.getTimesAggregated().get());
+    DoubleWritable x = diff.finalizeAggregation();
+    assertEquals(2, (int) x.get());
+
+  }
+
+}

Added: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java?rev=1449666&view=auto
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java (added)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java Mon Feb
25 11:40:13 2013
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.graph.example.PageRank;
+import org.apache.hama.graph.example.PageRank.PageRankVertex;
+import org.junit.Test;
+
+public class TestDiskVerticesInfo extends TestCase {
+
+  @Test
+  public void testDiskVerticesInfoLifeCycle() throws Exception {
+    DiskVerticesInfo<Text, NullWritable, DoubleWritable> info = new DiskVerticesInfo<Text,
NullWritable, DoubleWritable>();
+    Configuration conf = new Configuration();
+    conf.set(GraphJob.VERTEX_CLASS_ATTR, PageRankVertex.class.getName());
+    conf.set(GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR,
+        NullWritable.class.getName());
+    conf.set(GraphJob.VERTEX_ID_CLASS_ATTR, Text.class.getName());
+    conf.set(GraphJob.VERTEX_VALUE_CLASS_ATTR, DoubleWritable.class.getName());
+    GraphJobRunner.<Text, NullWritable, DoubleWritable> initClasses(conf);
+    TaskAttemptID attempt = new TaskAttemptID("omg", 1, 1, 0);
+    try {
+      ArrayList<PageRankVertex> list = new ArrayList<PageRankVertex>();
+
+      for (int i = 0; i < 10; i++) {
+        PageRankVertex v = new PageRank.PageRankVertex();
+        v.setVertexID(new Text(i + ""));
+        if (i % 2 == 0) {
+          v.setValue(new DoubleWritable(i * 2));
+        }
+        v.addEdge(new Edge<Text, NullWritable>(new Text((10 - i) + ""), null));
+
+        list.add(v);
+      }
+
+      info.init(null, conf, attempt);
+      for (PageRankVertex v : list) {
+        info.addVertex(v);
+      }
+
+      info.finishAdditions();
+
+      assertEquals(10, info.size());
+      // no we want to iterate and check if the result can properly be obtained
+
+      int index = 0;
+      IDSkippingIterator<Text, NullWritable, DoubleWritable> iterator = info
+          .skippingIterator();
+      while (iterator.hasNext()) {
+        Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
+        PageRankVertex pageRankVertex = list.get(index);
+        assertEquals(pageRankVertex.getVertexID().toString(), next
+            .getVertexID().toString());
+        if (index % 2 == 0) {
+          assertEquals((int) next.getValue().get(), index * 2);
+        } else {
+          assertNull(next.getValue());
+        }
+        assertEquals(next.isHalted(), false);
+        // check edges
+        List<Edge<Text, NullWritable>> edges = next.getEdges();
+        assertEquals(1, edges.size());
+        Edge<Text, NullWritable> edge = edges.get(0);
+        assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID()
+            .toString(), edge.getDestinationVertexID().toString());
+        assertNull(edge.getValue());
+
+        index++;
+      }
+      assertEquals(index, list.size());
+      info.finishSuperstep();
+      // iterate again and compute so vertices change internally
+      iterator = info.skippingIterator();
+      info.startSuperstep();
+      while (iterator.hasNext()) {
+        Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
+        // override everything with constant 2
+        next.setValue(new DoubleWritable(2));
+        if (Integer.parseInt(next.getVertexID().toString()) == 3) {
+          next.voteToHalt();
+        }
+        info.finishVertexComputation(next);
+      }
+      info.finishSuperstep();
+
+      index = 0;
+      // now reread
+      info.startSuperstep();
+      iterator = info.skippingIterator();
+      while (iterator.hasNext()) {
+        Vertex<Text, NullWritable, DoubleWritable> next = iterator.next();
+        PageRankVertex pageRankVertex = list.get(index);
+        assertEquals(pageRankVertex.getVertexID().toString(), next
+            .getVertexID().toString());
+        assertEquals((int) next.getValue().get(), 2);
+        // check edges
+        List<Edge<Text, NullWritable>> edges = next.getEdges();
+        assertEquals(1, edges.size());
+        Edge<Text, NullWritable> edge = edges.get(0);
+        assertEquals(pageRankVertex.getEdges().get(0).getDestinationVertexID()
+            .toString(), edge.getDestinationVertexID().toString());
+        assertNull(edge.getValue());
+        if (index == 3) {
+          assertEquals(true, next.isHalted());
+        }
+
+        index++;
+      }
+      assertEquals(index, list.size());
+
+    } finally {
+      info.cleanup(conf, attempt);
+    }
+
+  }
+}

Added: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java?rev=1449666&view=auto
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java (added)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java Mon Feb
25 11:40:13 2013
@@ -0,0 +1,50 @@
+package org.apache.hama.graph;
+
+import java.util.List;
+import java.util.PriorityQueue;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestGraphJobMessage extends TestCase {
+
+  @Test
+  public void testPriorityQueue() {
+    PriorityQueue<GraphJobMessage> prio = new PriorityQueue<GraphJobMessage>();
+    prio.addAll(getMessages());
+
+    GraphJobMessage poll = prio.poll();
+    assertEquals(true, poll.isMapMessage());
+    poll = prio.poll();
+    assertEquals(true, poll.isVertexMessage());
+    assertEquals("1", poll.getVertexId().toString());
+
+    poll = prio.poll();
+    assertEquals(true, poll.isVertexMessage());
+    assertEquals("2", poll.getVertexId().toString());
+
+    poll = prio.poll();
+    assertEquals(true, poll.isVertexMessage());
+    assertEquals("3", poll.getVertexId().toString());
+
+    assertTrue(prio.isEmpty());
+  }
+
+  public List<GraphJobMessage> getMessages() {
+    GraphJobMessage mapMsg = new GraphJobMessage(new MapWritable());
+    GraphJobMessage vertexMsg1 = new GraphJobMessage(new Text("1"),
+        new IntWritable());
+    GraphJobMessage vertexMsg2 = new GraphJobMessage(new Text("2"),
+        new IntWritable());
+    GraphJobMessage vertexMsg3 = new GraphJobMessage(new Text("3"),
+        new IntWritable());
+    return Lists.newArrayList(mapMsg, vertexMsg1, vertexMsg2, vertexMsg3);
+  }
+
+}

Added: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java?rev=1449666&view=auto
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java (added)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java Mon Feb
25 11:40:13 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Test;
+
+public class TestMinMaxAggregator extends TestCase {
+
+  @Test
+  public void testMinAggregator() {
+    MinAggregator diff = new MinAggregator();
+    diff.aggregate(null, new IntWritable(5));
+    diff.aggregate(null, new IntWritable(25));
+    assertEquals(5, diff.getValue().get());
+
+  }
+
+  @Test
+  public void testMaxAggregator() {
+    MaxAggregator diff = new MaxAggregator();
+    diff.aggregate(null, new IntWritable(5));
+    diff.aggregate(null, new IntWritable(25));
+    assertEquals(25, diff.getValue().get());
+  }
+
+}

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Mon Feb 25
11:40:13 2013
@@ -44,12 +44,11 @@ public class TestSubmitGraphJob extends 
       "yahoo.com\tnasa.gov\tstackoverflow.com",
       "twitter.com\tgoogle.com\tfacebook.com",
       "nasa.gov\tyahoo.com\tstackoverflow.com",
-      "youtube.com\tgoogle.com\tyahoo.com" };
+      "youtube.com\tgoogle.com\tyahoo.com", "google.com" };
 
   private static String INPUT = "/tmp/pagerank/real-tmp.seq";
   private static String OUTPUT = "/tmp/pagerank/real-out";
 
-  @SuppressWarnings("unchecked")
   @Override
   public void testSubmitJob() throws Exception {
 
@@ -60,6 +59,7 @@ public class TestSubmitGraphJob extends 
     bsp.setOutputPath(new Path(OUTPUT));
     BSPJobClient jobClient = new BSPJobClient(configuration);
     configuration.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 6000);
+    configuration.set("hama.graph.self.ref", "true");
     ClusterStatus cluster = jobClient.getClusterStatus(false);
     assertEquals(this.numOfGroom, cluster.getGroomServers());
     LOG.info("Client finishes execution job.");
@@ -67,11 +67,8 @@ public class TestSubmitGraphJob extends 
     bsp.setVertexClass(PageRank.PageRankVertex.class);
     // set the defaults
     bsp.setMaxIteration(30);
-    // FIXME why is the sum correct when 1-ALPHA instead of ALPHA itself?
-    bsp.set("hama.pagerank.alpha", "0.25");
 
-    bsp.setAggregatorClass(AverageAggregator.class,
-        PageRank.DanglingNodeAggregator.class);
+    bsp.setAggregatorClass(AverageAggregator.class);
 
     bsp.setInputFormat(SequenceFileInputFormat.class);
     bsp.setInputKeyClass(Text.class);

Added: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java?rev=1449666&view=auto
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java (added)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java Mon Feb 25
11:40:13 2013
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.junit.Test;
+
+public class TestSumAggregator extends TestCase {
+
+  @Test
+  public void testAggregator() {
+    SumAggregator diff = new SumAggregator();
+    diff.aggregate(null, new DoubleWritable(5));
+    diff.aggregate(null, new DoubleWritable(5));
+    assertEquals(10, (int) diff.getValue().get());
+
+  }
+
+}

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Mon Feb 25
11:40:13 2013
@@ -18,7 +18,6 @@
 package org.apache.hama.graph.example;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -31,7 +30,6 @@ import org.apache.hama.bsp.HashPartition
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.TextArrayWritable;
 import org.apache.hama.bsp.TextOutputFormat;
-import org.apache.hama.graph.AbstractAggregator;
 import org.apache.hama.graph.AverageAggregator;
 import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.GraphJob;
@@ -49,8 +47,6 @@ public class PageRank {
     static double DAMPING_FACTOR = 0.85;
     static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
 
-    int numEdges;
-
     @Override
     public void setup(Configuration conf) {
       String val = conf.get("hama.pagerank.alpha");
@@ -61,30 +57,20 @@ public class PageRank {
       if (val != null) {
         MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
       }
-      numEdges = this.getEdges().size();
     }
 
     @Override
-    public void compute(Iterator<DoubleWritable> messages) throws IOException {
+    public void compute(Iterable<DoubleWritable> messages) throws IOException {
       // initialize this vertex to 1 / count of global vertices in this graph
       if (this.getSuperstepCount() == 0) {
         this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
       } else if (this.getSuperstepCount() >= 1) {
-        DoubleWritable danglingNodeContribution = getLastAggregatedValue(1);
         double sum = 0;
-        while (messages.hasNext()) {
-          DoubleWritable msg = messages.next();
+        for (DoubleWritable msg : messages) {
           sum += msg.get();
         }
-        if (danglingNodeContribution == null) {
-          double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
-          this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
-        } else {
-          double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
-          this.setValue(new DoubleWritable(alpha
-              + (DAMPING_FACTOR * (sum + danglingNodeContribution.get()
-                  / this.getNumVertices()))));
-        }
+        double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
+        this.setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
       }
 
       // if we have not reached our global error yet, then proceed.
@@ -94,34 +80,10 @@ public class PageRank {
         voteToHalt();
         return;
       }
+
       // in each superstep we are going to send a new rank to our neighbours
       sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
-          / numEdges));
-    }
-
-  }
-
-  public static class DanglingNodeAggregator
-      extends
-      AbstractAggregator<DoubleWritable, Vertex<Text, NullWritable, DoubleWritable>>
{
-
-    double danglingNodeSum;
-
-    @Override
-    public void aggregate(Vertex<Text, NullWritable, DoubleWritable> vertex,
-        DoubleWritable value) {
-      if (vertex != null) {
-        if (vertex.getEdges().size() == 0) {
-          danglingNodeSum += value.get();
-        }
-      } else {
-        danglingNodeSum += value.get();
-      }
-    }
-
-    @Override
-    public DoubleWritable getValue() {
-      return new DoubleWritable(danglingNodeSum);
+          / this.getEdges().size()));
     }
 
   }
@@ -142,7 +104,6 @@ public class PageRank {
     }
   }
 
-  @SuppressWarnings("unchecked")
   public static GraphJob createJob(String[] args, HamaConfiguration conf)
       throws IOException {
     GraphJob pageJob = new GraphJob(conf, PageRank.class);
@@ -155,15 +116,17 @@ public class PageRank {
     // set the defaults
     pageJob.setMaxIteration(30);
     pageJob.set("hama.pagerank.alpha", "0.85");
+    // reference vertices to itself, because we don't have a dangling node
+    // contribution here
+    pageJob.set("hama.graph.self.ref", "true");
     pageJob.set("hama.graph.max.convergence.error", "0.001");
 
     if (args.length == 3) {
       pageJob.setNumBspTask(Integer.parseInt(args[2]));
     }
 
-    // error, dangling node probability sum
-    pageJob.setAggregatorClass(AverageAggregator.class,
-        DanglingNodeAggregator.class);
+    // error
+    pageJob.setAggregatorClass(AverageAggregator.class);
 
     // Vertex reader
     pageJob.setVertexInputReaderClass(PagerankSeqReader.class);



Mime
View raw message