tomee-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dblev...@apache.org
Subject svn commit: r676240 - in /openejb/trunk/openejb3: container/openejb-core/src/main/java/org/apache/openejb/util/ server/openejb-client/src/main/java/org/apache/openejb/client/ server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/ server/open...
Date Sat, 12 Jul 2008 21:02:57 GMT
Author: dblevins
Date: Sat Jul 12 14:02:56 2008
New Revision: 676240

URL: http://svn.apache.org/viewvc?rev=676240&view=rev
Log:
Keep Client Connections Alive

Added:
    openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
Modified:
    openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/LogCategory.java
    openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
    openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/EjbServer.java
    openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/MultithreadTest.java
    openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java

Modified: openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/LogCategory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/LogCategory.java?rev=676240&r1=676239&r2=676240&view=diff
==============================================================================
--- openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/LogCategory.java
(original)
+++ openejb/trunk/openejb3/container/openejb-core/src/main/java/org/apache/openejb/util/LogCategory.java
Sat Jul 12 14:02:56 2008
@@ -52,8 +52,8 @@
 	public static final LogCategory AXIS2 = new LogCategory( "axis");
 	public static final LogCategory CXF = new LogCategory( "cxf");
 	public static final LogCategory TIMER = new LogCategory( "Timer");
-	public static final LogCategory HTTPSERVER = new LogCategory( "HttpServer");
-	public static final LogCategory SERVICEPOOL = new LogCategory( "ServicePool");
+	public static final LogCategory HTTPSERVER = OPENEJB_SERVER.createChild("http");
+	public static final LogCategory SERVICEPOOL = OPENEJB_SERVER.createChild("pool");
 	private LogCategory(String name){
 		this.name = name;
 	}

