hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r681591 - in /incubator/hama/trunk/src/java/org/apache/hama: Constants.java algebra/ algebra/AdditionMap.java algebra/AdditionReduce.java mapred/MatrixInputFormat.java mapred/MatrixMap.java
Date Fri, 01 Aug 2008 02:43:11 GMT
Author: edwardyoon
Date: Thu Jul 31 19:43:10 2008
New Revision: 681591

URL: http://svn.apache.org/viewvc?rev=681591&view=rev
Log: (empty)

Added:
    incubator/hama/trunk/src/java/org/apache/hama/algebra/
    incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java
Modified:
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=681591&r1=681590&r2=681591&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Thu Jul 31 19:43:10 2008
@@ -40,7 +40,7 @@
   public final static String MINUS = "-";
 
   /** Default columnfamily name */
-  public final static Text COLUMN = new Text("column:");
+  public final static String COLUMN = "column:";
   /** The numerator version of the fraction matrix */
   public final static Text NUMERATOR = new Text("numerator:");
   /** The denominator version of the fration matrix */

Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java?rev=681591&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionMap.java Thu Jul 31 19:43:10
2008
@@ -0,0 +1,22 @@
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Vector;
+import org.apache.hama.io.VectorDatum;
+import org.apache.hama.mapred.MatrixMap;
+
+public class AdditionMap extends MatrixMap<ImmutableBytesWritable, VectorDatum> {
+
+  public void map(ImmutableBytesWritable key, VectorDatum value,
+      OutputCollector<ImmutableBytesWritable, VectorDatum> output,
+      Reporter reporter) throws IOException {
+
+    Vector v1 = new Vector(B.getRowResult(key.get()));
+    output.collect(key, v1.addition(key.get(), value.getVector()));
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java?rev=681591&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/AdditionReduce.java Thu Jul 31 19:43:10
2008
@@ -0,0 +1,32 @@
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorDatum;
+import org.apache.hama.mapred.MatrixReduce;
+
+public class AdditionReduce extends
+    MatrixReduce<ImmutableBytesWritable, VectorDatum> {
+
+  @Override
+  public void reduce(ImmutableBytesWritable key, Iterator<VectorDatum> values,
+      OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
+      Reporter reporter) throws IOException {
+
+    BatchUpdate b = new BatchUpdate(key.get());
+    VectorDatum vector = values.next();
+    for (Map.Entry<byte[], Cell> f : vector.entrySet()) {
+      b.put(f.getKey(), f.getValue().getValue());
+    }
+
+    output.collect(key, b);
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java?rev=681591&r1=681590&r2=681591&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixInputFormat.java Thu Jul 31
19:43:10 2008
@@ -13,46 +13,46 @@
 import org.apache.hadoop.mapred.JobConfigurable;
 
 public class MatrixInputFormat extends MatrixInputFormatBase implements
-JobConfigurable {
-private final Log LOG = LogFactory.getLog(MatrixInputFormat.class);
+    JobConfigurable {
+  private final Log LOG = LogFactory.getLog(MatrixInputFormat.class);
 
-/**
-* space delimited list of columns
-*
-* @see org.apache.hadoop.hbase.regionserver.HAbstractScanner for column name
-*      wildcards
-*/
-public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
+  /**
+   * space delimited list of columns
+   * 
+   * @see org.apache.hadoop.hbase.regionserver.HAbstractScanner for column name
+   *      wildcards
+   */
+  public static final String COLUMN_LIST = "hbase.mapred.tablecolumns";
 
-/** {@inheritDoc} */
-public void configure(JobConf job) {
-Path[] tableNames = FileInputFormat.getInputPaths(job);
-String colArg = job.get(COLUMN_LIST);
-String[] colNames = colArg.split(" ");
-byte [][] m_cols = new byte[colNames.length][];
-for (int i = 0; i < m_cols.length; i++) {
-  m_cols[i] = Bytes.toBytes(colNames[i]);
-}
-setInputColums(m_cols);
-try {
-  setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
-} catch (Exception e) {
-  LOG.error(e);
-}
-}
+  /** {@inheritDoc} */
+  public void configure(JobConf job) {
+    Path[] tableNames = FileInputFormat.getInputPaths(job);
+    String colArg = job.get(COLUMN_LIST);
+    String[] colNames = colArg.split(" ");
+    byte[][] m_cols = new byte[colNames.length][];
+    for (int i = 0; i < m_cols.length; i++) {
+      m_cols[i] = Bytes.toBytes(colNames[i]);
+    }
+    setInputColums(m_cols);
+    try {
+      setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+  }
 
-/** {@inheritDoc} */
-public void validateInput(JobConf job) throws IOException {
-// expecting exactly one path
-Path [] tableNames = FileInputFormat.getInputPaths(job);
-if (tableNames == null || tableNames.length > 1) {
-  throw new IOException("expecting one table name");
-}
+  /** {@inheritDoc} */
+  public void validateInput(JobConf job) throws IOException {
+    // expecting exactly one path
+    Path[] tableNames = FileInputFormat.getInputPaths(job);
+    if (tableNames == null || tableNames.length > 1) {
+      throw new IOException("expecting one table name");
+    }
 
-// expecting at least one column
-String colArg = job.get(COLUMN_LIST);
-if (colArg == null || colArg.length() == 0) {
-  throw new IOException("expecting at least one column");
-}
-}
+    // expecting at least one column
+    String colArg = job.get(COLUMN_LIST);
+    if (colArg == null || colArg.length() == 0) {
+      throw new IOException("expecting at least one column");
+    }
+  }
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java?rev=681591&r1=681590&r2=681591&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/MatrixMap.java Thu Jul 31 19:43:10
2008
@@ -2,8 +2,9 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapred.TableInputFormat;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -12,43 +13,31 @@
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.Matrix;
 import org.apache.hama.io.VectorDatum;
 
 @SuppressWarnings("unchecked")
 public abstract class MatrixMap<K extends WritableComparable, V extends Writable>
-    extends MapReduceBase implements Mapper<ImmutableBytesWritable, VectorDatum, K, V>
{
-  /**
-   * Use this before submitting a TableMap job. It will
-   * appropriately set up the JobConf.
-   * 
-   * @param table table name
-   * @param columns columns to scan
-   * @param mapper mapper class
-   * @param job job configuration
-   */
-  public static void initJob(String table, String columns,
-    Class<? extends MatrixMap> mapper, 
-    Class<? extends WritableComparable> outputKeyClass, 
-    Class<? extends Writable> outputValueClass, JobConf job) {
-      
+    extends MapReduceBase implements
+    Mapper<ImmutableBytesWritable, VectorDatum, K, V> {
+  protected static Matrix B;
+
+  public static void initJob(String matrixA, String matrixB,
+      Class<? extends MatrixMap> mapper,
+      Class<? extends WritableComparable> outputKeyClass,
+      Class<? extends Writable> outputValueClass, JobConf job) {
+
     job.setInputFormat(MatrixInputFormat.class);
     job.setMapOutputValueClass(outputValueClass);
     job.setMapOutputKeyClass(outputKeyClass);
     job.setMapperClass(mapper);
-    FileInputFormat.addInputPaths(job, table);
-    job.set(TableInputFormat.COLUMN_LIST, columns);
+    FileInputFormat.addInputPaths(job, matrixA);
+
+    B = new Matrix(new HBaseConfiguration(), new Text(matrixB));
+    job.set(MatrixInputFormat.COLUMN_LIST, Constants.COLUMN);
   }
 
-  /**
-   * Call a user defined function on a single HBase record, represented
-   * by a key and its associated record value.
-   * 
-   * @param key
-   * @param value
-   * @param output
-   * @param reporter
-   * @throws IOException
-   */
   public abstract void map(ImmutableBytesWritable key, VectorDatum value,
       OutputCollector<K, V> output, Reporter reporter) throws IOException;
 }



Mime
View raw message