hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r725904 [2/2] - in /incubator/hama/trunk/src: java/org/apache/hama/ java/org/apache/hama/algebra/ java/org/apache/hama/mapred/ test/org/apache/hama/mapred/
Date Fri, 12 Dec 2008 05:10:58 GMT
Added: incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java?rev=725904&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java Thu Dec
11 21:10:58 2008
@@ -0,0 +1,119 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.filter.RowFilterSet;
+import org.apache.hadoop.hbase.filter.StopRowFilter;
+import org.apache.hama.io.BlockWritable;
+
+public abstract class TableRecordReaderBase {
+  protected byte[] startRow;
+  protected byte[] endRow;
+  protected RowFilterInterface trrRowFilter;
+  protected Scanner scanner;
+  protected HTable htable;
+  protected byte[][] trrInputColumns;
+
+  /**
+   * Build the scanner. Not done in constructor to allow for extension.
+   * 
+   * @throws IOException
+   */
+  public void init() throws IOException {
+    if ((endRow != null) && (endRow.length > 0)) {
+      if (trrRowFilter != null) {
+        final Set<RowFilterInterface> rowFiltersSet = new HashSet<RowFilterInterface>();
+        rowFiltersSet.add(new StopRowFilter(endRow));
+        rowFiltersSet.add(trrRowFilter);
+        this.scanner = this.htable
+            .getScanner(trrInputColumns, startRow, new RowFilterSet(
+                RowFilterSet.Operator.MUST_PASS_ALL, rowFiltersSet));
+      } else {
+        this.scanner = this.htable
+            .getScanner(trrInputColumns, startRow, endRow);
+      }
+    } else {
+      this.scanner = this.htable.getScanner(trrInputColumns, startRow,
+          trrRowFilter);
+    }
+  }
+
+  /**
+   * @param htable the {@link HTable} to scan.
+   */
+  public void setHTable(HTable htable) {
+    this.htable = htable;
+  }
+
+  /**
+   * @param inputColumns the columns to be placed in {@link BlockWritable}.
+   */
+  public void setInputColumns(final byte[][] inputColumns) {
+    byte[][] columns = inputColumns;
+    this.trrInputColumns = columns;
+  }
+
+  /**
+   * @param startRow the first row in the split
+   */
+  public void setStartRow(final byte[] startRow) {
+    byte[] sRow = startRow;
+    this.startRow = sRow;
+  }
+
+  /**
+   * 
+   * @param endRow the last row in the split
+   */
+  public void setEndRow(final byte[] endRow) {
+    byte[] eRow = endRow;
+    this.endRow = eRow;
+  }
+
+  /**
+   * @param rowFilter the {@link RowFilterInterface} to be used.
+   */
+  public void setRowFilter(RowFilterInterface rowFilter) {
+    this.trrRowFilter = rowFilter;
+  }
+
+  public void close() throws IOException {
+    this.scanner.close();
+  }
+
+  public long getPos() {
+    // This should be the ordinal tuple in the range;
+    // not clear how to calculate...
+    return 0;
+  }
+
+  public float getProgress() {
+    // Depends on the total number of tuples and getPos
+    return 0;
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java?rev=725904&r1=725903&r2=725904&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/VectorInputFormat.java Thu Dec 11
21:10:58 2008
@@ -1,74 +1,119 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hama.mapred;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-
-public class VectorInputFormat extends VectorInputFormatBase implements
-    JobConfigurable {
-  private static final Log LOG = LogFactory.getLog(VectorInputFormat.class);
-
-  /**
-   * space delimited list of columns
-   */
-  public static final String COLUMN_LIST = "hama.mapred.tablecolumns";
-
-  /** {@inheritDoc} */
-  public void configure(JobConf job) {
-    Path[] tableNames = FileInputFormat.getInputPaths(job);
-    String colArg = job.get(COLUMN_LIST);
-    String[] colNames = colArg.split(" ");
-    byte[][] m_cols = new byte[colNames.length][];
-    for (int i = 0; i < m_cols.length; i++) {
-      m_cols[i] = Bytes.toBytes(colNames[i]);
-    }
-    setInputColums(m_cols);
-    try {
-      setHTable(new HTable(new HBaseConfiguration(job), tableNames[0].getName()));
-    } catch (Exception e) {
-      LOG.error(e);
-    }
-  }
-
-  /** {@inheritDoc} */
-  public void validateInput(JobConf job) throws IOException {
-    // expecting exactly one path
-    Path[] tableNames = FileInputFormat.getInputPaths(job);
-    if (tableNames == null || tableNames.length > 1) {
-      throw new IOException("expecting one table name");
-    }
-
-    // expecting at least one column
-    String colArg = job.get(COLUMN_LIST);
-    if (colArg == null || colArg.length() == 0) {
-      throw new IOException("expecting at least one column");
-    }
-  }
-}
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hama.io.VectorWritable;
+import org.apache.hama.util.BytesUtil;
+
+public class VectorInputFormat extends TableInputFormatBase implements
+    InputFormat<IntWritable, VectorWritable>, JobConfigurable {
+  private TableRecordReader tableRecordReader;
+  
+  /**
+   * Iterate over an HBase table data, return (IntWritable, VectorWritable) pairs
+   */
+  protected static class TableRecordReader extends TableRecordReaderBase
+      implements RecordReader<IntWritable, VectorWritable> {
+
+    /**
+     * @return IntWritable
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#createKey()
+     */
+    public IntWritable createKey() {
+      return new IntWritable();
+    }
+
+    /**
+     * @return VectorWritable
+     * 
+     * @see org.apache.hadoop.mapred.RecordReader#createValue()
+     */
+    public VectorWritable createValue() {
+      return new VectorWritable();
+    }
+
+    /**
+     * @param key IntWritable as input key.
+     * @param value VectorWritable as input value
+     * 
+     * Converts Scanner.next() to IntWritable, VectorWritable
+     * 
+     * @return true if there was more data
+     * @throws IOException
+     */
+    public boolean next(IntWritable key, VectorWritable value)
+        throws IOException {
+      RowResult result = this.scanner.next();
+      boolean hasMore = result != null && result.size() > 0;
+      if (hasMore) {
+        key.set(BytesUtil.bytesToInt(result.getRow()));
+        Writables.copyWritable(result, value);
+      }
+      return hasMore;
+    }
+  }
+  
+  /**
+   * Builds a TableRecordReader. If no TableRecordReader was provided, uses the
+   * default.
+   * 
+   * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
+   *      JobConf, Reporter)
+   */
+  public RecordReader<IntWritable, VectorWritable> getRecordReader(
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
+    TableSplit tSplit = (TableSplit) split;
+    TableRecordReader trr = this.tableRecordReader;
+    // if no table record reader was provided use default
+    if (trr == null) {
+      trr = new TableRecordReader();
+    }
+    trr.setStartRow(tSplit.getStartRow());
+    trr.setEndRow(tSplit.getEndRow());
+    trr.setHTable(this.table);
+    trr.setInputColumns(this.inputColumns);
+    trr.setRowFilter(this.rowFilter);
+    trr.init();
+    return trr;
+  }
+  
+  /**
+   * Allows subclasses to set the {@link TableRecordReader}.
+   * 
+   * @param tableRecordReader to provide other {@link TableRecordReader}
+   *                implementations.
+   */
+  protected void setTableRecordReader(TableRecordReader tableRecordReader) {
+    this.tableRecordReader = tableRecordReader;
+  }
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java?rev=725904&r1=725903&r2=725904&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Thu
Dec 11 21:10:58 2008
@@ -1,66 +1,66 @@
-package org.apache.hama.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hama.DenseMatrix;
-import org.apache.hama.HCluster;
-import org.apache.hama.Matrix;
-import org.apache.hama.algebra.BlockCyclicMultiplyMap;
-import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
-import org.apache.hama.io.BlockWritable;
-import org.apache.log4j.Logger;
-
-public class TestBlockMatrixMapReduce extends HCluster {
-  static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class);
-  static Matrix c;
-  static final int SIZE = 20;
-  /** constructor */
-  public TestBlockMatrixMapReduce() {
-    super();
-  }
-
-  public void testBlockMatrixMapReduce() throws IOException, ClassNotFoundException {
-    Matrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
-    Matrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
-    ((DenseMatrix) m1).blocking_mapred(4);
-    ((DenseMatrix) m2).blocking_mapred(4);
-
-    miniMRJob(m1.getPath(), m2.getPath());
-    
-    double[][] C = new double[SIZE][SIZE];
-    for (int i = 0; i < SIZE; i++) {
-      for (int j = 0; j < SIZE; j++) {
-        for (int k = 0; k < SIZE; k++) {
-          C[i][k] += m1.get(i, j) * m2.get(j, k);
-        }
-      }
-    }
-
-    for (int i = 0; i < SIZE; i++) {
-      for (int j = 0; j < SIZE; j++) {
-        assertEquals(String.valueOf(C[i][j]).substring(0, 5), 
-            String.valueOf(c.get(i, j)).substring(0, 5));
-      }
-    }
-  }
-
-  private void miniMRJob(String string, String string2) throws IOException {
-    c = new DenseMatrix(conf);
-    String output = c.getPath();
-    
-    JobConf jobConf = new JobConf(conf, TestBlockMatrixMapReduce.class);
-    jobConf.setJobName("test MR job");
-
-    BlockCyclicMultiplyMap.initJob(string, string2, BlockCyclicMultiplyMap.class, IntWritable.class,
-        BlockWritable.class, jobConf);
-    BlockCyclicReduce.initJob(output, BlockCyclicMultiplyReduce.class, jobConf);
-
-    jobConf.setNumMapTasks(2);
-    jobConf.setNumReduceTasks(2);
-
-    JobClient.runJob(jobConf);
-  }
-}
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.HCluster;
+import org.apache.hama.Matrix;
+import org.apache.hama.algebra.BlockCyclicMultiplyMap;
+import org.apache.hama.algebra.BlockCyclicMultiplyReduce;
+import org.apache.hama.io.BlockWritable;
+import org.apache.log4j.Logger;
+
+public class TestBlockMatrixMapReduce extends HCluster {
+  static final Logger LOG = Logger.getLogger(TestBlockMatrixMapReduce.class);
+  static Matrix c;
+  static final int SIZE = 20;
+  /** constructor */
+  public TestBlockMatrixMapReduce() {
+    super();
+  }
+
+  public void testBlockMatrixMapReduce() throws IOException, ClassNotFoundException {
+    Matrix m1 = DenseMatrix.random(conf, SIZE, SIZE);
+    Matrix m2 = DenseMatrix.random(conf, SIZE, SIZE);
+    ((DenseMatrix) m1).blocking_mapred(4);
+    ((DenseMatrix) m2).blocking_mapred(4);
+
+    miniMRJob(m1.getPath(), m2.getPath());
+    
+    double[][] C = new double[SIZE][SIZE];
+    for (int i = 0; i < SIZE; i++) {
+      for (int j = 0; j < SIZE; j++) {
+        for (int k = 0; k < SIZE; k++) {
+          C[i][k] += m1.get(i, j) * m2.get(j, k);
+        }
+      }
+    }
+
+    for (int i = 0; i < SIZE; i++) {
+      for (int j = 0; j < SIZE; j++) {
+        assertEquals(String.valueOf(C[i][j]).substring(0, 5), 
+            String.valueOf(c.get(i, j)).substring(0, 5));
+      }
+    }
+  }
+
+  private void miniMRJob(String string, String string2) throws IOException {
+    c = new DenseMatrix(conf);
+    String output = c.getPath();
+    
+    JobConf jobConf = new JobConf(conf, TestBlockMatrixMapReduce.class);
+    jobConf.setJobName("test MR job");
+
+    BlockCyclicMultiplyMap.initJob(string, string2, BlockCyclicMultiplyMap.class, IntWritable.class,
+        BlockWritable.class, jobConf);
+    BlockCyclicMultiplyReduce.initJob(output, BlockCyclicMultiplyReduce.class, jobConf);
+
+    jobConf.setNumMapTasks(2);
+    jobConf.setNumReduceTasks(2);
+
+    JobClient.runJob(jobConf);
+  }
+}

Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java?rev=725904&r1=725903&r2=725904&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestMatrixMapReduce.java Thu Dec
11 21:10:58 2008
@@ -66,7 +66,7 @@
 
     RowCyclicAdditionMap.initJob(string, string2, RowCyclicAdditionMap.class, IntWritable.class,
         VectorWritable.class, jobConf);
-    RowCyclicReduce.initJob(output, RowCyclicAdditionReduce.class, jobConf);
+    RowCyclicAdditionReduce.initJob(output, RowCyclicAdditionReduce.class, jobConf);
 
     jobConf.setNumMapTasks(2);
     jobConf.setNumReduceTasks(2);



Mime
View raw message