Modified: openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java?rev=676240&r1=676239&r2=676240&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
(original)
+++ openejb/trunk/openejb3/server/openejb-client/src/main/java/org/apache/openejb/client/SocketConnectionFactory.java
Sat Jul 12 14:02:56 2008
@@ -24,17 +24,49 @@
 import java.net.URI;
 import java.net.ConnectException;
 import java.util.Properties;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentHashMap;
 import javax.net.ssl.SSLSocketFactory;
 import javax.net.ssl.SSLSocket;
 
 public class SocketConnectionFactory implements ConnectionFactory {
 
+    private static Map<URI, SocketConnection> connections = new ConcurrentHashMap<URI,
SocketConnection>();
+
     public void init(Properties props) {
     }
 
     public Connection getConnection(URI uri) throws java.io.IOException {
-        SocketConnection conn = new SocketConnection();
-        conn.open(uri);
+
+        SocketConnection conn = connections.get(uri);
+        if (conn == null) {
+            conn = new SocketConnection();
+            conn.open(uri);
+            SocketConnection old = connections.put(uri, conn);
+            if (old != null) {
+                try {
+                    old.lock.lock();
+                } catch (Exception e) {
+                } finally {
+                    try {
+                        old.socket.close();
+                    } catch (IOException e) {
+                    }
+                }
+            }
+        }
+        try {
+            conn.lock.tryLock(60, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            throw new IOException("Connection busy");
+        }
+
+        OutputStream ouputStream = conn.getOuputStream();
+        ouputStream.write(30);
+        ouputStream.flush();
         return conn;
     }
 
@@ -46,6 +78,8 @@
 
         InputStream socketIn = null;
 
+        private final Lock lock = new ReentrantLock();
+
         protected void open(URI uri) throws IOException {
 
             /*-----------------------*/
@@ -82,13 +116,14 @@
         }
 
         public void close() throws IOException {
+            lock.unlock();
             try {
                 if (socketOut != null)
                     socketOut.close();
                 if (socketIn != null)
                     socketIn.close();
-                if (socket != null)
-                    socket.close();
+//                if (socket != null)
+//                    socket.close();
             } catch (Throwable t) {
                 throw new IOException("Error closing connection with server: " + t.getMessage());
             }
@@ -99,7 +134,7 @@
             /* Open input streams */
             /*----------------------------------*/
             try {
-                socketIn = socket.getInputStream();
+                socketIn = new Input(socket.getInputStream());
             } catch (StreamCorruptedException e) {
                 throw new IOException("Cannot open input stream to server, the stream has
been corrupted: " + e.getClass().getName() + " : " + e.getMessage());
 
@@ -117,7 +152,7 @@
             /* Openning output streams */
             /*----------------------------------*/
             try {
-                socketOut = socket.getOutputStream();
+                socketOut = new Output(socket.getOutputStream());
             } catch (IOException e) {
                 throw new IOException("Cannot open output stream to server: " + e.getClass().getName()
+ " : " + e.getMessage());
 
@@ -128,4 +163,24 @@
         }
 
     }
+
+    public class Input extends java.io.FilterInputStream {
+
+        public Input(InputStream in) {
+            super(in);
+        }
+
+        public void close() throws IOException {
+        }
+    }
+
+    public class Output extends java.io.FilterOutputStream {
+        public Output(OutputStream out) {
+            super(out);
+        }
+
+        public void close() throws IOException {
+            flush();
+        }
+    }
 }

Modified: openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/EjbServer.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/EjbServer.java?rev=676240&r1=676239&r2=676240&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/EjbServer.java
(original)
+++ openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/EjbServer.java
Sat Jul 12 14:02:56 2008
@@ -34,7 +34,12 @@
 
 public class EjbServer implements org.apache.openejb.server.ServerService, org.apache.openejb.spi.ApplicationServer
{
 
-    EjbDaemon server;
+    private final KeepAliveServer keepAlive;
+    private EjbDaemon server;
+
+    public EjbServer() {
+        keepAlive = new KeepAliveServer(this);
+    }
 
     public void init(Properties props) throws Exception {
         server = EjbDaemon.getEjbDaemon();
@@ -56,8 +61,7 @@
     }
 
     public void service(Socket socket) throws ServiceException, IOException {
-        ServerFederation.setApplicationServer(server);
-        server.service(socket);
+        keepAlive.service(socket);
     }
 
     public void service(InputStream inputStream, OutputStream outputStream) throws ServiceException,
IOException {

Added: openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java?rev=676240&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
(added)
+++ openejb/trunk/openejb3/server/openejb-ejbd/src/main/java/org/apache/openejb/server/ejbd/KeepAliveServer.java
Sat Jul 12 14:02:56 2008
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.server.ejbd;
+
+import org.apache.openejb.server.ServerService;
+import org.apache.openejb.server.ServiceException;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.Socket;
+import java.util.Properties;
+import java.util.Map;
+import java.util.TimerTask;
+import java.util.Timer;
+import java.util.Date;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.text.SimpleDateFormat;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class KeepAliveServer implements ServerService {
+    private final ServerService service;
+    private final long timeout = (1000 * 2);
+    private final KeepAliveTimer keepAliveTimer = new KeepAliveTimer(timeout);
+
+    public KeepAliveServer() {
+        this(new EjbServer());
+    }
+
+    public KeepAliveServer(ServerService service) {
+        this.service = service;
+
+
+        Timer timer = new Timer("KeepAliveTimer", true);
+        timer.scheduleAtFixedRate(keepAliveTimer, timeout, timeout / 2);
+
+    }
+
+
+
+    public static class KeepAliveTimer extends TimerTask {
+
+        private final Map<Thread, Status> statusMap = new ConcurrentHashMap<Thread,
Status>();
+
+        private final long timeout;
+
+        public KeepAliveTimer(long timeout) {
+            this.timeout = timeout;
+        }
+
+        public void run() {
+            long now = System.currentTimeMillis();
+
+            Collection<Status> statuses = statusMap.values();
+            for (Status status : statuses) {
+
+//                System.out.println(""+status);
+
+                if (status.isReading() && now - status.getTime() > timeout){
+//                    System.out.println("Thread Interrupt");
+                    try {
+                        status.in.close();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+//            System.out.println("exit");
+        }
+
+        public Status setStatus(Status status) {
+//            System.out.println("status = " + status);
+            return statusMap.put(status.getThread(), status);
+        }
+    }
+
+    public static class Status {
+        private final long time;
+        private final boolean reading;
+        private final Thread thread;
+        private static final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss.SSS");
+        private final InputStream in;
+
+        public boolean isReading() {
+            return reading;
+        }
+
+        public Thread getThread() {
+            return thread;
+        }
+
+        public long getTime() {
+            return time;
+        }
+
+        public Status(boolean reading, InputStream in) {
+            this.reading = reading;
+            this.thread = Thread.currentThread();
+            this.time = System.currentTimeMillis();
+            this.in = in;
+        }
+
+        public String toString() {
+            String msg = "";
+            if (reading)
+            msg += "READING";
+            else msg += "WORKING";
+            msg += " "+thread.getName();
+
+            msg += " since "+ format.format(new Date(time));
+            return msg;
+        }
+    }
+
+
+    public void service(Socket socket) throws ServiceException, IOException {
+        InputStream in = socket.getInputStream();
+        OutputStream out = socket.getOutputStream();
+
+        try {
+            while (true) {
+                keepAliveTimer.setStatus(new Status(true, in));
+                if (in.read() == 30){
+                    keepAliveTimer.setStatus(new Status(false, null));
+                    service.service(new Input(in), new Output(out));
+                    out.flush();
+                } else {
+                    keepAliveTimer.setStatus(new Status(false, null));
+                    break;
+                }
+            }
+        } catch (InterruptedIOException e) {
+            Thread.interrupted();
+        } catch (IOException e) {
+        } finally{
+            keepAliveTimer.setStatus(new Status(false, null));
+//            System.out.println("close socket");
+            socket.close();
+        }
+    }
+
+    public void service(InputStream in, OutputStream out) throws ServiceException, IOException
{
+    }
+
+    public String getIP() {
+        return service.getIP();
+    }
+
+    public String getName() {
+        return service.getName();
+    }
+
+    public int getPort() {
+        return service.getPort();
+    }
+
+    public void start() throws ServiceException {
+        service.start();
+    }
+
+    public void stop() throws ServiceException {
+        service.stop();
+    }
+
+    public void init(Properties props) throws Exception {
+        service.init(props);
+    }
+
+    public class Input extends java.io.FilterInputStream {
+
+        public Input(InputStream in) {
+            super(in);
+        }
+
+        public void close() throws IOException {
+        }
+    }
+
+    public class Output extends java.io.FilterOutputStream {
+        public Output(OutputStream out) {
+            super(out);
+        }
+
+        public void close() throws IOException {
+            flush();
+        }
+    }
+
+}

Modified: openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/MultithreadTest.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/MultithreadTest.java?rev=676240&r1=676239&r2=676240&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/MultithreadTest.java
(original)
+++ openejb/trunk/openejb3/server/openejb-ejbd/src/test/java/org/apache/openejb/server/ejbd/MultithreadTest.java
Sat Jul 12 14:02:56 2008
@@ -17,7 +17,6 @@
 package org.apache.openejb.server.ejbd;
 
 import org.apache.openejb.OpenEJB;
-import org.apache.openejb.util.Duration;
 import org.apache.openejb.jee.EjbJar;
 import org.apache.openejb.jee.StatelessBean;
 import org.apache.openejb.config.ConfigurationFactory;
@@ -29,6 +28,7 @@
 
 import javax.naming.Context;
 import javax.naming.InitialContext;
+import javax.naming.NamingException;
 import javax.ejb.Remote;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -44,6 +44,7 @@
 
     public void test() throws Exception {
         EjbServer ejbServer = new EjbServer();
+        KeepAliveServer keepAliveServer = new KeepAliveServer(ejbServer);
 
         Properties initProps = new Properties();
         initProps.setProperty("openejb.deployments.classpath.include", "");
@@ -51,7 +52,7 @@
         OpenEJB.init(initProps, new ServerFederation());
         ejbServer.init(new Properties());
 
-        ServicePool pool = new ServicePool(ejbServer, "ejbd", 10);
+        ServicePool pool = new ServicePool(keepAliveServer, "ejbd", 22);
         ServiceDaemon serviceDaemon = new ServiceDaemon(pool, 0, "localhost");
         serviceDaemon.start();
 
@@ -68,21 +69,16 @@
             assembler.createApplication(config.configureApplication(ejbJar));
 
             // good creds
-            Properties props = new Properties();
-            props.put("java.naming.factory.initial", "org.apache.openejb.client.RemoteInitialContextFactory");
-            props.put("java.naming.provider.url", "ejbd://127.0.0.1:" + port);
-            Context context = new InitialContext(props);
-            Echo echo = (Echo) context.lookup("EchoBeanRemote");
 
             int threads = 20;
             CountDownLatch latch = new CountDownLatch(threads);
-            Client client = new Client(echo, latch);
 
             for (int i = 0; i < threads; i++) {
+                Client client = new Client(latch, i, port);
                 thread(client, false);
             }
 
-            assertTrue(latch.await(60, TimeUnit.SECONDS));
+            assertTrue(latch.await(600, TimeUnit.SECONDS));
         } finally {
             serviceDaemon.stop();
             OpenEJB.destroy();
@@ -98,11 +94,19 @@
     public static class Client implements Runnable {
 
         private final Echo echo;
-        private CountDownLatch latch;
+        private final CountDownLatch latch;
+        private final int id;
 
-        public Client(Echo echo, CountDownLatch latch) {
-            this.echo = echo;
+        public Client(CountDownLatch latch, int i, int port) throws NamingException {
             this.latch = latch;
+            this.id = i;
+
+            Properties props = new Properties();
+            props.put("java.naming.factory.initial", "org.apache.openejb.client.RemoteInitialContextFactory");
+            props.put("java.naming.provider.url", "ejbd://127.0.0.1:" + port +"?"+id);
+            Context context = new InitialContext(props);
+
+            this.echo = (Echo) context.lookup("EchoBeanRemote");
         }
 
         public void run() {
@@ -111,6 +115,9 @@
                 int count = 250;
                 for (; count >= 0; count--){
                     String message = count + " bottles of beer on the wall";
+
+//                    Thread.currentThread().setName("client-"+id+": "+count);
+
                     String response = echo.echo(message);
                     Assert.assertEquals(message, reverse(response));
                 }
@@ -127,6 +134,7 @@
 
     public static class EchoBean implements Echo {
         public String echo(String s) {
+//            System.out.println(s);
             return new StringBuilder(s).reverse().toString();
         }
     }

Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java?rev=676240&r1=676239&r2=676240&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java
(original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/ServicePool.java
Sat Jul 12 14:02:56 2008
@@ -39,10 +39,12 @@
     private final ServerService next;
     private final Executor executor;
 
-    public ServicePool(ServerService next, final String name, Properties properties) {
-        this.next = next;
+    public ServicePool(ServerService next, String name, Properties properties) {
+        this(next, name, getInt(properties, "threads", 100));
+    }
 
-        final int threads = getInt(properties, "threads", 100);
+    public ServicePool(ServerService next, final String name, int threads) {
+        this.next = next;
 
         final int keepAliveTime = (1000 * 60 * 5);
 



Mime
View raw message