james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ieu...@apache.org
Subject svn commit: r1164981 [2/5] - in /james/mailbox/trunk: ./ hbase/ hbase/src/ hbase/src/main/ hbase/src/main/config/ hbase/src/main/java/ hbase/src/main/java/org/ hbase/src/main/java/org/apache/ hbase/src/main/java/org/apache/james/ hbase/src/main/java/or...
Date Sun, 04 Sep 2011 09:49:16 GMT
Added: james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseMailboxMapper.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseMailboxMapper.java?rev=1164981&view=auto
==============================================================================
--- james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseMailboxMapper.java (added)
+++ james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseMailboxMapper.java Sun Sep  4 09:49:14 2011
@@ -0,0 +1,374 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.mailbox.hbase.mail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.SubstringComparator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.james.mailbox.MailboxException;
+import org.apache.james.mailbox.MailboxNotFoundException;
+import org.apache.james.mailbox.MailboxPath;
+import org.apache.james.mailbox.hbase.HBaseNonTransactionalMapper;
+import org.apache.james.mailbox.hbase.mail.model.HBaseMailbox;
+import org.apache.james.mailbox.store.mail.MailboxMapper;
+import org.apache.james.mailbox.store.mail.model.Mailbox;
+import static org.apache.james.mailbox.hbase.HBaseUtils.*;
+import static org.apache.james.mailbox.hbase.HBaseNames.*;
+
+/**
+ * Data access management for mailbox.
+ * 
+ */
+public class HBaseMailboxMapper extends HBaseNonTransactionalMapper implements MailboxMapper<UUID> {
+
+    /** Link to the HBase Configuration object and specific mailbox names */
+    private final Configuration conf;
+
+    public HBaseMailboxMapper(Configuration conf) {
+        this.conf = conf;
+    }
+
+    @Override
+    public Mailbox<UUID> findMailboxByPath(MailboxPath mailboxPath) throws MailboxException, MailboxNotFoundException {
+        HTable mailboxes = null;
+        ResultScanner scanner = null;
+        try {
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+
+            Scan scan = new Scan();
+            scan.addFamily(MAILBOX_CF);
+            scan.setCaching(mailboxes.getScannerCaching() * 2);
+            scan.setMaxVersions(1);
+
+            /* 
+             * Filters is ORDERED. Passing the parameters in the right order might improve performance:
+             * passing the user first means that the other filters will not be tested if the mailbox
+             * does not belong to the passed user.
+             */
+            FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+
+            if (mailboxPath.getUser() != null) {
+                SingleColumnValueFilter userFilter = new SingleColumnValueFilter(MAILBOX_CF, MAILBOX_USER, CompareOp.EQUAL, Bytes.toBytes(mailboxPath.getUser()));
+                filters.addFilter(userFilter);
+            }
+            SingleColumnValueFilter nameFilter = new SingleColumnValueFilter(MAILBOX_CF, MAILBOX_NAME, CompareOp.EQUAL, Bytes.toBytes(mailboxPath.getName()));
+            filters.addFilter(nameFilter);
+            SingleColumnValueFilter namespaceFilter = new SingleColumnValueFilter(MAILBOX_CF, MAILBOX_NAMESPACE, CompareOp.EQUAL, Bytes.toBytes(mailboxPath.getNamespace()));
+            filters.addFilter(namespaceFilter);
+
+            scan.setFilter(filters);
+            scanner = mailboxes.getScanner(scan);
+            Result result = scanner.next();
+
+            if (result == null) {
+                throw new MailboxNotFoundException(mailboxPath);
+            }
+            return mailboxFromResult(result);
+        } catch (IOException e) {
+            throw new MailboxException("Search of mailbox " + mailboxPath + " failed", e);
+        } finally {
+            scanner.close();
+            if (mailboxes != null) {
+                try {
+                    mailboxes.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + mailboxes, ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public List<Mailbox<UUID>> findMailboxWithPathLike(MailboxPath mailboxPath) throws MailboxException {
+        HTable mailboxes = null;
+        ResultScanner scanner = null;
+        try {
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+
+            Scan scan = new Scan();
+            scan.addFamily(MAILBOX_CF);
+            scan.setCaching(mailboxes.getScannerCaching() * 2);
+            scan.setMaxVersions(1);
+
+            FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+
+            if (mailboxPath.getUser() != null) {
+                SingleColumnValueFilter userFilter = new SingleColumnValueFilter(MAILBOX_CF, MAILBOX_USER, CompareOp.EQUAL, Bytes.toBytes(mailboxPath.getUser()));
+                filters.addFilter(userFilter);
+            }
+            SubstringComparator pathComparator;
+            String mboxName = mailboxPath.getName();
+            /* TODO: use a RegExFiler */
+            if (mboxName.length() >= 1) {
+                if (mboxName.charAt(mboxName.length() - 1) == '%') {
+                    mboxName = mboxName.substring(0, mboxName.length() - 1);
+                }
+            }
+            if (mboxName.length() >= 1) {
+                if (mboxName.charAt(0) == '%') {
+                    mboxName = mboxName.substring(1);
+                }
+            }
+            pathComparator = new SubstringComparator(mboxName);
+            SingleColumnValueFilter nameFilter = new SingleColumnValueFilter(MAILBOX_CF, MAILBOX_NAME, CompareOp.EQUAL, pathComparator);
+            filters.addFilter(nameFilter);
+            SingleColumnValueFilter namespaceFilter = new SingleColumnValueFilter(MAILBOX_CF, MAILBOX_NAMESPACE, CompareOp.EQUAL, Bytes.toBytes(mailboxPath.getNamespace()));
+            filters.addFilter(namespaceFilter);
+
+            scan.setFilter(filters);
+            scanner = mailboxes.getScanner(scan);
+
+            List<Mailbox<UUID>> mailboxList = new ArrayList<Mailbox<UUID>>();
+
+            for (Result result : scanner) {
+                mailboxList.add(mailboxFromResult(result));
+            }
+            return mailboxList;
+        } catch (IOException e) {
+            throw new MailboxException("Search of mailbox " + mailboxPath + " failed", e);
+        } finally {
+            scanner.close();
+            if (mailboxes != null) {
+                try {
+                    mailboxes.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + mailboxes, ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public List<Mailbox<UUID>> list() throws MailboxException {
+        HTable mailboxes = null;
+        ResultScanner scanner = null;
+        //TODO: possible performance isssues, we are creating an object from all the rows in HBase mailbox table
+        try {
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+            Scan scan = new Scan();
+            scan.addFamily(MAILBOX_CF);
+            scan.setCaching(mailboxes.getScannerCaching() * 2);
+            scan.setMaxVersions(1);
+            scanner = mailboxes.getScanner(scan);
+            List<Mailbox<UUID>> mailboxList = new ArrayList<Mailbox<UUID>>();
+
+            Result result;
+            while ((result = scanner.next()) != null) {
+                Mailbox<UUID> mlbx = mailboxFromResult(result);
+                mailboxList.add(mlbx);
+            }
+            return mailboxList;
+        } catch (IOException ex) {
+            throw new MailboxException("HBase IOException in list()", ex);
+        } finally {
+            scanner.close();
+            if (mailboxes != null) {
+                try {
+                    mailboxes.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + mailboxes, ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void endRequest() {
+    }
+
+    @Override
+    public void save(Mailbox<UUID> mlbx) throws MailboxException {
+        //TODO: maybe switch to checkAndPut for transactions
+        HTable mailboxes = null;
+        try {
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+            /* cast to HBaseMailbox to access lastuid and ModSeq*/
+            Put put = toPut((HBaseMailbox) mlbx);
+            mailboxes.put(put);
+        } catch (IOException ex) {
+            throw new MailboxException("IOExeption", ex);
+        } finally {
+            if (mailboxes != null) {
+                try {
+                    mailboxes.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + mailboxes, ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void delete(Mailbox<UUID> mlbx) throws MailboxException {
+        //TODO: maybe switch to checkAndDelete
+        HTable mailboxes = null;
+        try {
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+            //TODO: delete all maessages from this mailbox
+            Delete delete = new Delete(mailboxRowKey(mlbx.getMailboxId()));
+            mailboxes.delete(delete);
+        } catch (IOException ex) {
+            throw new MailboxException("IOException in HBase cluster during delete()", ex);
+        } finally {
+            if (mailboxes != null) {
+                try {
+                    mailboxes.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + mailboxes, ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean hasChildren(final Mailbox<UUID> mailbox, final char c) throws MailboxException, MailboxNotFoundException {
+        HTable mailboxes = null;
+        ResultScanner scanner = null;
+        try {
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+
+            Scan scan = new Scan();
+            scan.addFamily(MAILBOX_CF);
+            scan.setCaching(mailboxes.getScannerCaching() * 2);
+            scan.setMaxVersions(1);
+
+            FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ALL);
+
+            if (mailbox.getUser() != null) {
+                SingleColumnValueFilter userFilter = new SingleColumnValueFilter(MAILBOX_CF, MAILBOX_USER, CompareOp.EQUAL, Bytes.toBytes(mailbox.getUser()));
+                filters.addFilter(userFilter);
+            }
+            SingleColumnValueFilter nameFilter = new SingleColumnValueFilter(MAILBOX_CF,
+                    MAILBOX_NAME,
+                    CompareOp.EQUAL,
+                    new BinaryPrefixComparator(Bytes.toBytes(mailbox.getName() + c)));
+            filters.addFilter(nameFilter);
+            SingleColumnValueFilter namespaceFilter = new SingleColumnValueFilter(MAILBOX_CF, MAILBOX_NAMESPACE, CompareOp.EQUAL, Bytes.toBytes(mailbox.getNamespace()));
+            filters.addFilter(namespaceFilter);
+
+            scan.setFilter(filters);
+            scanner = mailboxes.getScanner(scan);
+            try {
+                if (scanner.next() != null) {
+                    return true;
+                }
+            } catch (IOException e) {
+                throw new MailboxNotFoundException("hasChildren() " + mailbox.getName());
+            }
+            return false;
+        } catch (IOException e) {
+            throw new MailboxException("Search of mailbox " + mailbox + " failed", e);
+        } finally {
+            scanner.close();
+            if (mailboxes != null) {
+                try {
+                    mailboxes.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + mailboxes, ex);
+                }
+            }
+        }
+    }
+
+    public void deleteAllMemberships() {
+        HTable messages = null;
+        HTable mailboxes = null;
+        ResultScanner scanner = null;
+        try {
+            messages = new HTable(conf, MESSAGES_TABLE);
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+            Scan scan = new Scan();
+            scan.setMaxVersions(1);
+            scan.addColumn(MESSAGES_META_CF, MESSAGE_INTERNALDATE);
+            scanner = messages.getScanner(scan);
+            Result result;
+            List<Delete> deletes = new ArrayList<Delete>();
+            while ((result = scanner.next()) != null) {
+                deletes.add(new Delete(result.getRow()));
+            }
+            long totalDeletes = deletes.size();
+            messages.delete(deletes);
+            if (deletes.size() > 0) {
+                //TODO: what shoul we do if not all messages are deleted?
+                System.out.println("Just " + deletes.size() + " out of " + totalDeletes + " messages have been deleted");
+                //throw new RuntimeException("Just " + deletes.size() + " out of " + totalDeletes + " messages have been deleted");
+            }
+            List<Put> puts = new ArrayList<Put>();
+            scan = new Scan();
+            scan.setMaxVersions(1);
+            scan.addColumn(MAILBOX_CF, MAILBOX_MESSAGE_COUNT);
+            scanner = mailboxes.getScanner(scan);
+            Put put = null;
+            while ((result = scanner.next()) != null) {
+                put = new Put(result.getRow());
+                put.add(MAILBOX_CF, MAILBOX_MESSAGE_COUNT, Bytes.toBytes(0L));
+                puts.add(new Put());
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Error deleting MESSAGES table ", e);
+        } finally {
+            scanner.close();
+            if (messages != null) {
+                try {
+                    messages.close();
+                } catch (IOException ex) {
+                    throw new RuntimeException("Error closing table " + messages, ex);
+                }
+            }
+        }
+    }
+
+    public void deleteAllMailboxes() {
+        HTable mailboxes = null;
+        ResultScanner scanner = null;
+        try {
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+            Scan scan = new Scan();
+            scan.setMaxVersions(1);
+            scan.addColumn(MAILBOX_CF, MAILBOX_NAME);
+            scanner = mailboxes.getScanner(scan);
+            Result result;
+            List<Delete> deletes = new ArrayList<Delete>();
+            while ((result = scanner.next()) != null) {
+                deletes.add(new Delete(result.getRow()));
+            }
+            long totalDeletes = deletes.size();
+            mailboxes.delete(deletes);
+            if (deletes.size() > 0) {
+                //throw new RuntimeException("Just " + deletes.size() + " out of " + totalDeletes + " mailboxes have been deleted");
+            }
+        } catch (IOException ex) {
+            throw new RuntimeException("IOException deleting mailboxes", ex);
+        }
+    }
+}

Added: james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseMessage.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseMessage.java?rev=1164981&view=auto
==============================================================================
--- james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseMessage.java (added)
+++ james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseMessage.java Sun Sep  4 09:49:14 2011
@@ -0,0 +1,397 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.mailbox.hbase.mail;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.UUID;
+
+import javax.mail.Flags;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.james.mailbox.MailboxException;
+import org.apache.james.mailbox.hbase.io.ChunkInputStream;
+import org.apache.james.mailbox.store.mail.model.AbstractMessage;
+import org.apache.james.mailbox.store.mail.model.Message;
+import org.apache.james.mailbox.store.mail.model.Property;
+import org.apache.james.mailbox.store.mail.model.impl.PropertyBuilder;
+
+import static org.apache.james.mailbox.hbase.HBaseUtils.*;
+import static org.apache.james.mailbox.hbase.HBaseNames.*;
+
+/**
+ * Concrete HBaseMessage implementation. This implementation does not store any
+ * message content. The message content is retrieved using a ChunkedInputStream 
+ * directly from HBase.
+ */
+public class HBaseMessage extends AbstractMessage<UUID> {
+
+    private static final String TOSTRING_SEPARATOR = " ";
+    /** Configuration for the HBase cluster */
+    private final Configuration conf;
+    /** The value for the mailboxId field */
+    private UUID mailboxId;
+    /** The value for the uid field */
+    private long uid;
+    /** The value for the modSeq field */
+    private long modSeq;
+    /** The value for the internalDate field */
+    private Date internalDate;
+    /** The value for the answered field */
+    private boolean answered = false;
+    /** The value for the deleted field */
+    private boolean deleted = false;
+    /** The value for the draft field */
+    private boolean draft = false;
+    /** The value for the flagged field */
+    private boolean flagged = false;
+    /** The value for the recent field */
+    private boolean recent = false;
+    /** The value for the seen field */
+    private boolean seen = false;
+    /** The first body octet */
+    private int bodyStartOctet;
+    /** Number of octets in the full document content */
+    private long contentOctets;
+    /** MIME media type */
+    private String mediaType;
+    /** MIME sub type */
+    private String subType;
+    /** THE CRFL count when this document is textual, null otherwise */
+    private Long textualLineCount;
+    /** Meta data for this message */
+    private List<Property> properties;
+    private List<String> userFlags;
+
+    /**
+     * Create a copy of the given message.
+     * All properties are cloned except mailbox and UID.
+     * @param mailboxId
+     * @param uid
+     * @param modSeq
+     * @param original
+     * @throws MailboxException 
+     */
+    public HBaseMessage(Configuration conf, UUID mailboxId, long uid, long modSeq, Message<?> original) throws MailboxException {
+        super();
+        this.conf = conf;
+        this.mailboxId = mailboxId;
+        this.uid = uid;
+        this.modSeq = modSeq;
+        this.userFlags = new ArrayList<String>();
+        setFlags(original.createFlags());
+
+        // A copy of a message is recent 
+        // See MAILBOX-85
+        this.recent = true;
+
+        this.contentOctets = original.getFullContentOctets();
+        this.bodyStartOctet = (int) (original.getFullContentOctets() - original.getBodyOctets());
+        this.internalDate = original.getInternalDate();
+
+        this.textualLineCount = original.getTextualLineCount();
+        this.mediaType = original.getMediaType();
+        this.subType = original.getSubType();
+        this.properties = original.getProperties();
+    }
+
+    /**
+     * Create a copy of the given message.
+     * @param mailboxId
+     * @param internalDate
+     * @param flags
+     * @param contentOctets
+     * @param bodyStartOctet
+     * @param propertyBuilder 
+     */
+    public HBaseMessage(Configuration conf, UUID mailboxId, Date internalDate, Flags flags, long contentOctets, int bodyStartOctet, PropertyBuilder propertyBuilder) {
+        super();
+        this.conf = conf;
+        this.mailboxId = mailboxId;
+        this.internalDate = internalDate;
+        userFlags = new ArrayList<String>();
+
+        setFlags(flags);
+        this.contentOctets = contentOctets;
+        this.bodyStartOctet = bodyStartOctet;
+        this.textualLineCount = propertyBuilder.getTextualLineCount();
+        this.mediaType = propertyBuilder.getMediaType();
+        this.subType = propertyBuilder.getSubType();
+        this.properties = propertyBuilder.toProperties();
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.model.Message#getBodyContent()
+     */
+    @Override
+    public InputStream getBodyContent() throws IOException {
+        return new ChunkInputStream(conf, MESSAGES_TABLE, MESSAGE_DATA_BODY, messageRowKey(this));
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.model.Message#getHeaderContent()
+     */
+    @Override
+    public InputStream getHeaderContent() throws IOException {
+        return new ChunkInputStream(conf, MESSAGES_TABLE, MESSAGE_DATA_HEADERS, messageRowKey(this));
+    }
+
+    @Override
+    public int hashCode() {
+        final int PRIME = 31;
+        int result = 1;
+        result = PRIME * result + (int) (getMailboxId().getMostSignificantBits() ^ (getMailboxId().getMostSignificantBits() >>> 32));
+        result = PRIME * result + (int) (uid ^ (uid >>> 32));
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final HBaseMessage other = (HBaseMessage) obj;
+        if (getMailboxId() != null) {
+            if (!getMailboxId().equals(other.getMailboxId())) {
+                return false;
+            }
+        } else {
+            if (other.getMailboxId() != null) {
+                return false;
+            }
+        }
+        if (uid != other.uid) {
+            return false;
+        }
+        return true;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.model.Message#getModSeq()
+     */
+    @Override
+    public long getModSeq() {
+        return modSeq;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.model.Message#setModSeq(long)
+     */
+    @Override
+    public void setModSeq(long modSeq) {
+        this.modSeq = modSeq;
+    }
+
+    /**
+     * Gets the top level MIME content media type.
+     * 
+     * @return top level MIME content media type, or null if default
+     */
+    @Override
+    public String getMediaType() {
+        return mediaType;
+    }
+
+    /**
+     * Gets the MIME content subtype.
+     * 
+     * @return the MIME content subtype, or null if default
+     */
+    @Override
+    public String getSubType() {
+        return subType;
+    }
+
+    /**
+     * Gets a read-only list of meta-data properties.
+     * For properties with multiple values, this list will contain
+     * several enteries with the same namespace and local name.
+     * @return unmodifiable list of meta-data, not null
+     */
+    @Override
+    public List<Property> getProperties() {
+        return new ArrayList<Property>(properties);
+    }
+
+    /**
+     * Gets the number of CRLF in a textual document.
+     * @return CRLF count when document is textual,
+     * null otherwise
+     */
+    @Override
+    public Long getTextualLineCount() {
+        return textualLineCount;
+    }
+
+    public void setTextualLineCount(Long textualLineCount) {
+        this.textualLineCount = textualLineCount;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.model.Document#getFullContentOctets()
+     */
+    @Override
+    public long getFullContentOctets() {
+        return contentOctets;
+    }
+
+    @Override
+    protected int getBodyStartOctet() {
+        return bodyStartOctet;
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.MailboxMembership#getInternalDate()
+     */
+    @Override
+    public Date getInternalDate() {
+        return internalDate;
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.MailboxMembership#getMailboxId()
+     */
+    @Override
+    public UUID getMailboxId() {
+        return mailboxId;
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.MailboxMembership#getUid()
+     */
+    @Override
+    public long getUid() {
+        return uid;
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.MailboxMembership#isAnswered()
+     */
+    @Override
+    public boolean isAnswered() {
+        return answered;
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.MailboxMembership#isDeleted()
+     */
+    @Override
+    public boolean isDeleted() {
+        return deleted;
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.MailboxMembership#isDraft()
+     */
+    @Override
+    public boolean isDraft() {
+        return draft;
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.MailboxMembership#isFlagged()
+     */
+    @Override
+    public boolean isFlagged() {
+        return flagged;
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.MailboxMembership#isRecent()
+     */
+    @Override
+    public boolean isRecent() {
+        return recent;
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.MailboxMembership#isSeen()
+     */
+    @Override
+    public boolean isSeen() {
+        return seen;
+    }
+
+    @Override
+    public void setUid(long uid) {
+        this.uid = uid;
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.MailboxMembership#setFlags(javax.mail.Flags)
+     */
+    @Override
+    public final void setFlags(Flags flags) {
+        answered = flags.contains(Flags.Flag.ANSWERED);
+        deleted = flags.contains(Flags.Flag.DELETED);
+        draft = flags.contains(Flags.Flag.DRAFT);
+        flagged = flags.contains(Flags.Flag.FLAGGED);
+        recent = flags.contains(Flags.Flag.RECENT);
+        seen = flags.contains(Flags.Flag.SEEN);
+        String[] userflags = flags.getUserFlags();
+        userFlags.clear();
+        userFlags.addAll(Arrays.asList(userflags));
+    }
+
+    /**
+     * This implementation supports user flags
+     * 
+     * 
+     */
+    @Override
+    public String[] createUserFlags() {
+        String[] flags = new String[userFlags.size()];
+        for (int i = 0; i < userFlags.size(); i++) {
+            flags[i] = userFlags.get(i);
+        }
+        return flags;
+    }
+
+    @Override
+    public String toString() {
+        final String retValue =
+                "message("
+                + "mailboxId = " + this.getMailboxId() + TOSTRING_SEPARATOR
+                + "uid = " + this.uid + TOSTRING_SEPARATOR
+                + "internalDate = " + this.internalDate + TOSTRING_SEPARATOR
+                + "answered = " + this.answered + TOSTRING_SEPARATOR
+                + "deleted = " + this.deleted + TOSTRING_SEPARATOR
+                + "draft = " + this.draft + TOSTRING_SEPARATOR
+                + "flagged = " + this.flagged + TOSTRING_SEPARATOR
+                + "recent = " + this.recent + TOSTRING_SEPARATOR
+                + "seen = " + this.seen + TOSTRING_SEPARATOR
+                + " )";
+        return retValue;
+    }
+}

Added: james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseMessageMapper.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseMessageMapper.java?rev=1164981&view=auto
==============================================================================
--- james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseMessageMapper.java (added)
+++ james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseMessageMapper.java Sun Sep  4 09:49:14 2011
@@ -0,0 +1,734 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.mailbox.hbase.mail;
+
+import org.apache.hadoop.conf.Configuration;
+import java.io.BufferedInputStream;
+import org.apache.hadoop.hbase.client.Put;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import javax.mail.Flags;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
+import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.james.mailbox.MailboxException;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageMetaData;
+import org.apache.james.mailbox.MessageRange;
+import org.apache.james.mailbox.MessageRange.Type;
+import org.apache.james.mailbox.UpdatedFlags;
+import org.apache.james.mailbox.hbase.io.ChunkOutputStream;
+import org.apache.james.mailbox.store.SimpleMessageMetaData;
+import org.apache.james.mailbox.store.transaction.NonTransactionalMapper;
+import org.apache.james.mailbox.store.mail.MessageMapper;
+import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
+import org.apache.james.mailbox.store.mail.ModSeqProvider;
+import org.apache.james.mailbox.store.mail.UidProvider;
+import org.apache.james.mailbox.store.mail.model.Mailbox;
+import org.apache.james.mailbox.store.mail.model.Message;
+
+import static org.apache.james.mailbox.hbase.HBaseUtils.*;
+import static org.apache.james.mailbox.hbase.HBaseNames.*;
+import static org.apache.james.mailbox.hbase.FlagConvertor.*;
+
+/**
+ * HBase implementation of a {@link MessageMapper}. 
+ * I don't know if this class is thread-safe! Asume it is not!
+ * 
+ */
+public class HBaseMessageMapper extends NonTransactionalMapper implements MessageMapper<UUID> {
+
+    private final Configuration conf;
+    private final MailboxSession mailboxSession;
+    private final UidProvider<UUID> uidProvider;
+    private final ModSeqProvider<UUID> modSeqProvider;
+
+    public HBaseMessageMapper(final MailboxSession session,
+            final UidProvider<UUID> uidProvider,
+            ModSeqProvider<UUID> modSeqProvider, Configuration conf) {
+        this.mailboxSession = session;
+        this.modSeqProvider = modSeqProvider;
+        this.uidProvider = uidProvider;
+        this.conf = conf;
+    }
+
+    @Override
+    public void endRequest() {
+    }
+
+    @Override
+    public Iterator<Message<UUID>> findInMailbox(Mailbox<UUID> mailbox, MessageRange set, FetchType fType, int max) throws MailboxException {
+        try {
+            List<Message<UUID>> results;
+            long from = set.getUidFrom();
+            final long to = set.getUidTo();
+            final Type type = set.getType();
+
+            switch (type) {
+                default:
+                case ALL:
+                    results = findMessagesInMailbox(mailbox, max, false);
+                    break;
+                case FROM:
+                    results = findMessagesInMailboxAfterUID(mailbox, from, max, false);
+                    break;
+                case ONE:
+                    results = findMessagesInMailboxWithUID(mailbox, from, false);
+                    break;
+                case RANGE:
+                    results = findMessagesInMailboxBetweenUIDs(mailbox, from, to, max, false);
+                    break;
+            }
+            return results.iterator();
+
+        } catch (IOException e) {
+            throw new MailboxException("Search of MessageRange " + set + " failed in mailbox " + mailbox, e);
+        }
+    }
+
+    private List<Message<UUID>> findMessagesInMailbox(Mailbox<UUID> mailbox, int batchSize, boolean flaggedForDelete) throws IOException {
+        List<Message<UUID>> messageList = new ArrayList<Message<UUID>>();
+        HTable messages = new HTable(conf, MESSAGES_TABLE);
+        Scan scan = new Scan(customMessageRowKey(mailbox.getMailboxId(), 0L),
+                new PrefixFilter(Bytes.add(Bytes.toBytes(mailbox.getMailboxId().getMostSignificantBits()),
+                Bytes.toBytes(mailbox.getMailboxId().getLeastSignificantBits()))));
+        if (flaggedForDelete) {
+            SingleColumnValueFilter filter = new SingleColumnValueFilter(MESSAGES_META_CF, FLAGS_DELETED, CompareOp.EQUAL, MARKER_PRESENT);
+            filter.setFilterIfMissing(true);
+            scan.setFilter(filter);
+        }
+        scan.setMaxVersions(1);
+        /* we exclude the message content column family because it could be too large.
+         * the content will be pulled from HBase on demand by using a a ChunkedInputStream implementation.
+         */
+        scan.addFamily(MESSAGES_META_CF);
+        ResultScanner scanner = messages.getScanner(scan);
+        Result result;
+        long count = batchSize > 0 ? batchSize : Long.MAX_VALUE;
+        while (((result = scanner.next()) != null) && (count > 0)) {
+            messageList.add(messageMetaFromResult(conf, result));
+            count--;
+        }
+        scanner.close();
+        messages.close();
+        // we store uids in reverse order, we send them ascending
+        Collections.reverse(messageList);
+        return messageList;
+    }
+
+    private List<Message<UUID>> findMessagesInMailboxWithUID(Mailbox<UUID> mailbox, final long messageUid, final boolean flaggedForDelete) throws IOException {
+        List<Message<UUID>> messageList = new ArrayList<Message<UUID>>();
+        HTable messages = new HTable(conf, MESSAGES_TABLE);
+        Get get = new Get(messageRowKey(mailbox.getMailboxId(), messageUid));
+        get.setMaxVersions(1);
+        /* we exclude the message content column family because it could be too large.
+         * the content will be pulled from HBase on demand by using a a ChunkedInputStream implementation.
+         */
+        if (flaggedForDelete) {
+            SingleColumnValueFilter filter = new SingleColumnValueFilter(MESSAGES_META_CF, FLAGS_DELETED, CompareOp.EQUAL, MARKER_PRESENT);
+            filter.setFilterIfMissing(true);
+            get.setFilter(filter);
+        }
+        get.addFamily(MESSAGES_META_CF);
+        Result result = messages.get(get);
+        Message<UUID> message = null;
+        if (!result.isEmpty()) {
+            message = messageMetaFromResult(conf, result);
+            messageList.add(message);
+        }
+        messages.close();
+        return messageList;
+    }
+
+    private List<Message<UUID>> findMessagesInMailboxAfterUID(Mailbox<UUID> mailbox, final long from, final int batchSize, final boolean flaggedForDelete) throws IOException {
+        List<Message<UUID>> messageList = new ArrayList<Message<UUID>>();
+        HTable messages = new HTable(conf, MESSAGES_TABLE);
+        // uids are stored in reverse so we need to search 
+        Scan scan = new Scan(messageRowKey(mailbox.getMailboxId(), Long.MAX_VALUE),
+                messageRowKey(mailbox.getMailboxId(), from - 1));
+        if (flaggedForDelete) {
+            SingleColumnValueFilter filter = new SingleColumnValueFilter(MESSAGES_META_CF, FLAGS_DELETED, CompareOp.EQUAL, MARKER_PRESENT);
+            filter.setFilterIfMissing(true);
+            scan.setFilter(filter);
+        }
+        scan.setMaxVersions(1);
+        /* we exclude the message content column family because it could be too large.
+         * the content will be pulled from HBase on demand by using a a ChunkedInputStream implementation.
+         */
+        scan.addFamily(MESSAGES_META_CF);
+        ResultScanner scanner = messages.getScanner(scan);
+        Result result;
+        long count = batchSize > 0 ? batchSize : Long.MAX_VALUE;
+        while (((result = scanner.next()) != null) && (count > 0)) {
+            messageList.add(messageMetaFromResult(conf, result));
+            count--;
+        }
+        scanner.close();
+        messages.close();
+        // uids are stored in reverese so we change the list
+        Collections.reverse(messageList);
+        return messageList;
+    }
+
+    private List<Message<UUID>> findMessagesInMailboxBetweenUIDs(Mailbox<UUID> mailbox, final long from, final long to, final int batchSize, final boolean flaggedForDelete) throws IOException {
+        List<Message<UUID>> messageList = new ArrayList<Message<UUID>>();
+        if (from > to) {
+            return messageList;
+        }
+        HTable messages = new HTable(conf, MESSAGES_TABLE);
+        /*TODO: check if Between should be inclusive or exclusive regarding limits.
+         * HBase scan operaion are exclusive to the upper bound when providing stop row key. 
+         */
+        Scan scan = new Scan(messageRowKey(mailbox.getMailboxId(), to), messageRowKey(mailbox.getMailboxId(), from - 1));
+        if (flaggedForDelete) {
+            SingleColumnValueFilter filter = new SingleColumnValueFilter(MESSAGES_META_CF, FLAGS_DELETED, CompareOp.EQUAL, MARKER_PRESENT);
+            filter.setFilterIfMissing(true);
+            scan.setFilter(filter);
+        }
+        scan.setMaxVersions(1);
+        /* we exclude the message content column family because it could be too large.
+         * the content will be pulled from HBase on demand by using a a ChunkedInputStream implementation.
+         */
+        scan.addFamily(MESSAGES_META_CF);
+        ResultScanner scanner = messages.getScanner(scan);
+        Result result;
+
+        long count = batchSize > 0 ? batchSize : Long.MAX_VALUE;
+        while (((result = scanner.next()) != null)) {
+            if (count == 0) {
+                break;
+            }
+            Message<UUID> message = messageMetaFromResult(conf, result);
+            messageList.add(message);
+            count--;
+        }
+        scanner.close();
+        messages.close();
+        // uids are stored in reverse order
+        Collections.reverse(messageList);
+        return messageList;
+    }
+
+    @Override
+    public Map<Long, MessageMetaData> expungeMarkedForDeletionInMailbox(Mailbox<UUID> mailbox, MessageRange set) throws MailboxException {
+        try {
+            final Map<Long, MessageMetaData> data;
+            final List<Message<UUID>> results;
+            final long from = set.getUidFrom();
+            final long to = set.getUidTo();
+
+            switch (set.getType()) {
+                case ONE:
+                    results = findMessagesInMailboxWithUID(mailbox, from, true);
+                    data = createMetaData(results);
+                    deleteDeletedMessagesInMailboxWithUID(mailbox, from);
+                    break;
+                case RANGE:
+                    results = findMessagesInMailboxBetweenUIDs(mailbox, from, to, -1, true);
+                    data = createMetaData(results);
+                    deleteDeletedMessagesInMailboxBetweenUIDs(mailbox, from, to);
+                    break;
+                case FROM:
+                    results = findMessagesInMailboxAfterUID(mailbox, from, -1, true);
+                    data = createMetaData(results);
+                    deleteDeletedMessagesInMailboxAfterUID(mailbox, from);
+                    break;
+                default:
+                case ALL:
+                    results = findMessagesInMailbox(mailbox, -1, true);
+                    data = createMetaData(results);
+                    deleteDeletedMessagesInMailbox(mailbox);
+                    break;
+            }
+
+            return data;
+        } catch (IOException e) {
+            throw new MailboxException("Search of MessageRange " + set + " failed in mailbox " + mailbox, e);
+        }
+    }
+
+    @Override
+    public long countMessagesInMailbox(Mailbox<UUID> mailbox) throws MailboxException {
+        HTable mailboxes = null;
+        try {
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+            Get get = new Get(mailboxRowKey(mailbox.getMailboxId()));
+            get.addColumn(MAILBOX_CF, MAILBOX_MESSAGE_COUNT);
+            get.setMaxVersions(1);
+            Result result = mailboxes.get(get);
+            long count = Bytes.toLong(result.getValue(MAILBOX_CF, MAILBOX_MESSAGE_COUNT));
+            return count;
+        } catch (IOException e) {
+            throw new MailboxException("Count of messages failed in mailbox " + mailbox, e);
+        } finally {
+            if (mailboxes != null) {
+                try {
+                    mailboxes.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + mailboxes, ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public long countUnseenMessagesInMailbox(Mailbox<UUID> mailbox) throws MailboxException {
+        /* TODO: see if it is possible to store the number of unseen messages in the mailbox table
+         * and just return that value with a Get and kepp it up to date.
+         */
+        HTable messages = null;
+        ResultScanner scanner = null;
+        try {
+            messages = new HTable(conf, MESSAGES_TABLE);
+            /* Limit the number of entries scanned to just the mails in this mailbox */
+            Scan scan = new Scan(messageRowKey(mailbox.getMailboxId(), Long.MAX_VALUE),
+                    messageRowKey(mailbox.getMailboxId(), 0));
+            scan.addFamily(MESSAGES_META_CF);
+            scan.setFilter(new SingleColumnValueExcludeFilter(MESSAGES_META_CF, FLAGS_SEEN, CompareOp.EQUAL, MARKER_MISSING));
+            scan.setCaching(messages.getScannerCaching() * 2);
+            scan.setMaxVersions(1);
+            scanner = messages.getScanner(scan);
+            long count = 0;
+            Result result;
+            while ((result = scanner.next()) != null) {
+                count++;
+            }
+            return count;
+        } catch (IOException e) {
+            throw new MailboxException("Search of first unseen message failed in mailbox " + mailbox, e);
+        } finally {
+            scanner.close();
+            if (messages != null) {
+                try {
+                    messages.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + messages, ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void delete(Mailbox<UUID> mailbox, Message<UUID> message) throws MailboxException {
+        //TODO: maybe switch to checkAndDelete
+        HTable messages = null;
+        HTable mailboxes = null;
+        try {
+            messages = new HTable(conf, MESSAGES_TABLE);
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+            /** TODO: also implement/update the message count for this mailbox
+             *  and implement countMessages with get.
+             */
+            Delete delete = new Delete(messageRowKey(message));
+            mailboxes.incrementColumnValue(mailboxRowKey(mailbox.getMailboxId()), MAILBOX_CF, MAILBOX_MESSAGE_COUNT, -1);
+            messages.delete(delete);
+
+        } catch (IOException ex) {
+            throw new MailboxException("Delete of message " + message + " failed in mailbox " + mailbox, ex);
+        } finally {
+
+            if (mailboxes != null) {
+                try {
+                    mailboxes.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + mailboxes, ex);
+                }
+            }
+            if (messages != null) {
+                try {
+                    messages.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + messages, ex);
+                }
+            }
+
+        }
+
+    }
+
+    @Override
+    public Long findFirstUnseenMessageUid(Mailbox<UUID> mailbox) throws MailboxException {
+        HTable messages = null;
+        ResultScanner scanner = null;
+        try {
+            messages = new HTable(conf, MESSAGES_TABLE);
+            /* Limit the number of entries scanned to just the mails in this mailbox */
+            Scan scan = new Scan(messageRowKey(mailbox.getMailboxId(), Long.MAX_VALUE), messageRowKey(mailbox.getMailboxId(), 0));
+            scan.addFamily(MESSAGES_META_CF);
+            // filter out all rows with FLAGS_SEEN qualifier
+            SingleColumnValueFilter filter = new SingleColumnValueFilter(MESSAGES_META_CF, FLAGS_SEEN, CompareOp.EQUAL, MARKER_MISSING);
+            scan.setFilter(filter);
+            scan.setCaching(messages.getScannerCaching() * 2);
+            scan.setMaxVersions(1);
+            scanner = messages.getScanner(scan);
+            Result result;
+            Long lastUnseen = null;
+            byte[] row = null;
+            while ((result = scanner.next()) != null) {
+                row = result.getRow();
+            }
+            if (row != null) {
+                lastUnseen = Long.MAX_VALUE - Bytes.toLong(row, 16, 8);
+            }
+            return lastUnseen;
+        } catch (IOException e) {
+            throw new MailboxException("Search of first unseen message failed in mailbox " + mailbox, e);
+        } finally {
+            scanner.close();
+            if (messages != null) {
+                try {
+                    messages.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + messages, ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public List<Long> findRecentMessageUidsInMailbox(Mailbox<UUID> mailbox) throws MailboxException {
+        /** TODO: improve performance by implementing a last seen and last recent value per mailbox.
+         * maybe one more call to HBase is less expensive than iterating throgh all rows.
+         */
+        HTable messages = null;
+        ResultScanner scanner = null;
+        try {
+            messages = new HTable(conf, MESSAGES_TABLE);
+            /* Limit the number of entries scanned to just the mails in this mailbox */
+            Scan scan = new Scan(messageRowKey(mailbox.getMailboxId(), Long.MAX_VALUE),
+                    messageRowKey(mailbox.getMailboxId(), 0));
+            // we add the column, if it exists, the message is recent, else it is not
+            scan.addColumn(MESSAGES_META_CF, FLAGS_RECENT);
+            SingleColumnValueFilter filter = new SingleColumnValueFilter(MESSAGES_META_CF, FLAGS_RECENT, CompareOp.EQUAL, MARKER_PRESENT);
+            scan.setFilter(filter);
+            scan.setCaching(messages.getScannerCaching() * 2);
+            scan.setMaxVersions(1);
+
+            scanner = messages.getScanner(scan);
+            Result result;
+            List<Long> uids = new ArrayList<Long>();
+            while ((result = scanner.next()) != null) {
+                uids.add(Long.MAX_VALUE - Bytes.toLong(result.getRow(), 16, 8));
+            }
+            Collections.reverse(uids);
+            return uids;
+        } catch (IOException e) {
+            throw new MailboxException("Search of recent messages failed in mailbox " + mailbox, e);
+        } finally {
+            scanner.close();
+            if (messages != null) {
+                try {
+                    messages.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + messages, ex);
+                }
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.MessageMapper#add(org.apache.james.mailbox.store.mail.model.Mailbox, org.apache.james.mailbox.store.mail.model.Message)
+     */
+    @Override
+    public MessageMetaData add(Mailbox<UUID> mailbox, Message<UUID> message) throws MailboxException {
+        message.setUid(uidProvider.nextUid(mailboxSession, mailbox));
+        // if a mailbox does not support mod-sequences the provider may be null
+        if (modSeqProvider != null) {
+            message.setModSeq(modSeqProvider.nextModSeq(mailboxSession, mailbox));
+        }
+        MessageMetaData data = save(mailbox, message);
+
+        return data;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.MessageMapper#updateFlags(org.apache.james.mailbox.store.mail.model.Mailbox, javax.mail.Flags, boolean, boolean, org.apache.james.mailbox.MessageRange)
+     */
+    @Override
+    public Iterator<UpdatedFlags> updateFlags(final Mailbox<UUID> mailbox, final Flags flags, final boolean value, final boolean replace, MessageRange set) throws MailboxException {
+
+        final List<UpdatedFlags> updatedFlags = new ArrayList<UpdatedFlags>();
+        Iterator<Message<UUID>> messagesFound = findInMailbox(mailbox, set, FetchType.Metadata, -1);
+
+        HTable messages = null;
+        long modSeq = -1;
+        if (messagesFound.hasNext() == false) {
+            // if a mailbox does not support mod-sequences the provider may be null
+            if (modSeqProvider != null) {
+                modSeq = modSeqProvider.nextModSeq(mailboxSession, mailbox);
+            }
+        }
+
+        try {
+            messages = new HTable(conf, MESSAGES_TABLE);
+            while (messagesFound.hasNext()) {
+                Put put = null;
+                final Message<UUID> member = messagesFound.next();
+                Flags originalFlags = member.createFlags();
+
+                if (replace) {
+                    member.setFlags(flags);
+                } else {
+                    Flags current = member.createFlags();
+                    if (value) {
+                        current.add(flags);
+                    } else {
+                        current.remove(flags);
+                    }
+                    member.setFlags(current);
+                }
+                Flags newFlags = member.createFlags();
+                put = flagsToPut(member, newFlags);
+                if (UpdatedFlags.flagsChanged(originalFlags, newFlags)) {
+                    // increase the mod-seq as we changed the flags                    
+                    put.add(MESSAGES_META_CF, MESSAGE_MODSEQ, Bytes.toBytes(modSeq));
+                    // update put not to include the allready existing flags
+                    messages.put(put);
+                    messages.flushCommits();
+                }
+
+                UpdatedFlags uFlags = new UpdatedFlags(member.getUid(), member.getModSeq(), originalFlags, newFlags);
+                updatedFlags.add(uFlags);
+            }
+        } catch (IOException e) {
+            throw new MailboxException("Error setting flags for messages in " + mailbox, e);
+        } finally {
+            if (messages != null) {
+                try {
+                    messages.close();
+                } catch (IOException e) {
+                    throw new MailboxException("Error setting flags for messages in " + mailbox, e);
+                }
+            }
+        }   
+        
+        return updatedFlags.iterator();
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.MessageMapper#copy(org.apache.james.mailbox.store.mail.model.Mailbox, org.apache.james.mailbox.store.mail.model.Message)
+     */
+    @Override
+    public MessageMetaData copy(Mailbox<UUID> mailbox, Message<UUID> original) throws MailboxException {
+        long uid = uidProvider.nextUid(mailboxSession, mailbox);
+        long modSeq = -1;
+        if (modSeqProvider != null) {
+            modSeq = modSeqProvider.nextModSeq(mailboxSession, mailbox);
+        }
+        //TODO: check if creating a HBase message is the right thing to do
+        HBaseMessage message = new HBaseMessage(conf,
+                mailbox.getMailboxId(), uid, modSeq, original);
+        return save(mailbox, message);
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.MessageMapper#getLastUid(org.apache.james.mailbox.store.mail.model.Mailbox)
+     */
+    @Override
+    public long getLastUid(Mailbox<UUID> mailbox) throws MailboxException {
+        return uidProvider.lastUid(mailboxSession, mailbox);
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.MessageMapper#getHighestModSeq(org.apache.james.mailbox.store.mail.model.Mailbox)
+     */
+    @Override
+    public long getHighestModSeq(Mailbox<UUID> mailbox) throws MailboxException {
+        return modSeqProvider.highestModSeq(mailboxSession, mailbox);
+    }
+
+    /**
+     * Save the {@link Message} for the given {@link Mailbox} and return the {@link MessageMetaData} 
+     * 
+     * @param mailbox
+     * @param message
+     * @return metaData
+     * @throws MailboxException
+     */
+    protected MessageMetaData save(Mailbox<UUID> mailbox, Message<UUID> message) throws MailboxException {
+        HTable messages = null;
+        HTable mailboxes = null;
+        try {
+            //TODO: update the mailbox information about messages
+            messages = new HTable(conf, MESSAGES_TABLE);
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+            //save the message metadata
+            Put put = metadataToPut(message);
+            messages.put(put);
+            //save the message content
+            //TODO: current implementation is crude.
+
+            ChunkOutputStream out = new ChunkOutputStream(conf,
+                    MESSAGES_TABLE, MESSAGE_DATA_BODY, messageRowKey(message), MAX_COLUMN_SIZE);
+            int b;
+            BufferedInputStream in = new BufferedInputStream(message.getBodyContent());
+            while ((b = in.read()) != -1) {
+                out.write(b);
+            }
+            out.close();
+            out = new ChunkOutputStream(conf,
+                    MESSAGES_TABLE, MESSAGE_DATA_HEADERS, messageRowKey(message), MAX_COLUMN_SIZE);
+            in = new BufferedInputStream(message.getHeaderContent());
+            while ((b = in.read()) != -1) {
+                out.write(b);
+            }
+            out.close();
+            // increase the message count for the current mailbox
+            mailboxes.incrementColumnValue(mailboxRowKey(mailbox.getMailboxId()), MAILBOX_CF, MAILBOX_MESSAGE_COUNT, 1);
+            return new SimpleMessageMetaData(message);
+        } catch (IOException ex) {
+            throw new MailboxException("Error setting flags for messages in " + mailbox, ex);
+        } finally {
+            if (messages != null) {
+                try {
+                    messages.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + messages, ex);
+                }
+            }
+            if (mailboxes != null) {
+                try {
+                    mailboxes.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + mailboxes, ex);
+                }
+            }
+        }
+    }
+
+    private void deleteDeletedMessagesInMailboxWithUID(Mailbox<UUID> mailbox, long uid) throws IOException {
+        //TODO: do I have to check if the message is flagged for delete here?
+        HTable messages = new HTable(conf, MESSAGES_TABLE);
+        HTable mailboxes = new HTable(conf, MAILBOXES_TABLE);
+        Delete delete = new Delete(messageRowKey(mailbox.getMailboxId(), uid));
+        messages.delete(delete);
+        mailboxes.incrementColumnValue(mailboxRowKey(mailbox.getMailboxId()), MAILBOX_CF, MAILBOX_MESSAGE_COUNT, -1);
+        mailboxes.incrementColumnValue(mailboxRowKey(mailbox.getMailboxId()), MAILBOX_CF, MAILBOX_HIGHEST_MODSEQ, 1);
+        mailboxes.close();
+        messages.close();
+    }
+
+    private void deleteDeletedMessagesInMailboxBetweenUIDs(Mailbox<UUID> mailbox, long fromUid, long toUid) throws IOException {
+        HTable messages = new HTable(conf, MESSAGES_TABLE);
+        HTable mailboxes = new HTable(conf, MAILBOXES_TABLE);
+        List<Delete> deletes = new ArrayList<Delete>();
+        /*TODO: check if Between should be inclusive or exclusive regarding limits.
+         * HBase scan operaion are exclusive to the upper bound when providing stop row key. 
+         */
+        Scan scan = new Scan(messageRowKey(mailbox.getMailboxId(), fromUid), messageRowKey(mailbox.getMailboxId(), toUid));
+        scan.addColumn(MESSAGES_META_CF, FLAGS_DELETED);
+        SingleColumnValueFilter filter = new SingleColumnValueFilter(MESSAGES_META_CF, FLAGS_DELETED, CompareOp.EQUAL, MARKER_PRESENT);
+        scan.setFilter(filter);
+        scan.setMaxVersions(1);
+        ResultScanner scanner = messages.getScanner(scan);
+        Result result;
+        while ((result = scanner.next()) != null) {
+            deletes.add(new Delete(result.getRow()));
+        }
+        long totalDeletes = deletes.size();
+        scanner.close();
+        messages.delete(deletes);
+        mailboxes.incrementColumnValue(mailboxRowKey(mailbox.getMailboxId()), MAILBOX_CF, MAILBOX_MESSAGE_COUNT, -(totalDeletes - deletes.size()));
+        mailboxes.incrementColumnValue(mailboxRowKey(mailbox.getMailboxId()), MAILBOX_CF, MAILBOX_HIGHEST_MODSEQ, 1);
+        mailboxes.close();
+        messages.close();
+    }
+
+    private void deleteDeletedMessagesInMailboxAfterUID(Mailbox<UUID> mailbox, long fromUid) throws IOException {
+        HTable messages = new HTable(conf, MESSAGES_TABLE);
+        HTable mailboxes = new HTable(conf, MAILBOXES_TABLE);
+        List<Delete> deletes = new ArrayList<Delete>();
+        /*TODO: check if Between should be inclusive or exclusive regarding limits.
+         * HBase scan operaion are exclusive to the upper bound when providing stop row key. 
+         */
+        Scan scan = new Scan(messageRowKey(mailbox.getMailboxId(), fromUid));
+        scan.addColumn(MESSAGES_META_CF, FLAGS_DELETED);
+        SingleColumnValueFilter filter = new SingleColumnValueFilter(MESSAGES_META_CF, FLAGS_DELETED, CompareOp.EQUAL, MARKER_PRESENT);
+        scan.setFilter(filter);
+        scan.setMaxVersions(1);
+        ResultScanner scanner = messages.getScanner(scan);
+        Result result;
+        while ((result = scanner.next()) != null) {
+            deletes.add(new Delete(result.getRow()));
+        }
+        long totalDeletes = deletes.size();
+        scanner.close();
+        messages.delete(deletes);
+        mailboxes.incrementColumnValue(mailboxRowKey(mailbox.getMailboxId()), MAILBOX_CF, MAILBOX_MESSAGE_COUNT, -(totalDeletes - deletes.size()));
+        mailboxes.incrementColumnValue(mailboxRowKey(mailbox.getMailboxId()), MAILBOX_CF, MAILBOX_HIGHEST_MODSEQ, 1);
+        mailboxes.close();
+        messages.close();
+    }
+
+    private void deleteDeletedMessagesInMailbox(Mailbox<UUID> mailbox) throws IOException {
+        HTable messages = new HTable(conf, MESSAGES_TABLE);
+        HTable mailboxes = new HTable(conf, MAILBOXES_TABLE);
+        List<Delete> deletes = new ArrayList<Delete>();
+        /*TODO: check if Between should be inclusive or exclusive regarding limits.
+         * HBase scan operaion are exclusive to the upper bound when providing stop row key. 
+         */
+        Scan scan = new Scan(customMessageRowKey(mailbox.getMailboxId(), 0L),
+                new PrefixFilter(Bytes.add(Bytes.toBytes(mailbox.getMailboxId().getMostSignificantBits()),
+                Bytes.toBytes(mailbox.getMailboxId().getLeastSignificantBits()))));
+        scan.addColumn(MESSAGES_META_CF, FLAGS_DELETED);
+        SingleColumnValueFilter filter = new SingleColumnValueFilter(MESSAGES_META_CF, FLAGS_DELETED, CompareOp.EQUAL, MARKER_PRESENT);
+        scan.setFilter(filter);
+        scan.setMaxVersions(1);
+        ResultScanner scanner = messages.getScanner(scan);
+        Result result;
+        while ((result = scanner.next()) != null) {
+            deletes.add(new Delete(result.getRow()));
+        }
+        long totalDeletes = deletes.size();
+        scanner.close();
+        messages.delete(deletes);
+        mailboxes.incrementColumnValue(mailboxRowKey(mailbox.getMailboxId()), MAILBOX_CF, MAILBOX_MESSAGE_COUNT, -(totalDeletes - deletes.size()));
+        mailboxes.incrementColumnValue(mailboxRowKey(mailbox.getMailboxId()), MAILBOX_CF, MAILBOX_HIGHEST_MODSEQ, 1);
+        mailboxes.close();
+        messages.close();
+    }
+
+    private Map<Long, MessageMetaData> createMetaData(List<Message<UUID>> uids) {
+        final Map<Long, MessageMetaData> data = new HashMap<Long, MessageMetaData>();
+        for (int i = 0; i < uids.size(); i++) {
+            Message<UUID> m = uids.get(i);
+            data.put(m.getUid(), new SimpleMessageMetaData(m));
+        }
+        return data;
+    }
+}

Added: james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseModSeqProvider.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseModSeqProvider.java?rev=1164981&view=auto
==============================================================================
--- james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseModSeqProvider.java (added)
+++ james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseModSeqProvider.java Sun Sep  4 09:49:14 2011
@@ -0,0 +1,95 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.mailbox.hbase.mail;
+
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.james.mailbox.MailboxException;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.store.mail.ModSeqProvider;
+import org.apache.james.mailbox.store.mail.model.Mailbox;
+
+import static org.apache.james.mailbox.hbase.HBaseUtils.*;
+import static org.apache.james.mailbox.hbase.HBaseNames.*;
+/**
+ * ModSeqProvider implementation for HBase.
+ * 
+ */
+public class HBaseModSeqProvider implements ModSeqProvider<UUID> {
+
+    /** Link to the HBase Configuration object and specific mailbox names */
+    private final Configuration conf;
+
+    public HBaseModSeqProvider(Configuration conf) {
+        this.conf = conf;
+    }
+
+    @Override
+    public long highestModSeq(MailboxSession session, Mailbox<UUID> mailbox) throws MailboxException {
+        HTable mailboxes = null;
+        try {
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+            Get get = new Get(mailboxRowKey(mailbox.getMailboxId()));
+            get.addColumn(MAILBOX_CF, MAILBOX_HIGHEST_MODSEQ);
+            get.setMaxVersions(1);
+            Result result = mailboxes.get(get);
+
+            if (result == null) {
+                throw new MailboxException("Row or column not found!");
+            }
+            long modSeq = Bytes.toLong(result.getValue(MAILBOX_CF, MAILBOX_HIGHEST_MODSEQ));
+            return modSeq;
+        } catch (IOException e) {
+            throw new MailboxException("highestModSeq", e);
+        } finally {
+            if (mailboxes != null) {
+                try {
+                    mailboxes.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + mailboxes, ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public long nextModSeq(MailboxSession session, Mailbox<UUID> mailbox) throws MailboxException {
+        HTable mailboxes = null;
+        try {
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+            long newValue = mailboxes.incrementColumnValue(mailboxRowKey(mailbox.getMailboxId()), MAILBOX_CF, MAILBOX_HIGHEST_MODSEQ, 1);
+            return newValue;
+        } catch (IOException e) {
+            throw new MailboxException("lastUid", e);
+        } finally {
+            if (mailboxes != null) {
+                try {
+                    mailboxes.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + mailboxes, ex);
+                }
+            }
+        }
+    }
+}

Added: james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseUidProvider.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseUidProvider.java?rev=1164981&view=auto
==============================================================================
--- james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseUidProvider.java (added)
+++ james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/HBaseUidProvider.java Sun Sep  4 09:49:14 2011
@@ -0,0 +1,112 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.mailbox.hbase.mail;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.james.mailbox.MailboxException;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.store.mail.UidProvider;
+import org.apache.james.mailbox.store.mail.model.Mailbox;
+
+import static org.apache.james.mailbox.hbase.HBaseUtils.*;
+import static org.apache.james.mailbox.hbase.HBaseNames.*;
+/**
+ * Message UidProvider for HBase.
+ * 
+ */
+public class HBaseUidProvider implements UidProvider<UUID> {
+
+    /** Link to the HBase Configuration object and specific mailbox names */
+    private final Configuration conf;
+
+    public HBaseUidProvider(Configuration conf) {
+        this.conf = conf;
+    }
+
+    /**
+     * Returns the last message uid used in a mailbox.
+     * @param session the session
+     * @param mailbox the mailbox for which to get the last uid
+     * @return the last uid used
+     * @throws MailboxException 
+     */
+    @Override
+    public long lastUid(MailboxSession session, Mailbox<UUID> mailbox) throws MailboxException {
+        HTable mailboxes = null;
+        try {
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+            Get get = new Get(mailboxRowKey(mailbox.getMailboxId()));
+            get.addColumn(MAILBOX_CF, MAILBOX_LASTUID);
+            get.setMaxVersions(1);
+            Result result = mailboxes.get(get);
+
+            if (result == null) {
+                throw new MailboxException("Row or column not found!");
+            }
+            long uid = Bytes.toLong(result.getValue(MAILBOX_CF, MAILBOX_LASTUID));
+            return uid;
+        } catch (IOException e) {
+            throw new MailboxException("lastUid", e);
+        } finally {
+            if (mailboxes != null) {
+                try {
+                    mailboxes.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + mailboxes, ex);
+                }
+            }
+        }
+    }
+
+    /**
+     * Returns the next uid. Implemented using HTable.incrementColumnValue(row, family, qualifier, amount).
+     * 
+     * @param session the mailbox session
+     * @param mailbox the mailbox for which we are getting the next uid.
+     * @return the next uid to be used.
+     * @throws MailboxException 
+     */
+    @Override
+    public long nextUid(MailboxSession session, Mailbox<UUID> mailbox) throws MailboxException {
+        HTable mailboxes = null;
+        try {
+            mailboxes = new HTable(conf, MAILBOXES_TABLE);
+            long newValue = mailboxes.incrementColumnValue(mailboxRowKey(mailbox.getMailboxId()), MAILBOX_CF, MAILBOX_LASTUID, 1);
+            mailboxes.close();
+            return newValue;
+        } catch (IOException e) {
+            throw new MailboxException("lastUid", e);
+        } finally {
+            if (mailboxes != null) {
+                try {
+                    mailboxes.close();
+                } catch (IOException ex) {
+                    throw new MailboxException("Error closing table " + mailboxes, ex);
+                }
+            }
+        }
+    }
+}

Added: james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/model/HBaseMailbox.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/model/HBaseMailbox.java?rev=1164981&view=auto
==============================================================================
--- james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/model/HBaseMailbox.java (added)
+++ james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/mail/model/HBaseMailbox.java Sun Sep  4 09:49:14 2011
@@ -0,0 +1,196 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.mailbox.hbase.mail.model;
+
+import java.util.UUID;
+import org.apache.james.mailbox.MailboxPath;
+import org.apache.james.mailbox.store.mail.model.Mailbox;
+
+/**
+ * This class implements a mailbox. Most of the code is done after mailbox-jpa 
+ * implementations.
+ * 
+ */
+public class HBaseMailbox implements Mailbox<UUID> {
+
+    private static final String TAB = " ";
+    /** The value for the mailboxId field */
+    private UUID mailboxId;
+    /** The value for the name field */
+    private String name;
+    /** The value for the uidValidity field */
+    private long uidValidity;
+    private String user;
+    private String namespace;
+    private long lastUid;
+    private long highestModSeq;
+    private long messageCount;
+
+    public HBaseMailbox(MailboxPath mailboxPath, long uidValidity) {
+        super();
+        this.name = mailboxPath.getName();
+        this.user = mailboxPath.getUser();
+        this.namespace = mailboxPath.getNamespace();
+        this.uidValidity = uidValidity;
+        //TODO: this has to change to something that can guarantee that mailboxId is unique
+        this.mailboxId = UUID.randomUUID();
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.Mailbox#getMailboxId()
+     */
+    @Override
+    public UUID getMailboxId() {
+        return mailboxId;
+    }
+
+    public void setMailboxId(UUID mailboxId) {
+        this.mailboxId = mailboxId;
+    }
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.model.Mailbox#getNamespace()
+     */
+
+    @Override
+    public String getNamespace() {
+        return namespace;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.model.Mailbox#setNamespace(java.lang.String)
+     */
+    @Override
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.model.Mailbox#getUser()
+     */
+    @Override
+    public String getUser() {
+        return user;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.mail.model.Mailbox#setUser(java.lang.String)
+     */
+    @Override
+    public void setUser(String user) {
+        this.user = user;
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.Mailbox#getName()
+     */
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.Mailbox#setName(java.lang.String)
+     */
+    @Override
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * @see org.apache.james.mailbox.store.mail.model.Mailbox#getUidValidity()
+     */
+    @Override
+    public long getUidValidity() {
+        return uidValidity;
+    }
+
+    @Override
+    public String toString() {
+        final String retValue = "Mailbox ( "
+                + "mailboxId = " + this.mailboxId + TAB
+//                + "namespace = " + this.namespace + TAB
+                + "name = " + this.name + TAB
+//                + "user = " + this.user + TAB
+                + "uidValidity = " + this.uidValidity + TAB
+                + " )";
+        return retValue;
+    }
+
+    @Override
+    public int hashCode() {
+        final int PRIME = 31;
+        int result = 1;
+        result = PRIME * result + (int) (mailboxId.getMostSignificantBits() ^ (mailboxId.getMostSignificantBits() >>> 32));
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final HBaseMailbox other = (HBaseMailbox) obj;
+        if (!mailboxId.equals(other.getMailboxId())) {
+            return false;
+        }
+        return true;
+    }
+
+    public long getLastUid() {
+        return lastUid;
+    }
+
+    public void setLastUid(long lastUid) {
+        this.lastUid = lastUid;
+    }
+
+    public long getHighestModSeq() {
+        return highestModSeq;
+    }
+
+    public void setHighestModSeq(long highestModSeq) {
+        this.highestModSeq = highestModSeq;
+    }
+
+    public long consumeUid() {
+        return ++lastUid;
+    }
+
+    public long consumeModSeq() {
+        return ++highestModSeq;
+    }
+
+    public long getMessageCount() {
+        return messageCount;
+    }
+
+    public void setMessageCount(long messageCount) {
+        this.messageCount = messageCount;
+    }
+}

Added: james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/user/HBaseSubscriptionMapper.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/user/HBaseSubscriptionMapper.java?rev=1164981&view=auto
==============================================================================
--- james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/user/HBaseSubscriptionMapper.java (added)
+++ james/mailbox/trunk/hbase/src/main/java/org/apache/james/mailbox/hbase/user/HBaseSubscriptionMapper.java Sun Sep  4 09:49:14 2011
@@ -0,0 +1,172 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.mailbox.hbase.user;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.james.mailbox.SubscriptionException;
+import org.apache.james.mailbox.hbase.HBaseNonTransactionalMapper;
+import org.apache.james.mailbox.store.user.SubscriptionMapper;
+import org.apache.james.mailbox.store.user.model.Subscription;
+import org.apache.james.mailbox.store.user.model.impl.SimpleSubscription;
+
+import static org.apache.james.mailbox.hbase.HBaseUtils.*;
+import static org.apache.james.mailbox.hbase.HBaseNames.*;
+/**
+ * HBase implementation of a {@link SubscriptionMapper}. 
+ * I don't know if this class is thread-safe!
+ * 
+ */
+public class HBaseSubscriptionMapper extends HBaseNonTransactionalMapper implements SubscriptionMapper {
+
+    /** Link to the HBase Configuration object and specific mailbox names */
+    private final Configuration conf;
+
+    public HBaseSubscriptionMapper(Configuration conf) {
+        this.conf = conf;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.mailbox.store.user.SubscriptionMapper#findMailboxSubscriptionForUser(java.lang.String, java.lang.String)
+     */
+    @Override
+    public Subscription findMailboxSubscriptionForUser(String user, String mailbox) throws SubscriptionException {
+        HTable subscriptions = null;
+        try {
+            subscriptions = new HTable(conf, SUBSCRIPTIONS_TABLE);
+            Subscription subscription = null;
+            Get get = new Get(Bytes.toBytes(user));
+            get.addFamily(SUBSCRIPTION_CF);
+            Result result = subscriptions.get(get);
+
+            if (!result.isEmpty()) {
+                if (result.containsColumn(SUBSCRIPTION_CF, Bytes.toBytes(mailbox))) {
+                    subscription = new SimpleSubscription(user, mailbox);
+                    return subscription;
+                }
+            }
+            return null;
+        } catch (IOException e) {
+            throw new SubscriptionException(e);
+        } finally {
+            if (subscriptions != null) {
+                try {
+                    subscriptions.close();
+                } catch (IOException ex) {
+                    throw new SubscriptionException(ex);
+                }
+            }
+        }
+    }
+
+    /**
+     * @throws SubscriptionException 
+     * @see org.apache.james.mailbox.store.user.SubscriptionMapper#save(Subscription)
+     */
+    @Override
+    public void save(Subscription subscription) throws SubscriptionException {
+        //TODO: maybe switch to checkAndPut
+        HTable subscriptions = null;
+        try {
+            subscriptions = new HTable(conf, SUBSCRIPTIONS_TABLE);
+            Put put = toPut(subscription);
+            subscriptions.put(put);
+        } catch (IOException e) {
+            throw new SubscriptionException(e);
+        } finally {
+            if (subscriptions != null) {
+                try {
+                    subscriptions.close();
+                } catch (IOException ex) {
+                    throw new SubscriptionException(ex);
+                }
+            }
+        }
+    }
+
+    /**
+     * @throws SubscriptionException 
+     * @see org.apache.james.mailbox.store.user.SubscriptionMapper#findSubscriptionsForUser(java.lang.String)
+     */
+    @Override
+    public List<Subscription> findSubscriptionsForUser(String user) throws SubscriptionException {
+        HTable subscriptions = null;
+        try {
+            subscriptions = new HTable(conf, SUBSCRIPTIONS_TABLE);
+            List<Subscription> subscriptionList = new ArrayList<Subscription>();
+            Get get = new Get(Bytes.toBytes(user));
+            get.addFamily(SUBSCRIPTION_CF);
+            Result result = subscriptions.get(get);
+            if (!result.isEmpty()) {
+                List<KeyValue> columns = result.list();
+                for (KeyValue key : columns) {
+                    subscriptionList.add(new SimpleSubscription(user, Bytes.toString(key.getQualifier())));
+                }
+            }
+            return subscriptionList;
+        } catch (IOException e) {
+            throw new SubscriptionException(e);
+        } finally {
+            if (subscriptions != null) {
+                try {
+                    subscriptions.close();
+                } catch (IOException ex) {
+                    throw new SubscriptionException(ex);
+                }
+            }
+        }
+    }
+
+    /**
+     * @throws SubscriptionException 
+     * @see org.apache.james.mailbox.store.user.SubscriptionMapper#delete(Subscription)
+     */
+    @Override
+    public void delete(Subscription subscription) throws SubscriptionException {
+        //TODO: maybe switch to checkAndDelete
+        HTable subscriptions = null;
+        try {
+            subscriptions = new HTable(conf, SUBSCRIPTIONS_TABLE);
+            Delete delete = new Delete(Bytes.toBytes(subscription.getUser()));
+            delete.deleteColumns(SUBSCRIPTION_CF, Bytes.toBytes(subscription.getMailbox()));
+            subscriptions.delete(delete);
+            subscriptions.close();
+        } catch (IOException e) {
+            throw new SubscriptionException(e);
+        } finally {
+            if (subscriptions != null) {
+                try {
+                    subscriptions.close();
+                } catch (IOException ex) {
+                    throw new SubscriptionException(ex);
+                }
+            }
+        }
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message