hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r768864 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/ src/java/org/apache/hama/algebra/ src/test/org/apache/hama/
Date Mon, 27 Apr 2009 04:25:12 GMT
Author: edwardyoon
Date: Mon Apr 27 04:25:11 2009
New Revision: 768864

URL: http://svn.apache.org/viewvc?rev=768864&view=rev
Log:
Find the maximum absolute row sum using MapReduce

Added:
    incubator/hama/trunk/src/java/org/apache/hama/algebra/MatrixNormMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/MatrixNormReduce.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/SparseMatrix.java
    incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
    incubator/hama/trunk/src/test/org/apache/hama/TestSparseMatrix.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=768864&r1=768863&r2=768864&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Mon Apr 27 04:25:11 2009
@@ -4,6 +4,7 @@
 
   NEW FEATURES
   
+    HAMA-171: Find the maximum absolute row sum using MapReduce (edwardyoon)
     HAMA-174: Compute the transpose of a matrix (edwardyoon)
     HAMA-162: Add Graph using Sparse Matrix (edwardyoon)
     HAMA-151: Add multiplication example of file matrices (edwardyoon)

Modified: incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java?rev=768864&r1=768863&r2=768864&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/AbstractMatrix.java Mon Apr 27 04:25:11
2009
@@ -24,6 +24,8 @@
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -40,12 +42,17 @@
 import org.apache.hadoop.hbase.mapred.TableMap;
 import org.apache.hadoop.hbase.mapred.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.algebra.MatrixNormMap;
+import org.apache.hama.algebra.MatrixNormReduce;
 import org.apache.hama.algebra.TransposeMap;
 import org.apache.hama.algebra.TransposeReduce;
 import org.apache.hama.io.VectorUpdate;
@@ -60,7 +67,7 @@
 public abstract class AbstractMatrix implements Matrix {
   static int tryPathLength = Constants.DEFAULT_PATH_LENGTH;
   static final Logger LOG = Logger.getLogger(AbstractMatrix.class);
-
+  
   protected HamaConfiguration config;
   protected HBaseAdmin admin;
   // a matrix just need a table path to point to the table which stores matrix.
@@ -155,6 +162,54 @@
     return this.table;
   }
 
