hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1387533 [5/10] - in /hama/trunk: ./ core/ core/src/main/java/org/apache/hama/bsp/ graph/ graph/src/main/java/org/apache/hama/graph/ jdbm/ jdbm/src/ jdbm/src/main/ jdbm/src/main/java/ jdbm/src/main/java/org/ jdbm/src/main/java/org/apache/ j...
Date Wed, 19 Sep 2012 11:52:24 GMT
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageIo.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageIo.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageIo.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageIo.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import javax.crypto.Cipher;
+
+/**
+ * Wraps a page sizes ByteBuffer for reading and writing.
+ * <p>
+ * ByteBuffer may be subview of a larger buffer (ie large buffer mapped over a
+ * file). In this case ByteBuffer will have set limit, mark and other variables
+ * to limit its size.
+ * <p>
+ * For reading buffered may be shared. For example StoreMemory just returns its
+ * pages without copying. In this case buffer is marked as 'readonly' and needs
+ * to be copied before write (Copy On Write - COW). COW is not necessary if
+ * transactions are disabled and changes can not be rolled back.
+ * <p>
+ */
+public final class PageIo {
+
+  private long pageId;
+
+  private ByteBuffer data; // work area
+
+  /** buffers contains changes which were not written to disk yet. */
+  private boolean dirty = false;
+
+  private int transactionCount = 0;
+
+  /**
+   * Default constructor for serialization
+   */
+  public PageIo() {
+    // empty
+  }
+
+  /**
+   * Constructs a new PageIo instance working on the indicated buffer.
+   */
+  PageIo(long pageId, byte[] data) {
+    this.pageId = pageId;
+    this.data = ByteBuffer.wrap(data);
+  }
+
+  public PageIo(long pageId, ByteBuffer data) {
+    this.pageId = pageId;
+    this.data = data;
+  }
+
+  /**
+   * Frequent reads on direct buffer may be slower then on heap buffer. This
+   * method converts native direct to heap buffer
+   */
+  void ensureHeapBuffer() {
+    if (data.isDirect()) {
+      final byte[] bb = new byte[Storage.PAGE_SIZE];
+      data.get(bb, 0, Storage.PAGE_SIZE);
+      data = ByteBuffer.wrap(bb);
+      if (data.isReadOnly())
+        throw new InternalError();
+    }
+
+  }
+
+  /**
+   * Returns the underlying array
+   */
+  ByteBuffer getData() {
+    return data;
+  }
+
+  /**
+   * Returns the page number.
+   */
+  long getPageId() {
+    return pageId;
+  }
+
+  /**
+   * Sets the dirty flag
+   */
+  void setDirty() {
+    dirty = true;
+
+    if (data.isReadOnly()) {
+      // make copy if needed, so we can write into buffer
+      byte[] buf = new byte[Storage.PAGE_SIZE];
+      data.get(buf, 0, Storage.PAGE_SIZE);
+      data = ByteBuffer.wrap(buf);
+    }
+  }
+
+  /**
+   * Clears the dirty flag
+   */
+  void setClean() {
+    dirty = false;
+  }
+
+  /**
+   * Returns true if the dirty flag is set.
+   */
+  boolean isDirty() {
+    return dirty;
+  }
+
+  /**
+   * Returns true if the block is still dirty with respect to the transaction
+   * log.
+   */
+  boolean isInTransaction() {
+    return transactionCount != 0;
+  }
+
+  /**
+   * Increments transaction count for this block, to signal that this block is
+   * in the log but not yet in the data file. The method also takes a snapshot
+   * so that the data may be modified in new transactions.
+   */
+  void incrementTransactionCount() {
+    transactionCount++;
+  }
+
+  /**
+   * Decrements transaction count for this block, to signal that this block has
+   * been written from the log to the data file.
+   */
+  void decrementTransactionCount() {
+    transactionCount--;
+    if (transactionCount < 0)
+      throw new Error("transaction count on page " + getPageId()
+          + " below zero!");
+
+  }
+
+  /**
+   * Reads a byte from the indicated position
+   */
+  public byte readByte(int pos) {
+    return data.get(pos);
+  }
+
+  /**
+   * Writes a byte to the indicated position
+   */
+  public void writeByte(int pos, byte value) {
+    setDirty();
+    data.put(pos, value);
+  }
+
+  /**
+   * Reads a short from the indicated position
+   */
+  public short readShort(int pos) {
+    return data.getShort(pos);
+  }
+
+  /**
+   * Writes a short to the indicated position
+   */
+  public void writeShort(int pos, short value) {
+    setDirty();
+    data.putShort(pos, value);
+  }
+
+  /**
+   * Reads an int from the indicated position
+   */
+  public int readInt(int pos) {
+    return data.getInt(pos);
+  }
+
+  /**
+   * Writes an int to the indicated position
+   */
+  public void writeInt(int pos, int value) {
+    setDirty();
+    data.putInt(pos, value);
+  }
+
+  /**
+   * Reads a long from the indicated position
+   */
+  public long readLong(int pos) {
+    return data.getLong(pos);
+  }
+
+  /**
+   * Writes a long to the indicated position
+   */
+  public void writeLong(int pos, long value) {
+    setDirty();
+    data.putLong(pos, value);
+  }
+
+  /**
+   * Reads a long from the indicated position
+   */
+  public long readSixByteLong(int pos) {
+    long ret = ((long) (data.get(pos + 0) & 0x7f) << 40)
+        | ((long) (data.get(pos + 1) & 0xff) << 32)
+        | ((long) (data.get(pos + 2) & 0xff) << 24)
+        | ((long) (data.get(pos + 3) & 0xff) << 16)
+        | ((long) (data.get(pos + 4) & 0xff) << 8)
+        | ((long) (data.get(pos + 5) & 0xff) << 0);
+    if ((data.get(pos + 0) & 0x80) != 0)
+      return -ret;
+    else
+      return ret;
+
+  }
+
+  /**
+   * Writes a long to the indicated position
+   */
+  public void writeSixByteLong(int pos, long value) {
+    // if(value<0) throw new IllegalArgumentException();
+    // if(value >> (6*8)!=0)
+    // throw new IllegalArgumentException("does not fit");
+    int negativeBit = 0;
+    if (value < 0) {
+      value = -value;
+      negativeBit = 0x80;
+    }
+
+    setDirty();
+    data.put(pos + 0, (byte) ((0x7f & (value >> 40)) | negativeBit));
+    data.put(pos + 1, (byte) (0xff & (value >> 32)));
+    data.put(pos + 2, (byte) (0xff & (value >> 24)));
+    data.put(pos + 3, (byte) (0xff & (value >> 16)));
+    data.put(pos + 4, (byte) (0xff & (value >> 8)));
+    data.put(pos + 5, (byte) (0xff & (value >> 0)));
+
+  }
+
+  // overrides java.lang.Object
+
+  public String toString() {
+    return "PageIo(" + pageId + "," + dirty + ")";
+  }
+
+  public void readExternal(DataInputStream in, Cipher cipherOut)
+      throws IOException {
+    pageId = in.readLong();
+    byte[] data2 = new byte[Storage.PAGE_SIZE];
+    in.readFully(data2);
+    if (cipherOut == null || JDBMUtils.allZeros(data2))
+      data = ByteBuffer.wrap(data2);
+    else
+      try {
+        data = ByteBuffer.wrap(cipherOut.doFinal(data2));
+      } catch (Exception e) {
+        throw new IOError(e);
+      }
+  }
+
+  public void writeExternal(DataOutput out, Cipher cipherIn) throws IOException {
+    out.writeLong(pageId);
+    out.write(JDBMUtils.encrypt(cipherIn, data.array()));
+  }
+
+  public byte[] getByteArray() {
+    if (data.hasArray())
+      return data.array();
+    byte[] d = new byte[Storage.PAGE_SIZE];
+    data.rewind();
+    data.get(d, 0, Storage.PAGE_SIZE);
+    return d;
+  }
+
+  public void writeByteArray(byte[] buf, int srcOffset, int offset, int length) {
+    setDirty();
+    data.rewind();
+    data.position(offset);
+    data.put(buf, srcOffset, length);
+  }
+
+  public void fileHeaderCheckHead(boolean isNew) {
+    if (isNew)
+      writeShort(Magic.FILE_HEADER_O_MAGIC, Magic.FILE_HEADER);
+    else {
+      short magic = readShort(Magic.FILE_HEADER_O_MAGIC);
+      if (magic != Magic.FILE_HEADER)
+        throw new Error("CRITICAL: file header magic not OK " + magic);
+    }
+  }
+
+  /**
+   * Returns the first page of the indicated list
+   */
+  long fileHeaderGetFirstOf(int list) {
+    return readLong(fileHeaderOffsetOfFirst(list));
+  }
+
+  /**
+   * Sets the first page of the indicated list
+   */
+  void fileHeaderSetFirstOf(int list, long value) {
+    writeLong(fileHeaderOffsetOfFirst(list), value);
+  }
+
+  /**
+   * Returns the last page of the indicated list
+   */
+  long fileHeaderGetLastOf(int list) {
+    return readLong(fileHeaderOffsetOfLast(list));
+  }
+
+  /**
+   * Sets the last page of the indicated list
+   */
+  void fileHeaderSetLastOf(int list, long value) {
+    writeLong(fileHeaderOffsetOfLast(list), value);
+  }
+
+  /**
+   * Returns the offset of the "first" page of the indicated list
+   */
+  private short fileHeaderOffsetOfFirst(int list) {
+    return (short) (Magic.FILE_HEADER_O_LISTS + (2 * Magic.SZ_LONG * list));
+  }
+
+  /**
+   * Returns the offset of the "last" page of the indicated list
+   */
+  private short fileHeaderOffsetOfLast(int list) {
+    return (short) (fileHeaderOffsetOfFirst(list) + Magic.SZ_LONG);
+  }
+
+  /**
+   * Returns the indicated root rowid. A root rowid is a special rowid that
+   * needs to be kept between sessions. It could conceivably be stored in a
+   * special file, but as a large amount of space in the page header is wasted
+   * anyway, it's more useful to store it where it belongs.
+   * 
+   */
+  long fileHeaderGetRoot(final int root) {
+    final short offset = (short) (Magic.FILE_HEADER_O_ROOTS + (root * Magic.SZ_LONG));
+    return readLong(offset);
+  }
+
+  /**
+   * Sets the indicated root rowid.
+   * 
+   */
+  void fileHeaderSetRoot(final int root, final long rowid) {
+    final short offset = (short) (Magic.FILE_HEADER_O_ROOTS + (root * Magic.SZ_LONG));
+    writeLong(offset, rowid);
+  }
+
+  /**
+   * Returns true if the magic corresponds with the fileHeader magic.
+   */
+  boolean pageHeaderMagicOk() {
+    int magic = pageHeaderGetMagic();
+    return magic >= Magic.PAGE_MAGIC
+        && magic <= (Magic.PAGE_MAGIC + Magic.FREEPHYSIDS_ROOT_PAGE);
+  }
+
+  /**
+   * For paranoia mode
+   */
+  protected void pageHeaderParanoiaMagicOk() {
+    if (!pageHeaderMagicOk())
+      throw new Error("CRITICAL: page header magic not OK "
+          + pageHeaderGetMagic());
+  }
+
+  short pageHeaderGetMagic() {
+    return readShort(Magic.PAGE_HEADER_O_MAGIC);
+  }
+
+  long pageHeaderGetNext() {
+    pageHeaderParanoiaMagicOk();
+    return readSixByteLong(Magic.PAGE_HEADER_O_NEXT);
+  }
+
+  void pageHeaderSetNext(long next) {
+    pageHeaderParanoiaMagicOk();
+    writeSixByteLong(Magic.PAGE_HEADER_O_NEXT, next);
+  }
+
+  long pageHeaderGetPrev() {
+    pageHeaderParanoiaMagicOk();
+    return readSixByteLong(Magic.PAGE_HEADER_O_PREV);
+  }
+
+  void pageHeaderSetPrev(long prev) {
+    pageHeaderParanoiaMagicOk();
+    writeSixByteLong(Magic.PAGE_HEADER_O_PREV, prev);
+  }
+
+  void pageHeaderSetType(short type) {
+    writeShort(Magic.PAGE_HEADER_O_MAGIC, (short) (Magic.PAGE_MAGIC + type));
+  }
+
+  long pageHeaderGetLocation(final short pos) {
+    return readSixByteLong(pos + Magic.PhysicalRowId_O_LOCATION);
+  }
+
+  void pageHeaderSetLocation(short pos, long value) {
+    writeSixByteLong(pos + Magic.PhysicalRowId_O_LOCATION, value);
+  }
+
+  short dataPageGetFirst() {
+    return readShort(Magic.DATA_PAGE_O_FIRST);
+  }
+
+  void dataPageSetFirst(short value) {
+    pageHeaderParanoiaMagicOk();
+    if (value > 0 && value < Magic.DATA_PAGE_O_DATA)
+      throw new Error("DataPage.setFirst: offset " + value + " too small");
+    writeShort(Magic.DATA_PAGE_O_FIRST, value);
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageManager.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageManager.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageManager.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * This class manages the linked lists of pages that make up a file.
+ */
+public final class PageManager {
+  // our record file
+  final PageFile file;
+
+  private PageIo headerBuf;
+
+  /**
+   * Creates a new page manager using the indicated record file.
+   */
+  PageManager(PageFile file) throws IOException {
+    this.file = file;
+
+    // check the file headerBuf.fileHeader If the magic is 0, we assume a new
+    // file. Note that we hold on to the file header node.
+    headerBuf = file.get(0);
+    headerBuf.ensureHeapBuffer();
+    headerBuf.fileHeaderCheckHead(headerBuf.readShort(0) == 0);
+  }
+
+  /**
+   * Allocates a page of the indicated type. Returns recid of the page.
+   */
+  long allocate(short type) throws IOException {
+
+    if (type == Magic.FREE_PAGE)
+      throw new Error("allocate of free page?");
+
+    // do we have something on the free list?
+    long retval = headerBuf.fileHeaderGetFirstOf(Magic.FREE_PAGE);
+    boolean isNew = false;
+
+    if (type != Magic.TRANSLATION_PAGE) {
+
+      if (retval != 0) {
+        // yes. Point to it and make the next of that page the
+        // new first free page.
+        headerBuf.fileHeaderSetFirstOf(Magic.FREE_PAGE, getNext(retval));
+      } else {
+        // nope. make a new record
+        retval = headerBuf.fileHeaderGetLastOf(Magic.FREE_PAGE);
+        if (retval == 0)
+          // very new file - allocate record #1
+          retval = 1;
+        headerBuf.fileHeaderSetLastOf(Magic.FREE_PAGE, retval + 1);
+        isNew = true;
+      }
+    } else {
+      // translation pages have different allocation scheme
+      // and also have negative address
+      retval = headerBuf.fileHeaderGetLastOf(Magic.TRANSLATION_PAGE) - 1;
+      isNew = true;
+    }
+
+    // Cool. We have a record, add it to the correct list
+    PageIo pageHdr = file.get(retval);
+    if (isNew) {
+      pageHdr.pageHeaderSetType(type);
+    } else {
+      if (!pageHdr.pageHeaderMagicOk())
+        throw new Error("CRITICAL: page header magic for page "
+            + pageHdr.getPageId() + " not OK " + pageHdr.pageHeaderGetMagic());
+    }
+    long oldLast = headerBuf.fileHeaderGetLastOf(type);
+
+    // Clean data.
+    pageHdr.writeByteArray(PageFile.CLEAN_DATA, 0, 0, Storage.PAGE_SIZE);
+
+    pageHdr.pageHeaderSetType(type);
+    pageHdr.pageHeaderSetPrev(oldLast);
+    pageHdr.pageHeaderSetNext(0);
+
+    if (oldLast == 0)
+      // This was the first one of this type
+      headerBuf.fileHeaderSetFirstOf(type, retval);
+    headerBuf.fileHeaderSetLastOf(type, retval);
+    file.release(retval, true);
+
+    // If there's a previous, fix up its pointer
+    if (oldLast != 0) {
+      pageHdr = file.get(oldLast);
+      pageHdr.pageHeaderSetNext(retval);
+      file.release(oldLast, true);
+    }
+
+    return retval;
+  }
+
+  /**
+   * Frees a page of the indicated type.
+   */
+  void free(short type, long recid) throws IOException {
+    if (type == Magic.FREE_PAGE)
+      throw new Error("free free page?");
+    if (type == Magic.TRANSLATION_PAGE)
+      throw new Error("Translation page can not be dealocated");
+
+    if (recid == 0)
+      throw new Error("free header page?");
+
+    // get the page and read next and previous pointers
+    PageIo pageHdr = file.get(recid);
+    long prev = pageHdr.pageHeaderGetPrev();
+    long next = pageHdr.pageHeaderGetNext();
+
+    // put the page at the front of the free list.
+    pageHdr.pageHeaderSetType(Magic.FREE_PAGE);
+    pageHdr.pageHeaderSetNext(headerBuf.fileHeaderGetFirstOf(Magic.FREE_PAGE));
+    pageHdr.pageHeaderSetPrev(0);
+
+    headerBuf.fileHeaderSetFirstOf(Magic.FREE_PAGE, recid);
+    file.release(recid, true);
+
+    // remove the page from its old list
+    if (prev != 0) {
+      pageHdr = file.get(prev);
+      pageHdr.pageHeaderSetNext(next);
+      file.release(prev, true);
+    } else {
+      headerBuf.fileHeaderSetFirstOf(type, next);
+    }
+    if (next != 0) {
+      pageHdr = file.get(next);
+      pageHdr.pageHeaderSetPrev(prev);
+      file.release(next, true);
+    } else {
+      headerBuf.fileHeaderSetLastOf(type, prev);
+    }
+
+  }
+
+  /**
+   * Returns the page following the indicated page
+   */
+  long getNext(long page) throws IOException {
+    try {
+      return file.get(page).pageHeaderGetNext();
+    } finally {
+      file.release(page, false);
+    }
+  }
+
+  /**
+   * Returns the page before the indicated page
+   */
+  long getPrev(long page) throws IOException {
+    try {
+      return file.get(page).pageHeaderGetPrev();
+    } finally {
+      file.release(page, false);
+    }
+  }
+
+  /**
+   * Returns the first page on the indicated list.
+   */
+  long getFirst(short type) throws IOException {
+    return headerBuf.fileHeaderGetFirstOf(type);
+  }
+
+  /**
+   * Returns the last page on the indicated list.
+   */
+  long getLast(short type) throws IOException {
+    return headerBuf.fileHeaderGetLastOf(type);
+  }
+
+  /**
+   * Commit all pending (in-memory) data by flushing the page manager. This
+   * forces a flush of all outstanding pages (this it's an implicit
+   * {@link PageFile#commit} as well).
+   */
+  void commit() throws IOException {
+    // write the header out
+    file.release(headerBuf);
+    file.commit();
+
+    // and obtain it again
+    headerBuf = file.get(0);
+    headerBuf.ensureHeapBuffer();
+    headerBuf.fileHeaderCheckHead(headerBuf.readShort(0) == 0);
+  }
+
+  /**
+   * Flushes the page manager. This forces a flush of all outstanding pages
+   * (this it's an implicit {@link PageFile#commit} as well).
+   */
+  void rollback() throws IOException {
+    // release header
+    file.discard(headerBuf);
+    file.rollback();
+    // and obtain it again
+    headerBuf = file.get(0);
+    headerBuf.ensureHeapBuffer();
+    headerBuf.fileHeaderCheckHead(headerBuf.readShort(0) == 0);
+  }
+
+  /**
+   * Closes the page manager. This flushes the page manager and releases the
+   * lock on the headerBuf.fileHeader
+   */
+  void close() throws IOException {
+    file.release(headerBuf);
+    file.commit();
+    headerBuf = null;
+  }
+
+  /**
+   * PageManager permanently locks zero page, and we need this for backups
+   */
+  ByteBuffer getHeaderBufData() {
+    return headerBuf.getData();
+  }
+
+  public PageIo getFileHeader() {
+    return headerBuf;
+  }
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageTransactionManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageTransactionManager.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageTransactionManager.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageTransactionManager.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,314 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.TreeSet;
+
+import javax.crypto.Cipher;
+
+/**
+ * This class manages the transaction log that belongs to every {@link PageFile}
+ * . The transaction log is either clean, or in progress. In the latter case,
+ * the transaction manager takes care of a roll forward.
+ */
+// TODO: Handle the case where we are recovering lg9 and lg0, were we
+// should start with lg9 instead of lg0!
+
+public final class PageTransactionManager {
+  private PageFile owner;
+
+  // streams for transaction log.
+  private DataOutputStream oos;
+
+  /**
+   * In-core copy of transactions. We could read everything back from the log
+   * file, but the PageFile needs to keep the dirty pages in core anyway, so we
+   * might as well point to them and spare us a lot of hassle.
+   */
+  private ArrayList<PageIo> txn = new ArrayList<PageIo>();
+  private int curTxn = -1;
+
+  private Storage storage;
+  private Cipher cipherIn;
+  private Cipher cipherOut;
+
+  /**
+   * Instantiates a transaction manager instance. If recovery needs to be
+   * performed, it is done.
+   * 
+   * @param owner the PageFile instance that owns this transaction mgr.
+   * @param storage
+   * @param cipherIn
+   * @param cipherOut
+   */
+  PageTransactionManager(PageFile owner, Storage storage, Cipher cipherIn,
+      Cipher cipherOut) throws IOException {
+    this.owner = owner;
+    this.storage = storage;
+    this.cipherIn = cipherIn;
+    this.cipherOut = cipherOut;
+    recover();
+    open();
+  }
+
+  /**
+   * Synchronize log file data with the main database file.
+   * <p/>
+   * After this call, the main database file is guaranteed to be consistent and
+   * guaranteed to be the only file needed for backup purposes.
+   */
+  public void synchronizeLog() throws IOException {
+    synchronizeLogFromMemory();
+  }
+
+  /**
+   * Synchs in-core transactions to data file and opens a fresh log
+   */
+  private void synchronizeLogFromMemory() throws IOException {
+    close();
+
+    TreeSet<PageIo> pageList = new TreeSet<PageIo>(PAGE_IO_COMPARTOR);
+
+    int numPages = 0;
+    int writtenPages = 0;
+
+    if (txn != null) {
+      // Add each page to the pageList, replacing the old copy of this
+      // page if necessary, thus avoiding writing the same page twice
+      for (Iterator<PageIo> k = txn.iterator(); k.hasNext();) {
+        PageIo page = k.next();
+        if (pageList.contains(page)) {
+          page.decrementTransactionCount();
+        } else {
+          writtenPages++;
+          boolean result = pageList.add(page);
+        }
+        numPages++;
+      }
+
+      txn = null;
+    }
+
+    // Write the page from the pageList to disk
+    synchronizePages(pageList, true);
+
+    owner.sync();
+    open();
+  }
+
+  /**
+   * Opens the log file
+   */
+  private void open() throws IOException {
+
+    oos = storage.openTransactionLog();
+    oos.writeShort(Magic.LOGFILE_HEADER);
+    oos.flush();
+    curTxn = -1;
+  }
+
+  /**
+   * Startup recovery on all files
+   */
+  private void recover() throws IOException {
+
+    DataInputStream ois = storage.readTransactionLog();
+
+    // if transaction log is empty, or does not exist
+    if (ois == null)
+      return;
+
+    while (true) {
+      ArrayList<PageIo> pages = null;
+      try {
+        int size = LongPacker.unpackInt(ois);
+        pages = new ArrayList<PageIo>(size);
+        for (int i = 0; i < size; i++) {
+          PageIo b = new PageIo();
+          b.readExternal(ois, cipherOut);
+          pages.add(b);
+        }
+      } catch (IOException e) {
+        // corrupted logfile, ignore rest of transactions
+        break;
+      }
+      synchronizePages(pages, false);
+
+    }
+    owner.sync();
+    ois.close();
+    storage.deleteTransactionLog();
+  }
+
+  /**
+   * Synchronizes the indicated pages with the owner.
+   */
+  private void synchronizePages(Iterable<PageIo> pages, boolean fromCore)
+      throws IOException {
+    // write pages vector elements to the data file.
+    for (PageIo cur : pages) {
+      owner.synch(cur);
+      if (fromCore) {
+        cur.decrementTransactionCount();
+        if (!cur.isInTransaction()) {
+          owner.releaseFromTransaction(cur);
+        }
+      }
+    }
+  }
+
+  /**
+   * Set clean flag on the pages.
+   */
+  private void setClean(ArrayList<PageIo> pages) throws IOException {
+    for (PageIo cur : pages) {
+      cur.setClean();
+    }
+  }
+
+  /**
+   * Discards the indicated pages and notify the owner.
+   */
+  private void discardPages(ArrayList<PageIo> pages) throws IOException {
+    for (PageIo cur : pages) {
+
+      cur.decrementTransactionCount();
+      if (!cur.isInTransaction()) {
+        owner.releaseFromTransaction(cur);
+      }
+    }
+  }
+
+  /**
+   * Starts a transaction. This can pages if all slots have been filled with
+   * full transactions, waiting for the synchronization thread to clean out
+   * slots.
+   */
+  void start() throws IOException {
+    curTxn++;
+    if (curTxn == 1) {
+      synchronizeLogFromMemory();
+      curTxn = 0;
+    }
+    txn = new ArrayList();
+  }
+
+  /**
+   * Indicates the page is part of the transaction.
+   */
+  void add(PageIo page) throws IOException {
+    page.incrementTransactionCount();
+    txn.add(page);
+  }
+
+  /**
+   * Commits the transaction to the log file.
+   */
+  void commit() throws IOException {
+    LongPacker.packInt(oos, txn.size());
+    for (PageIo page : txn) {
+      page.writeExternal(oos, cipherIn);
+    }
+
+    sync();
+
+    // set clean flag to indicate pages have been written to log
+    setClean(txn);
+
+    // open a new ObjectOutputStream in order to store
+    // newer states of PageIo
+    // oos = new DataOutputStream(new BufferedOutputStream(fos));
+  }
+
+  /**
+   * Flushes and syncs
+   */
+  private void sync() throws IOException {
+    oos.flush();
+  }
+
+  /**
+   * Shutdowns the transaction manager. Resynchronizes outstanding logs.
+   */
+  void shutdown() throws IOException {
+    synchronizeLogFromMemory();
+    close();
+  }
+
+  /**
+   * Closes open files.
+   */
+  private void close() throws IOException {
+    sync();
+    oos.close();
+    oos = null;
+  }
+
+  /**
+   * Force closing the file without synchronizing pending transaction data. Used
+   * for testing purposes only.
+   */
+  void forceClose() throws IOException {
+    oos.close();
+    oos = null;
+  }
+
+  /**
+   * Use the disk-based transaction log to synchronize the data file.
+   * Outstanding memory logs are discarded because they are believed to be
+   * inconsistent.
+   */
+  void synchronizeLogFromDisk() throws IOException {
+    close();
+
+    if (txn != null) {
+      discardPages(txn);
+      txn = null;
+    }
+
+    recover();
+    open();
+  }
+
+  /**
+   * INNER CLASS. Comparator class for use by the tree set used to store the
+   * pages to write for this transaction. The PageIo objects are ordered by
+   * their page ids.
+   */
+  private static final Comparator<PageIo> PAGE_IO_COMPARTOR = new Comparator<PageIo>() {
+
+    public int compare(PageIo page1, PageIo page2) {
+
+      if (page1.getPageId() == page2.getPageId()) {
+        return 0;
+      } else if (page1.getPageId() < page2.getPageId()) {
+        return -1;
+      } else {
+        return 1;
+      }
+    }
+
+  };
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PhysicalFreeRowIdManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PhysicalFreeRowIdManager.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PhysicalFreeRowIdManager.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PhysicalFreeRowIdManager.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * This class manages free physical rowid pages and provides methods to free and
+ * allocate physical rowids on a high level.
+ */
+public final class PhysicalFreeRowIdManager {
+
+  /**
+   * maximal record size which can be hold. If record crosses multiple pages, it
+   * is trimmed before added to free list
+   */
+  static final int MAX_REC_SIZE = Storage.PAGE_SIZE * 2;
+
+  /** where data on root page starts, there are no extra data in page header */
+  static final int ROOT_HEADER_SIZE = Magic.PAGE_HEADER_SIZE;
+
+  /** page header size for slot page */
+  static final int SLOT_PAGE_HEADER_SIZE = Magic.PAGE_HEADER_SIZE
+      + Magic.SZ_SHORT + Magic.SZ_SIX_BYTE_LONG;
+
+  /** number of recids on slot page */
+  static final int OFFSET_SLOT_PAGE_REC_COUNT = Magic.PAGE_HEADER_SIZE;
+
+  static final int SLOT_PAGE_REC_NUM = (Storage.PAGE_SIZE - SLOT_PAGE_HEADER_SIZE) / 6;
+
+  /** pointer to next slo page in slot page header */
+  static final int OFFSET_SLOT_PAGE_NEXT = Magic.PAGE_HEADER_SIZE
+      + Magic.SZ_SHORT;
+
+  /** number of size slots held in root page */
+  static final int MAX_RECIDS_PER_PAGE = (Storage.PAGE_SIZE - ROOT_HEADER_SIZE - 6) / 6; // 6
+                                                                                         // is
+                                                                                         // size
+                                                                                         // of
+                                                                                         // page
+                                                                                         // pointer
+
+  /**
+   * free records are grouped into slots by record size. Here is max diff in
+   * record size per group
+   */
+  static final int ROOT_SLOT_SIZE = 1 + MAX_REC_SIZE / MAX_RECIDS_PER_PAGE;
+
+  protected final PageFile file;
+
+  protected final PageManager pageman;
+
+  /**
+   * list of free phys slots in current transaction. First two bytes are size,
+   * last 6 bytes are recid
+   */
+  private long[] inTrans = new long[8];
+  private int inTransSize = 0;
+
+  /**
+   * Creates a new instance using the indicated record file and page manager.
+   */
+  PhysicalFreeRowIdManager(PageFile file, PageManager pageman)
+      throws IOException {
+    this.file = file;
+    this.pageman = pageman;
+  }
+
+  long getFreeRecord(final int size) throws IOException {
+    if (size >= MAX_REC_SIZE)
+      return 0;
+
+    final PageIo root = getRootPage();
+    final int rootPageOffset = sizeToRootOffset(size + ROOT_SLOT_SIZE);
+    final long slotPageId = root.readSixByteLong(rootPageOffset);
+
+    if (slotPageId == 0) {
+      file.release(root);
+      return 0;
+    }
+
+    PageIo slotPage = file.get(slotPageId);
+    if (slotPage.readShort(Magic.PAGE_HEADER_O_MAGIC) != Magic.PAGE_MAGIC
+        + Magic.FREEPHYSIDS_PAGE)
+      throw new InternalError();
+
+    short recidCount = slotPage.readShort(OFFSET_SLOT_PAGE_REC_COUNT);
+    if (recidCount <= 0) {
+      throw new InternalError();
+    }
+
+    final int offset = (recidCount - 1) * 6 + SLOT_PAGE_HEADER_SIZE;
+    final long recid = slotPage.readSixByteLong(offset);
+
+    recidCount--;
+    if (recidCount > 0) {
+      // decrease counter and zero out old record
+      slotPage.writeSixByteLong(offset, 0);
+      slotPage.writeShort(OFFSET_SLOT_PAGE_REC_COUNT, recidCount);
+      file.release(root);
+      file.release(slotPage);
+    } else {
+      // release this page
+      long prevSlotPageId = slotPage.readSixByteLong(OFFSET_SLOT_PAGE_NEXT);
+      root.writeSixByteLong(rootPageOffset, prevSlotPageId);
+      file.release(root);
+      file.release(slotPage);
+      pageman.free(Magic.FREEPHYSIDS_PAGE, slotPageId);
+
+    }
+
+    return recid;
+  }
+
+  static final int sizeToRootOffset(int size) {
+    return ROOT_HEADER_SIZE + 6 * (size / ROOT_SLOT_SIZE);
+  }
+
+  /**
+   * Puts the indicated rowid on the free list, which awaits for commit
+   */
+  void putFreeRecord(final long rowid, final int size) throws IOException {
+    // ensure capacity
+    if (inTransSize == inTrans.length) {
+      inTrans = Arrays.copyOf(inTrans, inTrans.length * 2);
+    }
+    inTrans[inTransSize] = rowid + (((long) size) << 48);
+    inTransSize++;
+  }
+
+  public void commit() throws IOException {
+
+    if (inTransSize == 0)
+      return;
+
+    Arrays.sort(inTrans, 0, inTransSize - 1);
+
+    // write all uncommited free records
+    final PageIo root = getRootPage();
+    PageIo slotPage = null;
+    for (int rowIdPos = 0; rowIdPos < inTransSize; rowIdPos++) {
+      final int size = (int) (inTrans[rowIdPos] >>> 48);
+
+      final long rowid = inTrans[rowIdPos] & 0x0000FFFFFFFFFFFFL;
+      final int rootPageOffset = sizeToRootOffset(size);
+
+      long slotPageId = root.readSixByteLong(rootPageOffset);
+      if (slotPageId == 0) {
+        if (slotPage != null)
+          file.release(slotPage);
+        // create new page for this slot
+        slotPageId = pageman.allocate(Magic.FREEPHYSIDS_PAGE);
+        root.writeSixByteLong(rootPageOffset, slotPageId);
+      }
+
+      if (slotPage == null || slotPage.getPageId() != slotPageId) {
+        if (slotPage != null)
+          file.release(slotPage);
+        slotPage = file.get(slotPageId);
+      }
+      if (slotPage.readShort(Magic.PAGE_HEADER_O_MAGIC) != Magic.PAGE_MAGIC
+          + Magic.FREEPHYSIDS_PAGE)
+        throw new InternalError();
+
+      short recidCount = slotPage.readShort(OFFSET_SLOT_PAGE_REC_COUNT);
+      if (recidCount == MAX_RECIDS_PER_PAGE) {
+        file.release(slotPage);
+        // allocate new slot page and update links
+        final long newSlotPageId = pageman.allocate(Magic.FREEPHYSIDS_PAGE);
+        slotPage = file.get(newSlotPageId);
+        slotPage.writeSixByteLong(OFFSET_SLOT_PAGE_NEXT, slotPageId);
+        slotPage.writeShort(OFFSET_SLOT_PAGE_REC_COUNT, (short) 0);
+        recidCount = 0;
+        slotPageId = newSlotPageId;
+        root.writeSixByteLong(rootPageOffset, newSlotPageId);
+      }
+
+      // write new recid
+      slotPage.writeSixByteLong(recidCount * 6 + SLOT_PAGE_HEADER_SIZE, rowid);
+
+      // and increase count
+      recidCount++;
+      slotPage.writeShort(OFFSET_SLOT_PAGE_REC_COUNT, recidCount);
+
+    }
+    if (slotPage != null)
+      file.release(slotPage);
+
+    file.release(root);
+    clearFreeInTrans();
+  }
+
+  public void rollback() {
+    clearFreeInTrans();
+  }
+
+  private void clearFreeInTrans() {
+    if (inTrans.length > 128)
+      inTrans = new long[8];
+    inTransSize = 0;
+  }
+
+  /** return free phys row page. If not found create it */
+  final PageIo getRootPage() throws IOException {
+    long pageId = pageman.getFirst(Magic.FREEPHYSIDS_ROOT_PAGE);
+    if (pageId == 0) {
+      pageId = pageman.allocate(Magic.FREEPHYSIDS_ROOT_PAGE);
+    }
+    return file.get(pageId);
+  }
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PhysicalRowIdManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PhysicalRowIdManager.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PhysicalRowIdManager.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PhysicalRowIdManager.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOException;
+
+/**
+ * This class manages physical row ids, and their data.
+ */
+public final class PhysicalRowIdManager {
+
+  // The file we're talking to and the associated page manager.
+  final private PageFile file;
+  final private PageManager pageman;
+  final PhysicalFreeRowIdManager freeman;
+  static final private short DATA_PER_PAGE = (short) (Storage.PAGE_SIZE - Magic.DATA_PAGE_O_DATA);
+  // caches offset after last allocation. So we dont have to iterate throw page
+  // every allocation
+  private long cachedLastAllocatedRecordPage = Long.MIN_VALUE;
+  private short cachedLastAllocatedRecordOffset = Short.MIN_VALUE;
+
+  /**
+   * Creates a new rowid manager using the indicated record file. and page
+   * manager.
+   */
+  PhysicalRowIdManager(PageFile file, PageManager pageManager)
+      throws IOException {
+    this.file = file;
+    this.pageman = pageManager;
+    this.freeman = new PhysicalFreeRowIdManager(file, pageManager);
+
+  }
+
+  /**
+   * Inserts a new record. Returns the new physical rowid.
+   */
+  long insert(final byte[] data, final int start, final int length)
+      throws IOException {
+    if (length < 1)
+      throw new IllegalArgumentException("Length is <1");
+    if (start < 0)
+      throw new IllegalArgumentException("negative start");
+
+    long retval = alloc(length);
+    write(retval, data, start, length);
+    return retval;
+  }
+
+  /**
+   * Updates an existing record. Returns the possibly changed physical rowid.
+   */
+  long update(long rowid, final byte[] data, final int start, final int length)
+      throws IOException {
+    // fetch the record header
+    PageIo page = file.get(rowid >>> Storage.PAGE_SIZE_SHIFT);
+    short head = (short) (rowid & Storage.OFFSET_MASK);
+    int availSize = RecordHeader.getAvailableSize(page, head);
+    if (length > availSize ||
+    // difference between free and available space can be only 254.
+    // if bigger, need to realocate and free page
+        availSize - length > RecordHeader.MAX_SIZE_SPACE) {
+      // not enough space - we need to copy to a new rowid.
+      file.release(page);
+      free(rowid);
+      rowid = alloc(length);
+    } else {
+      file.release(page);
+    }
+
+    // 'nuff space, write it in and return the rowid.
+    write(rowid, data, start, length);
+    return rowid;
+  }
+
+  void fetch(final DataInputOutput out, final long rowid) throws IOException {
+    // fetch the record header
+    long current = rowid >>> Storage.PAGE_SIZE_SHIFT;
+    PageIo page = file.get(current);
+    final short head = (short) (rowid & Storage.OFFSET_MASK);
+
+    // allocate a return buffer
+    // byte[] retval = new byte[ head.getCurrentSize() ];
+    final int size = RecordHeader.getCurrentSize(page, head);
+    if (size == 0) {
+      file.release(current, false);
+      return;
+    }
+
+    // copy bytes in
+    int leftToRead = size;
+    short dataOffset = (short) (head + RecordHeader.SIZE);
+    while (leftToRead > 0) {
+      // copy current page's data to return buffer
+      int toCopy = Storage.PAGE_SIZE - dataOffset;
+      if (leftToRead < toCopy) {
+        toCopy = leftToRead;
+      }
+
+      out.writeFromByteBuffer(page.getData(), dataOffset, toCopy);
+
+      // Go to the next page
+      leftToRead -= toCopy;
+      // out.flush();
+      file.release(page);
+
+      if (leftToRead > 0) {
+        current = pageman.getNext(current);
+        page = file.get(current);
+        dataOffset = Magic.DATA_PAGE_O_DATA;
+      }
+
+    }
+
+    // return retval;
+  }
+
+  /**
+   * Allocate a new rowid with the indicated size.
+   */
+  private long alloc(int size) throws IOException {
+    size = RecordHeader.roundAvailableSize(size);
+    long retval = freeman.getFreeRecord(size);
+    if (retval == 0) {
+      retval = allocNew(size, pageman.getLast(Magic.USED_PAGE));
+    }
+    return retval;
+  }
+
+  /**
+   * Allocates a new rowid. The second parameter is there to allow for a
+   * recursive call - it indicates where the search should start.
+   */
+  private long allocNew(int size, long start) throws IOException {
+    PageIo curPage;
+    if (start == 0
+        ||
+        // last page was completely filled?
+        cachedLastAllocatedRecordPage == start
+        && cachedLastAllocatedRecordOffset == Storage.PAGE_SIZE) {
+      // we need to create a new page.
+      start = pageman.allocate(Magic.USED_PAGE);
+      curPage = file.get(start);
+      curPage.dataPageSetFirst(Magic.DATA_PAGE_O_DATA);
+      cachedLastAllocatedRecordOffset = Magic.DATA_PAGE_O_DATA;
+      cachedLastAllocatedRecordPage = curPage.getPageId();
+      RecordHeader.setAvailableSize(curPage, Magic.DATA_PAGE_O_DATA, 0);
+      RecordHeader.setCurrentSize(curPage, Magic.DATA_PAGE_O_DATA, 0);
+
+    } else {
+      curPage = file.get(start);
+    }
+
+    // follow the rowids on this page to get to the last one. We don't
+    // fall off, because this is the last page, remember?
+    short pos = curPage.dataPageGetFirst();
+    if (pos == 0) {
+      // page is exactly filled by the last page of a record
+      file.release(curPage);
+      return allocNew(size, 0);
+    }
+
+    short hdr = pos;
+
+    if (cachedLastAllocatedRecordPage != curPage.getPageId()) {
+      // position was not cached, have to find it again
+      int availSize = RecordHeader.getAvailableSize(curPage, hdr);
+      while (availSize != 0 && pos < Storage.PAGE_SIZE) {
+        pos += availSize + RecordHeader.SIZE;
+        if (pos == Storage.PAGE_SIZE) {
+          // Again, a filled page.
+          file.release(curPage);
+          return allocNew(size, 0);
+        }
+        hdr = pos;
+        availSize = RecordHeader.getAvailableSize(curPage, hdr);
+      }
+    } else {
+      hdr = cachedLastAllocatedRecordOffset;
+      pos = cachedLastAllocatedRecordOffset;
+    }
+
+    if (pos == RecordHeader.SIZE) { // TODO why is this here?
+      // the last record exactly filled the page. Restart forcing
+      // a new page.
+      file.release(curPage);
+    }
+
+    if (hdr > Storage.PAGE_SIZE - 16) {
+      file.release(curPage);
+      // there is not enought space on current page, so force new page
+      return allocNew(size, 0);
+    }
+
+    // we have the position, now tack on extra pages until we've got
+    // enough space.
+    long retval = (start << Storage.PAGE_SIZE_SHIFT) + pos;
+    int freeHere = Storage.PAGE_SIZE - pos - RecordHeader.SIZE;
+    if (freeHere < size) {
+      // check whether the last page would have only a small bit left.
+      // if yes, increase the allocation. A small bit is a record
+      // header plus 16 bytes.
+      int lastSize = (size - freeHere) % DATA_PER_PAGE;
+      if (size < DATA_PER_PAGE
+          && (DATA_PER_PAGE - lastSize) < (RecordHeader.SIZE + 16)) {
+        size += (DATA_PER_PAGE - lastSize);
+        size = RecordHeader.roundAvailableSize(size);
+      }
+
+      // write out the header now so we don't have to come back.
+      RecordHeader.setAvailableSize(curPage, hdr, size);
+      file.release(start, true);
+
+      int neededLeft = size - freeHere;
+      // Refactor these two pages!
+      while (neededLeft >= DATA_PER_PAGE) {
+        start = pageman.allocate(Magic.USED_PAGE);
+        curPage = file.get(start);
+        curPage.dataPageSetFirst((short) 0); // no rowids, just data
+        file.release(start, true);
+        neededLeft -= DATA_PER_PAGE;
+      }
+      if (neededLeft > 0) {
+        // done with whole chunks, allocate last fragment.
+        start = pageman.allocate(Magic.USED_PAGE);
+        curPage = file.get(start);
+        curPage.dataPageSetFirst((short) (Magic.DATA_PAGE_O_DATA + neededLeft));
+        file.release(start, true);
+        cachedLastAllocatedRecordOffset = (short) (Magic.DATA_PAGE_O_DATA + neededLeft);
+        cachedLastAllocatedRecordPage = curPage.getPageId();
+
+      }
+    } else {
+      // just update the current page. If there's less than 16 bytes
+      // left, we increase the allocation (16 bytes is an arbitrary
+      // number).
+      if (freeHere - size <= (16 + RecordHeader.SIZE)) {
+        size = freeHere;
+      }
+      RecordHeader.setAvailableSize(curPage, hdr, size);
+      file.release(start, true);
+      cachedLastAllocatedRecordOffset = (short) (hdr + RecordHeader.SIZE + size);
+      cachedLastAllocatedRecordPage = curPage.getPageId();
+
+    }
+    return retval;
+
+  }
+
+  void free(final long id) throws IOException {
+    // get the rowid, and write a zero current size into it.
+    final long curPageId = id >>> Storage.PAGE_SIZE_SHIFT;
+    final PageIo curPage = file.get(curPageId);
+    final short offset = (short) (id & Storage.OFFSET_MASK);
+    RecordHeader.setCurrentSize(curPage, offset, 0);
+    int size = RecordHeader.getAvailableSize(curPage, offset);
+
+    // trim size if spreads across multiple pages
+    if (offset + RecordHeader.SIZE + size > Storage.PAGE_SIZE
+        + (Storage.PAGE_SIZE - Magic.DATA_PAGE_O_DATA)) {
+      // minus data remaining on this page
+      int numOfPagesToSkip = (size - (Storage.PAGE_SIZE - (offset - RecordHeader.SIZE)))
+          / (Storage.PAGE_SIZE - Magic.DATA_PAGE_O_DATA);
+      size = size - numOfPagesToSkip
+          * (Storage.PAGE_SIZE - Magic.DATA_PAGE_O_DATA);
+      RecordHeader.setAvailableSize(curPage, offset, size);
+
+      // get next page
+      long nextPage = curPage.pageHeaderGetNext();
+      file.release(curPage);
+
+      // release pages
+      for (int i = 0; i < numOfPagesToSkip; i++) {
+        PageIo page = file.get(nextPage);
+        long nextPage2 = page.pageHeaderGetNext();
+        file.release(page);
+        pageman.free(Magic.USED_PAGE, nextPage);
+        nextPage = nextPage2;
+      }
+
+    } else {
+      file.release(curPage);
+    }
+
+    // write the rowid to the free list
+    freeman.putFreeRecord(id, size);
+  }
+
+  /**
+   * Writes out data to a rowid. Assumes that any resizing has been done.
+   */
+  private void write(final long rowid, final byte[] data, final int start,
+      final int length) throws IOException {
+    long current = rowid >>> Storage.PAGE_SIZE_SHIFT;
+    PageIo page = file.get(current);
+    final short hdr = (short) (rowid & Storage.OFFSET_MASK);
+    RecordHeader.setCurrentSize(page, hdr, length);
+    if (length == 0) {
+      file.release(current, true);
+      return;
+    }
+
+    // copy bytes in
+    int offsetInBuffer = start;
+    int leftToWrite = length;
+    short dataOffset = (short) (hdr + RecordHeader.SIZE);
+    while (leftToWrite > 0) {
+      // copy current page's data to return buffer
+      int toCopy = Storage.PAGE_SIZE - dataOffset;
+
+      if (leftToWrite < toCopy) {
+        toCopy = leftToWrite;
+      }
+      page.writeByteArray(data, offsetInBuffer, dataOffset, toCopy);
+
+      // Go to the next page
+      leftToWrite -= toCopy;
+      offsetInBuffer += toCopy;
+
+      file.release(current, true);
+
+      if (leftToWrite > 0) {
+        current = pageman.getNext(current);
+        page = file.get(current);
+        dataOffset = Magic.DATA_PAGE_O_DATA;
+      }
+    }
+  }
+
+  void rollback() throws IOException {
+    cachedLastAllocatedRecordPage = Long.MIN_VALUE;
+    cachedLastAllocatedRecordOffset = Short.MIN_VALUE;
+    freeman.rollback();
+  }
+
+  void commit() throws IOException {
+    freeman.commit();
+  }
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/RecordHeader.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/RecordHeader.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/RecordHeader.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/RecordHeader.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+/**
+ * The data that comes at the start of a record of data. It stores both the
+ * current size and the avaliable size for the record - the latter can be bigger
+ * than the former, which allows the record to grow without needing to be moved
+ * and which allows the system to put small records in larger free spots.
+ * <p/>
+ * In JDBM 1.0 both values were stored as four-byte integers. This was very
+ * wastefull. Now available size is stored in two bytes, it is compressed, so
+ * maximal value is up to 120 MB (not sure with exact number) Current size is
+ * stored as two-byte-unsigned-short difference from Available Size.
+ */
+public final class RecordHeader {
+  // offsets
+  private static final short O_CURRENTSIZE = 0; // int currentSize
+  private static final short O_AVAILABLESIZE = Magic.SZ_BYTE; // int
+                                                              // availableSize
+  static final int MAX_RECORD_SIZE = 8355839;
+  static final int SIZE = O_AVAILABLESIZE + Magic.SZ_SHORT;
+  /**
+   * Maximal difference between current and available size, Maximal value is
+   * reserved for currentSize 0, so use -1
+   */
+  static final int MAX_SIZE_SPACE = 255 - 1;
+
+  /**
+   * Returns the current size
+   */
+  static int getCurrentSize(final PageIo page, final short pos) {
+    int s = page.readByte(pos + O_CURRENTSIZE) & 0xFF;
+    if (s == MAX_SIZE_SPACE + 1)
+      return 0;
+    return getAvailableSize(page, pos) - s;
+  }
+
+  /**
+   * Sets the current size
+   */
+  static void setCurrentSize(final PageIo page, final short pos, int value) {
+    if (value == 0) {
+      page.writeByte(pos + O_CURRENTSIZE, (byte) (MAX_SIZE_SPACE + 1));
+      return;
+    }
+    int availSize = getAvailableSize(page, pos);
+    if (value < (availSize - MAX_SIZE_SPACE) || value > availSize)
+      throw new IllegalArgumentException(
+          "currentSize out of bounds, need to realocate " + value + " - "
+              + availSize);
+    page.writeByte(pos + O_CURRENTSIZE, (byte) (availSize - value));
+  }
+
+  /**
+   * Returns the available size
+   */
+  static int getAvailableSize(final PageIo page, final short pos) {
+    return deconvertAvailSize(page.readShort(pos + O_AVAILABLESIZE));
+  }
+
+  /**
+   * Sets the available size
+   */
+  static void setAvailableSize(final PageIo page, final short pos, int value) {
+    if (value != roundAvailableSize(value))
+      throw new IllegalArgumentException("value is not rounded");
+    int oldCurrSize = getCurrentSize(page, pos);
+
+    page.writeShort(pos + O_AVAILABLESIZE, convertAvailSize(value));
+    setCurrentSize(page, pos, oldCurrSize);
+  }
+
+  static short convertAvailSize(final int recordSize) {
+    if (recordSize <= Short.MAX_VALUE)
+      return (short) recordSize;
+    else {
+      int shift = recordSize - Short.MAX_VALUE;
+      if (shift % MAX_SIZE_SPACE == 0)
+        shift = shift / MAX_SIZE_SPACE;
+      else
+        shift = 1 + shift / MAX_SIZE_SPACE;
+      shift = -shift;
+      return (short) (shift);
+    }
+
+  }
+
+  static int deconvertAvailSize(final short converted) {
+    if (converted >= 0)
+      return converted;
+    else {
+      int shifted = -converted;
+      shifted = shifted * MAX_SIZE_SPACE;
+      return Short.MAX_VALUE + shifted;
+    }
+
+  }
+
+  static int roundAvailableSize(int value) {
+    if (value > MAX_RECORD_SIZE)
+      new InternalError("Maximal record size (" + MAX_RECORD_SIZE
+          + ") exceeded: " + value);
+    return deconvertAvailSize(convertAvailSize(value));
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/RecordListener.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/RecordListener.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/RecordListener.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/RecordListener.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOException;
+
+/**
+ * An listener notifed when record is inserted, updated or removed.
+ * <p/>
+ * NOTE: this class was used in JDBM2 to support secondary indexes JDBM3 does
+ * not have a secondary indexes, so this class is not publicly exposed.
+ * 
+ * @param <K> key type
+ * @param <V> value type
+ */
+public interface RecordListener<K, V> {
+
+  void recordInserted(K key, V value) throws IOException;
+
+  void recordUpdated(K key, V oldValue, V newValue) throws IOException;
+
+  void recordRemoved(K key, V value) throws IOException;
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerialClassInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerialClassInfo.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerialClassInfo.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerialClassInfo.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,569 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.InvalidClassException;
+import java.io.NotSerializableException;
+import java.io.ObjectStreamClass;
+import java.io.ObjectStreamField;
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hama.jdbm.Serialization.FastArrayList;
+
+/**
+ * This class stores information about serialized classes and fields.
+ */
+public abstract class SerialClassInfo {
+
+  private static final Class<?>[] EMPTY_ARRAY = new Class[] {};
+
+  /**
+   * Cache of constructors for each class. Pins the classes so they can't be
+   * garbage collected until ReflectionUtils can be collected.
+   */
+  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = new ConcurrentHashMap<Class<?>, Constructor<?>>();
+
+  static final Serializer<ArrayList<ClassInfo>> serializer = new Serializer<ArrayList<ClassInfo>>() {
+
+    @Override
+    public void serialize(DataOutput out, ArrayList<ClassInfo> obj)
+        throws IOException {
+      LongPacker.packInt(out, obj.size());
+      for (ClassInfo ci : obj) {
+        out.writeUTF(ci.getName());
+        out.writeBoolean(ci.isEnum);
+        out.writeBoolean(ci.isExternalizable);
+        if (ci.isExternalizable)
+          continue; // no fields
+
+        LongPacker.packInt(out, ci.fields.size());
+        for (FieldInfo fi : ci.fields) {
+          out.writeUTF(fi.getName());
+          out.writeBoolean(fi.isPrimitive());
+          out.writeUTF(fi.getType());
+        }
+      }
+    }
+
+    @Override
+    public ArrayList<ClassInfo> deserialize(DataInput in) throws IOException,
+        ClassNotFoundException {
+
+      int size = LongPacker.unpackInt(in);
+      ArrayList<ClassInfo> ret = new ArrayList<ClassInfo>(size);
+
+      for (int i = 0; i < size; i++) {
+        String className = in.readUTF();
+        boolean isEnum = in.readBoolean();
+        boolean isExternalizable = in.readBoolean();
+
+        int fieldsNum = isExternalizable ? 0 : LongPacker.unpackInt(in);
+        FieldInfo[] fields = new FieldInfo[fieldsNum];
+        for (int j = 0; j < fieldsNum; j++) {
+          fields[j] = new FieldInfo(in.readUTF(), in.readBoolean(),
+              in.readUTF(), Class.forName(className));
+        }
+        ret.add(new ClassInfo(className, fields, isEnum, isExternalizable));
+      }
+      return ret;
+    }
+  };
+
+  long serialClassInfoRecid;
+
+  public SerialClassInfo(DBAbstract db, long serialClassInfoRecid,
+      ArrayList<ClassInfo> registered) {
+    this.db = db;
+    this.serialClassInfoRecid = serialClassInfoRecid;
+    this.registered = registered;
+  }
+
+  /**
+   * Stores info about single class stored in JDBM. Roughly corresponds to
+   * 'java.io.ObjectStreamClass'
+   */
+  static class ClassInfo {
+
+    private final String name;
+    private final List<FieldInfo> fields = new ArrayList<FieldInfo>();
+    private final Map<String, FieldInfo> name2fieldInfo = new HashMap<String, FieldInfo>();
+    private final Map<String, Integer> name2fieldId = new HashMap<String, Integer>();
+    private ObjectStreamField[] objectStreamFields;
+
+    final boolean isEnum;
+
+    final boolean isExternalizable;
+
+    ClassInfo(final String name, final FieldInfo[] fields,
+        final boolean isEnum, final boolean isExternalizable) {
+      this.name = name;
+      this.isEnum = isEnum;
+      this.isExternalizable = isExternalizable;
+
+      for (FieldInfo f : fields) {
+        this.name2fieldId.put(f.getName(), this.fields.size());
+        this.fields.add(f);
+        this.name2fieldInfo.put(f.getName(), f);
+      }
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public FieldInfo[] getFields() {
+      return (FieldInfo[]) fields.toArray();
+    }
+
+    public FieldInfo getField(String name) {
+      return name2fieldInfo.get(name);
+    }
+
+    public int getFieldId(String name) {
+      Integer fieldId = name2fieldId.get(name);
+      if (fieldId != null)
+        return fieldId;
+      return -1;
+    }
+
+    public FieldInfo getField(int serialId) {
+      return fields.get(serialId);
+    }
+
+    public int addFieldInfo(FieldInfo field) {
+      name2fieldId.put(field.getName(), fields.size());
+      name2fieldInfo.put(field.getName(), field);
+      fields.add(field);
+      return fields.size() - 1;
+    }
+
+    public ObjectStreamField[] getObjectStreamFields() {
+      return objectStreamFields;
+    }
+
+    public void setObjectStreamFields(ObjectStreamField[] objectStreamFields) {
+      this.objectStreamFields = objectStreamFields;
+    }
+
+  }
+
+  /**
+   * Stores info about single field stored in JDBM. Roughly corresponds to
+   * 'java.io.ObjectFieldClass'
+   */
+  static class FieldInfo {
+    private final String name;
+    private final boolean primitive;
+    private final String type;
+    private Class typeClass;
+    // Class containing this field
+    private final Class clazz;
+    private Object setter;
+    private int setterIndex;
+    private Object getter;
+    private int getterIndex;
+
+    public FieldInfo(String name, boolean primitive, String type, Class clazz) {
+      this.name = name;
+      this.primitive = primitive;
+      this.type = type;
+      this.clazz = clazz;
+      try {
+        this.typeClass = Class.forName(type);
+      } catch (ClassNotFoundException e) {
+        this.typeClass = null;
+      }
+      initSetter();
+      initGetter();
+    }
+
+    private void initSetter() {
+      // Set setter
+      String setterName = "set" + firstCharCap(name);
+      String fieldSetterName = clazz.getName() + "#" + setterName;
+
+      Class aClazz = clazz;
+
+      // iterate over class hierarchy, until root class
+      while (aClazz != Object.class) {
+        // check if there is getMethod
+        try {
+          Method m = aClazz.getMethod(setterName, typeClass);
+          if (m != null) {
+            setter = m;
+            return;
+          }
+        } catch (Exception e) {
+          // e.printStackTrace();
+        }
+
+        // no get method, access field directly
+        try {
+          Field f = aClazz.getDeclaredField(name);
+          // security manager may not be happy about this
+          if (!f.isAccessible())
+            f.setAccessible(true);
+          setter = f;
+          return;
+        } catch (Exception e) {
+          // e.printStackTrace();
+        }
+        // move to superclass
+        aClazz = aClazz.getSuperclass();
+      }
+    }
+
+    private void initGetter() {
+      // Set setter
+      String getterName = "get" + firstCharCap(name);
+      String fieldSetterName = clazz.getName() + "#" + getterName;
+
+      Class aClazz = clazz;
+
+      // iterate over class hierarchy, until root class
+      while (aClazz != Object.class) {
+        // check if there is getMethod
+        try {
+          Method m = aClazz.getMethod(getterName);
+          if (m != null) {
+            getter = m;
+            return;
+          }
+        } catch (Exception e) {
+          // e.printStackTrace();
+        }
+
+        // no get method, access field directly
+        try {
+          Field f = aClazz.getDeclaredField(name);
+          // security manager may not be happy about this
+          if (!f.isAccessible())
+            f.setAccessible(true);
+          getter = f;
+          return;
+        } catch (Exception e) {
+          // e.printStackTrace();
+        }
+        // move to superclass
+        aClazz = aClazz.getSuperclass();
+      }
+    }
+
+    public FieldInfo(ObjectStreamField sf, Class clazz) {
+      this(sf.getName(), sf.isPrimitive(), sf.getType().getName(), clazz);
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public boolean isPrimitive() {
+      return primitive;
+    }
+
+    public String getType() {
+      return type;
+    }
+
+    private String firstCharCap(String s) {
+      return Character.toUpperCase(s.charAt(0)) + s.substring(1);
+    }
+  }
+
+  ArrayList<ClassInfo> registered;
+  Map<Class, Integer> class2classId = new HashMap<Class, Integer>();
+  Map<Integer, Class> classId2class = new HashMap<Integer, Class>();
+
+  final DBAbstract db;
+
+  public void registerClass(Class clazz) throws IOException {
+    if (clazz != Object.class)
+      assertClassSerializable(clazz);
+
+    if (containsClass(clazz))
+      return;
+
+    ObjectStreamField[] streamFields = getFields(clazz);
+    FieldInfo[] fields = new FieldInfo[streamFields.length];
+    for (int i = 0; i < fields.length; i++) {
+      ObjectStreamField sf = streamFields[i];
+      fields[i] = new FieldInfo(sf, clazz);
+    }
+
+    ClassInfo i = new ClassInfo(clazz.getName(), fields, clazz.isEnum(),
+        Externalizable.class.isAssignableFrom(clazz));
+    class2classId.put(clazz, registered.size());
+    classId2class.put(registered.size(), clazz);
+    registered.add(i);
+
+    if (db != null)
+      db.update(serialClassInfoRecid, (Serialization) this,
+          db.defaultSerializationSerializer);
+
+  }
+
+  private ObjectStreamField[] getFields(Class clazz) {
+    ObjectStreamField[] fields = null;
+    ClassInfo classInfo = null;
+    Integer classId = class2classId.get(clazz);
+    if (classId != null) {
+      classInfo = registered.get(classId);
+      fields = classInfo.getObjectStreamFields();
+    }
+    if (fields == null) {
+      ObjectStreamClass streamClass = ObjectStreamClass.lookup(clazz);
+      FastArrayList<ObjectStreamField> fieldsList = new FastArrayList<ObjectStreamField>();
+      while (streamClass != null) {
+        for (ObjectStreamField f : streamClass.getFields()) {
+          fieldsList.add(f);
+        }
+        clazz = clazz.getSuperclass();
+        streamClass = ObjectStreamClass.lookup(clazz);
+      }
+      fields = new ObjectStreamField[fieldsList.size()];
+      for (int i = 0; i < fields.length; i++) {
+        fields[i] = fieldsList.get(i);
+      }
+      if (classInfo != null)
+        classInfo.setObjectStreamFields(fields);
+    }
+    return fields;
+  }
+
+  private void assertClassSerializable(Class clazz)
+      throws NotSerializableException, InvalidClassException {
+    if (containsClass(clazz))
+      return;
+
+    if (!Serializable.class.isAssignableFrom(clazz))
+      throw new NotSerializableException(clazz.getName());
+  }
+
+  public Object getFieldValue(String fieldName, Object object) {
+    try {
+      registerClass(object.getClass());
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    ClassInfo classInfo = registered.get(class2classId.get(object.getClass()));
+    return getFieldValue(classInfo.getField(fieldName), object);
+  }
+
+  public Object getFieldValue(FieldInfo fieldInfo, Object object) {
+
+    Object fieldAccessor = fieldInfo.getter;
+    try {
+      if (fieldAccessor instanceof Method) {
+        Method m = (Method) fieldAccessor;
+        return m.invoke(object);
+      } else {
+        Field f = (Field) fieldAccessor;
+        return f.get(object);
+      }
+    } catch (Exception e) {
+
+    }
+
+    throw new NoSuchFieldError(object.getClass() + "." + fieldInfo.getName());
+  }
+
+  public void setFieldValue(String fieldName, Object object, Object value) {
+    try {
+      registerClass(object.getClass());
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    ClassInfo classInfo = registered.get(class2classId.get(object.getClass()));
+    setFieldValue(classInfo.getField(fieldName), object, value);
+  }
+
+  public void setFieldValue(FieldInfo fieldInfo, Object object, Object value) {
+
+    Object fieldAccessor = fieldInfo.setter;
+    try {
+      if (fieldAccessor instanceof Method) {
+        Method m = (Method) fieldAccessor;
+        m.invoke(object, value);
+      } else {
+        Field f = (Field) fieldAccessor;
+        f.set(object, value);
+      }
+      return;
+    } catch (Throwable e) {
+      e.printStackTrace();
+    }
+
+    throw new NoSuchFieldError(object.getClass() + "." + fieldInfo.getName());
+  }
+
+  public boolean containsClass(Class clazz) {
+    return (class2classId.get(clazz) != null);
+  }
+
+  public int getClassId(Class clazz) {
+    Integer classId = class2classId.get(clazz);
+    if (classId != null) {
+      return classId;
+    }
+    throw new Error("Class is not registered: " + clazz);
+  }
+
+  public void writeObject(DataOutput out, Object obj, FastArrayList objectStack)
+      throws IOException {
+    registerClass(obj.getClass());
+
+    // write class header
+    int classId = getClassId(obj.getClass());
+    LongPacker.packInt(out, classId);
+    ClassInfo classInfo = registered.get(classId);
+
+    if (classInfo.isExternalizable) {
+      Externalizable o = (Externalizable) obj;
+      DataInputOutput out2 = (DataInputOutput) out;
+      try {
+        out2.serializer = this;
+        out2.objectStack = objectStack;
+        o.writeExternal(out2);
+      } finally {
+        out2.serializer = null;
+        out2.objectStack = null;
+      }
+      return;
+    }
+
+    if (classInfo.isEnum) {
+      int ordinal = ((Enum) obj).ordinal();
+      LongPacker.packInt(out, ordinal);
+    }
+
+    ObjectStreamField[] fields = getFields(obj.getClass());
+    LongPacker.packInt(out, fields.length);
+
+    for (ObjectStreamField f : fields) {
+      // write field ID
+      int fieldId = classInfo.getFieldId(f.getName());
+      if (fieldId == -1) {
+        // field does not exists in class definition stored in db,
+        // propably new field was added so add field descriptor
+        fieldId = classInfo.addFieldInfo(new FieldInfo(f, obj.getClass()));
+        db.update(serialClassInfoRecid, (Serialization) this,
+            db.defaultSerializationSerializer);
+      }
+      LongPacker.packInt(out, fieldId);
+      // and write value
+      Object fieldValue = getFieldValue(classInfo.getField(fieldId), obj);
+      serialize(out, fieldValue, objectStack);
+    }
+  }
+
+  public Object readObject(DataInput in, FastArrayList objectStack)
+      throws IOException {
+    // read class header
+    try {
+      int classId = LongPacker.unpackInt(in);
+      ClassInfo classInfo = registered.get(classId);
+      // Class clazz = Class.forName(classInfo.getName());
+      Class clazz = classId2class.get(classId);
+      if (clazz == null)
+        clazz = Class.forName(classInfo.getName());
+      assertClassSerializable(clazz);
+
+      Object o;
+
+      if (classInfo.isEnum) {
+        int ordinal = LongPacker.unpackInt(in);
+        o = clazz.getEnumConstants()[ordinal];
+      } else {
+        o = createInstance(clazz, Object.class);
+      }
+
+      objectStack.add(o);
+
+      if (classInfo.isExternalizable) {
+        Externalizable oo = (Externalizable) o;
+        DataInputOutput in2 = (DataInputOutput) in;
+        try {
+          in2.serializer = this;
+          in2.objectStack = objectStack;
+          oo.readExternal(in2);
+        } finally {
+          in2.serializer = null;
+          in2.objectStack = null;
+        }
+
+      } else {
+        int fieldCount = LongPacker.unpackInt(in);
+        for (int i = 0; i < fieldCount; i++) {
+          int fieldId = LongPacker.unpackInt(in);
+          FieldInfo f = classInfo.getField(fieldId);
+          Object fieldValue = deserialize(in, objectStack);
+          setFieldValue(f, o, fieldValue);
+        }
+      }
+      return o;
+    } catch (Exception e) {
+      throw new Error("Could not instanciate class", e);
+    }
+  }
+
+  private static Map<Class, Constructor> class2constuctor = new HashMap<Class, Constructor>();
+
+  /**
+   * Little trick to create new instance without using constructor. Taken from
+   * http://www.javaspecialists.eu/archive/Issue175.html
+   */
+  private static <T> T createInstance(Class<T> clazz, Class<? super T> parent) {
+    return newInstance(clazz);
+  }
+
+  public static <T> T newInstance(Class<T> theClass) {
+    T result;
+    try {
+      @SuppressWarnings("unchecked")
+      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
+      if (meth == null) {
+        meth = theClass.getDeclaredConstructor(EMPTY_ARRAY);
+        meth.setAccessible(true);
+        CONSTRUCTOR_CACHE.put(theClass, meth);
+      }
+      result = meth.newInstance();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    return result;
+  }
+
+  protected abstract Object deserialize(DataInput in, FastArrayList objectStack)
+      throws IOException, ClassNotFoundException;
+
+  protected abstract void serialize(DataOutput out, Object fieldValue,
+      FastArrayList objectStack) throws IOException;
+
+}



Mime
View raw message