servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lh...@apache.org
Subject svn commit: r719558 - in /servicemix/components/bindings/servicemix-mail/trunk/src/main/java/org/apache/servicemix/mail: MailPollerEndpoint.java MailSenderEndpoint.java
Date Fri, 21 Nov 2008 11:21:10 GMT
Author: lhein
Date: Fri Nov 21 03:21:00 2008
New Revision: 719558

URL: http://svn.apache.org/viewvc?rev=719558&view=rev
Log:
fixed SM-1692

Modified:
    servicemix/components/bindings/servicemix-mail/trunk/src/main/java/org/apache/servicemix/mail/MailPollerEndpoint.java
    servicemix/components/bindings/servicemix-mail/trunk/src/main/java/org/apache/servicemix/mail/MailSenderEndpoint.java

Modified: servicemix/components/bindings/servicemix-mail/trunk/src/main/java/org/apache/servicemix/mail/MailPollerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-mail/trunk/src/main/java/org/apache/servicemix/mail/MailPollerEndpoint.java?rev=719558&r1=719557&r2=719558&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-mail/trunk/src/main/java/org/apache/servicemix/mail/MailPollerEndpoint.java
(original)
+++ servicemix/components/bindings/servicemix-mail/trunk/src/main/java/org/apache/servicemix/mail/MailPollerEndpoint.java
Fri Nov 21 03:21:00 2008
@@ -16,9 +16,13 @@
  */
 package org.apache.servicemix.mail;
 
+import com.sun.mail.pop3.POP3Folder;
+
+import java.io.IOException;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import javax.jbi.JBIException;
@@ -43,8 +47,6 @@
 import org.apache.servicemix.mail.utils.MailConnectionConfiguration;
 import org.apache.servicemix.mail.utils.MailUtils;
 
-import com.sun.mail.pop3.POP3Folder;
-
 /**
  * This is the polling endpoint for the mail component.
  * 
@@ -56,8 +58,8 @@
 
     private AbstractMailMarshaler marshaler = new DefaultMailMarshaler();
 
-    private List<String> seenMessages;
-    
+    private List<String> seenMessages = Collections.synchronizedList(new LinkedList<String>());
+
     private String customTrustManagers;
 
     private MailConnectionConfiguration config;
@@ -71,11 +73,17 @@
     private boolean deleteProcessedMessages;
 
     private boolean debugMode;
-    
+
     private boolean forgetTopHeaders;
-    
+
     private boolean disableTop;
 
+    private Map<String, String> customProperties;
+
+    private List<String> foundMessagesInFolder = Collections.synchronizedList(new LinkedList<String>());
+
+    private org.apache.servicemix.store.Store storage;
+
     /**
      * default constructor
      */