+  protected double getNorm1() throws IOException {
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("norm1 MR job : " + this.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+    
+    final FileSystem fs = FileSystem.get(jobConf);
+    final Path outDir = new Path(new Path(getType() + "_TMP_norm1_dir"), "out");
+    if(fs.exists(outDir)) 
+      fs.delete(outDir, true);
+    FileOutputFormat.setOutputPath(jobConf, outDir);
+
+    MatrixNormMap.initJob(this.getPath(), MatrixNormMap.class, IntWritable.class,
+        DoubleWritable.class, jobConf);
+    MatrixNormReduce.initJob(outDir.toString(), MatrixNormReduce.class, jobConf);
+    JobManager.execute(jobConf);
+
+    //read outputs
+    Path inFile = new Path(outDir, "reduce-out");
+    IntWritable numInside = new IntWritable();
+    DoubleWritable max = new DoubleWritable();
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, inFile, jobConf);
+    try {
+      reader.next(numInside, max);
+    } finally {
+      reader.close();
+    }
+
+    fs.delete(outDir, true);
+    return max.get();
+  }
+  
+  protected double getMaxvalue() {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  protected double getInfinity() {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  protected double getFrobenius() {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+  
   /** {@inheritDoc} */
   public int getRows() throws IOException {
     Cell rows = null;

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=768864&r1=768863&r2=768864&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Mon Apr 27 04:25:11 2009
@@ -244,7 +244,7 @@
     jobConf.setSpeculativeExecution(false);
     jobConf.set("matrix.column", String.valueOf(n));
     jobConf.set("matrix.type", TABLE_PREFIX);
-    
+
     jobConf.setInputFormat(SequenceFileInputFormat.class);
     final FileSystem fs = FileSystem.get(jobConf);
     int interval = m / conf.getNumMapTasks();
@@ -328,7 +328,8 @@
    * @throws IOException
    */
   public DenseVector getRow(int i) throws IOException {
-    return new DenseVector(table.getRow(BytesUtil.getRowIndex(i), new byte[][] { Bytes.toBytes(Constants.COLUMN)
}));
+    return new DenseVector(table.getRow(BytesUtil.getRowIndex(i),
+        new byte[][] { Bytes.toBytes(Constants.COLUMN) }));
   }
 
   /**
@@ -387,7 +388,7 @@
     if (this.getColumns() < column)
       increaseColumns();
 
-    for(Map.Entry<Writable, Writable> e : vector.getEntries().entrySet()) {
+    for (Map.Entry<Writable, Writable> e : vector.getEntries().entrySet()) {
       int key = ((IntWritable) e.getKey()).get();
       double value = ((DoubleEntry) e.getValue()).getValue();
       VectorUpdate update = new VectorUpdate(key);
@@ -486,18 +487,20 @@
   public DenseMatrix mult(Matrix B) throws IOException {
     ensureForMultiplication(B);
     DenseMatrix result = new DenseMatrix(config);
-    
-    for(int i = 0; i < this.getRows(); i++) {
+
+    for (int i = 0; i < this.getRows(); i++) {
       JobConf jobConf = new JobConf(config);
-      jobConf.setJobName("multiplication MR job : " + result.getPath() + " " + i);
+      jobConf.setJobName("multiplication MR job : " + result.getPath() + " "
+          + i);
 
       jobConf.setNumMapTasks(config.getNumMapTasks());
       jobConf.setNumReduceTasks(config.getNumReduceTasks());
 
-      DenseMatrixVectorMultMap.initJob(i, this.getPath(), B.getPath(), DenseMatrixVectorMultMap.class,
-          IntWritable.class, MapWritable.class, jobConf);
-      DenseMatrixVectorMultReduce.initJob(result.getPath(), DenseMatrixVectorMultReduce.class,
+      DenseMatrixVectorMultMap.initJob(i, this.getPath(), B.getPath(),
+          DenseMatrixVectorMultMap.class, IntWritable.class, MapWritable.class,
           jobConf);
+      DenseMatrixVectorMultReduce.initJob(result.getPath(),
+          DenseMatrixVectorMultReduce.class, jobConf);
       JobManager.execute(jobConf);
     }
 
@@ -568,8 +571,14 @@
    * @throws IOException
    */
   public double norm(Norm type) throws IOException {
-    // TODO Auto-generated method stub
-    return 0;
+    if (type == Norm.One)
+      return getNorm1();
+    else if (type == Norm.Frobenius)
+      return getFrobenius();
+    else if (type == Norm.Infinity)
+      return getInfinity();
+    else
+      return getMaxvalue();
   }
 
   /**

Modified: incubator/hama/trunk/src/java/org/apache/hama/SparseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/SparseMatrix.java?rev=768864&r1=768863&r2=768864&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/SparseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/SparseMatrix.java Mon Apr 27 04:25:11 2009
@@ -251,10 +251,22 @@
     return null;
   }
 
-  @Override
+  /**
+   * Computes the given norm of the matrix
+   * 
+   * @param type
+   * @return norm of the matrix
+   * @throws IOException
+   */
   public double norm(Norm type) throws IOException {
-    // TODO Auto-generated method stub
-    return 0;
+    if (type == Norm.One)
+      return getNorm1();
+    else if (type == Norm.Frobenius)
+      return getFrobenius();
+    else if (type == Norm.Infinity)
+      return getInfinity();
+    else
+      return getMaxvalue();
   }
 
   @Override

Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/MatrixNormMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/MatrixNormMap.java?rev=768864&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/MatrixNormMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/MatrixNormMap.java Mon Apr 27 04:25:11
2009
@@ -0,0 +1,70 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.mapred.VectorInputFormat;
+
+public class MatrixNormMap extends MapReduceBase implements
+    Mapper<IntWritable, MapWritable, IntWritable, DoubleWritable> {
+  private IntWritable nKey = new IntWritable(-1);
+  private DoubleWritable nValue = new DoubleWritable();
+  
+  public static void initJob(String path, Class<MatrixNormMap> map,
+      Class<IntWritable> outputKeyClass, Class<DoubleWritable> outputValueClass,
+      JobConf jobConf) {
+    jobConf.setMapOutputValueClass(outputValueClass);
+    jobConf.setMapOutputKeyClass(outputKeyClass);
+    jobConf.setMapperClass(map);
+
+    jobConf.setInputFormat(VectorInputFormat.class);
+    FileInputFormat.addInputPaths(jobConf, path);
+    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+  }
+
+  @Override
+  public void map(IntWritable key, MapWritable value,
+      OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+      throws IOException {
+
+    double rowSum = 0;
+    for(Map.Entry<Writable, Writable> e : value.entrySet()) {
+      rowSum += Math.abs(((DoubleEntry) e.getValue()).getValue());
+    }
+    nValue.set(rowSum);
+    
+    output.collect(nKey, nValue);
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/MatrixNormReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/MatrixNormReduce.java?rev=768864&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/MatrixNormReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/MatrixNormReduce.java Mon Apr 27
04:25:11 2009
@@ -0,0 +1,83 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+
+public class MatrixNormReduce extends MapReduceBase implements
+    Reducer<IntWritable, DoubleWritable, IntWritable, DoubleWritable> {
+  private double max = 0;
+  private String outDir = "";
+  private JobConf conf;
+  private static final String OUTPUT = "hama.multiplication.matrix.a";
+
+  public void configure(JobConf job) {
+    outDir = job.get(OUTPUT, "");
+    conf = job;
+  }
+
+  public static void initJob(String path, Class<MatrixNormReduce> reducer,
+      JobConf jobConf) {
+    jobConf.setOutputFormat(SequenceFileOutputFormat.class);
+    jobConf.setReducerClass(reducer);
+    jobConf.setOutputKeyClass(IntWritable.class);
+    jobConf.setOutputValueClass(DoubleWritable.class);
+    jobConf.set(OUTPUT, path);
+  }
+
+  @Override
+  public void reduce(IntWritable key, Iterator<DoubleWritable> values,
+      OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
+      throws IOException {
+
+    while (values.hasNext()) {
+      max = Math.max(values.next().get(), max);
+    }
+
+  }
+
+  /**
+   * Reduce task done, Writes the largest element of the passed array
+   */
+  @Override
+  public void close() throws IOException {
+    // write output to a file
+    Path outFile = new Path(outDir, "reduce-out");
+    FileSystem fileSys = FileSystem.get(conf);
+    SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+        outFile, IntWritable.class, DoubleWritable.class, CompressionType.NONE);
+    writer.append(new IntWritable(-1), new DoubleWritable(max));
+    writer.close();
+  }
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=768864&r1=768863&r2=768864&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Mon Apr 27 04:25:11
2009
@@ -29,6 +29,7 @@
 
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.Matrix.Norm;
 import org.apache.hama.io.DoubleEntry;
 import org.apache.hama.util.RandomVariable;
 import org.apache.log4j.Logger;
@@ -233,6 +234,11 @@
     assertEquals(value, result.get(0, 0));
   }
   
+  public void testNorm1() throws IOException {
+    double norm1 = m1.norm(Norm.One);
+    assertEquals(norm1, verifyNorm1());
+  }
+  
   public void testSetRow() throws IOException {
     Vector v = new DenseVector();
     double[] entries = new double[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 0 };
@@ -272,7 +278,7 @@
     
     assertEquals(m1.getColumns(), SIZE + 1);
   }
-
+  
   public void testEnsureForAddition() {
     try {
       m1.add(m2);
@@ -334,6 +340,7 @@
         loadTest3.close();
     }
   }
+  
   public void testForceCreate() throws IOException {
     String path2 = m2.getPath();
     // save m2 to aliase2
@@ -399,4 +406,18 @@
       }
     }
   }
+
+  private double verifyNorm1() throws IOException {
+    double[] rowSum = new double[m1.getRows()];
+    for (int i = 0; i < m1.getRows(); i++) {
+      for (int j = 0; j < m1.getColumns(); j++) {
+        rowSum[i] += Math.abs(m1.get(i, j));
+      }
+    }
+
+    double max = 0;
+    for (int i = 0; i < rowSum.length; ++i)
+      max = Math.max(rowSum[i], max);
+    return max;
+  }
 }

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestSparseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestSparseMatrix.java?rev=768864&r1=768863&r2=768864&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestSparseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestSparseMatrix.java Mon Apr 27 04:25:11
2009
@@ -26,6 +26,7 @@
 import junit.framework.TestCase;
 import junit.framework.TestSuite;
 
+import org.apache.hama.Matrix.Norm;
 import org.apache.log4j.Logger;
 
 public class TestSparseMatrix extends TestCase {
@@ -62,13 +63,13 @@
 
   public void testTranspose() throws IOException {
     SparseMatrix trans = (SparseMatrix) m1.transpose();
-    for(int i = 0; i < trans.getRows(); i++) {
-      for(int j = 0; j < trans.getColumns(); j++) {
+    for (int i = 0; i < trans.getRows(); i++) {
+      for (int j = 0; j < trans.getColumns(); j++) {
         assertEquals(trans.get(i, j), m1.get(j, i));
       }
     }
   }
-  
+
   public void testSparsity() throws IOException {
     boolean appeared = false;
     for (int i = 0; i < m1.getRows(); i++) {
@@ -91,6 +92,11 @@
     verifyMultResult(m1, m2, result);
   }
 
+  public void testNorm1() throws IOException {
+    double norm1 = m1.norm(Norm.One);
+    assertEquals(norm1, verifyNorm1());
+  }
+
   /**
    * Verifying multiplication result
    * 
@@ -118,4 +124,18 @@
       }
     }
   }
+
+  private double verifyNorm1() throws IOException {
+    double[] rowSum = new double[m1.getRows()];
+    for (int i = 0; i < m1.getRows(); i++) {
+      for (int j = 0; j < m1.getColumns(); j++) {
+        rowSum[i] += Math.abs(m1.get(i, j));
+      }
+    }
+
+    double max = 0;
+    for (int i = 0; i < rowSum.length; ++i)
+      max = Math.max(rowSum[i], max);
+    return max;
+  }
 }



Mime
View raw message