hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r725904 [1/2] - in /incubator/hama/trunk/src: java/org/apache/hama/ java/org/apache/hama/algebra/ java/org/apache/hama/mapred/ test/org/apache/hama/mapred/
Date Fri, 12 Dec 2008 05:10:58 GMT
Author: edwardyoon
Date: Thu Dec 11 21:10:58 2008
New Revision: 725904

URL: http://svn.apache.org/viewvc?rev=725904&view=rev
Log:
Remove duplicate codes

Added:
    incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java
Removed:
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicMap.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockCyclicReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormatBase.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicMap.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/RowCyclicReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormatBase.java
Modified:
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyMap.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=725904&r1=725903&r2=725904&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Thu Dec 11 21:10:58 2008
@@ -1,598 +1,600 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Scanner;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.Cell;
-import org.apache.hadoop.hbase.io.RowResult;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.lib.NullOutputFormat;
-import org.apache.hama.algebra.BlockCyclicMultiplyMap;
-import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
-import org.apache.hama.algebra.RowCyclicAdditionMap;
-import org.apache.hama.algebra.RowCyclicAdditionReduce;
-import org.apache.hama.algebra.SIMDMultiplyMap;
-import org.apache.hama.algebra.SIMDMultiplyReduce;
-import org.apache.hama.io.BlockID;
-import org.apache.hama.io.BlockWritable;
-import org.apache.hama.io.DoubleEntry;
-import org.apache.hama.io.MapWritable;
-import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.BlockCyclicReduce;
-import org.apache.hama.mapred.BlockingMapRed;
-import org.apache.hama.mapred.RandomMatrixMap;
-import org.apache.hama.mapred.RowCyclicReduce;
-import org.apache.hama.util.BytesUtil;
-import org.apache.hama.util.JobManager;
-import org.apache.hama.util.RandomVariable;
-
-public class DenseMatrix extends AbstractMatrix implements Matrix {
-
-  static int tryPathLength = Constants.DEFAULT_PATH_LENGTH;
-  static final String TABLE_PREFIX = DenseMatrix.class.getSimpleName() + "_";
-  static private final Path TMP_DIR = new Path(DenseMatrix.class
-      .getSimpleName()
-      + "_TMP_dir");
-
-  /**
-   * Construct a raw matrix. Just create a table in HBase, but didn't lay any
-   * schema ( such as dimensions: i, j ) on it.
-   * 
-   * @param conf configuration object
-   * @throws IOException throw the exception to let the user know what happend,
-   *                 if we didn't create the matrix successfully.
-   */
-  public DenseMatrix(HamaConfiguration conf) throws IOException {
-    setConfiguration(conf);
-
-    tryToCreateTable();
-
-    closed = false;
-  }
-
-  /**
-   * Create/load a matrix aliased as 'matrixName'.
-   * 
-   * @param conf configuration object
-   * @param matrixName the name of the matrix
-   * @param force if force is true, a new matrix will be created no matter
-   *                'matrixName' has aliased to an existed matrix; otherwise,
-   *                just try to load an existed matrix alised 'matrixName'.
-   * @throws IOException
-   */
-  public DenseMatrix(HamaConfiguration conf, String matrixName, boolean force)
-      throws IOException {
-    setConfiguration(conf);
-    // if force is set to true:
-    // 1) if this matrixName has aliase to other matrix, we will remove
-    // the old aliase, create a new matrix table, and aliase to it.
-    // 2) if this matrixName has no aliase to other matrix, we will create
-    // a new matrix table, and alise to it.
-    //
-    // if force is set to false, we just try to load an existed matrix alised
-    // as 'matrixname'.
-
-    boolean existed = hamaAdmin.matrixExists(matrixName);
-
-    if (force) {
-      if (existed) {
-        // remove the old aliase
-        hamaAdmin.delete(matrixName);
-      }
-      // create a new matrix table.
-      tryToCreateTable();
-      // save the new aliase relationship
-      save(matrixName);
-    } else {
-      if (existed) {
-        // try to get the actual path of the table
-        matrixPath = hamaAdmin.getPath(matrixName);
-        // load the matrix
-        table = new HTable(conf, matrixPath);
-        // increment the reference
-        incrementAndGetRef();
-      } else {
-        throw new IOException("Try to load non-existed matrix alised as "
-            + matrixName);
-      }
-    }
-
-    closed = false;
-  }
-
-  /**
-   * Load a matrix from an existed matrix table whose tablename is 'matrixpath' !!
-   * It is an internal used for map/reduce.
-   * 
-   * @param conf configuration object
-   * @param matrixpath
-   * @throws IOException
-   * @throws IOException
-   */
-  public DenseMatrix(HamaConfiguration conf, String matrixpath)
-      throws IOException {
-    setConfiguration(conf);
-    matrixPath = matrixpath;
-    // load the matrix
-    table = new HTable(conf, matrixPath);
-    // TODO: now we don't increment the reference of the table
-    // for it's an internal use for map/reduce.
-    // if we want to increment the reference of the table,
-    // we don't know where to call Matrix.close in Add & Mul map/reduce
-    // process to decrement the reference. It seems difficulty.
-  }
-
-  /**
-   * Create an m-by-n constant matrix.
-   * 
-   * @param conf configuration object
-   * @param m the number of rows.
-   * @param n the number of columns.
-   * @param s fill the matrix with this scalar value.
-   * @throws IOException throw the exception to let the user know what happend,
-   *                 if we didn't create the matrix successfully.
-   */
-  public DenseMatrix(HamaConfiguration conf, int m, int n, double s)
-      throws IOException {
-    setConfiguration(conf);
-
-    tryToCreateTable();
-
-    closed = false;
-
-    for (int i = 0; i < m; i++) {
-      for (int j = 0; j < n; j++) {
-        set(i, j, s);
-      }
-    }
-
-    setDimension(m, n);
-  }
-
-  /**
-   * try to create a new matrix with a new random name. try times will be
-   * (Integer.MAX_VALUE - 4) * DEFAULT_TRY_TIMES;
-   * 
-   * @throws IOException
-   */
-  private void tryToCreateTable() throws IOException {
-    int tryTimes = Constants.DEFAULT_TRY_TIMES;
-    do {
-      matrixPath = TABLE_PREFIX + RandomVariable.randMatrixPath(tryPathLength);
-
-      if (!admin.tableExists(matrixPath)) { // no table 'matrixPath' in hbase.
-        tableDesc = new HTableDescriptor(matrixPath);
-        create();
-        return;
-      }
-
-      tryTimes--;
-      if (tryTimes <= 0) { // this loop has exhausted DEFAULT_TRY_TIMES.
-        tryPathLength++;
-        tryTimes = Constants.DEFAULT_TRY_TIMES;
-      }
-
-    } while (tryPathLength <= Constants.DEFAULT_MAXPATHLEN);
-    // exhaustes the try times.
-    // throw out an IOException to let the user know what happened.
-    throw new IOException("Try too many times to create a table in hbase.");
-  }
-
-  /**
-   * Generate matrix with random elements
-   * 
-   * @param conf configuration object
-   * @param m the number of rows.
-   * @param n the number of columns.
-   * @return an m-by-n matrix with uniformly distributed random elements.
-   * @throws IOException
-   */
-  public static DenseMatrix random(HamaConfiguration conf, int m, int n)
-      throws IOException {
-    DenseMatrix rand = new DenseMatrix(conf);
-    DenseVector vector = new DenseVector();
-    LOG.info("Create the " + m + " * " + n + " random matrix : "
-        + rand.getPath());
-
-    for (int i = 0; i < m; i++) {
-      vector.clear();
-      for (int j = 0; j < n; j++) {
-        vector.set(j, RandomVariable.rand());
-      }
-      rand.setRow(i, vector);
-    }
-
-    rand.setDimension(m, n);
-    return rand;
-  }
-
-  /**
-   * Generate matrix with random elements using Map/Reduce
-   * 
-   * @param conf configuration object
-   * @param m the number of rows.
-   * @param n the number of columns.
-   * @return an m-by-n matrix with uniformly distributed random elements.
-   * @throws IOException
-   */
-  public static DenseMatrix random_mapred(HamaConfiguration conf, int m, int n)
-      throws IOException {
-    DenseMatrix rand = new DenseMatrix(conf);
-    LOG.info("Create the " + m + " * " + n + " random matrix : "
-        + rand.getPath());
-
-    JobConf jobConf = new JobConf(conf);
-    jobConf.setJobName("random matrix MR job : " + rand.getPath());
-
-    jobConf.setNumMapTasks(conf.getNumMapTasks());
-    jobConf.setNumReduceTasks(conf.getNumReduceTasks());
-
-    final Path inDir = new Path(TMP_DIR, "in");
-    FileInputFormat.setInputPaths(jobConf, inDir);
-    jobConf.setMapperClass(RandomMatrixMap.class);
-
-    jobConf.setOutputKeyClass(BooleanWritable.class);
-    jobConf.setOutputValueClass(LongWritable.class);
-    jobConf.setOutputFormat(NullOutputFormat.class);
-    jobConf.setSpeculativeExecution(false);
-    jobConf.set("matrix.column", String.valueOf(n));
-    jobConf.set("matrix.path", rand.getPath());
-
-    jobConf.setInputFormat(SequenceFileInputFormat.class);
-    final FileSystem fs = FileSystem.get(jobConf);
-    int interval = m / conf.getNumMapTasks();
-
-    // generate an input file for each map task
-    for (int i = 0; i < conf.getNumMapTasks(); ++i) {
-      final Path file = new Path(inDir, "part" + i);
-      final IntWritable start = new IntWritable(i * interval);
-      IntWritable end = null;
-      if ((i + 1) != conf.getNumMapTasks()) {
-        end = new IntWritable(((i * interval) + interval) - 1);
-      } else {
-        end = new IntWritable(m - 1);
-      }
-      final SequenceFile.Writer writer = SequenceFile.createWriter(fs, jobConf,
-          file, IntWritable.class, IntWritable.class, CompressionType.NONE);
-      try {
-        writer.append(start, end);
-      } finally {
-        writer.close();
-      }
-      System.out.println("Wrote input for Map #" + i);
-    }
-
-    JobClient.runJob(jobConf);
-    rand.setDimension(m, n);
-    fs.delete(TMP_DIR, true);
-    return rand;
-  }
-
-  /**
-   * Generate identity matrix
-   * 
-   * @param conf configuration object
-   * @param m the number of rows.
-   * @param n the number of columns.
-   * @return an m-by-n matrix with ones on the diagonal and zeros elsewhere.
-   * @throws IOException
-   */
-  public static Matrix identity(HamaConfiguration conf, int m, int n)
-      throws IOException {
-    Matrix identity = new DenseMatrix(conf);
-    LOG.info("Create the " + m + " * " + n + " identity matrix : "
-        + identity.getPath());
-
-    for (int i = 0; i < m; i++) {
-      DenseVector vector = new DenseVector();
-      for (int j = 0; j < n; j++) {
-        vector.set(j, (i == j ? 1.0 : 0.0));
-      }
-      identity.setRow(i, vector);
-    }
-
-    identity.setDimension(m, n);
-    return identity;
-  }
-
-  /**
-   * Gets the double value of (i, j)
-   * 
-   * @param i ith row of the matrix
-   * @param j jth column of the matrix
-   * @return the value of entry, or zero If entry is null
-   * @throws IOException
-   */
-  public double get(int i, int j) throws IOException {
-    Cell c = table.get(BytesUtil.intToBytes(i), BytesUtil.getColumnIndex(j));
-    return (c != null) ? BytesUtil.bytesToDouble(c.getValue()) : 0;
-  }
-
-  public Matrix add(Matrix B) throws IOException {
-    Matrix result = new DenseMatrix(config);
-
-    JobConf jobConf = new JobConf(config);
-    jobConf.setJobName("addition MR job" + result.getPath());
-
-    jobConf.setNumMapTasks(config.getNumMapTasks());
-    jobConf.setNumReduceTasks(config.getNumReduceTasks());
-
-    RowCyclicAdditionMap.initJob(this.getPath(), B.getPath(),
-        RowCyclicAdditionMap.class, IntWritable.class, VectorWritable.class,
-        jobConf);
-    RowCyclicReduce.initJob(result.getPath(), RowCyclicAdditionReduce.class,
-        jobConf);
-
-    JobManager.execute(jobConf, result);
-    return result;
-  }
-
-  public Matrix add(double alpha, Matrix B) throws IOException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  public DenseVector getRow(int row) throws IOException {
-    return new DenseVector(table.getRow(BytesUtil.intToBytes(row)));
-  }
-
-  public DenseVector getColumn(int column) throws IOException {
-    byte[] columnKey = BytesUtil.getColumnIndex(column);
-    byte[][] c = { columnKey };
-    Scanner scan = table.getScanner(c, HConstants.EMPTY_START_ROW);
-
-    MapWritable<Integer, DoubleEntry> trunk = new MapWritable<Integer, DoubleEntry>();
-
-    for (RowResult row : scan) {
-      trunk.put(BytesUtil.bytesToInt(row.getRow()), new DoubleEntry(row
-          .get(columnKey)));
-    }
-
-    return new DenseVector(trunk);
-  }
-
-  public Matrix mult(Matrix B) throws IOException {
-    Matrix result = new DenseMatrix(config);
-
-    JobConf jobConf = new JobConf(config);
-    jobConf.setJobName("multiplication MR job : " + result.getPath());
-
-    jobConf.setNumMapTasks(config.getNumMapTasks());
-    jobConf.setNumReduceTasks(config.getNumReduceTasks());
-
-    if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) {
-      BlockCyclicMultiplyMap.initJob(this.getPath(), B.getPath(),
-          BlockCyclicMultiplyMap.class, IntWritable.class, BlockWritable.class,
-          jobConf);
-      BlockCyclicReduce.initJob(result.getPath(),
-          BlockCyclicMultiplyReduce.class, jobConf);
-    } else {
-      SIMDMultiplyMap.initJob(this.getPath(), B.getPath(),
-          SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class,
-          jobConf);
-      RowCyclicReduce.initJob(result.getPath(), SIMDMultiplyReduce.class,
-          jobConf);
-    }
-
-    JobManager.execute(jobConf, result);
-    return result;
-  }
-
-  public Matrix multAdd(double alpha, Matrix B, Matrix C) throws IOException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  public double norm(Norm type) throws IOException {
-    // TODO Auto-generated method stub
-    return 0;
-  }
-
-  public Matrix set(double alpha, Matrix B) throws IOException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  public Matrix set(Matrix B) throws IOException {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  public void setRow(int row, Vector vector) throws IOException {
-    VectorUpdate update = new VectorUpdate(row);
-    update.putAll(((DenseVector) vector).getEntries().entrySet());
-    table.commit(update.getBatchUpdate());
-  }
-
-  public void setColumn(int column, Vector vector) throws IOException {
-    for (int i = 0; i < vector.size(); i++) {
-      VectorUpdate update = new VectorUpdate(i);
-      update.put(column, vector.get(i));
-      table.commit(update.getBatchUpdate());
-    }
-  }
-
-  public String getType() {
-    return this.getClass().getSimpleName();
-  }
-
-  public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException {
-    int columnSize = (j1 - j0) + 1;
-    SubMatrix result = new SubMatrix((i1 - i0) + 1, columnSize);
-
-    for (int i = i0, ii = 0; i <= i1; i++, ii++) {
-      for (int j = j0, jj = 0; j <= j1; j++, jj++) {
-        Cell c = table
-            .get(BytesUtil.intToBytes(i), BytesUtil.getColumnIndex(j));
-        result.set(ii, jj, BytesUtil.bytesToDouble(c.getValue()));
-      }
-    }
-
-    return result;
-  }
-
-  public boolean isBlocked() throws IOException {
-    return (table.get(Constants.METADATA, Constants.BLOCK_SIZE) == null) ? false
-        : true;
-  }
-
-  public SubMatrix getBlock(int i, int j) throws IOException {
-    return new SubMatrix(table.get(String.valueOf(i), Constants.BLOCK + j)
-        .getValue());
-  }
-
-  /**
-   * @return the size of block
-   * @throws IOException
-   */
-  public int getBlockSize() throws IOException {
-    return (isBlocked()) ? BytesUtil.bytesToInt(table.get(Constants.METADATA,
-        Constants.BLOCK_SIZE).getValue()) : -1;
-  }
-
-  protected void setBlockSize(int blockNum) throws IOException {
-    BatchUpdate update = new BatchUpdate(Constants.METADATA);
-    update.put(Constants.BLOCK_SIZE, BytesUtil.intToBytes(blockNum));
-    table.commit(update);
-  }
-
-  public void setBlock(int i, int j, SubMatrix matrix) throws IOException {
-    BatchUpdate update = new BatchUpdate(String.valueOf(i));
-    update.put(Constants.BLOCK + j, matrix.getBytes());
-    table.commit(update);
-  }
-
-  protected void setBlockPosition(int blockNum) throws IOException {
-    int block_row_size = this.getRows() / blockNum;
-    int block_column_size = this.getColumns() / blockNum;
-
-    int startRow, endRow, startColumn, endColumn;
-    int i = 0, j = 0;
-    do {
-      startRow = i * block_row_size;
-      endRow = (startRow + block_row_size) - 1;
-      if (endRow >= this.getRows())
-        endRow = this.getRows() - 1;
-
-      j = 0;
-      do {
-        startColumn = j * block_column_size;
-        endColumn = (startColumn + block_column_size) - 1;
-        if (endColumn >= this.getColumns())
-          endColumn = this.getColumns() - 1;
-
-        BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes());
-        update.put(Constants.BLOCK_STARTROW, BytesUtil.intToBytes(startRow));
-        update.put(Constants.BLOCK_ENDROW, BytesUtil.intToBytes(endRow));
-        update.put(Constants.BLOCK_STARTCOLUMN, BytesUtil
-            .intToBytes(startColumn));
-        update.put(Constants.BLOCK_ENDCOLUMN, BytesUtil.intToBytes(endColumn));
-        table.commit(update);
-
-        j++;
-      } while (endColumn < (this.getColumns() - 1));
-
-      i++;
-    } while (endRow < (this.getRows() - 1));
-  }
-
-  protected int[] getBlockPosition(int i, int j) throws IOException {
-    RowResult rs = table.getRow(new BlockID(i, j).getBytes());
-    int[] result = new int[4];
-    
-    result[0] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_STARTROW).getValue());
-    result[1] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_ENDROW).getValue());
-    result[2] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_STARTCOLUMN).getValue());
-    result[3] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_ENDCOLUMN).getValue());
-    
-    return result;
-  }
-
-  /**
-   * Using a map/reduce job to block a dense matrix.
-   * 
-   * @param blockNum
-   * @throws IOException
-   */
-  public void blocking_mapred(int blockNum) throws IOException {
-    this.checkBlockNum(blockNum);
-
-    JobConf jobConf = new JobConf(config);
-    jobConf.setJobName("Blocking MR job" + getPath());
-
-    jobConf.setNumMapTasks(config.getNumMapTasks());
-    jobConf.setNumReduceTasks(config.getNumReduceTasks());
-
-    BlockingMapRed.initJob(getPath(), jobConf);
-
-    JobManager.execute(jobConf);
-  }
-
-  /**
-   * Using a scanner to block a dense matrix. If the matrix is large, use the
-   * blocking_mapred()
-   * 
-   * @param blockNum
-   * @throws IOException
-   */
-  public void blocking(int blockNum) throws IOException {
-    this.checkBlockNum(blockNum);
-
-    String[] columns = new String[] { Constants.BLOCK_STARTROW,
-        Constants.BLOCK_ENDROW, Constants.BLOCK_STARTCOLUMN,
-        Constants.BLOCK_ENDCOLUMN };
-    Scanner scan = table.getScanner(columns);
-
-    for (RowResult row : scan) {
-      BlockID bID = new BlockID(row.getRow());
-      int[] pos = getBlockPosition(bID.getRow(), bID.getColumn());
-      setBlock(bID.getRow(), bID.getColumn(), subMatrix(pos[0], pos[1], pos[2], pos[3]));
-    }
-  }
-
-  private void checkBlockNum(int blockNum) throws IOException {
-    double blocks = Math.pow(blockNum, 0.5);
-    // TODO: Check also it is validation with matrix.
-    if (!String.valueOf(blocks).endsWith(".0"))
-      throw new IOException("can't divide.");
-
-    int block_size = (int) blocks;
-    setBlockPosition(block_size);
-    setBlockSize(block_size);
-    LOG.info("Create " + block_size + " * " + block_size + " blocked matrix");
-  }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hama.algebra.BlockCyclicMultiplyMap;
+import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
+import org.apache.hama.algebra.RowCyclicAdditionMap;
+import org.apache.hama.algebra.RowCyclicAdditionReduce;
+import org.apache.hama.algebra.SIMDMultiplyMap;
+import org.apache.hama.algebra.SIMDMultiplyReduce;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.DoubleEntry;
+import org.apache.hama.io.MapWritable;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.io.VectorWritable;
+import org.apache.hama.mapred.BlockingMapRed;
+import org.apache.hama.mapred.RandomMatrixMap;
+import org.apache.hama.util.BytesUtil;
+import org.apache.hama.util.JobManager;
+import org.apache.hama.util.RandomVariable;
+
+public class DenseMatrix extends AbstractMatrix implements Matrix {
+
+  static int tryPathLength = Constants.DEFAULT_PATH_LENGTH;
+  static final String TABLE_PREFIX = DenseMatrix.class.getSimpleName() + "_";
+  static private final Path TMP_DIR = new Path(DenseMatrix.class
+      .getSimpleName()
+      + "_TMP_dir");
+
+  /**
+   * Construct a raw matrix. Just create a table in HBase, but didn't lay any
+   * schema ( such as dimensions: i, j ) on it.
+   * 
+   * @param conf configuration object
+   * @throws IOException throw the exception to let the user know what happend,
+   *                 if we didn't create the matrix successfully.
+   */
+  public DenseMatrix(HamaConfiguration conf) throws IOException {
+    setConfiguration(conf);
+
+    tryToCreateTable();
+
+    closed = false;
+  }
+
+  /**
+   * Create/load a matrix aliased as 'matrixName'.
+   * 
+   * @param conf configuration object
+   * @param matrixName the name of the matrix
+   * @param force if force is true, a new matrix will be created no matter
+   *                'matrixName' has aliased to an existed matrix; otherwise,
+   *                just try to load an existed matrix alised 'matrixName'.
+   * @throws IOException
+   */
+  public DenseMatrix(HamaConfiguration conf, String matrixName, boolean force)
+      throws IOException {
+    setConfiguration(conf);
+    // if force is set to true:
+    // 1) if this matrixName has aliase to other matrix, we will remove
+    // the old aliase, create a new matrix table, and aliase to it.
+    // 2) if this matrixName has no aliase to other matrix, we will create
+    // a new matrix table, and alise to it.
+    //
+    // if force is set to false, we just try to load an existed matrix alised
+    // as 'matrixname'.
+
+    boolean existed = hamaAdmin.matrixExists(matrixName);
+
+    if (force) {
+      if (existed) {
+        // remove the old aliase
+        hamaAdmin.delete(matrixName);
+      }
+      // create a new matrix table.
+      tryToCreateTable();
+      // save the new aliase relationship
+      save(matrixName);
+    } else {
+      if (existed) {
+        // try to get the actual path of the table
+        matrixPath = hamaAdmin.getPath(matrixName);
+        // load the matrix
+        table = new HTable(conf, matrixPath);
+        // increment the reference
+        incrementAndGetRef();
+      } else {
+        throw new IOException("Try to load non-existed matrix alised as "
+            + matrixName);
+      }
+    }
+
+    closed = false;
+  }
+
+  /**
+   * Load a matrix from an existed matrix table whose tablename is 'matrixpath' !!
+   * It is an internal used for map/reduce.
+   * 
+   * @param conf configuration object
+   * @param matrixpath
+   * @throws IOException
+   * @throws IOException
+   */
+  public DenseMatrix(HamaConfiguration conf, String matrixpath)
+      throws IOException {
+    setConfiguration(conf);
+    matrixPath = matrixpath;
+    // load the matrix
+    table = new HTable(conf, matrixPath);
+    // TODO: now we don't increment the reference of the table
+    // for it's an internal use for map/reduce.
+    // if we want to increment the reference of the table,
+    // we don't know where to call Matrix.close in Add & Mul map/reduce
+    // process to decrement the reference. It seems difficulty.
+  }
+
+  /**
+   * Create an m-by-n constant matrix.
+   * 
+   * @param conf configuration object
+   * @param m the number of rows.
+   * @param n the number of columns.
+   * @param s fill the matrix with this scalar value.
+   * @throws IOException throw the exception to let the user know what happend,
+   *                 if we didn't create the matrix successfully.
+   */
+  public DenseMatrix(HamaConfiguration conf, int m, int n, double s)
+      throws IOException {
+    setConfiguration(conf);
+
+    tryToCreateTable();
+
+    closed = false;
+
+    for (int i = 0; i < m; i++) {
+      for (int j = 0; j < n; j++) {
+        set(i, j, s);
+      }
+    }
+
+    setDimension(m, n);
+  }
+
+  /**
+   * try to create a new matrix with a new random name. try times will be
+   * (Integer.MAX_VALUE - 4) * DEFAULT_TRY_TIMES;
+   * 
+   * @throws IOException
+   */
+  private void tryToCreateTable() throws IOException {
+    int tryTimes = Constants.DEFAULT_TRY_TIMES;
+    do {
+      matrixPath = TABLE_PREFIX + RandomVariable.randMatrixPath(tryPathLength);
+
+      if (!admin.tableExists(matrixPath)) { // no table 'matrixPath' in hbase.
+        tableDesc = new HTableDescriptor(matrixPath);
+        create();
+        return;
+      }
+
+      tryTimes--;
+      if (tryTimes <= 0) { // this loop has exhausted DEFAULT_TRY_TIMES.
+        tryPathLength++;
+        tryTimes = Constants.DEFAULT_TRY_TIMES;
+      }
+
+    } while (tryPathLength <= Constants.DEFAULT_MAXPATHLEN);
+    // exhaustes the try times.
+    // throw out an IOException to let the user know what happened.
+    throw new IOException("Try too many times to create a table in hbase.");
+  }
+
+  /**
+   * Generate matrix with random elements
+   * 
+   * @param conf configuration object
+   * @param m the number of rows.
+   * @param n the number of columns.
+   * @return an m-by-n matrix with uniformly distributed random elements.
+   * @throws IOException
+   */
+  public static DenseMatrix random(HamaConfiguration conf, int m, int n)
+      throws IOException {
+    DenseMatrix rand = new DenseMatrix(conf);
+    DenseVector vector = new DenseVector();
+    LOG.info("Create the " + m + " * " + n + " random matrix : "
+        + rand.getPath());
+
+    for (int i = 0; i < m; i++) {
+      vector.clear();
+      for (int j = 0; j < n; j++) {
+        vector.set(j, RandomVariable.rand());
+      }
+      rand.setRow(i, vector);
+    }
+
+    rand.setDimension(m, n);
+    return rand;
+  }
+
+  /**
+   * Generate matrix with random elements using Map/Reduce
+   * 
+   * @param conf configuration object
+   * @param m the number of rows.
+   * @param n the number of columns.
+   * @return an m-by-n matrix with uniformly distributed random elements.
+   * @throws IOException
+   */
+  public static DenseMatrix random_mapred(HamaConfiguration conf, int m, int n)
+      throws IOException {
+    DenseMatrix rand = new DenseMatrix(conf);
+    LOG.info("Create the " + m + " * " + n + " random matrix : "
+        + rand.getPath());
+
+    JobConf jobConf = new JobConf(conf);
+    jobConf.setJobName("random matrix MR job : " + rand.getPath());
+
+    jobConf.setNumMapTasks(conf.getNumMapTasks());
+    jobConf.setNumReduceTasks(conf.getNumReduceTasks());
+
+    final Path inDir = new Path(TMP_DIR, "in");
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    jobConf.setMapperClass(RandomMatrixMap.class);
+
+    jobConf.setOutputKeyClass(BooleanWritable.class);
+    jobConf.setOutputValueClass(LongWritable.class);
+    jobConf.setOutputFormat(NullOutputFormat.class);
+    jobConf.setSpeculativeExecution(false);
+    jobConf.set("matrix.column", String.valueOf(n));
+    jobConf.set("matrix.path", rand.getPath());
+
+    jobConf.setInputFormat(SequenceFileInputFormat.class);
+    final FileSystem fs = FileSystem.get(jobConf);
+    int interval = m / conf.getNumMapTasks();
+
+    // generate an input file for each map task
+    for (int i = 0; i < conf.getNumMapTasks(); ++i) {
+      final Path file = new Path(inDir, "part" + i);
+      final IntWritable start = new IntWritable(i * interval);
+      IntWritable end = null;
+      if ((i + 1) != conf.getNumMapTasks()) {
+        end = new IntWritable(((i * interval) + interval) - 1);
+      } else {
+        end = new IntWritable(m - 1);
+      }
+      final SequenceFile.Writer writer = SequenceFile.createWriter(fs, jobConf,
+          file, IntWritable.class, IntWritable.class, CompressionType.NONE);
+      try {
+        writer.append(start, end);
+      } finally {
+        writer.close();
+      }
+      System.out.println("Wrote input for Map #" + i);
+    }
+
+    JobClient.runJob(jobConf);
+    rand.setDimension(m, n);
+    fs.delete(TMP_DIR, true);
+    return rand;
+  }
+
+  /**
+   * Generate identity matrix
+   * 
+   * @param conf configuration object
+   * @param m the number of rows.
+   * @param n the number of columns.
+   * @return an m-by-n matrix with ones on the diagonal and zeros elsewhere.
+   * @throws IOException
+   */
+  public static Matrix identity(HamaConfiguration conf, int m, int n)
+      throws IOException {
+    Matrix identity = new DenseMatrix(conf);
+    LOG.info("Create the " + m + " * " + n + " identity matrix : "
+        + identity.getPath());
+
+    for (int i = 0; i < m; i++) {
+      DenseVector vector = new DenseVector();
+      for (int j = 0; j < n; j++) {
+        vector.set(j, (i == j ? 1.0 : 0.0));
+      }
+      identity.setRow(i, vector);
+    }
+
+    identity.setDimension(m, n);
+    return identity;
+  }
+
+  /**
+   * Gets the double value of (i, j)
+   * 
+   * @param i ith row of the matrix
+   * @param j jth column of the matrix
+   * @return the value of entry, or zero If entry is null
+   * @throws IOException
+   */
+  public double get(int i, int j) throws IOException {
+    Cell c = table.get(BytesUtil.intToBytes(i), BytesUtil.getColumnIndex(j));
+    return (c != null) ? BytesUtil.bytesToDouble(c.getValue()) : 0;
+  }
+
+  public Matrix add(Matrix B) throws IOException {
+    Matrix result = new DenseMatrix(config);
+
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("addition MR job" + result.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+    RowCyclicAdditionMap.initJob(this.getPath(), B.getPath(),
+        RowCyclicAdditionMap.class, IntWritable.class, VectorWritable.class,
+        jobConf);
+    RowCyclicAdditionReduce.initJob(result.getPath(),
+        RowCyclicAdditionReduce.class, jobConf);
+
+    JobManager.execute(jobConf, result);
+    return result;
+  }
+
+  public Matrix add(double alpha, Matrix B) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public DenseVector getRow(int row) throws IOException {
+    return new DenseVector(table.getRow(BytesUtil.intToBytes(row)));
+  }
+
+  public DenseVector getColumn(int column) throws IOException {
+    byte[] columnKey = BytesUtil.getColumnIndex(column);
+    byte[][] c = { columnKey };
+    Scanner scan = table.getScanner(c, HConstants.EMPTY_START_ROW);
+
+    MapWritable<Integer, DoubleEntry> trunk = new MapWritable<Integer, DoubleEntry>();
+
+    for (RowResult row : scan) {
+      trunk.put(BytesUtil.bytesToInt(row.getRow()), new DoubleEntry(row
+          .get(columnKey)));
+    }
+
+    return new DenseVector(trunk);
+  }
+
+  public Matrix mult(Matrix B) throws IOException {
+    Matrix result = new DenseMatrix(config);
+
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("multiplication MR job : " + result.getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+    if (this.isBlocked() && ((DenseMatrix) B).isBlocked()) {
+      BlockCyclicMultiplyMap.initJob(this.getPath(), B.getPath(),
+          BlockCyclicMultiplyMap.class, IntWritable.class, BlockWritable.class,
+          jobConf);
+      BlockCyclicMultiplyReduce.initJob(result.getPath(),
+          BlockCyclicMultiplyReduce.class, jobConf);
+    } else {
+      SIMDMultiplyMap.initJob(this.getPath(), B.getPath(),
+          SIMDMultiplyMap.class, IntWritable.class, VectorWritable.class,
+          jobConf);
+      SIMDMultiplyReduce.initJob(result.getPath(), SIMDMultiplyReduce.class,
+          jobConf);
+    }
+
+    JobManager.execute(jobConf, result);
+    return result;
+  }
+
+  public Matrix multAdd(double alpha, Matrix B, Matrix C) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public double norm(Norm type) throws IOException {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  public Matrix set(double alpha, Matrix B) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public Matrix set(Matrix B) throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  public void setRow(int row, Vector vector) throws IOException {
+    VectorUpdate update = new VectorUpdate(row);
+    update.putAll(((DenseVector) vector).getEntries().entrySet());
+    table.commit(update.getBatchUpdate());
+  }
+
+  public void setColumn(int column, Vector vector) throws IOException {
+    for (int i = 0; i < vector.size(); i++) {
+      VectorUpdate update = new VectorUpdate(i);
+      update.put(column, vector.get(i));
+      table.commit(update.getBatchUpdate());
+    }
+  }
+
+  public String getType() {
+    return this.getClass().getSimpleName();
+  }
+
+  public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException {
+    int columnSize = (j1 - j0) + 1;
+    SubMatrix result = new SubMatrix((i1 - i0) + 1, columnSize);
+
+    for (int i = i0, ii = 0; i <= i1; i++, ii++) {
+      for (int j = j0, jj = 0; j <= j1; j++, jj++) {
+        Cell c = table
+            .get(BytesUtil.intToBytes(i), BytesUtil.getColumnIndex(j));
+        result.set(ii, jj, BytesUtil.bytesToDouble(c.getValue()));
+      }
+    }
+
+    return result;
+  }
+
+  public boolean isBlocked() throws IOException {
+    return (table.get(Constants.METADATA, Constants.BLOCK_SIZE) == null) ? false
+        : true;
+  }
+
+  public SubMatrix getBlock(int i, int j) throws IOException {
+    return new SubMatrix(table.get(String.valueOf(i), Constants.BLOCK + j)
+        .getValue());
+  }
+
+  /**
+   * @return the size of block
+   * @throws IOException
+   */
+  public int getBlockSize() throws IOException {
+    return (isBlocked()) ? BytesUtil.bytesToInt(table.get(Constants.METADATA,
+        Constants.BLOCK_SIZE).getValue()) : -1;
+  }
+
+  protected void setBlockSize(int blockNum) throws IOException {
+    BatchUpdate update = new BatchUpdate(Constants.METADATA);
+    update.put(Constants.BLOCK_SIZE, BytesUtil.intToBytes(blockNum));
+    table.commit(update);
+  }
+
+  public void setBlock(int i, int j, SubMatrix matrix) throws IOException {
+    BatchUpdate update = new BatchUpdate(String.valueOf(i));
+    update.put(Constants.BLOCK + j, matrix.getBytes());
+    table.commit(update);
+  }
+
+  protected void setBlockPosition(int blockNum) throws IOException {
+    int block_row_size = this.getRows() / blockNum;
+    int block_column_size = this.getColumns() / blockNum;
+
+    int startRow, endRow, startColumn, endColumn;
+    int i = 0, j = 0;
+    do {
+      startRow = i * block_row_size;
+      endRow = (startRow + block_row_size) - 1;
+      if (endRow >= this.getRows())
+        endRow = this.getRows() - 1;
+
+      j = 0;
+      do {
+        startColumn = j * block_column_size;
+        endColumn = (startColumn + block_column_size) - 1;
+        if (endColumn >= this.getColumns())
+          endColumn = this.getColumns() - 1;
+
+        BatchUpdate update = new BatchUpdate(new BlockID(i, j).getBytes());
+        update.put(Constants.BLOCK_STARTROW, BytesUtil.intToBytes(startRow));
+        update.put(Constants.BLOCK_ENDROW, BytesUtil.intToBytes(endRow));
+        update.put(Constants.BLOCK_STARTCOLUMN, BytesUtil
+            .intToBytes(startColumn));
+        update.put(Constants.BLOCK_ENDCOLUMN, BytesUtil.intToBytes(endColumn));
+        table.commit(update);
+
+        j++;
+      } while (endColumn < (this.getColumns() - 1));
+
+      i++;
+    } while (endRow < (this.getRows() - 1));
+  }
+
+  protected int[] getBlockPosition(int i, int j) throws IOException {
+    RowResult rs = table.getRow(new BlockID(i, j).getBytes());
+    int[] result = new int[4];
+
+    result[0] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_STARTROW)
+        .getValue());
+    result[1] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_ENDROW).getValue());
+    result[2] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_STARTCOLUMN)
+        .getValue());
+    result[3] = BytesUtil.bytesToInt(rs.get(Constants.BLOCK_ENDCOLUMN)
+        .getValue());
+
+    return result;
+  }
+
+  /**
+   * Using a map/reduce job to block a dense matrix.
+   * 
+   * @param blockNum
+   * @throws IOException
+   */
+  public void blocking_mapred(int blockNum) throws IOException {
+    this.checkBlockNum(blockNum);
+
+    JobConf jobConf = new JobConf(config);
+    jobConf.setJobName("Blocking MR job" + getPath());
+
+    jobConf.setNumMapTasks(config.getNumMapTasks());
+    jobConf.setNumReduceTasks(config.getNumReduceTasks());
+
+    BlockingMapRed.initJob(getPath(), jobConf);
+
+    JobManager.execute(jobConf);
+  }
+
+  /**
+   * Using a scanner to block a dense matrix. If the matrix is large, use the
+   * blocking_mapred()
+   * 
+   * @param blockNum
+   * @throws IOException
+   */
+  public void blocking(int blockNum) throws IOException {
+    this.checkBlockNum(blockNum);
+
+    String[] columns = new String[] { Constants.BLOCK_STARTROW,
+        Constants.BLOCK_ENDROW, Constants.BLOCK_STARTCOLUMN,
+        Constants.BLOCK_ENDCOLUMN };
+    Scanner scan = table.getScanner(columns);
+
+    for (RowResult row : scan) {
+      BlockID bID = new BlockID(row.getRow());
+      int[] pos = getBlockPosition(bID.getRow(), bID.getColumn());
+      setBlock(bID.getRow(), bID.getColumn(), subMatrix(pos[0], pos[1], pos[2],
+          pos[3]));
+    }
+  }
+
+  private void checkBlockNum(int blockNum) throws IOException {
+    double blocks = Math.pow(blockNum, 0.5);
+    // TODO: Check also it is validation with matrix.
+    if (!String.valueOf(blocks).endsWith(".0"))
+      throw new IOException("can't divide.");
+
+    int block_size = (int) blocks;
+    setBlockPosition(block_size);
+    setBlockSize(block_size);
+    LOG.info("Create " + block_size + " * " + block_size + " blocked matrix");
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=725904&r1=725903&r2=725904&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Thu Dec 11 21:10:58 2008
@@ -1,74 +1,81 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.algebra;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.DenseMatrix;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.SubMatrix;
-import org.apache.hama.io.BlockWritable;
-import org.apache.hama.mapred.BlockCyclicMap;
-import org.apache.log4j.Logger;
-
-public class BlockCyclicMultiplyMap extends
-    BlockCyclicMap<IntWritable, BlockWritable> {
-  static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class);
-  protected DenseMatrix matrix_b;
-  public static final String MATRIX_B = "hama.multiplication.matrix.b";
-
-  public void configure(JobConf job) {
-    try {
-      matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, ""));
-    } catch (IOException e) {
-      LOG.warn("Load matrix_b failed : " + e.getMessage());
-    }
-  }
-
-  public static void initJob(String matrix_a, String matrix_b,
-      Class<BlockCyclicMultiplyMap> map, Class<IntWritable> outputKeyClass,
-      Class<BlockWritable> outputValueClass, JobConf jobConf) {
-
-    jobConf.setMapOutputValueClass(outputValueClass);
-    jobConf.setMapOutputKeyClass(outputKeyClass);
-    jobConf.setMapperClass(map);
-    jobConf.set(MATRIX_B, matrix_b);
-
-    initJob(matrix_a, map, jobConf);
-  }
-
-  @Override
-  public void map(IntWritable key, BlockWritable value,
-      OutputCollector<IntWritable, BlockWritable> output, Reporter reporter)
-      throws IOException {
-    for (int i = 0; i < value.size(); i++) {
-      SubMatrix a = value.get(i);
-      for (int j = 0; j < matrix_b.getBlockSize(); j++) {
-        SubMatrix b = matrix_b.getBlock(i, j);
-        SubMatrix c = a.mult(b);
-        output.collect(key, new BlockWritable(key.get(), j, c));
-      }
-    }
-  }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.mapred.BlockInputFormat;
+import org.apache.log4j.Logger;
+
+public class BlockCyclicMultiplyMap extends MapReduceBase implements
+    Mapper<IntWritable, BlockWritable, IntWritable, BlockWritable> {
+  static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class);
+  protected DenseMatrix matrix_b;
+  public static final String MATRIX_B = "hama.multiplication.matrix.b";
+
+  public void configure(JobConf job) {
+    try {
+      matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, ""));
+    } catch (IOException e) {
+      LOG.warn("Load matrix_b failed : " + e.getMessage());
+    }
+  }
+
+  public static void initJob(String matrix_a, String matrix_b,
+      Class<BlockCyclicMultiplyMap> map, Class<IntWritable> outputKeyClass,
+      Class<BlockWritable> outputValueClass, JobConf jobConf) {
+
+    jobConf.setMapOutputValueClass(outputValueClass);
+    jobConf.setMapOutputKeyClass(outputKeyClass);
+    jobConf.setMapperClass(map);
+    jobConf.set(MATRIX_B, matrix_b);
+
+    jobConf.setInputFormat(BlockInputFormat.class);
+    FileInputFormat.addInputPaths(jobConf, matrix_a);
+
+    jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK);
+  }
+
+  @Override
+  public void map(IntWritable key, BlockWritable value,
+      OutputCollector<IntWritable, BlockWritable> output, Reporter reporter)
+      throws IOException {
+    for (int i = 0; i < value.size(); i++) {
+      SubMatrix a = value.get(i);
+      for (int j = 0; j < matrix_b.getBlockSize(); j++) {
+        SubMatrix b = matrix_b.getBlock(i, j);
+        SubMatrix c = a.mult(b);
+        output.collect(key, new BlockWritable(key.get(), j, c));
+      }
+    }
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java?rev=725904&r1=725903&r2=725904&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyReduce.java Thu Dec 11 21:10:58 2008
@@ -1,77 +1,98 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.algebra;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.SubMatrix;
-import org.apache.hama.io.BlockEntry;
-import org.apache.hama.io.BlockWritable;
-import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.mapred.BlockCyclicReduce;
-import org.apache.log4j.Logger;
-
-public class BlockCyclicMultiplyReduce extends
-    BlockCyclicReduce<IntWritable, BlockWritable> {
-  static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyReduce.class);
-
-  @Override
-  public void reduce(IntWritable key, Iterator<BlockWritable> values,
-      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
-      throws IOException {
-    int row = key.get();
-    Map<Integer, SubMatrix> sum = new HashMap<Integer, SubMatrix>();
-
-    while (values.hasNext()) {
-      BlockWritable b = values.next();
-      for (Map.Entry<Integer, BlockEntry> e : b.entrySet()) {
-        int j = e.getKey();
-        SubMatrix value = e.getValue().getValue();
-        if (sum.containsKey(j)) {
-          sum.put(j, sum.get(j).add(value));
-        } else {
-          sum.put(j, value);
-        }
-      }
-    }
-
-    for (Map.Entry<Integer, SubMatrix> e : sum.entrySet()) {
-      int column = e.getKey();
-      SubMatrix mat = e.getValue();
-
-      int startRow = row * mat.getRows();
-      int startColumn = column * mat.getColumns();
-
-      for (int i = 0; i < mat.getRows(); i++) {
-        VectorUpdate update = new VectorUpdate(i + startRow);
-        for (int j = 0; j < mat.getColumns(); j++) {
-          update.put(j + startColumn, mat.get(i, j));
-        }
-        output.collect(key, update);
-      }
-    }
-  }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockEntry;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.mapred.VectorOutputFormat;
+import org.apache.log4j.Logger;
+
+public class BlockCyclicMultiplyReduce extends MapReduceBase implements
+    Reducer<IntWritable, BlockWritable, IntWritable, VectorUpdate> {
+  static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyReduce.class);
+
+  /**
+   * Use this before submitting a BlockCyclicMultiplyReduce job. It will
+   * appropriately set up the JobConf.
+   * 
+   * @param table
+   * @param reducer
+   * @param job
+   */
+  public static void initJob(String table,
+      Class<BlockCyclicMultiplyReduce> reducer, JobConf job) {
+    job.setOutputFormat(VectorOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(BatchUpdate.class);
+  }
+
+  @Override
+  public void reduce(IntWritable key, Iterator<BlockWritable> values,
+      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+      throws IOException {
+    int row = key.get();
+    Map<Integer, SubMatrix> sum = new HashMap<Integer, SubMatrix>();
+
+    while (values.hasNext()) {
+      BlockWritable b = values.next();
+      for (Map.Entry<Integer, BlockEntry> e : b.entrySet()) {
+        int j = e.getKey();
+        SubMatrix value = e.getValue().getValue();
+        if (sum.containsKey(j)) {
+          sum.put(j, sum.get(j).add(value));
+        } else {
+          sum.put(j, value);
+        }
+      }
+    }
+
+    for (Map.Entry<Integer, SubMatrix> e : sum.entrySet()) {
+      int column = e.getKey();
+      SubMatrix mat = e.getValue();
+
+      int startRow = row * mat.getRows();
+      int startColumn = column * mat.getColumns();
+
+      for (int i = 0; i < mat.getRows(); i++) {
+        VectorUpdate update = new VectorUpdate(i + startRow);
+        for (int j = 0; j < mat.getColumns(); j++) {
+          update.put(j + startColumn, mat.get(i, j));
+        }
+        output.collect(key, update);
+      }
+    }
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java?rev=725904&r1=725903&r2=725904&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionMap.java Thu Dec 11 21:10:58 2008
@@ -22,17 +22,21 @@
 import java.io.IOException;
 
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
 import org.apache.hama.DenseMatrix;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.RowCyclicMap;
+import org.apache.hama.mapred.VectorInputFormat;
 import org.apache.log4j.Logger;
 
-public class RowCyclicAdditionMap extends
-    RowCyclicMap<IntWritable, VectorWritable> {
+public class RowCyclicAdditionMap extends MapReduceBase implements
+Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
   static final Logger LOG = Logger.getLogger(RowCyclicAdditionMap.class);
   protected DenseMatrix matrix_b;
   public static final String MATRIX_B = "hama.addition.matrix.b";
@@ -54,7 +58,10 @@
     jobConf.setMapperClass(map);
     jobConf.set(MATRIX_B, matrix_b);
 
-    initJob(matrix_a, map, jobConf);
+
+    jobConf.setInputFormat(VectorInputFormat.class);
+    FileInputFormat.addInputPaths(jobConf, matrix_a);
+    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
   }
 
   @Override
@@ -66,5 +73,4 @@
         matrix_b.getRow(key.get()).add(value.getDenseVector())));
 
   }
-
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java?rev=725904&r1=725903&r2=725904&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/RowCyclicAdditionReduce.java Thu Dec 11 21:10:58 2008
@@ -1,45 +1,67 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.algebra;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.RowCyclicReduce;
-
-public class RowCyclicAdditionReduce extends RowCyclicReduce<IntWritable, VectorWritable> {
-
-  @Override
-  public void reduce(IntWritable key, Iterator<VectorWritable> values,
-      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
-      throws IOException {
-
-    VectorUpdate update = new VectorUpdate(key.get());
-    update.putAll(values.next().entrySet());
-
-    output.collect(key, update);
-  }
-
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.io.VectorWritable;
+import org.apache.hama.mapred.VectorOutputFormat;
+
+public class RowCyclicAdditionReduce extends MapReduceBase implements
+    Reducer<IntWritable, VectorWritable, IntWritable, VectorUpdate> {
+
+  /**
+   * Use this before submitting a TableReduce job. It will appropriately set up
+   * the JobConf.
+   * 
+   * @param table
+   * @param reducer
+   * @param job
+   */
+  public static void initJob(String table, Class<RowCyclicAdditionReduce> reducer,
+      JobConf job) {
+    job.setOutputFormat(VectorOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(BatchUpdate.class);
+  }
+
+  @Override
+  public void reduce(IntWritable key, Iterator<VectorWritable> values,
+      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+      throws IOException {
+
+    VectorUpdate update = new VectorUpdate(key.get());
+    update.putAll(values.next().entrySet());
+
+    output.collect(key, update);
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyMap.java?rev=725904&r1=725903&r2=725904&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyMap.java Thu Dec 11 21:10:58 2008
@@ -1,77 +1,84 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.algebra;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.DenseMatrix;
-import org.apache.hama.DenseVector;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hama.Matrix;
-import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.RowCyclicMap;
-import org.apache.log4j.Logger;
-
-/**
- * SIMD version
- */
-public class SIMDMultiplyMap extends RowCyclicMap<IntWritable, VectorWritable> {
-  static final Logger LOG = Logger.getLogger(SIMDMultiplyMap.class);
-  protected Matrix matrix_b;
-  public static final String MATRIX_B = "hama.multiplication.matrix.b";
-  public static final DenseVector sum = new DenseVector();;
-  
-  public void configure(JobConf job) {
-    try {
-      matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, ""));
-    } catch (IOException e) {
-      LOG.warn("Load matrix_b failed : " + e.getMessage());
-    }
-  }
-
-  public static void initJob(String matrix_a, String matrix_b,
-      Class<SIMDMultiplyMap> map, Class<IntWritable> outputKeyClass,
-      Class<VectorWritable> outputValueClass, JobConf jobConf) {
-
-    jobConf.setMapOutputValueClass(outputValueClass);
-    jobConf.setMapOutputKeyClass(outputKeyClass);
-    jobConf.setMapperClass(map);
-    jobConf.set(MATRIX_B, matrix_b);
-
-    initJob(matrix_a, map, jobConf);
-  }
-
-  @Override
-  public void map(IntWritable key, VectorWritable value,
-      OutputCollector<IntWritable, VectorWritable> output, Reporter reporter)
-      throws IOException {
-    sum.clear();
-
-    for(int i = 0; i < value.size(); i++) {
-      sum.add(matrix_b.getRow(i).scale(value.get(i)));
-    }
-    
-    output.collect(key, new VectorWritable(key.get(), sum));
-  }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.Constants;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.DenseVector;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.Matrix;
+import org.apache.hama.io.VectorWritable;
+import org.apache.hama.mapred.VectorInputFormat;
+import org.apache.log4j.Logger;
+
+/**
+ * SIMD version
+ */
+public class SIMDMultiplyMap extends MapReduceBase implements
+Mapper<IntWritable, VectorWritable, IntWritable, VectorWritable> {
+  static final Logger LOG = Logger.getLogger(SIMDMultiplyMap.class);
+  protected Matrix matrix_b;
+  public static final String MATRIX_B = "hama.multiplication.matrix.b";
+  public static final DenseVector sum = new DenseVector();;
+  
+  public void configure(JobConf job) {
+    try {
+      matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, ""));
+    } catch (IOException e) {
+      LOG.warn("Load matrix_b failed : " + e.getMessage());
+    }
+  }
+
+  public static void initJob(String matrix_a, String matrix_b,
+      Class<SIMDMultiplyMap> map, Class<IntWritable> outputKeyClass,
+      Class<VectorWritable> outputValueClass, JobConf jobConf) {
+
+    jobConf.setMapOutputValueClass(outputValueClass);
+    jobConf.setMapOutputKeyClass(outputKeyClass);
+    jobConf.setMapperClass(map);
+    jobConf.set(MATRIX_B, matrix_b);
+
+    jobConf.setInputFormat(VectorInputFormat.class);
+    FileInputFormat.addInputPaths(jobConf, matrix_a);
+    jobConf.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+  }
+
+  @Override
+  public void map(IntWritable key, VectorWritable value,
+      OutputCollector<IntWritable, VectorWritable> output, Reporter reporter)
+      throws IOException {
+    sum.clear();
+
+    for(int i = 0; i < value.size(); i++) {
+      sum.add(matrix_b.getRow(i).scale(value.get(i)));
+    }
+    
+    output.collect(key, new VectorWritable(key.get(), sum));
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java?rev=725904&r1=725903&r2=725904&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/SIMDMultiplyReduce.java Thu Dec 11 21:10:58 2008
@@ -1,48 +1,69 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.algebra;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.io.VectorUpdate;
-import org.apache.hama.io.VectorWritable;
-import org.apache.hama.mapred.RowCyclicReduce;
-import org.apache.log4j.Logger;
-
-public class SIMDMultiplyReduce extends
-    RowCyclicReduce<IntWritable, VectorWritable> {
-  static final Logger LOG = Logger.getLogger(SIMDMultiplyReduce.class);
-  
-  @Override
-  public void reduce(IntWritable key, Iterator<VectorWritable> values,
-      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
-      throws IOException {
-
-    VectorUpdate update = new VectorUpdate(key.get());
-    update.putAll(values.next().entrySet());
-
-    output.collect(key, update);
-  }
-
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.algebra;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorUpdate;
+import org.apache.hama.io.VectorWritable;
+import org.apache.hama.mapred.VectorOutputFormat;
+import org.apache.log4j.Logger;
+
+public class SIMDMultiplyReduce extends MapReduceBase implements
+    Reducer<IntWritable, VectorWritable, IntWritable, VectorUpdate> {
+  static final Logger LOG = Logger.getLogger(SIMDMultiplyReduce.class);
+
+  /**
+   * Use this before submitting a TableReduce job. It will appropriately set up
+   * the JobConf.
+   * 
+   * @param table
+   * @param reducer
+   * @param job
+   */
+  public static void initJob(String table,
+      Class<SIMDMultiplyReduce> reducer, JobConf job) {
+    job.setOutputFormat(VectorOutputFormat.class);
+    job.setReducerClass(reducer);
+    job.set(VectorOutputFormat.OUTPUT_TABLE, table);
+    job.setOutputKeyClass(IntWritable.class);
+    job.setOutputValueClass(BatchUpdate.class);
+  }
+  
+  @Override
+  public void reduce(IntWritable key, Iterator<VectorWritable> values,
+      OutputCollector<IntWritable, VectorUpdate> output, Reporter reporter)
+      throws IOException {
+
+    VectorUpdate update = new VectorUpdate(key.get());
+    update.putAll(values.next().entrySet());
+
+    output.collect(key, update);
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=725904&r1=725903&r2=725904&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Thu Dec 11 21:10:58 2008
@@ -1,74 +1,119 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-
-public class BlockInputFormat extends BlockInputFormatBase implements
-    JobConfigurable {
-  private static final Log LOG = LogFactory.getLog(BlockInputFormat.class);
-
-  /**
-   * space delimited list of columns
-   */
-  public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
-
-  /** {@inheritDoc} */
-  public void configure(JobConf job) {
-    Path[] tableNames = FileInputFormat.getInputPaths(job);
-    String colArg = job.get(COLUMN_LIST);
-    String[] colNames = colArg.split(" ");
-    byte[][] m_cols = new byte[colNames.length][];
-    for (int i = 0; i < m_cols.length; i++) {
-      m_cols[i] = Bytes.toBytes(colNames[i]);
-    }
-    setInputColums(m_cols);
-    try {
-      setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-  }
-
-  /** {@inheritDoc} */
-  public void validateInput(JobConf job) throws IOException {
-    // expecting exactly one path
-    Path[] tableNames = FileInputFormat.getInputPaths(job);
-    if (tableNames == null || tableNames.length > 1) {
-      throw new IOException("expecting one table name");
-    }
-
-    // expecting at least one column
-    String colArg = job.get(COLUMN_LIST);
-    if (colArg == null || colArg.length() == 0) {
-      throw new IOException("expecting at least one column");
-    }
-  }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.BlockWritable;
+import org.apache.hama.util.BytesUtil;
+
+public class BlockInputFormat extends TableInputFormatBase implements
+    InputFormat<IntWritable, BlockWritable>, JobConfigurable {
+  private TableRecordReader tableRecordReader;
+  
+  /**
+   * Iterate over an HBase table data, return (IntWritable, BlockWritable) pairs
+   */
+  protected static class TableRecordReader extends TableRecordReaderBase
+      implements RecordReader<IntWritable, BlockWritable> {
+
+    /**
+     * @return IntWritable
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#createKey()
+     */
+    public IntWritable createKey() {
+      return new IntWritable();
+    }
+
+    /**
+     * @return BlockWritable
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#createValue()
+     */
+    public BlockWritable createValue() {
+      return new BlockWritable();
+    }
+
+    /**
+     * @param key IntWritable as input key.
+     * @param value BlockWritable as input value
+     * 
+     * Converts Scanner.next() to IntWritable, BlockWritable
+     * 
+     * @return true if there was more data
+     * @throws IOException
+     */
+    public boolean next(IntWritable key, BlockWritable value)
+        throws IOException {
+      RowResult result = this.scanner.next();
+      boolean hasMore = result != null && result.size() > 0;
+      if (hasMore) {
+        key.set(BytesUtil.bytesToInt(result.getRow()));
+        Writables.copyWritable(result, value);
+      }
+      return hasMore;
+    }
+  }
+
+  /**
+   * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
+   * default.
+   * 
+   * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+   *      JobConf, Reporter)
+   */
+  public RecordReader<IntWritable, BlockWritable> getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    TableSplit tSplit = (TableSplit) split;
+    TableRecordReader trr = this.tableRecordReader;
+    // if no table record reader was provided use default
+    if (trr == null) {
+      trr = new TableRecordReader();
+    }
+    trr.setStartRow(tSplit.getStartRow());
+    trr.setEndRow(tSplit.getEndRow());
+    trr.setHTable(this.table);
+    trr.setInputColumns(this.inputColumns);
+    trr.setRowFilter(this.rowFilter);
+    trr.init();
+    return trr;
+  }
+  
+  /**
+   * Allows subclasses to set the {@link TableRecordReader}.
+   * 
+   * @param tableRecordReader to provide other {@link TableRecordReader}
+   *                implementations.
+   */
+  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+    this.tableRecordReader = tableRecordReader;
+  }
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java?rev=725904&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/TableInputFormatBase.java Thu Dec 11 21:10:58 2008
@@ -0,0 +1,128 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hama.Constants;
+import org.apache.hama.util.BytesUtil;
+
+public abstract class TableInputFormatBase {
+  private static final Log LOG = LogFactory.getLog(TableInputFormatBase.class);
+  protected byte[][] inputColumns;
+  protected HTable table;
+  protected RowFilterInterface rowFilter;
+
+  /**
+   * space delimited list of columns
+   */
+  public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
+
+  public void configure(JobConf job) {
+    Path[] tableNames = FileInputFormat.getInputPaths(job);
+    String colArg = job.get(COLUMN_LIST);
+    String[] colNames = colArg.split(" ");
+    byte[][] m_cols = new byte[colNames.length][];
+    for (int i = 0; i < m_cols.length; i++) {
+      m_cols[i] = Bytes.toBytes(colNames[i]);
+    }
+    setInputColums(m_cols);
+    try {
+      setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
+    } catch (Exception e) {
+      LOG.error(e);
+    }
+  }
+
+  public void validateInput(JobConf job) throws IOException {
+    // expecting exactly one path
+    Path[] tableNames = FileInputFormat.getInputPaths(job);
+    if (tableNames == null || tableNames.length > 1) {
+      throw new IOException("expecting one table name");
+    }
+
+    // expecting at least one column
+    String colArg = job.get(COLUMN_LIST);
+    if (colArg == null || colArg.length() == 0) {
+      throw new IOException("expecting at least one column");
+    }
+  }
+
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    Cell meta = this.table.get(Constants.METADATA, Constants.METADATA_ROWS);
+
+    if (BytesUtil.bytesToInt(meta.getValue()) < numSplits) {
+      numSplits = BytesUtil.bytesToInt(meta.getValue());
+    }
+
+    int[] startKeys = new int[numSplits];
+    int interval = BytesUtil.bytesToInt(meta.getValue()) / numSplits;
+
+    for (int i = 0; i < numSplits; i++) {
+      startKeys[i] = (i * interval);
+    }
+
+    InputSplit[] splits = new InputSplit[startKeys.length];
+    for (int i = 0; i < startKeys.length; i++) {
+      splits[i] = new TableSplit(this.table.getTableName(), BytesUtil
+          .intToBytes(startKeys[i]), ((i + 1) < startKeys.length) ? BytesUtil
+          .intToBytes(startKeys[i + 1]) : HConstants.EMPTY_START_ROW);
+    }
+    return splits;
+  }
+
+  /**
+   * @param inputColumns to be passed to the map task.
+   */
+  protected void setInputColums(byte[][] inputColumns) {
+    this.inputColumns = inputColumns;
+  }
+
+  /**
+   * Allows subclasses to set the {@link HTable}.
+   * 
+   * @param table to get the data from
+   */
+  protected void setHTable(HTable table) {
+    this.table = table;
+  }
+
+  /**
+   * Allows subclasses to set the {@link RowFilterInterface} to be used.
+   * 
+   * @param rowFilter
+   */
+  protected void setRowFilter(RowFilterInterface rowFilter) {
+    this.rowFilter = rowFilter;
+  }
+}



Mime
View raw message