hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r726660 - in /incubator/hama/trunk/src: java/org/apache/hama/ java/org/apache/hama/algebra/ java/org/apache/hama/io/ java/org/apache/hama/mapred/ test/org/apache/hama/ test/org/apache/hama/mapred/
Date Mon, 15 Dec 2008 09:41:45 GMT
Author: edwardyoon
Date: Mon Dec 15 01:41:44 2008
New Revision: 726660

URL: http://svn.apache.org/viewvc?rev=726660&view=rev
Log:
To reduce disk I/O operations, Remove 'reduce phase' from blocking_mapred

Added:
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockPositionMapWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/io/IntegerEntry.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestRandomMatrixMapReduce.java
Modified:
    incubator/hama/trunk/src/java/org/apache/hama/Constants.java
    incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
    incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
    incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
    incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java
    incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
    incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java

Modified: incubator/hama/trunk/src/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/Constants.java?rev=726660&r1=726659&r2=726660&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/Constants.java Mon Dec 15 01:41:44 2008
@@ -99,6 +99,11 @@
   /** end column of block */
   public static final String BLOCK_ENDCOLUMN = "attribute:endColumn";
 
+
+  public static final String BLOCK_POSITION = Constants.BLOCK_STARTROW 
+  + " " + Constants.BLOCK_ENDROW + " " + Constants.BLOCK_STARTCOLUMN 
+  + " " + Constants.BLOCK_ENDCOLUMN;
+  
   /** block dimension */
   public static final String BLOCK = "block:";
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java?rev=726660&r1=726659&r2=726660&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/DenseMatrix.java Mon Dec 15 01:41:44 2008
@@ -59,7 +59,6 @@
 import org.apache.hama.util.RandomVariable;
 
 public class DenseMatrix extends AbstractMatrix implements Matrix {
-
   static int tryPathLength = Constants.DEFAULT_PATH_LENGTH;
   static final String TABLE_PREFIX = DenseMatrix.class.getSimpleName() + "_";
   static private final Path TMP_DIR = new Path(DenseMatrix.class
@@ -448,15 +447,16 @@
     return this.getClass().getSimpleName();
   }
 
+  // TODO: Scanner should be used. -- Edward J. Yoon
   public SubMatrix subMatrix(int i0, int i1, int j0, int j1) throws IOException {
     int columnSize = (j1 - j0) + 1;
     SubMatrix result = new SubMatrix((i1 - i0) + 1, columnSize);
 
+    RowResult rs = null;
     for (int i = i0, ii = 0; i <= i1; i++, ii++) {
+      rs = table.getRow(BytesUtil.intToBytes(i));
       for (int j = j0, jj = 0; j <= j1; j++, jj++) {
-        Cell c = table
-            .get(BytesUtil.intToBytes(i), BytesUtil.getColumnIndex(j));
-        result.set(ii, jj, BytesUtil.bytesToDouble(c.getValue()));
+        result.set(ii, jj, rs.get(BytesUtil.getColumnIndex(j)).getValue());
       }
     }
 

Modified: incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java?rev=726660&r1=726659&r2=726660&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/SubMatrix.java Mon Dec 15 01:41:44 2008
@@ -25,6 +25,7 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
+import org.apache.hama.util.BytesUtil;
 import org.apache.log4j.Logger;
 
 /**
@@ -82,6 +83,17 @@
   }
 
   /**
+   * Sets the value
+   * 
+   * @param row
+   * @param column
+   * @param value
+   */
+  public void set(int row, int column, byte[] value) {
+    matrix[row][column] = BytesUtil.bytesToDouble(value);    
+  }
+  
+  /**
    * Gets the value
    * 
    * @param i
@@ -183,4 +195,5 @@
     byte[] data = bos.toByteArray();
     return data;
   }
+
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java?rev=726660&r1=726659&r2=726660&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/algebra/BlockCyclicMultiplyMap.java Mon
Dec 15 01:41:44 2008
@@ -32,18 +32,23 @@
 import org.apache.hama.DenseMatrix;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.SubMatrix;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockPosition;
 import org.apache.hama.io.BlockWritable;
 import org.apache.hama.mapred.BlockInputFormat;
 import org.apache.log4j.Logger;
 
 public class BlockCyclicMultiplyMap extends MapReduceBase implements
-    Mapper<IntWritable, BlockWritable, IntWritable, BlockWritable> {
+    Mapper<BlockID, BlockPosition, IntWritable, BlockWritable> {
   static final Logger LOG = Logger.getLogger(BlockCyclicMultiplyMap.class);
+  protected DenseMatrix matrix_a;
+  public static final String MATRIX_A = "hama.multiplication.matrix.a";
   protected DenseMatrix matrix_b;
   public static final String MATRIX_B = "hama.multiplication.matrix.b";
 
   public void configure(JobConf job) {
     try {
+      matrix_a = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_A, ""));
       matrix_b = new DenseMatrix(new HamaConfiguration(), job.get(MATRIX_B, ""));
     } catch (IOException e) {
       LOG.warn("Load matrix_b failed : " + e.getMessage());
@@ -57,25 +62,26 @@
     jobConf.setMapOutputValueClass(outputValueClass);
     jobConf.setMapOutputKeyClass(outputKeyClass);
     jobConf.setMapperClass(map);
+    jobConf.set(MATRIX_A, matrix_a);
     jobConf.set(MATRIX_B, matrix_b);
 
     jobConf.setInputFormat(BlockInputFormat.class);
     FileInputFormat.addInputPaths(jobConf, matrix_a);
 
-    jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK);
+    jobConf.set(BlockInputFormat.COLUMN_LIST, Constants.BLOCK_POSITION);
   }
 
   @Override
-  public void map(IntWritable key, BlockWritable value,
+  public void map(BlockID key, @SuppressWarnings("unused") BlockPosition value,
       OutputCollector<IntWritable, BlockWritable> output, Reporter reporter)
       throws IOException {
-    for (int i = 0; i < value.size(); i++) {
-      SubMatrix a = value.get(i);
-      for (int j = 0; j < matrix_b.getBlockSize(); j++) {
-        SubMatrix b = matrix_b.getBlock(i, j);
-        SubMatrix c = a.mult(b);
-        output.collect(key, new BlockWritable(key.get(), j, c));
-      }
+    int blockSize = matrix_b.getBlockSize();
+    SubMatrix a = matrix_a.getBlock(key.getRow(), key.getColumn());
+    for (int j = 0; j < blockSize; j++) {
+      SubMatrix b = matrix_b.getBlock(key.getColumn(), j);
+      SubMatrix c = a.mult(b);
+      output.collect(new IntWritable(key.getRow()), 
+          new BlockWritable(key.getRow(), j, c));
     }
   }
 }

Added: incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java?rev=726660&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockPosition.java Mon Dec 15 01:41:44
2008
@@ -0,0 +1,148 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+public class BlockPosition implements Writable, Map<Text, IntegerEntry> {
+
+  public BlockID row;
+  public BlockPositionMapWritable<Text, IntegerEntry> entries;
+
+  public BlockPosition() {
+    this(new BlockPositionMapWritable<Text, IntegerEntry>());
+  }
+
+  public BlockPosition(BlockPositionMapWritable<Text, IntegerEntry> entries) {
+    this.entries = entries;
+  }
+
+  public int getIndex(String key) {
+    return this.entries.get(new Text(key)).getValue();
+  }
+  
+  public int size() {
+    return this.entries.size();
+  }
+  
+  public IntegerEntry put(Text key, IntegerEntry value) {
+    throw new UnsupportedOperationException("VectorWritable is read-only!");
+  }
+
+  public IntegerEntry get(Object key) {
+    return this.entries.get(key);
+  }
+
+  public IntegerEntry remove(Object key) {
+    throw new UnsupportedOperationException("VectorWritable is read-only!");
+  }
+
+  public boolean containsKey(Object key) {
+    return entries.containsKey(key);
+  }
+
+  public boolean containsValue(Object value) {
+    throw new UnsupportedOperationException("Don't support containsValue!");
+  }
+
+  public boolean isEmpty() {
+    return entries.isEmpty();
+  }
+
+  public void clear() {
+    throw new UnsupportedOperationException("VectorDatum is read-only!");
+  }
+
+  public Set<Text> keySet() {
+    Set<Text> result = new TreeSet<Text>();
+    for (Text w : entries.keySet()) {
+      result.add(w);
+    }
+    return result;
+  }
+
+  public Set<Map.Entry<Text, IntegerEntry>> entrySet() {
+    return Collections.unmodifiableSet(this.entries.entrySet());
+  }
+
+  public Collection<IntegerEntry> values() {
+    ArrayList<IntegerEntry> result = new ArrayList<IntegerEntry>();
+    for (Writable w : entries.values()) {
+      result.add((IntegerEntry) w);
+    }
+    return result;
+  }
+
+  public void readFields(final DataInput in) throws IOException {
+    this.row = new BlockID(Bytes.readByteArray(in));
+    this.entries.readFields(in);
+  }
+
+  public void write(final DataOutput out) throws IOException {
+    Bytes.writeByteArray(out, this.row.getBytes());
+    this.entries.write(out);
+  }
+
+  public void putAll(Map<? extends Text, ? extends IntegerEntry> m) {
+    throw new UnsupportedOperationException("Not implemented yet");
+  }
+
+  /**
+   * 
+   * The inner class for an entry of row.
+   * 
+   */
+  public static class Entries implements Map.Entry<byte[], IntegerEntry> {
+
+    private final byte[] column;
+    private final IntegerEntry entry;
+
+    Entries(byte[] column, IntegerEntry entry) {
+      this.column = column;
+      this.entry = entry;
+    }
+
+    public IntegerEntry setValue(IntegerEntry c) {
+      throw new UnsupportedOperationException("VectorWritable is read-only!");
+    }
+
+    public byte[] getKey() {
+      byte[] key = column;
+      return key;
+    }
+
+    public IntegerEntry getValue() {
+      return entry;
+    }
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/io/BlockPositionMapWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockPositionMapWritable.java?rev=726660&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockPositionMapWritable.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockPositionMapWritable.java Mon Dec
15 01:41:44 2008
@@ -0,0 +1,189 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class BlockPositionMapWritable <K, V> implements Map<Text, V>, Writable,
+    Configurable {
+  private AtomicReference<Configuration> conf = new AtomicReference<Configuration>();
+
+  // Static maps of code to class and vice versa. Includes types used in hama
+  // only.
+  static final Map<Byte, Class<?>> CODE_TO_CLASS = new HashMap<Byte, Class<?>>();
+  static final Map<Class<?>, Byte> CLASS_TO_CODE = new HashMap<Class<?>,
Byte>();
+
+  static {
+    byte code = 0;
+    addToMap(HStoreKey.class, code++);
+    addToMap(ImmutableBytesWritable.class, code++);
+    addToMap(Text.class, code++);
+    addToMap(IntegerEntry.class, code++);
+    addToMap(byte[].class, code++);
+  }
+
+  @SuppressWarnings("boxing")
+  private static void addToMap(final Class<?> clazz, final byte code) {
+    CLASS_TO_CODE.put(clazz, code);
+    CODE_TO_CLASS.put(code, clazz);
+  }
+
+  private Map<Text, V> instance = new TreeMap<Text, V>();
+
+  /** @return the conf */
+  public Configuration getConf() {
+    return conf.get();
+  }
+
+  /** @param conf the conf to set */
+  public void setConf(Configuration conf) {
+    this.conf.set(conf);
+  }
+
+  /** {@inheritDoc} */
+  public void clear() {
+    instance.clear();
+  }
+
+  /** {@inheritDoc} */
+  public boolean containsKey(Object key) {
+    return instance.containsKey(key);
+  }
+
+  /** {@inheritDoc} */
+  public boolean containsValue(Object value) {
+    return instance.containsValue(value);
+  }
+
+  /** {@inheritDoc} */
+  public Set<Entry<Text, V>> entrySet() {
+    return instance.entrySet();
+  }
+
+  /** {@inheritDoc} */
+  public V get(Object key) {
+    return instance.get(key);
+  }
+
+  /** {@inheritDoc} */
+  public boolean isEmpty() {
+    return instance.isEmpty();
+  }
+
+  /** {@inheritDoc} */
+  public Set<Text> keySet() {
+    return instance.keySet();
+  }
+
+  /** {@inheritDoc} */
+  public int size() {
+    return instance.size();
+  }
+
+  /** {@inheritDoc} */
+  public Collection<V> values() {
+    return instance.values();
+  }
+
+  // Writable
+
+  /** @return the Class class for the specified id */
+  protected Class<?> getClass(byte id) {
+    return CODE_TO_CLASS.get(id);
+  }
+
+  /** @return the id for the specified Class */
+  protected byte getId(Class<?> clazz) {
+    Byte b = CLASS_TO_CODE.get(clazz);
+    if (b == null) {
+      throw new NullPointerException("Nothing for : " + clazz);
+    }
+    return b;
+  }
+
+  @Override
+  public String toString() {
+    return this.instance.toString();
+  }
+
+  /** {@inheritDoc} */
+  public void write(DataOutput out) throws IOException {
+    // Write out the number of entries in the map
+    out.writeInt(this.instance.size());
+
+    // Then write out each key/value pair
+    for (Map.Entry<Text, V> e : instance.entrySet()) {
+      //Bytes.writeByteArray(out, BytesUtil.getBlockIndex(e.getKey()));
+      Bytes.writeByteArray(out, e.getKey().getBytes());
+      out.writeByte(getId(e.getValue().getClass()));
+      ((Writable) e.getValue()).write(out);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @SuppressWarnings("unchecked")
+public void readFields(DataInput in) throws IOException {
+    // First clear the map. Otherwise we will just accumulate
+    // entries every time this method is called.
+    this.instance.clear();
+
+    // Read the number of entries in the map
+    int entries = in.readInt();
+
+    // Then read each key/value pair
+    for (int i = 0; i < entries; i++) {
+      byte[] key = Bytes.readByteArray(in);
+      Writable value = (Writable) ReflectionUtils.newInstance(getClass(in
+          .readByte()), getConf());
+      value.readFields(in);
+      V v = (V) value;
+      this.instance.put(new Text(key), v);
+    }
+  }
+
+  public void putAll(Map<? extends Text, ? extends V> m) {
+    this.instance.putAll(m);
+  }
+
+  public V remove(Object key) {
+    return this.instance.remove(key);
+  }
+
+  public V put(Text key, V value) {
+    return this.instance.put(key, value);
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java?rev=726660&r1=726659&r2=726660&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/BlockWritable.java Mon Dec 15 01:41:44
2008
@@ -1,3 +1,22 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.io;
 
 import java.io.DataInput;

Added: incubator/hama/trunk/src/java/org/apache/hama/io/IntegerEntry.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/io/IntegerEntry.java?rev=726660&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/io/IntegerEntry.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/io/IntegerEntry.java Mon Dec 15 01:41:44
2008
@@ -0,0 +1,143 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.io;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.util.BytesUtil;
+import org.apache.log4j.Logger;
+
+public class IntegerEntry implements Writable, Iterable<IntegerEntry> {
+  static final Logger LOG = Logger.getLogger(IntegerEntry.class);
+  protected byte[][] values;
+  
+  // We don't need this.
+  @Deprecated
+  protected long[] timestamps;
+  
+  /** For Writable compatibility */
+  public IntegerEntry() {
+    values = null;
+    timestamps = null;
+  }
+
+  public IntegerEntry(Cell c) {
+    this.values = new byte[1][];
+    this.values[0] = c.getValue();
+    this.timestamps = new long[1];
+    this.timestamps[0] = c.getTimestamp();
+  }
+
+  public IntegerEntry(int value) {
+    this.values = new byte[1][];
+    this.values[0] = BytesUtil.intToBytes(value);
+    this.timestamps = new long[1];
+    this.timestamps[0] = System.currentTimeMillis();
+  }
+  
+  /** @return the current IntegerEntry's value */
+  public int getValue() {
+    return BytesUtil.bytesToInt(this.values[0]);
+  }
+
+  /** @return the current VectorEntry's timestamp */
+  public long getTimestamp() {
+    return timestamps[0];
+  }
+
+  /**
+   * Create a new IntegerEntry with a given value and timestamp. Used by HStore.
+   * 
+   * @param value
+   * @param timestamp
+   */
+  public IntegerEntry(byte[] value, long timestamp) {
+    this.values = new byte[1][];
+    this.values[0] = value;
+    this.timestamps = new long[1];
+    this.timestamps[0] = timestamp;
+  }
+
+  //
+  // Writable
+  //
+
+  /** {@inheritDoc} */
+  public void readFields(final DataInput in) throws IOException {
+    int nvalues = in.readInt();
+    this.timestamps = new long[nvalues];
+    this.values = new byte[nvalues][];
+    for (int i = 0; i < nvalues; i++) {
+      this.timestamps[i] = in.readLong();
+    }
+    for (int i = 0; i < nvalues; i++) {
+      this.values[i] = Bytes.readByteArray(in);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public void write(final DataOutput out) throws IOException {
+    out.writeInt(this.values.length);
+    for (int i = 0; i < this.timestamps.length; i++) {
+      out.writeLong(this.timestamps[i]);
+    }
+    for (int i = 0; i < this.values.length; i++) {
+      Bytes.writeByteArray(out, this.values[i]);
+    }
+  }
+
+  //
+  // Iterable
+  //
+
+  /** {@inheritDoc} */
+  public Iterator<IntegerEntry> iterator() {
+    return new IntegerEntryIterator();
+  }
+
+  private class IntegerEntryIterator implements Iterator<IntegerEntry> {
+    private int currentValue = -1;
+
+    IntegerEntryIterator() {
+    }
+
+    /** {@inheritDoc} */
+    public boolean hasNext() {
+      return currentValue < values.length;
+    }
+
+    /** {@inheritDoc} */
+    public IntegerEntry next() {
+      currentValue += 1;
+      return new IntegerEntry(values[currentValue], timestamps[currentValue]);
+    }
+
+    /** {@inheritDoc} */
+    public void remove() throws UnsupportedOperationException {
+      throw new UnsupportedOperationException("remove is not supported");
+    }
+  }
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java?rev=726660&r1=726659&r2=726660&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockInputFormat.java Mon Dec 15
01:41:44 2008
@@ -21,36 +21,38 @@
 
 import java.io.IOException;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.io.RowResult;
 import org.apache.hadoop.hbase.mapred.TableSplit;
 import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hama.io.BlockWritable;
-import org.apache.hama.util.BytesUtil;
+import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockPosition;
 
 public class BlockInputFormat extends TableInputFormatBase implements
-    InputFormat<IntWritable, BlockWritable>, JobConfigurable {
+    InputFormat<BlockID, BlockPosition>, JobConfigurable {
+  static final Log LOG = LogFactory.getLog(BlockInputFormat.class);
   private TableRecordReader tableRecordReader;
   
   /**
-   * Iterate over an HBase table data, return (IntWritable, BlockWritable) pairs
+   * Iterate over an HBase table data, return (BlockID, BlockWritable) pairs
    */
   protected static class TableRecordReader extends TableRecordReaderBase
-      implements RecordReader<IntWritable, BlockWritable> {
+      implements RecordReader<BlockID, BlockPosition> {
 
     /**
      * @return IntWritable
      * 
      * @see org.apache.hadoop.mapred.RecordReader#createKey()
      */
-    public IntWritable createKey() {
-      return new IntWritable();
+    public BlockID createKey() {
+      return new BlockID();
     }
 
     /**
@@ -58,25 +60,27 @@
      * 
      * @see org.apache.hadoop.mapred.RecordReader#createValue()
      */
-    public BlockWritable createValue() {
-      return new BlockWritable();
+    public BlockPosition createValue() {
+      return new BlockPosition();
     }
 
     /**
-     * @param key IntWritable as input key.
+     * @param key BlockID as input key.
      * @param value BlockWritable as input value
      * 
-     * Converts Scanner.next() to IntWritable, BlockWritable
+     * Converts Scanner.next() to BlockID, BlockWritable
      * 
      * @return true if there was more data
      * @throws IOException
      */
-    public boolean next(IntWritable key, BlockWritable value)
+    public boolean next(BlockID key, BlockPosition value)
         throws IOException {
       RowResult result = this.scanner.next();
       boolean hasMore = result != null && result.size() > 0;
       if (hasMore) {
-        key.set(BytesUtil.bytesToInt(result.getRow()));
+        byte[] row = result.getRow();
+        BlockID bID = new BlockID(row);
+        key.set(bID.getRow(), bID.getColumn());
         Writables.copyWritable(result, value);
       }
       return hasMore;
@@ -90,7 +94,7 @@
    * @see org.apache.hadoop.mapred.InputFormat#getRecordReader(InputSplit,
    *      JobConf, Reporter)
    */
-  public RecordReader<IntWritable, BlockWritable> getRecordReader(
+  public RecordReader<BlockID, BlockPosition> getRecordReader(
       InputSplit split, JobConf job, Reporter reporter) throws IOException {
     TableSplit tSplit = (TableSplit) split;
     TableRecordReader trr = this.tableRecordReader;

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java?rev=726660&r1=726659&r2=726660&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/BlockingMapRed.java Mon Dec 15 01:41:44
2008
@@ -17,32 +17,27 @@
 package org.apache.hama.mapred;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hama.Constants;
 import org.apache.hama.DenseMatrix;
-import org.apache.hama.DenseVector;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.SubMatrix;
 import org.apache.hama.io.BlockID;
+import org.apache.hama.io.BlockPosition;
 import org.apache.hama.io.VectorWritable;
 
 /**
  * A Map/Reduce help class for blocking a DenseMatrix to a block-formated matrix
  */
 public class BlockingMapRed {
-
   static final Log LOG = LogFactory.getLog(BlockingMapRed.class);
   /** Parameter of the path of the matrix to be blocked * */
   public static final String BLOCKING_MATRIX = "hama.blocking.matrix";
@@ -55,16 +50,15 @@
    */
   public static void initJob(String matrixPath, JobConf job) {
     job.setMapperClass(BlockingMapper.class);
-    job.setReducerClass(BlockingReducer.class);
     FileInputFormat.addInputPaths(job, matrixPath);
 
-    job.setInputFormat(VectorInputFormat.class);
+    job.setInputFormat(BlockInputFormat.class);
     job.setMapOutputKeyClass(BlockID.class);
     job.setMapOutputValueClass(VectorWritable.class);
     job.setOutputFormat(NullOutputFormat.class);
 
     job.set(BLOCKING_MATRIX, matrixPath);
-    job.set(VectorInputFormat.COLUMN_LIST, Constants.COLUMN);
+    job.set(VectorInputFormat.COLUMN_LIST, Constants.BLOCK_POSITION);
   }
 
   /**
@@ -73,108 +67,35 @@
   public static abstract class BlockingMapRedBase extends MapReduceBase {
 
     protected DenseMatrix matrix;
-    protected int mBlockNum;
-    protected int mBlockRowSize;
-    protected int mBlockColSize;
     
-    protected int mRows;
-    protected int mColumns;
-
     @Override
     public void configure(JobConf job) {
       try {
         matrix = new DenseMatrix(new HamaConfiguration(), job.get(
             BLOCKING_MATRIX, ""));
-        mBlockNum = matrix.getBlockSize();
-        mBlockRowSize = matrix.getRows() / mBlockNum;
-        mBlockColSize = matrix.getColumns() / mBlockNum;
-        
-        mRows = matrix.getRows();
-        mColumns = matrix.getColumns();
       } catch (IOException e) {
         LOG.warn("Load matrix_blocking failed : " + e.getMessage());
       }
     }
-
   }
 
   /**
    * Mapper Class
    */
   public static class BlockingMapper extends BlockingMapRedBase implements
-      Mapper<IntWritable, VectorWritable, BlockID, VectorWritable> {
+      Mapper<BlockID, BlockPosition, BlockID, VectorWritable> {
 
     @Override
-    public void map(IntWritable key, VectorWritable value,
+    public void map(BlockID key, BlockPosition value,
         OutputCollector<BlockID, VectorWritable> output, Reporter reporter)
         throws IOException {
-      int startColumn;
-      int endColumn;
-      int blkRow = key.get() / mBlockRowSize;
-      DenseVector dv = value.getDenseVector();
-      
-      int i = 0;
-      do {
-        startColumn = i * mBlockColSize;
-        endColumn = startColumn + mBlockColSize - 1;
-        if(endColumn >= mColumns) // the last sub vector
-          endColumn = mColumns - 1;
-        output.collect(new BlockID(blkRow, i), new VectorWritable(key.get(),
-            dv.subVector(startColumn, endColumn)));
-        
-        i++;
-      } while(endColumn < (mColumns-1));
-    }
-
-  }
-
-  /**
-   * Reducer Class
-   */
-  public static class BlockingReducer extends BlockingMapRedBase implements
-      Reducer<BlockID, VectorWritable, BlockID, SubMatrix> {
-
-    @Override
-    public void reduce(BlockID key, Iterator<VectorWritable> values,
-        OutputCollector<BlockID, SubMatrix> output, Reporter reporter)
-        throws IOException {
-      // Note: all the sub-vectors are grouped by {@link
-      // org.apache.hama.io.BlockID}
-      
-      // the block's base offset in the original matrix
-      int colBase = key.getColumn() * mBlockColSize;
-      int rowBase = key.getRow() * mBlockRowSize;
-      
-      // the block's size : rows & columns
-      int smRows = mBlockRowSize;
-      if((rowBase + mBlockRowSize - 1) >= mRows)
-        smRows = mRows - rowBase;
-      int smCols = mBlockColSize;
-      if((colBase + mBlockColSize - 1) >= mColumns)  
-        smCols = mColumns - colBase;
-      
-      // construct the matrix
-      SubMatrix subMatrix = new SubMatrix(smRows, smCols);
-      
-      // i, j is the current offset in the sub-matrix
-      int i = 0, j = 0;
-      while (values.hasNext()) {
-        VectorWritable vw = values.next();
-        // check the size is suitable
-        if (vw.size() != smCols)
-          throw new IOException("Block Column Size dismatched.");
-        i = vw.row - rowBase;
-        if (i >= smRows || i < 0)
-          throw new IOException("Block Row Size dismatched.");
-
-        // put the subVector to the subMatrix
-        for (j = 0; j < smCols; j++) {
-          subMatrix.set(i, j, vw.get(colBase + j));
-        }
-      }
+      int startRow = value.getIndex(Constants.BLOCK_STARTROW);
+      int endRow = value.getIndex(Constants.BLOCK_ENDROW);
+      int startColumn = value.getIndex(Constants.BLOCK_STARTCOLUMN);
+      int endColumn = value.getIndex(Constants.BLOCK_ENDCOLUMN);
 
-      matrix.setBlock(key.getRow(), key.getColumn(), subMatrix);
+      matrix.setBlock(key.getRow(), key.getColumn(), 
+          matrix.subMatrix(startRow, endRow, startColumn, endColumn));
     }
   }
-
 }

Modified: incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java?rev=726660&r1=726659&r2=726660&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/mapred/TableRecordReaderBase.java Mon Dec
15 01:41:44 2008
@@ -28,7 +28,7 @@
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.filter.RowFilterSet;
 import org.apache.hadoop.hbase.filter.StopRowFilter;
-import org.apache.hama.io.BlockWritable;
+import org.apache.hama.io.BlockPosition;
 
 public abstract class TableRecordReaderBase {
   protected byte[] startRow;
@@ -70,7 +70,7 @@
   }
 
   /**
-   * @param inputColumns the columns to be placed in {@link BlockWritable}.
+   * @param inputColumns the columns to be placed in {@link BlockPosition}.
    */
   public void setInputColumns(final byte[][] inputColumns) {
     byte[][] columns = inputColumns;

Modified: incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java?rev=726660&r1=726659&r2=726660&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/TestDenseMatrix.java Mon Dec 15 01:41:44
2008
@@ -349,15 +349,4 @@
       }
     }
   }
-  
-  public void testRandomMatrixMapReduce() throws IOException {
-    DenseMatrix rand = DenseMatrix.random_mapred(conf, 20, 20);
-    assertEquals(rand.getRows(), 20);
-    assertEquals(rand.getColumns(), 20);
-    for(int i = 0; i < rand.getRows(); i++) {
-      for(int j = 0; j < rand.getColumns(); j++) {
-        assertTrue(rand.get(i, j) > -1);
-      }
-    }
-  }
 }

Modified: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java?rev=726660&r1=726659&r2=726660&view=diff
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java (original)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestBlockMatrixMapReduce.java Mon
Dec 15 01:41:44 2008
@@ -1,3 +1,22 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hama.mapred;
 
 import java.io.IOException;

Added: incubator/hama/trunk/src/test/org/apache/hama/mapred/TestRandomMatrixMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/mapred/TestRandomMatrixMapReduce.java?rev=726660&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/mapred/TestRandomMatrixMapReduce.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/mapred/TestRandomMatrixMapReduce.java Mon
Dec 15 01:41:44 2008
@@ -0,0 +1,44 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.mapred;
+
+import java.io.IOException;
+
+import org.apache.hama.DenseMatrix;
+import org.apache.hama.HCluster;
+import org.apache.log4j.Logger;
+
+public class TestRandomMatrixMapReduce extends HCluster {
+  static final Logger LOG = Logger.getLogger(TestRandomMatrixMapReduce.class);
+  
+  public void testRandomMatrixMapReduce() throws IOException {
+    DenseMatrix rand = DenseMatrix.random_mapred(conf, 20, 20);
+    assertEquals(20, rand.getRows());
+    assertEquals(20, rand.getColumns());
+    
+    for(int i = 0; i < 20; i++) {
+      for(int j = 0; j < 20; j++) {
+        assertTrue(rand.get(i, j) > -1);
+      }
+    }
+    
+    rand.close();
+  }
+}



Mime
View raw message