servicemix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lh...@apache.org
Subject svn commit: r674020 - /servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
Date Fri, 04 Jul 2008 11:58:18 GMT
Author: lhein
Date: Fri Jul  4 04:58:18 2008
New Revision: 674020

URL: http://svn.apache.org/viewvc?rev=674020&view=rev
Log:
reworked endpoint to use async send only (SM-1441)

Modified:
    servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java

Modified: servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
URL: http://svn.apache.org/viewvc/servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java?rev=674020&r1=674019&r2=674020&view=diff
==============================================================================
--- servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
(original)
+++ servicemix/components/bindings/servicemix-file/trunk/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
Fri Jul  4 04:58:18 2008
@@ -22,7 +22,8 @@
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.Lock;
 
 import javax.jbi.JBIException;
@@ -61,6 +62,7 @@
     private File archive;
     private FileMarshaler marshaler = new DefaultFileMarshaler();
     private LockManager lockManager;
+    private ConcurrentMap<String, File> openExchanges;
 
     public FilePollerEndpoint() {
     }
@@ -73,6 +75,17 @@
         super(component, endpoint);
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.servicemix.common.endpoints.PollingEndpoint#start()
+     */
+    @Override
+    public synchronized void start() throws Exception {
+        super.start();
+        
+        // create the openExchanges map
+        this.openExchanges = new ConcurrentHashMap<String, File>();
+    }
+    
     public void poll() throws Exception {
         pollFileOrDirectory(file);
     }
@@ -193,7 +206,7 @@
     public void setArchive(File archive) {
         this.archive = archive;
     }
-
+    
     // Implementation methods
     //-------------------------------------------------------------------------
 
@@ -225,14 +238,7 @@
                 String uri = file.toURI().relativize(aFile.toURI()).toString();
                 Lock lock = lockManager.getLock(uri);
                 if (lock.tryLock()) {
-                    boolean unlock = true;
-                    try {
-                        unlock = processFileAndDelete(aFile);
-                    } finally {
-                        if (unlock) {
-                            lock.unlock();
-                        }
-                    }
+                    processFileNow(aFile);
                 } else {
                     if (logger.isDebugEnabled()) {
                         logger.debug("Unable to acquire lock on " + aFile);
@@ -242,55 +248,30 @@
         });
     }
 
-    protected boolean processFileAndDelete(File aFile) {
-        boolean unlock = true;
+    protected void processFileNow(File aFile) {
         try {
             if (logger.isDebugEnabled()) {
                 logger.debug("Processing file " + aFile);
             }
             if (aFile.exists()) {
                 processFile(aFile);
-                unlock = false;
-                if (isDeleteFile()) {
-                    if (archive != null) {
-                        moveFile(aFile, archive);
-                    } else {
-                        if (!aFile.delete()) {
-                            throw new IOException("Could not delete file " + aFile);
-                        }
-                    }
-                    unlock = true;
-                }
             }
         } catch (Exception e) {
             logger.error("Failed to process file: " + aFile + ". Reason: " + e, e);
         }
-        return unlock;
     }
 
     protected void processFile(File aFile) throws Exception {
         InputStream in = null;
-        try {
-            String name = aFile.getCanonicalPath();
-            in = new BufferedInputStream(new FileInputStream(aFile));
-            InOnly exchange = getExchangeFactory().createInOnlyExchange();
-            configureExchangeTarget(exchange);
-            NormalizedMessage message = exchange.createMessage();
-            exchange.setInMessage(message);
-            marshaler.readMessage(exchange, message, in, name);
-            sendSync(exchange);
-            if (exchange.getStatus() == ExchangeStatus.ERROR) {
-                Exception e = exchange.getError();
-                if (e == null) {
-                    e = new JBIException("Unkown error");
-                }
-                throw e;
-            }
-        } finally {
-            if (in != null) {
-                in.close();
-            }
-        }
+        String name = aFile.getCanonicalPath();
+        in = new BufferedInputStream(new FileInputStream(aFile));
+        InOnly exchange = getExchangeFactory().createInOnlyExchange();
+        configureExchangeTarget(exchange);
+        NormalizedMessage message = exchange.createMessage();
+        exchange.setInMessage(message);
+        marshaler.readMessage(exchange, message, in, name);
+        send(exchange);
+        this.openExchanges.put(exchange.getExchangeId(), aFile);
     }
 
     public String getLocationURI() {
@@ -298,8 +279,61 @@
     }
 
     public void process(MessageExchange exchange) throws Exception {
-        // Do nothing. In our case, this method should never be called
-        // as we only send synchronous InOnly exchange
+        // check for done or error
+        if (this.openExchanges.containsKey(exchange.getExchangeId())) {
+            File aFile = this.openExchanges.get(exchange.getExchangeId());
+
+            logger.debug("Releasing " + aFile.getAbsolutePath());
+            try {
+                // check for state
+                if (exchange.getStatus() == ExchangeStatus.DONE) {
+                    if (isDeleteFile()) {
+                        if (archive != null) {
+                            moveFile(aFile, archive);
+                        } else {
+                            if (!aFile.delete()) {
+                                throw new IOException("Could not delete file " + aFile);
+                            }
+                        }
+                    } 
+                } else {
+                    Exception e = exchange.getError();
+                    if (e == null) {
+                        e = new JBIException("Unkown error");
+                    }
+                    throw e;
+                }
+            } finally {
+                // remove the open exchange
+                openExchanges.remove(exchange.getExchangeId());
+                // unlock the file
+                unlockAsyncFile(aFile);
+            }
+            
+        } else {
+            // strange, we don't know this exchange
+            logger.debug("Received unknown exchange. Will be ignored...");
+            return;
+        }            
+    }
+    
+    /**
+     * unlock the file
+     * 
+     * @param file      the file to unlock
+     */
+    private void unlockAsyncFile(File file) {
+        // finally remove the file from the open exchanges list
+        String uri = file.toURI().relativize(file.toURI()).toString();
+        Lock lock = lockManager.getLock(uri);
+        if (lock != null) {
+            try {
+                lock.unlock();                            
+            } catch (Exception ex) {
+                // can't release the lock
+                logger.error(ex);
+            }
+        }
     }
 
     /**
@@ -314,5 +348,4 @@
             throw new IOException("Failed to move " + src + " to " + targetDirectory);
         }
     }
-
 }



Mime
View raw message