servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ge...@apache.org
Subject svn commit: r572807 - /incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java
Date Tue, 04 Sep 2007 20:09:43 GMT
Author: gertv
Date: Tue Sep  4 13:09:42 2007
New Revision: 572807

URL: http://svn.apache.org/viewvc?rev=572807&view=rev
Log:
Fix for SM-1043: Poller sends invalid MessageExchange when file has been deleted

Modified:
    incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java

Modified: incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java?rev=572807&r1=572806&r2=572807&view=diff
==============================================================================
--- incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java
(original)
+++ incubator/servicemix/trunk/deployables/bindingcomponents/servicemix-ftp/src/main/java/org/apache/servicemix/ftp/FtpPollerEndpoint.java
Tue Sep  4 13:09:42 2007
@@ -1,334 +1,339 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.servicemix.ftp;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.concurrent.locks.Lock;
-
-import javax.jbi.JBIException;
-import javax.jbi.management.DeploymentException;
-import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.InOnly;
-import javax.jbi.messaging.MessageExchange;
-import javax.jbi.messaging.NormalizedMessage;
-import javax.jbi.servicedesc.ServiceEndpoint;
-import javax.xml.namespace.QName;
-
-import org.apache.commons.net.ftp.FTPClient;
-import org.apache.commons.net.ftp.FTPFile;
-import org.apache.servicemix.common.DefaultComponent;
-import org.apache.servicemix.common.ServiceUnit;
-import org.apache.servicemix.common.endpoints.PollingEndpoint;
-import org.apache.servicemix.components.util.DefaultFileMarshaler;
-import org.apache.servicemix.components.util.FileMarshaler;
-import org.apache.servicemix.locks.LockManager;
-import org.apache.servicemix.locks.impl.SimpleLockManager;
-
-/**
- * A polling endpoint which looks for a file or files in a directory
- * and sends the files into the JBI bus as messages, deleting the files
- * by default when they are processed.
- *
- * @org.apache.xbean.XBean element="poller"
- *
- * @version $Revision: 468487 $
- */
-public class FtpPollerEndpoint extends PollingEndpoint implements FtpEndpointType {
-
-    private FTPClientPool clientPool;
-    private FileFilter filter;  
-    private boolean deleteFile = true;
-    private boolean recursive = true;
-    private FileMarshaler marshaler = new DefaultFileMarshaler();
-    private LockManager lockManager;
-    private URI uri;
-
-    public FtpPollerEndpoint() {
-    }
-
-    public FtpPollerEndpoint(ServiceUnit serviceUnit, QName service, String endpoint) {
-        super(serviceUnit, service, endpoint);
-    }
-
-    public FtpPollerEndpoint(DefaultComponent component, ServiceEndpoint endpoint) {
-        super(component, endpoint);
-    }
-
-    public void poll() throws Exception {
-        pollFileOrDirectory(getWorkingPath());
-    }
-
-    public void validate() throws DeploymentException {
-        super.validate();
-        if (uri == null && (getClientPool() == null || getClientPool().getHost()
== null)) {
-            throw new DeploymentException("Property uri or clientPool.host must be configured");
-        }
-        if (uri != null && getClientPool() != null && getClientPool().getHost()
!= null) {
-            throw new DeploymentException("Properties uri and clientPool.host can not be
configured at the same time");
-        }
-    }
-    
-    public void start() throws Exception {
-        if (lockManager == null) {
-            lockManager = createLockManager();
-        }
-        if (clientPool == null) {
-            clientPool = createClientPool();
-        }
-        if (uri != null) {
-            clientPool.setHost(uri.getHost());
-            clientPool.setPort(uri.getPort());
-            if (uri.getUserInfo() != null) {
-                String[] infos = uri.getUserInfo().split(":");
-                clientPool.setUsername(infos[0]);
-                if (infos.length > 1) {
-                    clientPool.setPassword(infos[1]);
-                }
-            }
-        } else {
-            String str = "ftp://" + clientPool.getHost();
-            if (clientPool.getPort() >= 0) {
-                str += ":" + clientPool.getPort();
-            }
-            str += "/";
-            uri = new URI(str);
-        }
-        super.start();
-    }
-    
-    protected LockManager createLockManager() {
-        return new SimpleLockManager();
-    }
-
-    private String getWorkingPath() {
-        return (uri != null && uri.getPath() != null) ? uri.getPath() : ".";
-    }
-
-    // Properties
-    //-------------------------------------------------------------------------
-    /**
-     * @return the clientPool
-     */
-    public FTPClientPool getClientPool() {
-        return clientPool;
-    }
-
-    /**
-     * @param clientPool the clientPool to set
-     */
-    public void setClientPool(FTPClientPool clientPool) {
-        this.clientPool = clientPool;
-    }
-
-    /**
-     * @return the uri
-     */
-    public URI getUri() {
-        return uri;
-    }
-
-    /**
-     * @param uri the uri to set
-     */
-    public void setUri(URI uri) {
-        this.uri = uri;
-    }
-
-    public FileFilter getFilter() {
-        return filter;
-    }
-
-    /**
-     * Sets the optional filter to choose which files to process
-     */
-    public void setFilter(FileFilter filter) {
-        this.filter = filter;
-    }
-
-    /**
-     * Returns whether or not we should delete the file when its processed
-     */
-    public boolean isDeleteFile() {
-        return deleteFile;
-    }
-
-    public void setDeleteFile(boolean deleteFile) {
-        this.deleteFile = deleteFile;
-    }
-
-    public boolean isRecursive() {
-        return recursive;
-    }
-
-    public void setRecursive(boolean recursive) {
-        this.recursive = recursive;
-    }
-
-    public FileMarshaler getMarshaler() {
-        return marshaler;
-    }
-
-    public void setMarshaler(FileMarshaler marshaler) {
-        this.marshaler = marshaler;
-    }
-
-    // Implementation methods
-    //-------------------------------------------------------------------------
-
-
-    protected void pollFileOrDirectory(String fileOrDirectory) throws Exception {
-        FTPClient ftp = borrowClient();
-        try {
-            logger.debug("Polling directory " + fileOrDirectory);
-            pollFileOrDirectory(ftp, fileOrDirectory, isRecursive());
-        } finally {
-            returnClient(ftp);
-        }
-    }
-
-    protected void pollFileOrDirectory(FTPClient ftp, String fileOrDirectory, boolean processDir)
throws Exception {
-        FTPFile[] files = ftp.listFiles(fileOrDirectory);
-        for (int i = 0; i < files.length; i++) {
-            String name = files[i].getName();
-            if (".".equals(name) || "..".equals(name)) {
-                continue; // ignore "." and ".."
-            }
-            String file = fileOrDirectory + "/" + name;
-            // This is a file, process it
-            if (!files[i].isDirectory()) {
-                if (getFilter() == null || getFilter().accept(new File(file))) {
-                    pollFile(file); // process the file
-                }
-            // Only process directories if processDir is true
-            } else if (processDir) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Polling directory " + file);
-                }
-                pollFileOrDirectory(ftp, file, isRecursive());
-            } else {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Skipping directory " + file);
-                }
-            }
-        }
-    }
-
-    protected void pollFile(final String file) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Scheduling file " + file + " for processing");
-        }
-        getExecutor().execute(new Runnable() {
-            public void run() {
-                final Lock lock = lockManager.getLock(file);
-                if (lock.tryLock()) {
-                    boolean unlock = true;
-                    try {
-                        unlock = processFileAndDelete(file);
-                    } finally {
-                        if (unlock) {
-                            lock.unlock();
-                        }
-                    }
-                }
-            }
-        });
-    }
-
-    protected boolean processFileAndDelete(String file) {
-        FTPClient ftp = null;
-        boolean unlock = true;
-        try {
-            ftp = borrowClient();
-            if (logger.isDebugEnabled()) {
-                logger.debug("Processing file " + file);
-            }
-            // Process the file. If processing fails, an exception should be thrown.
-            processFile(ftp, file);
-            // Processing is succesfull
-            // We should not unlock until the file has been deleted
-            unlock = false;
-            if (isDeleteFile()) {
-                if (!ftp.deleteFile(file)) {
-                    throw new IOException("Could not delete file " + file);
-                }
-                unlock = true;
-            }
-        } catch (Exception e) {
-            logger.error("Failed to process file: " + file + ". Reason: " + e, e);
-        } finally {
-            returnClient(ftp);
-        }
-        return unlock;
-    }
-
-    protected void processFile(FTPClient ftp, String file) throws Exception {
-        InputStream in = ftp.retrieveFileStream(file);
-        InOnly exchange = getExchangeFactory().createInOnlyExchange();
-        configureExchangeTarget(exchange);
-        NormalizedMessage message = exchange.createMessage();
-        exchange.setInMessage(message);
-        marshaler.readMessage(exchange, message, in, file);
-        sendSync(exchange);
-        in.close();
-        ftp.completePendingCommand();
-        if (exchange.getStatus() == ExchangeStatus.ERROR) {
-            Exception e = exchange.getError();
-            if (e == null) {
-                e = new JBIException("Unkown error");
-            }
-            throw e;
-        }
-    }
-
-    public String getLocationURI() {
-        return uri.toString();
-    }
-
-    public void process(MessageExchange exchange) throws Exception {
-        // Do nothing. In our case, this method should never be called
-        // as we only send synchronous InOnly exchange
-    }
-
-    protected FTPClientPool createClientPool() throws Exception {
-        FTPClientPool pool = new FTPClientPool();
-        pool.afterPropertiesSet();
-        return pool;
-    }
-
-    protected FTPClient borrowClient() throws JBIException {
-        try {
-            return (FTPClient) getClientPool().borrowClient();
-        } catch (Exception e) {
-            throw new JBIException(e);
-        }
-    }
-
-    protected void returnClient(FTPClient client) {
-        if (client != null) {
-            try {
-                getClientPool().returnClient(client);
-            } catch (Exception e) {
-                logger.error("Failed to return client to pool: " + e, e);
-            }
-        }
-    }
-    
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.ftp;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.concurrent.locks.Lock;
+
+import javax.jbi.JBIException;
+import javax.jbi.management.DeploymentException;
+import javax.jbi.messaging.ExchangeStatus;
+import javax.jbi.messaging.InOnly;
+import javax.jbi.messaging.MessageExchange;
+import javax.jbi.messaging.NormalizedMessage;
+import javax.jbi.servicedesc.ServiceEndpoint;
+import javax.xml.namespace.QName;
+
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.servicemix.common.DefaultComponent;
+import org.apache.servicemix.common.ServiceUnit;
+import org.apache.servicemix.common.endpoints.PollingEndpoint;
+import org.apache.servicemix.components.util.DefaultFileMarshaler;
+import org.apache.servicemix.components.util.FileMarshaler;
+import org.apache.servicemix.locks.LockManager;
+import org.apache.servicemix.locks.impl.SimpleLockManager;
+
+/**
+ * A polling endpoint which looks for a file or files in a directory
+ * and sends the files into the JBI bus as messages, deleting the files
+ * by default when they are processed.
+ *
+ * @org.apache.xbean.XBean element="poller"
+ *
+ * @version $Revision: 468487 $
+ */
+public class FtpPollerEndpoint extends PollingEndpoint implements FtpEndpointType {
+
+    private FTPClientPool clientPool;
+    private FileFilter filter;  
+    private boolean deleteFile = true;
+    private boolean recursive = true;
+    private FileMarshaler marshaler = new DefaultFileMarshaler();
+    private LockManager lockManager;
+    private URI uri;
+
+    public FtpPollerEndpoint() {
+    }
+
+    public FtpPollerEndpoint(ServiceUnit serviceUnit, QName service, String endpoint) {
+        super(serviceUnit, service, endpoint);
+    }
+
+    public FtpPollerEndpoint(DefaultComponent component, ServiceEndpoint endpoint) {
+        super(component, endpoint);
+    }
+
+    public void poll() throws Exception {
+        pollFileOrDirectory(getWorkingPath());
+    }
+
+    public void validate() throws DeploymentException {
+        super.validate();
+        if (uri == null && (getClientPool() == null || getClientPool().getHost()
== null)) {
+            throw new DeploymentException("Property uri or clientPool.host must be configured");
+        }
+        if (uri != null && getClientPool() != null && getClientPool().getHost()
!= null) {
+            throw new DeploymentException("Properties uri and clientPool.host can not be
configured at the same time");
+        }
+    }
+    
+    public void start() throws Exception {
+        if (lockManager == null) {
+            lockManager = createLockManager();
+        }
+        if (clientPool == null) {
+            clientPool = createClientPool();
+        }
+        if (uri != null) {
+            clientPool.setHost(uri.getHost());
+            clientPool.setPort(uri.getPort());
+            if (uri.getUserInfo() != null) {
+                String[] infos = uri.getUserInfo().split(":");
+                clientPool.setUsername(infos[0]);
+                if (infos.length > 1) {
+                    clientPool.setPassword(infos[1]);
+                }
+            }
+        } else {
+            String str = "ftp://" + clientPool.getHost();
+            if (clientPool.getPort() >= 0) {
+                str += ":" + clientPool.getPort();
+            }
+            str += "/";
+            uri = new URI(str);
+        }
+        super.start();
+    }
+    
+    protected LockManager createLockManager() {
+        return new SimpleLockManager();
+    }
+
+    private String getWorkingPath() {
+        return (uri != null && uri.getPath() != null) ? uri.getPath() : ".";
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    /**
+     * @return the clientPool
+     */
+    public FTPClientPool getClientPool() {
+        return clientPool;
+    }
+
+    /**
+     * @param clientPool the clientPool to set
+     */
+    public void setClientPool(FTPClientPool clientPool) {
+        this.clientPool = clientPool;
+    }
+
+    /**
+     * @return the uri
+     */
+    public URI getUri() {
+        return uri;
+    }
+
+    /**
+     * @param uri the uri to set
+     */
+    public void setUri(URI uri) {
+        this.uri = uri;
+    }
+
+    public FileFilter getFilter() {
+        return filter;
+    }
+
+    /**
+     * Sets the optional filter to choose which files to process
+     */
+    public void setFilter(FileFilter filter) {
+        this.filter = filter;
+    }
+
+    /**
+     * Returns whether or not we should delete the file when its processed
+     */
+    public boolean isDeleteFile() {
+        return deleteFile;
+    }
+
+    public void setDeleteFile(boolean deleteFile) {
+        this.deleteFile = deleteFile;
+    }
+
+    public boolean isRecursive() {
+        return recursive;
+    }
+
+    public void setRecursive(boolean recursive) {
+        this.recursive = recursive;
+    }
+
+    public FileMarshaler getMarshaler() {
+        return marshaler;
+    }
+
+    public void setMarshaler(FileMarshaler marshaler) {
+        this.marshaler = marshaler;
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+
+
+    protected void pollFileOrDirectory(String fileOrDirectory) throws Exception {
+        FTPClient ftp = borrowClient();
+        try {
+            logger.debug("Polling directory " + fileOrDirectory);
+            pollFileOrDirectory(ftp, fileOrDirectory, isRecursive());
+        } finally {
+            returnClient(ftp);
+        }
+    }
+
+    protected void pollFileOrDirectory(FTPClient ftp, String fileOrDirectory, boolean processDir)
throws Exception {
+        FTPFile[] files = ftp.listFiles(fileOrDirectory);
+        for (int i = 0; i < files.length; i++) {
+            String name = files[i].getName();
+            if (".".equals(name) || "..".equals(name)) {
+                continue; // ignore "." and ".."
+            }
+            String file = fileOrDirectory + "/" + name;
+            // This is a file, process it
+            if (!files[i].isDirectory()) {
+                if (getFilter() == null || getFilter().accept(new File(file))) {
+                    pollFile(file); // process the file
+                }
+            // Only process directories if processDir is true
+            } else if (processDir) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Polling directory " + file);
+                }
+                pollFileOrDirectory(ftp, file, isRecursive());
+            } else {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Skipping directory " + file);
+                }
+            }
+        }
+    }
+
+    protected void pollFile(final String file) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Scheduling file " + file + " for processing");
+        }
+        getExecutor().execute(new Runnable() {
+            public void run() {
+                final Lock lock = lockManager.getLock(file);
+                if (lock.tryLock()) {
+                    boolean unlock = true;
+                    try {
+                        unlock = processFileAndDelete(file);
+                    } finally {
+                        if (unlock) {
+                            lock.unlock();
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    protected boolean processFileAndDelete(String file) {
+        FTPClient ftp = null;
+        boolean unlock = true;
+        try {
+            ftp = borrowClient();
+            if (logger.isDebugEnabled()) {
+                logger.debug("Processing file " + file);
+            }
+            if (ftp.listFiles(file).length > 0) {
+                // Process the file. If processing fails, an exception should be thrown.
+                processFile(ftp, file);
+                // Processing is successful
+                // We should not unlock until the file has been deleted
+                unlock = false;
+                if (isDeleteFile()) {
+                    if (!ftp.deleteFile(file)) {
+                        throw new IOException("Could not delete file " + file);
+                    }
+                    unlock = true;
+                }
+            } else {
+                //avoid processing files that have been deleted on the server
+                logger.debug("Skipping " + file + ": the file no longer exists on the server");
+            }
+        } catch (Exception e) {
+            logger.error("Failed to process file: " + file + ". Reason: " + e, e);
+        } finally {
+            returnClient(ftp);
+        }
+        return unlock;
+    }
+
+    protected void processFile(FTPClient ftp, String file) throws Exception {
+        InputStream in = ftp.retrieveFileStream(file);
+        InOnly exchange = getExchangeFactory().createInOnlyExchange();
+        configureExchangeTarget(exchange);
+        NormalizedMessage message = exchange.createMessage();
+        exchange.setInMessage(message);
+        marshaler.readMessage(exchange, message, in, file);
+        sendSync(exchange);
+        in.close();
+        ftp.completePendingCommand();
+        if (exchange.getStatus() == ExchangeStatus.ERROR) {
+            Exception e = exchange.getError();
+            if (e == null) {
+                e = new JBIException("Unkown error");
+            }
+            throw e;
+        }
+    }
+
+    public String getLocationURI() {
+        return uri.toString();
+    }
+
+    public void process(MessageExchange exchange) throws Exception {
+        // Do nothing. In our case, this method should never be called
+        // as we only send synchronous InOnly exchange
+    }
+
+    protected FTPClientPool createClientPool() throws Exception {
+        FTPClientPool pool = new FTPClientPool();
+        pool.afterPropertiesSet();
+        return pool;
+    }
+
+    protected FTPClient borrowClient() throws JBIException {
+        try {
+            return (FTPClient) getClientPool().borrowClient();
+        } catch (Exception e) {
+            throw new JBIException(e);
+        }
+    }
+
+    protected void returnClient(FTPClient client) {
+        if (client != null) {
+            try {
+                getClientPool().returnClient(client);
+            } catch (Exception e) {
+                logger.error("Failed to return client to pool: " + e, e);
+            }
+        }
+    }
+    
+}



Mime
View raw message