james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject svn commit: r385301 - in /james/server/trunk/src/java/org/apache/james/mailrepository: JDBCMailRepository.java MessageInputStream.java
Date Sun, 12 Mar 2006 15:49:49 GMT
Author: bago
Date: Sun Mar 12 07:49:48 2006
New Revision: 385301

URL: http://svn.apache.org/viewcvs?rev=385301&view=rev
Log:
Stream body in JDBCMailRepository.store(Mail) (JAMES-158 / and partially JAMES-134)
Hardcoded behaviour is to use bytearrays for messages smaller than 4k and piped streams (and
the worker thread) for larger messages.
Please note that this patch need the previous patch (about correctness of the message size)
applied or will not work (most dbs will reject to store a stream of unknown size)
Tested this with mysql, oracle and derby. Currently Mysql driver will use an "in driver" buffer
to store the full message. Only mysql 4.1+ with connector/j 5.0+ properly handle streaming
to db.

Added:
    james/server/trunk/src/java/org/apache/james/mailrepository/MessageInputStream.java
Modified:
    james/server/trunk/src/java/org/apache/james/mailrepository/JDBCMailRepository.java

Modified: james/server/trunk/src/java/org/apache/james/mailrepository/JDBCMailRepository.java
URL: http://svn.apache.org/viewcvs/james/server/trunk/src/java/org/apache/james/mailrepository/JDBCMailRepository.java?rev=385301&r1=385300&r2=385301&view=diff
==============================================================================
--- james/server/trunk/src/java/org/apache/james/mailrepository/JDBCMailRepository.java (original)
+++ james/server/trunk/src/java/org/apache/james/mailrepository/JDBCMailRepository.java Sun
Mar 12 07:49:48 2006
@@ -36,7 +36,6 @@
 import org.apache.james.context.AvalonContextUtilities;
 import org.apache.james.core.MailImpl;
 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
-import org.apache.james.core.MimeMessageUtil;
 import org.apache.james.core.MimeMessageWrapper;
 import org.apache.james.services.MailRepository;
 import org.apache.james.util.JDBCUtil;
@@ -47,13 +46,13 @@
 
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.ObjectInputStream;
-import java.io.OutputStream;
 import java.sql.Blob;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -661,40 +660,23 @@
                 } else {
                     saveBody = true;
                 }
-
+                
                 if (saveBody) {
+                    PreparedStatement updateMessageBody = 
+                        conn.prepareStatement(sqlQueries.getSqlString("updateMessageBodySQL",
true));
                     try {
-                        updateMessage =
-                            conn.prepareStatement(sqlQueries.getSqlString("updateMessageBodySQL",
true));
-                        ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
-                        OutputStream bodyOut = null;
-                        try {
-                            if (sr == null) {
-                                //If there is no filestore, use the byte array to store headers
-                                //  and the body
-                                bodyOut = headerOut;
-                            } else {
-                                //Store the body in the stream repository
-                                bodyOut = sr.put(mc.getName());
-                            }
-        
-                            //Write the message to the headerOut and bodyOut.  bodyOut goes
straight to the file
-                            MimeMessageUtil.writeTo(mc.getMessage(), headerOut, bodyOut);
-        
-                            //Store the headers in the database
-                            ByteArrayInputStream headerInputStream =
-                                new ByteArrayInputStream(headerOut.toByteArray());
-                            updateMessage.setBinaryStream(1, headerInputStream, headerOut.size());
-                        } finally {
-                            closeOutputStreams(headerOut, bodyOut);
-                        }
-                        updateMessage.setString(2, mc.getName());
-                        updateMessage.setString(3, repositoryName);
-                        updateMessage.execute();
+                        MessageInputStream is = new MessageInputStream(mc,sr);
+                        updateMessageBody.setBinaryStream(1,is,(int) is.getSize());
+                        updateMessageBody.setString(2, mc.getName());
+                        updateMessageBody.setString(3, repositoryName);
+                        updateMessageBody.execute();
+                        
                     } finally {
-                        theJDBCUtil.closeJDBCStatement(updateMessage);
+                        theJDBCUtil.closeJDBCStatement(updateMessageBody);
                     }
                 }
+                
+
             } else {
                 //Insert the record into the database
                 PreparedStatement insertMessage = null;
@@ -723,31 +705,11 @@
                     insertMessage.setString(7, mc.getRemoteHost());
                     insertMessage.setString(8, mc.getRemoteAddr());
                     insertMessage.setTimestamp(9, new java.sql.Timestamp(mc.getLastUpdated().getTime()));
-                    MimeMessage messageBody = mc.getMessage();
-    
-                    ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
-                    OutputStream bodyOut = null;
-                    try {
-                        if (sr == null) {
-                            //If there is no sr, then use the same byte array to hold the
headers
-                            //  and the body
-                            bodyOut = headerOut;
-                        } else {
-                            //Store the body in the file system.
-                            bodyOut = sr.put(mc.getName());
-                        }
-        
-                        //Write the message to the headerOut and bodyOut.  bodyOut goes straight
to the file
-                        MimeMessageUtil.writeTo(messageBody, headerOut, bodyOut);
 
-                        ByteArrayInputStream headerInputStream =
-                            new ByteArrayInputStream(headerOut.toByteArray());
-                        insertMessage.setBinaryStream(10, headerInputStream, headerOut.size());
-                    } finally {
-                        closeOutputStreams(headerOut, bodyOut);
-                    }
-                    //Store the headers in the database
+                    MessageInputStream is = new MessageInputStream(mc,sr);
 
+                    insertMessage.setBinaryStream(10, is, (int) is.getSize());
+                    
                     //Store attributes
                     if (number_of_parameters > 10) {
                         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -784,11 +746,13 @@
                 }
             }
 
