hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1202506 - in /incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples: ./ graph/ graph/partitioning/
Date Wed, 16 Nov 2011 02:01:55 GMT
Author: edwardyoon
Date: Wed Nov 16 02:01:55 2011
New Revision: 1202506

URL: http://svn.apache.org/viewvc?rev=1202506&view=rev
Log:
Fix sssp example

Added:
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/VertexArrayWritable.java
Removed:
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRankBase.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsBase.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/partitioning/
Modified:
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/Vertex.java

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1202506&r1=1202505&r2=1202506&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
(original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
Wed Nov 16 02:01:55 2011
@@ -20,6 +20,7 @@
 package org.apache.hama.examples;
 
 import org.apache.hadoop.util.ProgramDriver;
+import org.apache.hama.examples.graph.ShortestPaths;
 
 public class ExampleDriver {
 
@@ -27,6 +28,7 @@ public class ExampleDriver {
     ProgramDriver pgd = new ProgramDriver();
     try {
       pgd.addClass("pi", PiEstimator.class, "Pi Estimator");
+      pgd.addClass("sssp", ShortestPaths.class, "Single Shortest Path");
 
       pgd.driver(args);
     } catch (Throwable e) {

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java?rev=1202506&r1=1202505&r2=1202506&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
(original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/PageRank.java
Wed Nov 16 02:01:55 2011
@@ -18,54 +18,75 @@
 package org.apache.hama.examples.graph;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.DoubleMessage;
-import org.apache.hama.bsp.OutputCollector;
-import org.apache.hama.bsp.RecordReader;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.util.KeyValuePair;
 import org.apache.zookeeper.KeeperException;
 
-public class PageRank extends PageRankBase {
+public class PageRank extends
+    BSP<Vertex, ShortestPathVertexArrayWritable, Text, DoubleWritable> {
   public static final Log LOG = LogFactory.getLog(PageRank.class);
 
-  private final HashMap<Vertex, List<Vertex>> adjacencyList = new HashMap<Vertex,
List<Vertex>>();
-  private final HashMap<String, Vertex> lookupMap = new HashMap<String, Vertex>();
+  private final HashMap<Vertex, Vertex[]> adjacencyList = new HashMap<Vertex, Vertex[]>();
+  private final HashMap<String, Vertex> vertexLookupMap = new HashMap<String, Vertex>();
   private final HashMap<Vertex, Double> tentativePagerank = new HashMap<Vertex,
Double>();
   // backup of the last pagerank to determine the error
   private final HashMap<Vertex, Double> lastTentativePagerank = new HashMap<Vertex,
Double>();
-  private String[] peerNames;
+
+  protected static int MAX_ITERATIONS = 30;
+  protected static String masterTaskName;
+  protected static double ALPHA;
+  protected static int numOfVertices;
+  protected static double DAMPING_FACTOR = 0.85;
+  protected static double EPSILON = 0.001;
 
   @Override
-  public void setup(BSPPeer peer) {
-    Configuration conf = peer.getConfiguration();
-    numOfVertices = Integer.parseInt(conf.get("num.vertices"));
+  public void setup(
+      BSPPeer<Vertex, ShortestPathVertexArrayWritable, Text, DoubleWritable> peer)
+      throws IOException {
+    // map our stuff into ram
+
+    KeyValuePair<Vertex, ShortestPathVertexArrayWritable> next = null;
+    while ((next = peer.readNext()) != null) {
+      adjacencyList.put(next.getKey(), (ShortestPathVertex[]) next.getValue()
+          .toArray());
+      vertexLookupMap.put(next.getKey().getName(), next.getKey());
+    }
+
+    // normally this is the global number of vertices
+    numOfVertices = vertexLookupMap.size();
     DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor"));
     ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices;
     EPSILON = Double.parseDouble(conf.get("epsilon.error"));
     MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations"));
-    peerNames = conf.get(ShortestPaths.BSP_PEERS).split(";");
+    masterTaskName = peer.getPeerName(0);
   }
 
   @Override
-  public void bsp(BSPPeer peer) throws IOException, KeeperException,
-      InterruptedException {
-    String master = peer.getConfiguration().get(MASTER_TASK);
-    // setup the datasets
-    PageRankBase.mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList,
-        tentativePagerank, lookupMap);
+  public void bsp(
+      BSPPeer<Vertex, ShortestPathVertexArrayWritable, Text, DoubleWritable> peer)
+      throws IOException, KeeperException, InterruptedException {
 
     // while the error not converges against epsilon do the pagerank stuff
     double error = 1.0;
@@ -83,7 +104,10 @@ public class PageRank extends PageRankBa
         HashMap<Vertex, Double> sumMap = new HashMap<Vertex, Double>();
         DoubleMessage msg = null;
         while ((msg = (DoubleMessage) peer.getCurrentMessage()) != null) {
-          Vertex k = lookupMap.get(msg.getTag());
+          Vertex k = vertexLookupMap.get(msg.getTag());
+          if (k == null) {
+            LOG.fatal("If you see this, partitioning has totally failed.");
+          }
           if (!sumMap.containsKey(k)) {
             sumMap.put(k, msg.getData());
           } else {
@@ -100,7 +124,7 @@ public class PageRank extends PageRankBa
 
         // determine the error and send this to the master
         double err = determineError();
-        error = broadcastError(peer, master, err);
+        error = broadcastError(peer, err);
       }
       // in every step send the tentative pagerank of a vertex to its
       // adjacent vertices
@@ -111,18 +135,29 @@ public class PageRank extends PageRankBa
       iteration++;
     }
 
-    // Clears all queues entries.
+    // Clears all queues entries after we finished.
     peer.clear();
-    // finally save the chunk of pageranks
-    PageRankBase.savePageRankMap(peer, peer.getConfiguration(),
-        lastTentativePagerank);
   }
 
-  private double broadcastError(BSPPeer peer, String master, double error)
-      throws IOException, KeeperException, InterruptedException {
-    peer.send(master, new DoubleMessage("", error));
+  @Override
+  public void cleanup(
+      BSPPeer<Vertex, ShortestPathVertexArrayWritable, Text, DoubleWritable> peer)
{
+    try {
+      for (Entry<Vertex, Double> row : tentativePagerank.entrySet()) {
+        peer.write(new Text(row.getKey().getName()),
+            new DoubleWritable(row.getValue()));
+      }
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private double broadcastError(
+      BSPPeer<Vertex, ShortestPathVertexArrayWritable, Text, DoubleWritable> peer,
+      double error) throws IOException, KeeperException, InterruptedException {
+    peer.send(masterTaskName, new DoubleMessage("", error));
     peer.sync();
-    if (peer.getPeerName().equals(master)) {
+    if (peer.getPeerName().equals(masterTaskName)) {
       double errorSum = 0.0;
       int count = 0;
       DoubleMessage message;
@@ -157,15 +192,16 @@ public class PageRank extends PageRankBa
     }
   }
 
-  private void sendMessageToNeighbors(BSPPeer peer, Vertex v)
-      throws IOException {
-    List<Vertex> outgoingEdges = adjacencyList.get(v);
+  private void sendMessageToNeighbors(
+      BSPPeer<Vertex, ShortestPathVertexArrayWritable, Text, DoubleWritable> peer,
+      Vertex v) throws IOException {
+    Vertex[] outgoingEdges = adjacencyList.get(v);
     for (Vertex adjacent : outgoingEdges) {
-      int mod = Math.abs(adjacent.getId() % peerNames.length);
+      int mod = Math.abs(adjacent.hashCode() % peer.getNumPeers());
       // send a message of the tentative pagerank divided by the size of
       // the outgoing edges to all adjacents
-      peer.send(peerNames[mod], new DoubleMessage(adjacent.getName(),
-          tentativePagerank.get(v) / outgoingEdges.size()));
+      peer.send(peer.getPeerName(mod), new DoubleMessage(adjacent.getName(),
+          tentativePagerank.get(v) / outgoingEdges.length));
     }
   }
 
@@ -184,10 +220,14 @@ public class PageRank extends PageRankBa
     }
 
     HamaConfiguration conf = new HamaConfiguration(new Configuration());
+    BSPJob job = new BSPJob(conf);
+    job.setOutputPath(new Path("pagerank/output"));
+
     // set the defaults
     conf.set("damping.factor", "0.85");
     conf.set("epsilon.error", "0.000001");
 
+    boolean inputGiven = false;
     if (args.length < 2) {
       System.out.println("You have to provide a damping factor and an error!");
       System.out.println("Try using 0.85 0.001 as parameter!");
@@ -198,17 +238,13 @@ public class PageRank extends PageRankBa
       LOG.info("Set damping factor to " + args[0]);
       LOG.info("Set epsilon error to " + args[1]);
       if (args.length > 2) {
-        conf.set("out.path", args[2]);
         LOG.info("Set output path to " + args[2]);
+        job.setOutputPath(new Path(args[2]));
         if (args.length == 4) {
-          conf.set("in.path", args[3]);
+          job.setInputPath(new Path(args[3]));
           LOG.info("Using custom input at " + args[3]);
-        } else {
-          LOG.info("Running default example graph!");
+          inputGiven = true;
         }
-      } else {
-        conf.set("out.path", "pagerank/output");
-        LOG.info("Set output path to default of pagerank/output!");
       }
     }
 
@@ -218,30 +254,43 @@ public class PageRank extends PageRankBa
     // leave the iterations on default
     conf.set("max.iterations", "0");
 
-    Collection<String> activeGrooms = cluster.getActiveGroomNames().keySet();
-    String[] grooms = activeGrooms.toArray(new String[activeGrooms.size()]);
-
-    if (conf.get("in.path") == null) {
-      conf = PageRankBase.partitionExample(new Path(conf.get("out.path")),
-          conf, grooms);
-    } else {
-      conf = PageRankBase.partitionTextFile(new Path(conf.get("in.path")),
-          conf, grooms);
+    if (!inputGiven) {
+      Path tmp = new Path("pagerank/input");
+      FileSystem.get(conf).delete(tmp, true);
+      //ShortestPathsGraphLoader.loadGraph(conf, tmp);
+      job.setInputPath(tmp);
     }
 
-    BSPJob job = new BSPJob(conf);
-    job.setNumBspTask(cluster.getGroomServers());
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setPartitioner(HashPartitioner.class);
+    job.setOutputFormat(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(IntWritable.class);
+
+    job.setNumBspTask(cluster.getMaxTasks());
     job.setBspClass(PageRank.class);
     job.setJarByClass(PageRank.class);
     job.setJobName("Pagerank");
     if (job.waitForCompletion(true)) {
-      PageRankBase.printOutput(FileSystem.get(conf), conf);
+      printOutput(FileSystem.get(conf), conf);
     }
   }
 
-  @Override
-  public void cleanup(BSPPeer peer) {
-    // TODO Auto-generated method stub
-    
+  static void printOutput(FileSystem fs, Configuration conf) throws IOException {
+    LOG.info("-------------------- RESULTS --------------------");
+    FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir")));
+    for (FileStatus status : stati) {
+      if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) {
+        Path path = status.getPath();
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+        Text key = new Text();
+        DoubleWritable value = new DoubleWritable();
+        while (reader.next(key, value)) {
+          LOG.info(key.toString() + " | " + value.get());
+        }
+        reader.close();
+      }
+    }
   }
+
 }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java?rev=1202506&r1=1202505&r2=1202506&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java
(original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertex.java
Wed Nov 16 02:01:55 2011
@@ -24,7 +24,7 @@ import java.io.IOException;
 public final class ShortestPathVertex extends Vertex {
 
   private int weight;
-  private Integer cost;
+  private int cost = Integer.MAX_VALUE;
 
   public ShortestPathVertex() {
   }
@@ -34,7 +34,7 @@ public final class ShortestPathVertex ex
     this.weight = weight;
   }
 
-  public ShortestPathVertex(int weight, String name, Integer cost) {
+  public ShortestPathVertex(int weight, String name, int cost) {
     super(name);
     this.weight = weight;
     this.cost = cost;
@@ -44,7 +44,7 @@ public final class ShortestPathVertex ex
     return name;
   }
 
-  public Integer getCost() {
+  public int getCost() {
     return cost;
   }
 
@@ -52,10 +52,6 @@ public final class ShortestPathVertex ex
     this.cost = cost;
   }
 
-  public int getId() {
-    return id;
-  }
-
   public int getWeight() {
     return weight;
   }

Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java?rev=1202506&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java
(added)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathVertexArrayWritable.java
Wed Nov 16 02:01:55 2011
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.graph;
+
+import org.apache.hadoop.io.ArrayWritable;
+
+public class ShortestPathVertexArrayWritable extends ArrayWritable {
+
+  public ShortestPathVertexArrayWritable() {
+    super(ShortestPathVertex.class);
+  }
+
+}

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java?rev=1202506&r1=1202505&r2=1202506&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
(original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPaths.java
Wed Nov 16 02:01:55 2011
@@ -18,56 +18,47 @@
 package org.apache.hama.examples.graph;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.fs.FileStatus;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BooleanMessage;
 import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.IntegerMessage;
-import org.apache.hama.bsp.OutputCollector;
-import org.apache.hama.bsp.RecordReader;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.util.KeyValuePair;
 import org.apache.zookeeper.KeeperException;
 
-public class ShortestPaths extends ShortestPathsBase {
-
+public class ShortestPaths extends
+    BSP<ShortestPathVertex, ShortestPathVertexArrayWritable, Text, IntWritable> {
   public static final Log LOG = LogFactory.getLog(ShortestPaths.class);
 
-  private final HashMap<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList
= new HashMap<ShortestPathVertex, List<ShortestPathVertex>>();
+  public static final String START_VERTEX = "shortest.paths.start.vertex.name";
   private final HashMap<String, ShortestPathVertex> vertexLookupMap = new HashMap<String,
ShortestPathVertex>();
-  private String[] peerNames;
+  private final HashMap<ShortestPathVertex, ShortestPathVertex[]> adjacencyList = new
HashMap<ShortestPathVertex, ShortestPathVertex[]>();
+  private String masterTask;
 
   @Override
-  public void bsp(BSPPeer peer)
+  public void bsp(
+      BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text, IntWritable>
peer)
       throws IOException, KeeperException, InterruptedException {
-    // map our input into ram
-    mapAdjacencyList(peer.getConfiguration(), peer, adjacencyList,
-        vertexLookupMap);
-    // parse the configuration to get the peerNames
-    parsePeerNames(peer.getConfiguration());
-    // get our master groom
-    String master = peer.getConfiguration().get(MASTER_TASK);
-
-    // initial message bypass
-    ShortestPathVertex v = vertexLookupMap.get(peer.getConfiguration().get(
-        SHORTEST_PATHS_START_VERTEX_ID));
-    if (v != null) {
-      v.setCost(0);
-      sendMessageToNeighbors(peer, v);
-    }
-
     boolean updated = true;
     while (updated) {
       int updatesMade = 0;
@@ -85,23 +76,51 @@ public class ShortestPaths extends Short
         }
       }
       // synchonize with all grooms if there were updates
-      updated = broadcastUpdatesMade(peer, master, updatesMade);
+      updated = broadcastUpdatesMade(peer, updatesMade);
       // send updates to the adjacents of the updated vertices
       for (ShortestPathVertex vertex : updatedQueue) {
         sendMessageToNeighbors(peer, vertex);
       }
     }
-    // finished, finally save our map to DFS.
-    saveVertexMap(peer.getConfiguration(), peer, adjacencyList);
   }
 
-  /**
-   * Parses the peer names to fix inconsistency in bsp peer names from context.
-   * 
-   * @param conf
-   */
-  private void parsePeerNames(Configuration conf) {
-    peerNames = conf.get(BSP_PEERS).split(";");
+  @Override
+  public void setup(
+      BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text, IntWritable>
peer)
+      throws IOException, KeeperException, InterruptedException {
+
+    KeyValuePair<ShortestPathVertex, ShortestPathVertexArrayWritable> next = null;
+    while ((next = peer.readNext()) != null) {
+      adjacencyList.put(next.getKey(), (ShortestPathVertex[]) next.getValue()
+          .toArray());
+      vertexLookupMap.put(next.getKey().getName(), next.getKey());
+    }
+
+    masterTask = peer.getPeerName(0);
+
+    // initial message bypass
+    ShortestPathVertex startVertex = vertexLookupMap.get(peer
+        .getConfiguration().get(START_VERTEX));
+
+    if (startVertex != null) {
+      startVertex.setCost(0);
+      sendMessageToNeighbors(peer, startVertex);
+    }
+  }
+
+  @Override
+  public void cleanup(
+      BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text, IntWritable>
peer) {
+    // write our map into hdfs
+    for (Entry<ShortestPathVertex, ShortestPathVertex[]> entry : adjacencyList
+        .entrySet()) {
+      try {
+        peer.write(new Text(entry.getKey().getName()), new IntWritable(entry
+            .getKey().getCost()));
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
   }
 
   /**
@@ -118,11 +137,12 @@ public class ShortestPaths extends Short
    * @throws KeeperException
    * @throws InterruptedException
    */
-  private boolean broadcastUpdatesMade(BSPPeer peer, String master, int updates)
-      throws IOException, KeeperException, InterruptedException {
-    peer.send(master, new IntegerMessage(peer.getPeerName(), updates));
+  private boolean broadcastUpdatesMade(
+      BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text, IntWritable>
peer,
+      int updates) throws IOException, KeeperException, InterruptedException {
+    peer.send(masterTask, new IntegerMessage(peer.getPeerName(), updates));
     peer.sync();
-    if (peer.getPeerName().equals(master)) {
+    if (peer.getPeerName().equals(masterTask)) {
       int count = 0;
       IntegerMessage message;
       while ((message = (IntegerMessage) peer.getCurrentMessage()) != null) {
@@ -145,103 +165,88 @@ public class ShortestPaths extends Short
    * to. <br/>
    * It sends the current cost to the adjacent vertex + the edge weight. If cost
    * will be infinity we just going to send infinity, because summing the weight
-   * will cause an integer overflow resulting in negative weights.
+   * will cause an integer overflow resulting in negative cost.
    * 
    * @param peer The peer we got through the BSP method.
    * @param id The vertex to all adjacent vertices the new cost has to be send.
    * @throws IOException
    */
-  private void sendMessageToNeighbors(BSPPeer peer, ShortestPathVertex id)
-      throws IOException {
-    List<ShortestPathVertex> outgoingEdges = adjacencyList.get(id);
+  private void sendMessageToNeighbors(
+      BSPPeer<ShortestPathVertex, ShortestPathVertexArrayWritable, Text, IntWritable>
peer,
+      ShortestPathVertex id) throws IOException {
+    ShortestPathVertex[] outgoingEdges = adjacencyList.get(id);
     for (ShortestPathVertex adjacent : outgoingEdges) {
-      int mod = Math.abs((adjacent.getId() % peer.getAllPeerNames().length));
-      peer.send(peerNames[mod], new IntegerMessage(adjacent.getName(), id
-          .getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost()
-          + adjacent.getWeight()));
+      int mod = Math.abs((adjacent.hashCode() % peer.getAllPeerNames().length));
+      peer.send(peer.getPeerName(mod), new IntegerMessage(adjacent.getName(),
+          id.getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost()
+              + adjacent.getWeight()));
     }
   }
 
   public static void printUsage() {
-    System.out.println("Single Source Shortest Path Example:");
-    System.out
-        .println("<Startvertex name> <optional: output path> <optional: path
to own adjacency list textfile!>");
+    System.out.println("Usage: <startNode> <output path> <input path>");
+  }
+
+  public static void printOutput(Configuration conf) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    LOG.info("-------------------- RESULTS --------------------");
+    FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir")));
+    for (FileStatus status : stati) {
+      if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) {
+        Path path = status.getPath();
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+        Text key = new Text();
+        IntWritable value = new IntWritable();
+        int x = 0;
+        while (reader.next(key, value)) {
+          LOG.info(key.toString() + " | " + value.get());
+          x++;
+          if(x > 3)
+           break;
+        }
+        reader.close();
+      }
+    }
   }
 
   public static void main(String[] args) throws IOException,
       InterruptedException, ClassNotFoundException, InstantiationException,
       IllegalAccessException {
 
-    printUsage();
+    if (args.length < 3) {
+      printUsage();
+      System.exit(-1);
+    }
 
     // BSP job configuration
     HamaConfiguration conf = new HamaConfiguration();
-    conf.set(SHORTEST_PATHS_START_VERTEX_ID, "Frankfurt");
-    System.out.println("Setting default start vertex to \"Frankfurt\"!");
-    conf.set(OUT_PATH, "sssp/output");
-    Path adjacencyListPath = null;
-
-    if (args.length > 0) {
-      conf.set(SHORTEST_PATHS_START_VERTEX_ID, args[0]);
-      System.out.println("Setting start vertex to " + args[0] + "!");
-
-      if (args.length > 1) {
-        conf.set(OUT_PATH, args[1]);
-        System.out.println("Using new output folder: " + args[1]);
-      }
-
-      if (args.length > 2) {
-        adjacencyListPath = new Path(args[2]);
-      }
-
-    }
-
-    Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = null;
-    if (adjacencyListPath == null)
-      adjacencyList = ShortestPathsGraphLoader.loadGraph();
-
     BSPJob bsp = new BSPJob(conf, ShortestPaths.class);
     // Set the job name
     bsp.setJobName("Single Source Shortest Path");
-    bsp.setBspClass(ShortestPaths.class);
-
-    // Set the task size as a number of GroomServer
-    BSPJobClient jobClient = new BSPJobClient(conf);
-    ClusterStatus cluster = jobClient.getClusterStatus(true);
 
-    Collection<String> activeGrooms = cluster.getActiveGroomNames().keySet();
-    String[] grooms = activeGrooms.toArray(new String[activeGrooms.size()]);
+    conf.set(START_VERTEX, args[0]);
+    bsp.setOutputPath(new Path(args[1]));
+    bsp.setInputPath(new Path(args[2]));
 
-    LOG.info("Starting data partitioning...");
-    if (adjacencyList == null) {
-      conf = (HamaConfiguration) partition(conf, adjacencyListPath, grooms);
-    } else {
-      conf = (HamaConfiguration) partitionExample(conf, adjacencyList, grooms);
-    }
-    LOG.info("Finished!");
+    bsp.setBspClass(ShortestPaths.class);
+    bsp.setInputFormat(SequenceFileInputFormat.class);
+    bsp.setPartitioner(HashPartitioner.class);
+    bsp.setOutputFormat(SequenceFileOutputFormat.class);
+    bsp.setOutputKeyClass(Text.class);
+    bsp.setOutputValueClass(IntWritable.class);
 
-    bsp.setNumBspTask(cluster.getGroomServers());
+    BSPJobClient jobClient = new BSPJobClient(conf);
+    ClusterStatus cluster = jobClient.getClusterStatus(false);
+    // Use max tasks
+    bsp.setNumBspTask(6);
 
     long startTime = System.currentTimeMillis();
     if (bsp.waitForCompletion(true)) {
+      printOutput(conf);
       System.out.println("Job Finished in "
           + (double) (System.currentTimeMillis() - startTime) / 1000.0
           + " seconds");
-      printOutput(FileSystem.get(conf), conf);
     }
   }
 
-  @Override
-  public void cleanup(BSPPeer peer) {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void setup(BSPPeer peer) throws IOException, KeeperException,
-      InterruptedException {
-    // TODO Auto-generated method stub
-
-  }
-
 }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java?rev=1202506&r1=1202505&r2=1202506&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java
(original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java
Wed Nov 16 02:01:55 2011
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.examples.graph;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-public class ShortestPathsGraphLoader {
-  
-  static Map<ShortestPathVertex, List<ShortestPathVertex>> loadGraph() {
-
-    Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = new HashMap<ShortestPathVertex,
List<ShortestPathVertex>>();
-    String[] cities = new String[] { "Frankfurt", "Mannheim", "Wuerzburg",
-        "Stuttgart", "Kassel", "Karlsruhe", "Erfurt", "Nuernberg", "Augsburg",
-        "Muenchen" };
-
-    for (String city : cities) {
-      if (city.equals("Frankfurt")) {
-        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
-        list.add(new ShortestPathVertex(85, "Mannheim"));
-        list.add(new ShortestPathVertex(173, "Kassel"));
-        list.add(new ShortestPathVertex(217, "Wuerzburg"));
-        adjacencyList.put(new ShortestPathVertex(0, city), list);
-      } else if (city.equals("Stuttgart")) {
-        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
-        list.add(new ShortestPathVertex(183, "Nuernberg"));
-        adjacencyList.put(new ShortestPathVertex(0, city), list);
-      } else if (city.equals("Kassel")) {
-        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
-        list.add(new ShortestPathVertex(502, "Muenchen"));
-        list.add(new ShortestPathVertex(173, "Frankfurt"));
-        adjacencyList.put(new ShortestPathVertex(0, city), list);
-      } else if (city.equals("Erfurt")) {
-        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
-        list.add(new ShortestPathVertex(186, "Wuerzburg"));
-        adjacencyList.put(new ShortestPathVertex(0, city), list);
-      } else if (city.equals("Wuerzburg")) {
-        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
-        list.add(new ShortestPathVertex(217, "Frankfurt"));
-        list.add(new ShortestPathVertex(168, "Erfurt"));
-        list.add(new ShortestPathVertex(103, "Nuernberg"));
-        adjacencyList.put(new ShortestPathVertex(0, city), list);
-      } else if (city.equals("Mannheim")) {
-        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
-        list.add(new ShortestPathVertex(80, "Karlsruhe"));
-        list.add(new ShortestPathVertex(85, "Frankfurt"));
-        adjacencyList.put(new ShortestPathVertex(0, city), list);
-      } else if (city.equals("Karlsruhe")) {
-        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
-        list.add(new ShortestPathVertex(250, "Augsburg"));
-        list.add(new ShortestPathVertex(80, "Mannheim"));
-        adjacencyList.put(new ShortestPathVertex(0, city), list);
-      } else if (city.equals("Augsburg")) {
-        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
-        list.add(new ShortestPathVertex(250, "Karlsruhe"));
-        list.add(new ShortestPathVertex(84, "Muenchen"));
-        adjacencyList.put(new ShortestPathVertex(0, city), list);
-      } else if (city.equals("Nuernberg")) {
-        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
-        list.add(new ShortestPathVertex(183, "Stuttgart"));
-        list.add(new ShortestPathVertex(167, "Muenchen"));
-        list.add(new ShortestPathVertex(103, "Wuerzburg"));
-        adjacencyList.put(new ShortestPathVertex(0, city), list);
-      } else if (city.equals("Muenchen")) {
-        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
-        list.add(new ShortestPathVertex(167, "Nuernberg"));
-        list.add(new ShortestPathVertex(173, "Kassel"));
-        list.add(new ShortestPathVertex(84, "Augsburg"));
-        adjacencyList.put(new ShortestPathVertex(0, city), list);
-      }
-    }
-    return adjacencyList;
-  }
-
-}

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/Vertex.java?rev=1202506&r1=1202505&r2=1202506&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/Vertex.java
(original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/Vertex.java
Wed Nov 16 02:01:55 2011
@@ -21,11 +21,10 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hama.examples.graph.partitioning.PartitionableWritable;
+import org.apache.hadoop.io.Writable;
 
-public class Vertex implements PartitionableWritable {
+public class Vertex implements Writable {
 
-  protected int id;
   protected String name;
 
   public Vertex() {
@@ -35,24 +34,21 @@ public class Vertex implements Partition
   public Vertex(String name) {
     super();
     this.name = name;
-    this.id = name.hashCode();
   }
 
   @Override
   public void readFields(DataInput in) throws IOException {
-    this.id = in.readInt();
     this.name = in.readUTF();
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    out.writeInt(id);
     out.writeUTF(name);
   }
 
   @Override
   public int hashCode() {
-    return id;
+    return name.hashCode();
   }
 
   @Override
@@ -69,11 +65,6 @@ public class Vertex implements Partition
     return true;
   }
 
-  @Override
-  public int getId() {
-    return id;
-  }
-
   public String getName() {
     return name;
   }

Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/VertexArrayWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/VertexArrayWritable.java?rev=1202506&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/VertexArrayWritable.java
(added)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/graph/VertexArrayWritable.java
Wed Nov 16 02:01:55 2011
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.graph;
+
+import org.apache.hadoop.io.ArrayWritable;
+
+public class VertexArrayWritable extends ArrayWritable {
+
+  public VertexArrayWritable() {
+    super(Vertex.class);
+  }
+
+}



Mime
View raw message