@@ -89,8 +97,8 @@
 
     /*
      * (non-Javadoc)
-     * 
-     * @see org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI()
+     * @see
+     * org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI()
      */
     @Override
     public String getLocationURI() {
@@ -100,8 +108,55 @@
 
     /*
      * (non-Javadoc)
-     * 
-     * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
+     * @see org.apache.servicemix.common.endpoints.PollingEndpoint#start()
+     */
+    @Override
+    public synchronized void start() throws Exception {
+        super.start();
+
+        if (this.storage != null) {
+            String id = config.getUsername() + " @ " + config.getHost();
+            try {
+                // load the list of seen messages
+                List<String> loadedMsg = (List<String>)this.storage.load(id);
+                if (loadedMsg != null && !loadedMsg.isEmpty()) {
+                    for (String uid : loadedMsg) {
+                        if (!this.seenMessages.contains(uid)) {
+                            this.seenMessages.add(uid);
+                        }
+                    }
+                    loadedMsg.clear();
+                }
+            } catch (IOException ioex) {
+                logger.error("Error loading seen messages for: " + id, ioex);
+            }
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.servicemix.common.endpoints.PollingEndpoint#stop()
+     */
+    @Override
+    public synchronized void stop() throws Exception {
+        if (this.storage != null) {
+            String id = config.getUsername() + " @ " + config.getHost();
+            try {
+                // save the list of seen messages
+                this.storage.store(id, this.seenMessages);
+            } catch (IOException ioex) {
+                logger.error("Error saving list of seen messages for: " + id, ioex);
+            }
+        }
+
+        super.stop();
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see
+     * org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging
+     * .MessageExchange)
      */
     public void process(MessageExchange arg0) throws Exception {
         // Do nothing. In our case, this method should never be called
@@ -110,7 +165,6 @@
 
     /*
      * (non-Javadoc)
-     * 
      * @see org.apache.servicemix.components.util.PollingComponentSupport#poll()
      */
     public void poll() throws Exception {
@@ -121,21 +175,22 @@
             return;
         }
 
-        boolean isPopProtocol = this.config.getProtocol().toLowerCase().indexOf("pop") >
-1 ;
-        
-        // check if protocol is POP like and setup seen messages storage properly
-        if (isPopProtocol && this.seenMessages == null) {
-        	this.seenMessages = Collections.synchronizedList(new LinkedList<String>());
-        }
-        
+        boolean isPopProtocol = this.config.getProtocol().toLowerCase().indexOf("pop") >
-1;
+
+        // clear the list each run
+        this.foundMessagesInFolder.clear();
+
         Store store = null;
         Folder folder = null;
         Session session = null;
         try {
             Properties props = MailUtils.getPropertiesForProtocol(this.config, this.customTrustManagers);
             props.put("mail.debug", isDebugMode() ? "true" : "false");
-           	props.put("mail.pop3.forgettopheaders", isForgetTopHeaders() ? "true" : "false");
-           	props.put("mail.pop3.disabletop", isDisableTop() ? "true" : "false");
+            props.put("mail.pop3.forgettopheaders", isForgetTopHeaders() ? "true" : "false");
+            props.put("mail.pop3.disabletop", isDisableTop() ? "true" : "false");
+
+            // apply the custom properties
+            applyCustomProperties(props);
 
             // Get session
             session = Session.getInstance(props, config.getAuthenticator());
@@ -159,65 +214,83 @@
             }
 
             String uid = null;
-            
+
             int fetchSize = getMaxFetchSize() == -1 ? messages.length : Math.min(getMaxFetchSize(),
                                                                                  messages.length);
-            for (int cnt = 0; cnt < fetchSize; cnt++) {
+            int fetchedMessages = 0;
+
+            for (int cnt = 0; cnt < messages.length; cnt++) {
                 // get the message
                 MimeMessage mailMsg = (MimeMessage)messages[cnt];
 
                 if (isProcessOnlyUnseenMessages() && isPopProtocol) {
-                	// POP3 doesn't support flags, so we need to check manually if message is
new or not
-                	if (folder instanceof POP3Folder) {
-                	    POP3Folder pf = (POP3Folder)folder;
-                	    uid = pf.getUID(mailMsg);
-                	    if (uid != null && this.seenMessages.contains(uid)) {
-                	    	// this message was already processed
-                	    	uid = null;
-                	    	continue;
-                	    }
-                	}
-                }
-                
-                // create a inOnly exchange
-                InOnly io = getExchangeFactory().createInOnlyExchange();
-
-                // configure the exchange target
-                configureExchangeTarget(io);
-
-                // create the in message
-                NormalizedMessage normalizedMessage = io.createMessage();
-
-                // now let the marshaller convert the mail into a normalized
-                // message to send to jbi bus
-                marshaler.convertMailToJBI(io, normalizedMessage, mailMsg);
-
-                // then put the in message into the inOnly exchange
-                io.setInMessage(normalizedMessage);
-
-                // and use sendSync to deliver it
-                sendSync(io);
-
-                // now check if delivery succeeded or went wrong
-                if (io.getStatus() == ExchangeStatus.ERROR) {
-                    Exception e = io.getError();
-                    if (e == null) {
-                        e = new JBIException("Unexpected error occured...");
+                    // POP3 doesn't support flags, so we need to check manually
+                    // if message is new or not
+                    if (folder instanceof POP3Folder) {
+                        POP3Folder pf = (POP3Folder)folder;
+                        uid = pf.getUID(mailMsg);
+
+                        // remember each found message
+                        if (uid != null) {
+                            foundMessagesInFolder.add(uid);
+                        }
+
+                        // check if we already processed the message
+                        if (uid != null && this.seenMessages.contains(uid)) {
+                            // this message was already processed
+                            uid = null;
+                            continue;
+                        }
                     }
-                    throw e;
-                } else {
-                    // then mark the mail as processed (only if no errors)
-                    if (deleteProcessedMessages) {
-                        // processed messages have to be marked as deleted
-                        mailMsg.setFlag(Flags.Flag.DELETED, true);
+                }
+
+                // only process a message if the max message fetch size isn't
+                // exceeded then
+                if (fetchedMessages < fetchSize) {
+                    // create a inOnly exchange
+                    InOnly io = getExchangeFactory().createInOnlyExchange();
+
+                    // configure the exchange target
+                    configureExchangeTarget(io);
+
+                    // create the in message
+                    NormalizedMessage normalizedMessage = io.createMessage();
+
+                    // now let the marshaller convert the mail into a normalized
+                    // message to send to jbi bus
+                    marshaler.convertMailToJBI(io, normalizedMessage, mailMsg);
+
+                    // then put the in message into the inOnly exchange
+                    io.setInMessage(normalizedMessage);
+
+                    // and use sendSync to deliver it
+                    sendSync(io);
+
+                    // increment the fetched messages counter
+                    fetchedMessages++;
+
+                    // now check if delivery succeeded or went wrong
+                    if (io.getStatus() == ExchangeStatus.ERROR) {
+                        Exception e = io.getError();
+                        if (e == null) {
+                            e = new JBIException("Unexpected error occured...");
+                        }
+                        throw e;
                     } else {
-                        // processed messages have to be marked as seen
-                        mailMsg.setFlag(Flags.Flag.SEEN, true);
-                    }
-                    // remember the processed mail if needed
-                    if (isProcessOnlyUnseenMessages() && isPopProtocol &&
uid != null) {
-                    	// POP3 doesn't support flags, so we need to remember processed mails
-                    	this.seenMessages.add(uid);
+                        // then mark the mail as processed (only if no errors)
+                        if (deleteProcessedMessages) {
+                            // processed messages have to be marked as deleted
+                            mailMsg.setFlag(Flags.Flag.DELETED, true);
+                        } else {
+                            // processed messages have to be marked as seen
+                            mailMsg.setFlag(Flags.Flag.SEEN, true);
+                        }
+                        // remember the processed mail if needed
+                        if (isProcessOnlyUnseenMessages() && isPopProtocol &&
uid != null) {
+                            // POP3 doesn't support flags, so we need to
+                            // remember processed mails
+                            this.seenMessages.add(uid);
+                        }
                     }
                 }
             }
@@ -230,6 +303,11 @@
                 if (store != null) {
                     store.close();
                 }
+                // clean up the seen messages list because of maybe deleted
+                // messages
+                if (isProcessOnlyUnseenMessages() && isPopProtocol) {
+                    cleanUpSeenMessages();
+                }
             } catch (Exception ignored) {
                 logger.debug(ignored);
             }
@@ -237,6 +315,39 @@
     }
 
     /**
+     * this method will check if a seen message was deleted from mail folder and
+     * remove this from the list of messages already seen
+     */
+    private synchronized void cleanUpSeenMessages() {
+        List<String> uidsToRemove = new LinkedList<String>();
+        // first collect all uid's to remove
+        for (String uid : seenMessages) {
+            if (!foundMessagesInFolder.contains(uid)) {
+                // the message was deleted from the mail folder, so delete it
+                // also from the seen messages list as well
+                uidsToRemove.add(uid);
+            }
+        }
+        // now remove them
+        for (String uid : uidsToRemove) {
+            seenMessages.remove(uid);
+        }
+    }
+
+    /**
+     * applies custom properties to the used properties for mail server
+     * connection
+     * 
+     * @param props the properties to apply to
+     */
+    private void applyCustomProperties(Properties props) {
+        // allow custom properties
+        if (customProperties != null) {
+            props.putAll(customProperties);
+        }
+    }
+
+    /**
      * @return the deleteProcessedMessages
      */
     public boolean isDeleteProcessedMessages() {
@@ -244,8 +355,7 @@
     }
 
     /**
-     * @param deleteProcessedMessages
-     *            the deleteProcessedMessages to set
+     * @param deleteProcessedMessages the deleteProcessedMessages to set
      */
     public void setDeleteProcessedMessages(boolean deleteProcessedMessages) {
         this.deleteProcessedMessages = deleteProcessedMessages;
@@ -259,8 +369,7 @@
     }
 
     /**
-     * @param marshaler
-     *            the marshaler to set
+     * @param marshaler the marshaler to set
      */
     public void setMarshaler(AbstractMailMarshaler marshaler) {
         this.marshaler = marshaler;
@@ -274,8 +383,7 @@
     }
 
     /**
-     * @param maxFetchSize
-     *            the maxFetchSize to set
+     * @param maxFetchSize the maxFetchSize to set
      */
     public void setMaxFetchSize(int maxFetchSize) {
         this.maxFetchSize = maxFetchSize;
@@ -289,8 +397,7 @@
     }
 
     /**
-     * @param processOnlyUnseenMessages
-     *            the processOnlyUnseenMessages to set
+     * @param processOnlyUnseenMessages the processOnlyUnseenMessages to set
      */
     public void setProcessOnlyUnseenMessages(boolean processOnlyUnseenMessages) {
         this.processOnlyUnseenMessages = processOnlyUnseenMessages;
@@ -308,8 +415,7 @@
     /**
      * sets the connection uri
      * 
-     * @param connection
-     *            The connection to set.
+     * @param connection The connection to set.
      */
     public void setConnection(String connection) {
         this.connection = connection;
@@ -328,8 +434,7 @@
     }
 
     /**
-     * @param debugMode
-     *            the debugMode to set
+     * @param debugMode the debugMode to set
      */
     public void setDebugMode(boolean debugMode) {
         this.debugMode = debugMode;
@@ -343,38 +448,65 @@
     }
 
     /**
-     * @param customTrustManagers
-     *            the customTrustManagers to set
+     * @param customTrustManagers the customTrustManagers to set
      */
     public void setCustomTrustManagers(String customTrustManagers) {
         this.customTrustManagers = customTrustManagers;
     }
 
-	/**
-	 * @return the forgetTopHeaders
-	 */
-	public boolean isForgetTopHeaders() {
-		return this.forgetTopHeaders;
-	}
-
-	/**
-	 * @param forgetTopHeaders the forgetTopHeaders to set
-	 */
-	public void setForgetTopHeaders(boolean forgetTopHeaders) {
-		this.forgetTopHeaders = forgetTopHeaders;
-	}
-
-	/**
-	 * @return the disableTop
-	 */
-	public boolean isDisableTop() {
-		return this.disableTop;
-	}
-
-	/**
-	 * @param disableTop the disableTop to set
-	 */
-	public void setDisableTop(boolean disableTop) {
-		this.disableTop = disableTop;
-	}
+    /**
+     * @return the forgetTopHeaders
+     */
+    public boolean isForgetTopHeaders() {
+        return this.forgetTopHeaders;
+    }
+
+    /**
+     * @param forgetTopHeaders the forgetTopHeaders to set
+     */
+    public void setForgetTopHeaders(boolean forgetTopHeaders) {
+        this.forgetTopHeaders = forgetTopHeaders;
+    }
+
+    /**
+     * @return the disableTop
+     */
+    public boolean isDisableTop() {
+        return this.disableTop;
+    }
+
+    /**
+     * @param disableTop the disableTop to set
+     */
+    public void setDisableTop(boolean disableTop) {
+        this.disableTop = disableTop;
+    }
+
+    /**
+     * * @return Returns the customProperties.
+     */
+    public Map<String, String> getCustomProperties() {
+        return this.customProperties;
+    }
+
+    /**
+     * @param customProperties The customProperties to set.
+     */
+    public void setCustomProperties(Map<String, String> customProperties) {
+        this.customProperties = customProperties;
+    }
+
+    /**
+     * * @return Returns the storage.
+     */
+    public org.apache.servicemix.store.Store getStorage() {
+        return this.storage;
+    }
+
+    /**
+     * @param storage The storage to set.
+     */
+    public void setStorage(org.apache.servicemix.store.Store storage) {
+        this.storage = storage;
+    }
 }

Modified: servicemix/components/bindings/servicemix-mail/trunk/src/main/java/org/apache/servicemix/mail/MailSenderEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-mail/trunk/src/main/java/org/apache/servicemix/mail/MailSenderEndpoint.java?rev=719558&r1=719557&r2=719558&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-mail/trunk/src/main/java/org/apache/servicemix/mail/MailSenderEndpoint.java
(original)
+++ servicemix/components/bindings/servicemix-mail/trunk/src/main/java/org/apache/servicemix/mail/MailSenderEndpoint.java
Fri Nov 21 03:21:00 2008
@@ -16,6 +16,7 @@
  */
 package org.apache.servicemix.mail;
 
+import java.util.Map;
 import java.util.Properties;
 
 import javax.jbi.management.DeploymentException;
@@ -53,10 +54,10 @@
     private String sender;
     private String receiver;
     private boolean debugMode;
+    private Map<String, String> customProperties;
 
     /*
      * (non-Javadoc)
-     * 
      * @see org.apache.servicemix.common.Endpoint#validate()
      */
     public void validate() throws DeploymentException {
@@ -73,9 +74,10 @@
 
     /*
      * (non-Javadoc)
-     * 
-     * @see org.apache.servicemix.common.endpoints.ProviderEndpoint#processInOnly(javax.jbi.messaging.MessageExchange,
-     *      javax.jbi.messaging.NormalizedMessage)
+     * @see
+     * org.apache.servicemix.common.endpoints.ProviderEndpoint#processInOnly
+     * (javax.jbi.messaging.MessageExchange,
+     * javax.jbi.messaging.NormalizedMessage)
      */
     @Override
     protected void processInOnly(MessageExchange exchange, NormalizedMessage in) throws Exception
{
@@ -95,6 +97,9 @@
                 Properties props = MailUtils.getPropertiesForProtocol(this.config, this.customTrustManagers);
                 props.put("mail.debug", isDebugMode() ? "true" : "false");
 
+                // apply the custom properties
+                applyCustomProperties(props);
+
                 // Get session
                 session = Session.getInstance(props, config.getAuthenticator());
 
@@ -107,8 +112,9 @@
                 // Connect only once here
                 // Transport.send() disconnects after each send
                 // Usually, no username and password is required for SMTP
-                transport.connect(config.getHost(), config.getPort(), config.getUsername(),
config.getPassword());
-                
+                transport.connect(config.getHost(), config.getPort(), config.getUsername(),
config
+                    .getPassword());
+
                 // Define message
                 MimeMessage msg = new MimeMessage(session);
 
@@ -117,7 +123,7 @@
 
                 // Send message
                 transport.sendMessage(msg, msg.getAllRecipients());
-                
+
                 // close transport
                 transport.close();
             } catch (MessagingException mex) {
@@ -132,9 +138,11 @@
 
     /*
      * (non-Javadoc)
-     * 
-     * @see org.apache.servicemix.common.endpoints.ProviderEndpoint#processInOut(javax.jbi.messaging.MessageExchange,
-     *      javax.jbi.messaging.NormalizedMessage, javax.jbi.messaging.NormalizedMessage)
+     * @see
+     * org.apache.servicemix.common.endpoints.ProviderEndpoint#processInOut(
+     * javax.jbi.messaging.MessageExchange,
+     * javax.jbi.messaging.NormalizedMessage,
+     * javax.jbi.messaging.NormalizedMessage)
      */
     @Override
     protected void processInOut(MessageExchange exchange, NormalizedMessage in, NormalizedMessage
out)
@@ -155,6 +163,9 @@
                 Properties props = MailUtils.getPropertiesForProtocol(this.config, this.customTrustManagers);
                 props.put("mail.debug", isDebugMode() ? "true" : "false");
 
+                // apply the custom properties
+                applyCustomProperties(props);
+
                 // Get session
                 session = Session.getInstance(props, config.getAuthenticator());
 
@@ -167,8 +178,9 @@
                 // Connect only once here
                 // Transport.send() disconnects after each send
                 // Usually, no username and password is required for SMTP
-                transport.connect(config.getHost(), config.getPort(), config.getUsername(),
config.getPassword());
-                                
+                transport.connect(config.getHost(), config.getPort(), config.getUsername(),
config
+                    .getPassword());
+
                 // Define message
                 MimeMessage msg = new MimeMessage(session);
 
@@ -177,7 +189,7 @@
 
                 // Send message
                 transport.sendMessage(msg, msg.getAllRecipients());
-                
+
                 // close transport
                 transport.close();
 
@@ -194,6 +206,19 @@
     }
 
     /**
+     * this will apply the custom properties to the properties map used for
+     * connection to mail server
+     * 
+     * @param props the properties to apply to
+     */
+    private void applyCustomProperties(Properties props) {
+        // allow custom properties
+        if (customProperties != null) {
+            props.putAll(customProperties);
+        }
+    }
+
+    /**
      * @return the marshaler
      */
     public AbstractMailMarshaler getMarshaler() {
@@ -272,7 +297,7 @@
         this.customTrustManagers = customTrustManagers;
     }
 
-    /** 
+    /**
      * @return Returns the receiver.
      */
     public String getReceiver() {
@@ -285,4 +310,18 @@
     public void setReceiver(String receiver) {
         this.receiver = receiver;
     }
+
+    /**
+     * * @return Returns the customProperties.
+     */
+    public Map<String, String> getCustomProperties() {
+        return this.customProperties;
+    }
+
+    /**
+     * @param customProperties The customProperties to set.
+     */
+    public void setCustomProperties(Map<String, String> customProperties) {
+        this.customProperties = customProperties;
+    }
 }



Mime
View raw message