+
             conn.commit();
             conn.setAutoCommit(true);
 
         } catch (Exception e) {
-            throw new MessagingException("Exception caught while storing mail Container:
" + e);
+            getLogger().error("Exception caught while storing mail Container",e);
+            throw new MessagingException("Exception caught while storing mail Container:
",e);
         } finally {
             theJDBCUtil.closeJDBCConnection(conn);
             if (!wasLocked) {
@@ -860,7 +824,7 @@
                         try {
                             byte[] serialized_attr = null;
                             String getAttributesOption = sqlQueries.getDbOption("getAttributes");
-                            if (getAttributesOption != null && getAttributesOption.equalsIgnoreCase("useBlob"))
{
+                            if (getAttributesOption != null && (getAttributesOption.equalsIgnoreCase("useBlob")
|| getAttributesOption.equalsIgnoreCase("useBinaryStream"))) {
                                 Blob b = rsMessageAttr.getBlob(1);
                                 serialized_attr = b.getBytes(1, (int)b.length());
                             } else {
@@ -1091,31 +1055,5 @@
             count += chars[i]=='?' ? 1 : 0;
         }
         return count;
-    }
-
-    /**
-     * Closes output streams used to update message
-     * 
-     * @param headerStream the stream containing header information - potentially the same
-     *               as the body stream
-     * @param bodyStream the stream containing body information
-     */
-    private void closeOutputStreams(OutputStream headerStream, OutputStream bodyStream) {
-        try {
-            // If the header stream is not the same as the body stream,
-            // close the header stream here.
-            if ((headerStream != null) && (headerStream != bodyStream)) {
-                headerStream.close();
-            }
-        } catch (IOException ioe) {
-            getLogger().debug("JDBCMailRepository: Unexpected exception while closing output
stream.");
-        }
-        try {
-            if (bodyStream != null) {
-                bodyStream.close();
-            }
-        } catch (IOException ioe) {
-            getLogger().debug("JDBCMailRepository: Unexpected exception while closing output
stream.");
-        }
     }
 }

Added: james/server/trunk/src/java/org/apache/james/mailrepository/MessageInputStream.java
URL: http://svn.apache.org/viewcvs/james/server/trunk/src/java/org/apache/james/mailrepository/MessageInputStream.java?rev=385301&view=auto
==============================================================================
--- james/server/trunk/src/java/org/apache/james/mailrepository/MessageInputStream.java (added)
+++ james/server/trunk/src/java/org/apache/james/mailrepository/MessageInputStream.java Sun
Mar 12 07:49:48 2006
@@ -0,0 +1,250 @@
+package org.apache.james.mailrepository;
+
+import org.apache.avalon.cornerstone.services.store.StreamRepository;
+import org.apache.james.core.MimeMessageUtil;
+import org.apache.mailet.Mail;
+
+import javax.mail.MessagingException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+
+/**
+ * This class provides an inputStream for a Mail object.
+ * If the Mail is larger than 4KB it uses Piped streams and a worker threads
+ * Otherwise it simply create a temporary byte buffer and does not create
+ * the worker thread.
+ * 
+ * Note: Javamail (or the Activation Framework) already uses a worker threads when
+ * asked for an inputstream.
+ */
+final class MessageInputStream extends InputStream {
+    
+    /**
+     * The size of the current message
+     */
+    private long size = -1;
+    /**
+     * The wrapped stream (Piped or Binary)
+     */
+    private InputStream wrapped;
+    /**
+     * If an excaption happens in the worker threads it's stored here
+     */
+    private Exception caughtException;
+    /**
+     * Stream repository used for dbfiles (null otherwise)
+     */
+    private StreamRepository streamRep;
+    
+    /**
+     * Main constructor. If srep is not null than we are using dbfiles and we stream
+     * the body to file and only the header to db.
+     */
+    public MessageInputStream(Mail mc, StreamRepository srep) throws IOException, MessagingException
{
+        super();
+        caughtException = null;
+        streamRep = srep;
+        size = mc.getMessageSize();
+        if (size > 4096) {
+            PipedOutputStream headerOut = new PipedOutputStream();
+            new Thread() {
+                private Mail mail;
+
+                private PipedOutputStream out;
+
+                public void run() {
+                    try {
+                        writeStream(mail,out);
+                    } catch (IOException e) {
+                        caughtException = e;
+                    } catch (MessagingException e) {
+                        caughtException = e;
+                    }
+                }
+
+                public Thread setParam(Mail mc, PipedOutputStream headerOut) {
+                    this.mail = mc;
+                    this.out = headerOut;
+                    return this;
+                }
+            }.setParam(mc,(PipedOutputStream) headerOut).start();
+            wrapped = new PipedInputStream(headerOut);
+        } else {
+            ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
+            writeStream(mc,headerOut);
+            wrapped = new ByteArrayInputStream(headerOut.toByteArray());
+        }
+    }
+    
+    /**
+     * Returns the size of the full message
+     */
+    public long getSize() {
+        return size;
+    }
+
+    /**
+     * write the full mail to the stream
+     * This can be used by this object or by the worker threads.
+     */
+    private void writeStream(Mail mail, OutputStream out) throws IOException, MessagingException
{
+        OutputStream bodyOut = null;
+        try {
+            if (streamRep == null) {
+                //If there is no filestore, use the byte array to store headers
+                //  and the body
+                bodyOut = out;
+            } else {
+                //Store the body in the stream repository
+                bodyOut = streamRep.put(mail.getName());
+            }
+        
+            //Write the message to the headerOut and bodyOut.  bodyOut goes straight to the
file
+            MimeMessageUtil.writeTo(mail.getMessage(), out, bodyOut);
+            out.flush();
+            bodyOut.flush();
+        
+        } finally {
+            closeOutputStreams(out, bodyOut);
+        }
+    }
+
+    private void throwException() throws IOException {
+        try {
+            if (wrapped == null) {
+                throw new IOException("wrapped stream does not exists anymore");
+            } else if (caughtException instanceof IOException) {
+                throw (IOException) caughtException;
+            } else {
+                throw new IOException("Exception caugth in worker thread "+caughtException.getMessage())
{
+                    /**
+                     * @see java.lang.Throwable#getCause()
+                     */
+                    public Throwable getCause() {
+                        return caughtException;
+                    }
+                };
+            }
+        } finally {
+            caughtException = null;
+            wrapped = null;
+        }
+    }
+
+
+    /**
+     * Closes output streams used to update message
+     * 
+     * @param headerStream the stream containing header information - potentially the same
+     *               as the body stream
+     * @param bodyStream the stream containing body information
+     * @throws IOException 
+     */
+    private void closeOutputStreams(OutputStream headerStream, OutputStream bodyStream) throws
IOException {
+        try {
+            // If the header stream is not the same as the body stream,
+            // close the header stream here.
+            if ((headerStream != null) && (headerStream != bodyStream)) {
+                headerStream.close();
+            }
+        } finally {
+            if (bodyStream != null) {
+                bodyStream.close();
+            }
+        }
+    }
+
+    // wrapper methods
+
+    /**
+     * @see java.io.InputStream#available()
+     */
+    public int available() throws IOException {
+        if (caughtException != null || wrapped == null) {
+            throwException();
+        }
+        return wrapped.available();
+    }
+
+    /**
+     * @see java.io.Closeable#close()
+     */
+    public void close() throws IOException {
+        if (caughtException != null || wrapped == null) {
+            throwException();
+        }
+        wrapped.close();
+        wrapped = null;
+    }
+
+    /**
+     * @see java.io.InputStream#mark(int)
+     */
+    public synchronized void mark(int arg0) {
+        wrapped.mark(arg0);
+    }
+
+    /**
+     * @see java.io.InputStream#markSupported()
+     */
+    public boolean markSupported() {
+        return wrapped.markSupported();
+    }
+
+    /**
+     * @see java.io.InputStream#read(byte[], int, int)
+     */
+    public int read(byte[] arg0, int arg1, int arg2) throws IOException {
+        if (caughtException != null || wrapped == null) {
+            throwException();
+        }
+        return wrapped.read(arg0, arg1, arg2);
+    }
+
+    /**
+     * @see java.io.InputStream#read(byte[])
+     */
+    public int read(byte[] arg0) throws IOException {
+        if (caughtException != null || wrapped == null) {
+            throwException();
+        }
+        return wrapped.read(arg0);
+    }
+
+    /**
+     * @see java.io.InputStream#reset()
+     */
+    public synchronized void reset() throws IOException {
+        if (caughtException != null || wrapped == null) {
+            throwException();
+        }
+        wrapped.reset();
+    }
+
+    /**
+     * @see java.io.InputStream#skip(long)
+     */
+    public long skip(long arg0) throws IOException {
+        if (caughtException != null || wrapped == null) {
+            throwException();
+        }
+        return wrapped.skip(arg0);
+    }
+
+    /**
+     * @see java.io.InputStream#read()
+     */
+    public int read() throws IOException {
+        if (caughtException != null || wrapped == null) {
+            throwException();
+        }
+        return wrapped.read();
+    }
+
+}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message