tomee-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andygumbre...@apache.org
Subject svn commit: r1357726 - in /openejb/trunk/openejb/server: openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
Date Thu, 05 Jul 2012 16:32:05 GMT
Author: andygumbrecht
Date: Thu Jul  5 16:32:05 2012
New Revision: 1357726

URL: http://svn.apache.org/viewvc?rev=1357726&view=rev
Log:
Fix multipulse test.
Fix multipulse services substring - Thanks to the test ;-)

Modified:
    openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
    openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java

Modified: openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1357726&r1=1357725&r2=1357726&view=diff
==============================================================================
--- openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
(original)
+++ openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
Thu Jul  5 16:32:05 2012
@@ -137,7 +137,6 @@ public class MulticastPulseClient extend
         final byte[] bytes = (MulticastPulseClient.CLIENT + forGroup).getBytes(UTF8);
         final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia,
port));
 
-
         final AtomicBoolean running = new AtomicBoolean(true);
         final MulticastSocket[] clientSockets = MulticastPulseClient.getSockets(ia, port);
         final Timer timer = new Timer(true);
@@ -210,7 +209,7 @@ public class MulticastPulseClient extend
                                         continue;
                                     }
 
-                                    final String services = s.substring(0, s.indexOf('|'));
+                                    final String services = s.substring(0, s.lastIndexOf('|'));
                                     s = s.substring(services.length() + 1);
 
                                     final String[] serviceList = services.split("\\|");
@@ -319,6 +318,8 @@ public class MulticastPulseClient extend
             }
         }
 
+        futures.clear();
+
         return set;
     }
 

Modified: openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java?rev=1357726&r1=1357725&r2=1357726&view=diff
==============================================================================
--- openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
(original)
+++ openejb/trunk/openejb/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MulticastPulseAgentTest.java
Thu Jul  5 16:32:05 2012
@@ -17,6 +17,8 @@
 package org.apache.openejb.server.discovery;
 
 import org.apache.openejb.server.DiscoveryListener;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.net.DatagramPacket;
@@ -28,6 +30,7 @@ import java.net.SocketAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -36,6 +39,9 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -46,19 +52,23 @@ import java.util.concurrent.atomic.Atomi
 public class MulticastPulseAgentTest {
 
     private static final Set<String> schemes = new HashSet<String>(Arrays.asList("ejbd",
"ejbds", "http"));
+    private static ExecutorService executor;
+    private static final Charset utf8 = Charset.forName("UTF-8");
+    private static final String forGroup = "*";
+    private static final String host = "239.255.3.2";
+    private static final int port = 6142;
+    private static MulticastPulseAgent agent;
 
-    @Test
-    public void test() throws Exception {
+    @BeforeClass
+    public static void beforeClass() throws Exception {
 
-        final String group = "*";
-        final String host = "239.255.3.2";
-        final int port = 6142;
+        executor = Executors.newFixedThreadPool(10);
 
         final Properties p = new Properties();
         p.setProperty("bind", host);
         p.setProperty("port", "" + 6142);
 
-        MulticastPulseAgent agent = new MulticastPulseAgent();
+        agent = new MulticastPulseAgent();
         agent.init(p);
         agent.setDiscoveryListener(new MyDiscoveryListener("test"));
         agent.registerService(new URI("ejb:ejbd://[::]:4201"));
@@ -66,22 +76,57 @@ public class MulticastPulseAgentTest {
         agent.registerService(new URI("ejb:http://127.0.0.1:4201"));
         agent.registerService(new URI("ejb:https://0.0.0.1:4201"));
         agent.start();
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        agent.stop();
+        executor.shutdownNow();
+    }
+
+    @Test
+    public void test() throws Exception {
 
-        final byte[] bytes = (MulticastPulseAgent.CLIENT + group).getBytes(Charset.forName("utf8"));
-        final InetAddress ia = InetAddress.getByName(host);
-        DatagramPacket dp = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia,
port));
+        final InetAddress ia;
+
+        try {
+            ia = InetAddress.getByName(host);
+        } catch (UnknownHostException e) {
+            throw new Exception(host + " is not a valid address", e);
+        }
+
+        if (null == ia || !ia.isMulticastAddress()) {
+            throw new Exception(host + " is not a valid multicast address");
+        }
+
+        final byte[] bytes = (MulticastPulseAgent.CLIENT + forGroup).getBytes(utf8);
+        final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia,
port));
 
         final AtomicBoolean running = new AtomicBoolean(true);
