tomee-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andygumbre...@apache.org
Subject svn commit: r1358092 - in /openejb/trunk/openejb/server: openejb-client/src/main/java/org/apache/openejb/client/ openejb-multicast/src/main/java/org/apache/openejb/server/discovery/ openejb-multicast/src/test/java/org/apache/openejb/server/discovery/
Date Fri, 06 Jul 2012 09:33:25 GMT
Author: andygumbrecht
Date: Fri Jul  6 09:33:24 2012
New Revision: 1358092

URL: http://svn.apache.org/viewvc?rev=1358092&view=rev
Log:
Trying a latch - Also makes sense for client and agent.

Modified:
    openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
    openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
    openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java

Modified: openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1358092&r1=1358091&r2=1358092&view=diff
==============================================================================
--- openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
(original)
+++ openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
Fri Jul  6 09:33:24 2012
@@ -4,6 +4,7 @@ import java.io.IOException;
 import java.net.*;
 import java.nio.charset.Charset;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -174,6 +175,7 @@ public class MulticastPulseClient extend
         //Start threads that listen for multicast packets on our channel.
         //These need to start 'before' we pulse a request.
         final ArrayList<Future> futures = new ArrayList<Future>();
+        final CountDownLatch latch = new CountDownLatch(clientSockets.length);
 
         for (final MulticastSocket socket : clientSockets) {
 
@@ -181,6 +183,7 @@ public class MulticastPulseClient extend
                 @Override
                 public void run() {
 
+                    latch.countDown();
                     final DatagramPacket response = new DatagramPacket(new byte[2048], 2048);
 
                     while (running.get()) {
@@ -273,13 +276,21 @@ public class MulticastPulseClient extend
             }));
         }
 
-        //Pulse the server - It is thread safe to use same sockets as send/receive synchronization
is only on the packet
-        for (final MulticastSocket socket : clientSockets) {
-            try {
-                socket.send(request);
-            } catch (Throwable e) {
-                //Ignore
+        try {
+            latch.await();
+
+            //Pulse the server - It is thread safe to use same sockets as send/receive synchronization
is only on the packet
+            for (final MulticastSocket socket : clientSockets) {
+                try {
+                    socket.send(request);
+                } catch (Throwable e) {
+                    //Ignore
+                }
             }
+
+        } catch (InterruptedException e) {
+            //Terminate as quickly as possible
+            timeout = 1;
         }
 
         //Kill the threads after timeout

Modified: openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1358092&r1=1358091&r2=1358092&view=diff
==============================================================================
--- openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
(original)
+++ openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
Fri Jul  6 09:33:24 2012
@@ -20,10 +20,12 @@ import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -194,6 +196,8 @@ public class MulticastPulseAgent impleme
                 throw new ServiceException("Failed to get Multicast sockets", e);
             }
 
+            final CountDownLatch latch = new CountDownLatch(this.sockets.length);
+
             for (final MulticastSocket socket : this.sockets) {
 
                 this.futures.add(executor.submit(new Runnable() {
@@ -201,6 +205,7 @@ public class MulticastPulseAgent impleme
                     public void run() {
 
                         final DatagramPacket request = new DatagramPacket(new byte[2048],
2048);
+                        latch.countDown();
 
                         while (MulticastPulseAgent.this.running.get()) {
 
@@ -250,6 +255,12 @@ public class MulticastPulseAgent impleme
                     }
                 }));
             }
+
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                this.stop();
+            }
         }
     }
 

Modified: openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java?rev=1358092&r1=1358091&r2=1358092&view=diff
==============================================================================
--- openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
(original)
+++ openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
Fri Jul  6 09:33:24 2012
@@ -39,6 +39,7 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -66,7 +67,7 @@ public class MulticastPulseAgentTest {
 
         final Properties p = new Properties();
         p.setProperty("bind", host);
-        p.setProperty("port", "" + 6142);
+        p.setProperty("port", "" + port);
 
         agent = new MulticastPulseAgent();
         agent.init(p);
@@ -144,6 +145,7 @@ public class MulticastPulseAgentTest {
         //Start threads that listen for multicast packets on our channel.
         //These need to start 'before' we pulse a request.
         final ArrayList<Future> futures = new ArrayList<Future>();
+        final CountDownLatch latch = new CountDownLatch(clientSockets.length);
 
         for (final MulticastSocket socket : clientSockets) {
 
@@ -161,6 +163,8 @@ public class MulticastPulseAgentTest {
 
                     final DatagramPacket response = new DatagramPacket(new byte[2048], 2048);
 
+                    latch.countDown();
+
                     while (running.get()) {
                         try {
 
@@ -257,13 +261,25 @@ public class MulticastPulseAgentTest {
             }));
         }
 
-        //Pulse the server - It is thread safe to use same sockets as send/receive synchronization
is only on the packet
-        for (final MulticastSocket socket : clientSockets) {
-            try {
-                socket.send(request);
-            } catch (Throwable e) {
-                //Ignore
+        //Allow slow thread starts
+        System.out.println("Wait for threads to start");
+        int timeout = 5000;
+        try {
+
+            latch.await();
+            System.out.println("Threads have started");
+
+            //Pulse the server - It is thread safe to use same sockets as send/receive synchronization
is only on the packet
+            for (final MulticastSocket socket : clientSockets) {
+                try {
+                    socket.send(request);
+                } catch (Throwable e) {
+                    //Ignore
+                }
             }
+
+        } catch (InterruptedException e) {
+            timeout = 1;
         }
 
         //Kill the threads after timeout
@@ -295,7 +311,7 @@ public class MulticastPulseAgentTest {
                     }
                 }
             }
-        }, 10000);
+        }, timeout);
 
         //Wait for threads to complete
         for (final Future future : futures) {
@@ -332,8 +348,6 @@ public class MulticastPulseAgentTest {
         private final String id;
 
         public MyDiscoveryListener(String id) {
-            id += "        ";
-            id = id.substring(0, 8);
             this.id = id;
         }
 



Mime
View raw message