-        final AtomicBoolean passed = new AtomicBoolean(false);
-        final MulticastSocket client = MulticastPulseAgent.getSockets(host, port)[0];
+
+        final MulticastSocket[] clientSockets = MulticastPulseAgent.getSockets(host, port);
         final Timer timer = new Timer(true);
+
         final Set<URI> set = new TreeSet<URI>(new Comparator<URI>() {
             @Override
             public int compare(URI u1, URI u2) {
 
+                //Ignore server hostname
+                final String serverHost = u1.getScheme();
+                u1 = URI.create(u1.getSchemeSpecificPart());
+                u2 = URI.create(u2.getSchemeSpecificPart());
+
+                //Ignore scheme (ejb,ejbs,etc.)
                 u1 = URI.create(u1.getSchemeSpecificPart());
                 u2 = URI.create(u2.getSchemeSpecificPart());
 
+                if (u1.getHost().equals(serverHost)) {
+                    //If the service host is the same as the server host
+                    //then keep it at the top of the list
+                    return -1;
+                }
+
+                //Compare URI hosts
                 int i = u1.getHost().compareTo(u2.getHost());
 
                 if (i == 0) {
@@ -92,117 +137,170 @@ public class MulticastPulseAgentTest {
             }
         });
 
-        //Start a thread that listens for multicast packets
-        final Thread t = new Thread() {
-            @Override
-            public void run() {
+        //Start threads that listen for multicast packets on our channel.
+        //These need to start 'before' we pulse a request.
+        final ArrayList<Future> futures = new ArrayList<Future>();
 
-                while (running.get()) {
-                    try {
+        for (final MulticastSocket socket : clientSockets) {
 
-                        final DatagramPacket dgp = new DatagramPacket(new byte[512], 512);
+            futures.add(executor.submit(new Runnable() {
+                @Override
+                public void run() {
 
-                        client.receive(dgp);
+                    final DatagramPacket response = new DatagramPacket(new byte[2048], 2048);
 
-                        final SocketAddress sa = dgp.getSocketAddress();
+                    while (running.get()) {
+                        try {
 
-                        if (null != sa) {
+                            socket.receive(response);
 
-                            String s = new String(dgp.getData()).trim();
-                            if (s.startsWith(MulticastPulseAgent.SERVER)) {
+                            final SocketAddress sa = response.getSocketAddress();
 
-                                s = (s.replace(MulticastPulseAgent.SERVER, ""));
-                                final String group = s.substring(0, s.indexOf(':'));
-                                s = s.substring(group.length() + 1);
+                            if (null != sa && (sa instanceof InetSocketAddress))
{
 
-                                final String services = s.substring(0, s.lastIndexOf('|'));
-                                s = s.substring(services.length() + 1);
+                                int len = response.getLength();
+                                if (len > 2048) {
+                                    len = 2048;
+                                }
 
-                                final String[] service = services.split("\\|");
-                                final String[] hosts = s.split(",");
+                                String s = new String(response.getData(), 0, len);
 
-                                System.out.println(String.format("Client received Server
pulse:\n\t%1$s\n\t%2$s\n\t%3$s\n", group, services, s));
+                                if (s.startsWith(MulticastPulseAgent.SERVER)) {
 
-                                for (String svc : service) {
+                                    s = (s.replace(MulticastPulseAgent.SERVER, ""));
+                                    final String group = s.substring(0, s.indexOf(':'));
+                                    s = s.substring(group.length() + 1);
 
-                                    if (MulticastPulseAgent.EMPTY.equals(svc)) {
+                                    if (!"*".equals(forGroup) && !forGroup.equals(group))
{
                                         continue;
                                     }
 
-                                    URI test = null;
-                                    try {
-                                        test = URI.create(svc);
-                                    } catch (Throwable e) {
-                                        continue;
-                                    }
+                                    final String services = s.substring(0, s.lastIndexOf('|'));
+                                    s = s.substring(services.length() + 1);
+
+                                    final String[] serviceList = services.split("\\|");
+                                    final String[] hosts = s.split(",");
 
-                                    if (schemes.contains(test.getScheme())) {
+                                    System.out.println(String.format("Client received Server
pulse:\n\t%1$s\n\t%2$s\n\t%3$s\n", group, services, s));
 
-                                        svc = (group + ":" + svc);
+                                    for (String svc : serviceList) {
+
+                                        if (MulticastPulseAgent.EMPTY.equals(svc)) {
+                                            continue;
+                                        }
 
+                                        final URI serviceUri;
                                         try {
-                                            if (svc.contains("0.0.0.0")) {
-                                                for (final String h : hosts) {
-                                                    set.add(URI.create(svc.replace("0.0.0.0",
ipFormat(h))));
+                                            serviceUri = URI.create(svc);
+                                        } catch (Throwable e) {
+                                            continue;
+                                        }
+
+                                        if (schemes.contains(serviceUri.getScheme())) {
+
+                                            //Just because multicast was received on this
host is does not mean the service is on the same
+                                            //We can however use this to identify an individual
machine and group
+                                            final String serverHost = ((InetSocketAddress)
response.getSocketAddress()).getAddress().getHostAddress();
+
+                                            final String serviceHost = serviceUri.getHost();
+                                            if (MulticastPulseAgent.isLocalAddress(serviceHost,
false)) {
+                                                if (!MulticastPulseAgent.isLocalAddress(serverHost,
false)) {
+                                                    //A local service is only available to
a local client
+                                                    continue;
                                                 }
-                                            } else if (svc.contains("[::]")) {
-                                                for (final String h : hosts) {
-                                                    set.add(URI.create(svc.replace("[::]",
ipFormat(h))));
+                                            }
+
+                                            svc = ("mp-" + serverHost + ":" + group + ":"
+ svc);
+
+                                            try {
+                                                if (svc.contains("0.0.0.0")) {
+                                                    for (final String h : hosts) {
+                                                        set.add(URI.create(svc.replace("0.0.0.0",
ipFormat(h))));
+                                                    }
+                                                } else if (svc.contains("[::]")) {
+                                                    for (final String h : hosts) {
+                                                        set.add(URI.create(svc.replace("[::]",
ipFormat(h))));
+                                                    }
+                                                } else {
+                                                    //Just add as is
+                                                    set.add(URI.create(svc));
                                                 }
-                                            } else {
-                                                //Just add as is
-                                                set.add(URI.create(svc));
+                                            } catch (Throwable e) {
+                                                //Ignore
                                             }
-                                        } catch (Throwable e) {
-                                            //Ignore
+                                        } else {
+                                            System.out.println("Reject service: " + serviceUri.toASCIIString());
                                         }
                                     }
                                 }
-
-                                running.set(false);
-                                timer.cancel();
-                                passed.set(true);
                             }
+
+                        } catch (Throwable e) {
+                            //Ignore
                         }
+                    }
+
+                    System.out.println("Exit MulticastPulse client thread");
+                }
+            }));
+        }
+
+        //Pulse the server - It is thread safe to use same sockets as send/receive synchronization
is only on the packet
+        for (final MulticastSocket socket : clientSockets) {
+            try {
+                socket.send(request);
+            } catch (Throwable e) {
+                //Ignore
+            }
+        }
+
+        //Kill the threads after timeout
+        timer.schedule(new TimerTask() {
+            @Override
+            public void run() {
+
+                running.set(false);
+
+                for (Future future : futures) {
+                    try {
+                        future.cancel(true);
+                    } catch (Throwable e) {
+                        //Ignore
+                    }
+                }
 
+                for (final MulticastSocket socket : clientSockets) {
+
+                    try {
+                        socket.leaveGroup(ia);
+                    } catch (Throwable e) {
+                        //Ignore
+                    }
+                    try {
+                        socket.close();
                     } catch (Throwable e) {
                         //Ignore
                     }
                 }
+            }
+        }, 1500);
 
-                System.out.println("Exit MulticastPulse client thread");
+        //Wait for threads to complete
+        for (final Future future : futures) {
+            try {
+                future.get();
+            } catch (Throwable e) {
+                //Ignore
             }
-        };
-        t.setDaemon(true);
-        t.start();
-
-        if (running.get()) {
-            //Kill the thread after timeout
-            timer.schedule(new TimerTask() {
-                @Override
-                public void run() {
-                    running.set(false);
-                    client.close();
-                    t.interrupt();
-                    System.out.println("Interrupted MultiPulse client");
-                }
-            }, 1000);
         }
 
-        //Pulse the server
-        final MulticastSocket ms = MulticastPulseAgent.getSockets(host, port)[0];
-        ms.send(dp);
-
-        //Wait for thread to die
-        t.join();
-
-        agent.stop();
+        futures.clear();
 
-        for (URI uri : set) {
+        for (final URI uri : set) {
             System.out.println(uri.toASCIIString());
         }
 
-        org.junit.Assert.assertTrue(passed.get());
+        org.junit.Assert.assertTrue(set.size() > 0);
     }
 
     private String ipFormat(final String h) throws UnknownHostException {



Mime
View raw message