tomee-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andygumbre...@apache.org
Subject svn commit: r1357572 - in /openejb/trunk/openejb/server: openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
Date Thu, 05 Jul 2012 12:18:35 GMT
Author: andygumbrecht
Date: Thu Jul  5 12:18:35 2012
New Revision: 1357572

URL: http://svn.apache.org/viewvc?rev=1357572&view=rev
Log:
Make multipulse work on multihomed client and server, out of the box.

Background:
If the network interface metrics are configured automatically on multihomed systems then an
arbitrary interface is chosen for multicast broadcast. Manually changing the OS metric and/or
programmatically defining the MulticastSocket interface will alleviate the issue, but still
requires some kind of user intervention and/or configuration. It is virtually impossible to
'guess' which interfaces both client and server will broadcast on so the only solution is
to broadcast on all valid interfaces. If both client 'and' server are multihomed then the
issue is compounded.

Modified:
    openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
    openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java

Modified: openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java?rev=1357572&r1=1357571&r2=1357572&view=diff
==============================================================================
--- openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
(original)
+++ openejb/trunk/openejb/server/openejb-client/src/main/java/org/apache/openejb/client/MulticastPulseClient.java
Thu Jul  5 12:18:35 2012
@@ -3,15 +3,10 @@ package org.apache.openejb.client;
 import java.io.IOException;
 import java.net.*;
 import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeSet;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -35,8 +30,11 @@ public class MulticastPulseClient extend
     private static final String SERVER = "OpenEJB.MCP.Server:";
     private static final String CLIENT = "OpenEJB.MCP.Client:";
     private static final String EMPTY = "NoService";
-
+    private static final Charset UTF8 = Charset.forName("UTF-8");
+    private static final int TTL = Integer.parseInt(System.getProperty("org.apache.openejb.multipulse.ttl",
"32"));
     private static final Set<URI> badUri = new HashSet<URI>();
+    private static final NetworkInterface[] interfaces = getNetworkInterfaces();
+    private static final ExecutorService executor = Executors.newFixedThreadPool(interfaces.length
+ 1);
 
     /**
      * @param uri Connection URI
@@ -73,7 +71,7 @@ public class MulticastPulseClient extend
             try {
                 //Strip serverhost and group and try to connect
                 return ConnectionManager.getConnection(URI.create(URI.create(serviceURI.getSchemeSpecificPart()).getSchemeSpecificPart()));
-            } catch (IOException e) {
+            } catch (Throwable e) {
                 badUri.add(serviceURI);
             }
         }
@@ -102,7 +100,7 @@ public class MulticastPulseClient extend
      * @return A URI set, possibly empty
      * @throws Exception On error
      */
-    public static synchronized Set<URI> discoverURIs(final String forGroup, final Set<String>
schemes, final String host, final int port, long timeout) throws Exception {
+    public static Set<URI> discoverURIs(final String forGroup, final Set<String>
schemes, final String host, final int port, long timeout) throws Exception {
 
         if (timeout < 50) {
             timeout = 50;
@@ -136,12 +134,12 @@ public class MulticastPulseClient extend
             throw new Exception(host + " is not a valid multicast address");
         }
 
-        final byte[] bytes = (MulticastPulseClient.CLIENT + forGroup).getBytes(Charset.forName("utf8"));
+        final byte[] bytes = (MulticastPulseClient.CLIENT + forGroup).getBytes(UTF8);
         final DatagramPacket request = new DatagramPacket(bytes, bytes.length, new InetSocketAddress(ia,
port));
 
 
         final AtomicBoolean running = new AtomicBoolean(true);
-        final MulticastSocket client = MulticastPulseClient.getSocket(ia, port);
+        final MulticastSocket[] clientSockets = MulticastPulseClient.getSockets(ia, port);
         final Timer timer = new Timer(true);
 
         final Set<URI> set = new TreeSet<URI>(new Comparator<URI>() {
@@ -174,132 +172,152 @@ public class MulticastPulseClient extend
             }
         });
 
-        //Start a thread that listens for multicast packets on our channel.
-        //This needs to start 'before' we pulse a request.
-        final Thread t = new Thread() {
-            @Override
-            public void run() {
+        //Start threads that listen for multicast packets on our channel.
+        //These need to start 'before' we pulse a request.
+        final ArrayList<Future> futures = new ArrayList<Future>();
 
-                final DatagramPacket response = new DatagramPacket(new byte[2048], 2048);
+        for (final MulticastSocket socket : clientSockets) {
 
-                while (running.get()) {
-                    try {
+            futures.add(executor.submit(new Runnable() {
+                @Override
+                public void run() {
 
-                        client.receive(response);
+                    final DatagramPacket response = new DatagramPacket(new byte[2048], 2048);
 
-                        final SocketAddress sa = response.getSocketAddress();
+                    while (running.get()) {
+                        try {
 
-                        if (null != sa && (sa instanceof InetSocketAddress)) {
+                            socket.receive(response);
 
-                            int len = response.getLength();
-                            if (len > 2048) {
-                                len = 2048;
-                            }
-
-                            String s = new String(response.getData(), 0, len);
+                            final SocketAddress sa = response.getSocketAddress();
 
-                            if (s.startsWith(MulticastPulseClient.SERVER)) {
+                            if (null != sa && (sa instanceof InetSocketAddress))
{
 
-                                s = (s.replace(MulticastPulseClient.SERVER, ""));
-                                final String group = s.substring(0, s.indexOf(':'));
-                                s = s.substring(group.length() + 1);
-
-                                if (!"*".equals(forGroup) && !forGroup.equals(group))
{
-                                    continue;
+                                int len = response.getLength();
+                                if (len > 2048) {
+                                    len = 2048;
                                 }
 
-                                final String services = s.substring(0, s.indexOf('|'));
-                                s = s.substring(services.length() + 1);
+                                String s = new String(response.getData(), 0, len);
 
-                                final String[] serviceList = services.split("\\|");
-                                final String[] hosts = s.split(",");
+                                if (s.startsWith(MulticastPulseClient.SERVER)) {
 
-                                for (String svc : serviceList) {
+                                    s = (s.replace(MulticastPulseClient.SERVER, ""));
+                                    final String group = s.substring(0, s.indexOf(':'));
+                                    s = s.substring(group.length() + 1);
 
-                                    if (EMPTY.equals(svc)) {
+                                    if (!"*".equals(forGroup) && !forGroup.equals(group))
{
                                         continue;
                                     }
 
-                                    final URI serviceUri;
-                                    try {
-                                        serviceUri = URI.create(svc);
-                                    } catch (Throwable e) {
-                                        continue;
-                                    }
+                                    final String services = s.substring(0, s.indexOf('|'));
+                                    s = s.substring(services.length() + 1);
 
-                                    if (schemes.contains(serviceUri.getScheme())) {
+                                    final String[] serviceList = services.split("\\|");
+                                    final String[] hosts = s.split(",");
 
-                                        //Just because multicast was received on this host
is does not mean the service is on the same
-                                        //We can however use this to identify an individual
machine and group
-                                        final String serverHost = ((InetSocketAddress) response.getSocketAddress()).getAddress().getHostAddress();
-
-                                        final String serviceHost = serviceUri.getHost();
-                                        if (MulticastPulseClient.isLocalAddress(serviceHost,
false)) {
-                                            if (!MulticastPulseClient.isLocalAddress(serverHost,
false)) {
-                                                //A local service is only available to a
local client
-                                                continue;
-                                            }
-                                        }
+                                    for (String svc : serviceList) {
 
-                                        svc = ("mp-" + serverHost + ":" + group + ":" + svc);
+                                        if (EMPTY.equals(svc)) {
+                                            continue;
+                                        }
 
+                                        final URI serviceUri;
                                         try {
-                                            if (svc.contains("0.0.0.0")) {
-                                                for (final String h : hosts) {
-                                                    set.add(URI.create(svc.replace("0.0.0.0",
ipFormat(h))));
+                                            serviceUri = URI.create(svc);
+                                        } catch (Throwable e) {
+                                            continue;
+                                        }
+
+                                        if (schemes.contains(serviceUri.getScheme())) {
+
+                                            //Just because multicast was received on this
host is does not mean the service is on the same
+                                            //We can however use this to identify an individual
machine and group
+                                            final String serverHost = ((InetSocketAddress)
response.getSocketAddress()).getAddress().getHostAddress();
+
+                                            final String serviceHost = serviceUri.getHost();
+                                            if (MulticastPulseClient.isLocalAddress(serviceHost,
false)) {
+                                                if (!MulticastPulseClient.isLocalAddress(serverHost,
false)) {
+                                                    //A local service is only available to
a local client
+                                                    continue;
                                                 }
-                                            } else if (svc.contains("[::]")) {
-                                                for (final String h : hosts) {
-                                                    set.add(URI.create(svc.replace("[::]",
ipFormat(h))));
+                                            }
+
+                                            svc = ("mp-" + serverHost + ":" + group + ":"
+ svc);
+
+                                            try {
+                                                if (svc.contains("0.0.0.0")) {
+                                                    for (final String h : hosts) {
+                                                        set.add(URI.create(svc.replace("0.0.0.0",
ipFormat(h))));
+                                                    }
+                                                } else if (svc.contains("[::]")) {
+                                                    for (final String h : hosts) {
+                                                        set.add(URI.create(svc.replace("[::]",
ipFormat(h))));
+                                                    }
+                                                } else {
+                                                    //Just add as is
+                                                    set.add(URI.create(svc));
                                                 }
-                                            } else {
-                                                //Just add as is
-                                                set.add(URI.create(svc));
+                                            } catch (Throwable e) {
+                                                //Ignore
                                             }
-                                        } catch (Throwable e) {
-                                            //Ignore
                                         }
                                     }
                                 }
                             }
-                        }
 
-                    } catch (Throwable e) {
-                        //Ignore
+                        } catch (Throwable e) {
+                            //Ignore
+                        }
                     }
                 }
+            }));
+        }
+
+        //Pulse the server - It is thread safe to use same sockets as send/receive synchronization
is only on the packet
+        for (final MulticastSocket socket : clientSockets) {
+            try {
+                socket.send(request);
+            } catch (Throwable e) {
+                //Ignore
             }
-        };
-        t.setDaemon(true);
-        t.start();
+        }
 
-        //Kill the thread after timeout
+        //Kill the threads after timeout
         timer.schedule(new TimerTask() {
             @Override
             public void run() {
 
                 running.set(false);
 
-                try {
-                    client.leaveGroup(ia);
-                } catch (Throwable e) {
-                    //Ignore
+                for (Future future : futures) {
+                    future.cancel(true);
                 }
-                try {
-                    client.close();
-                } catch (Throwable e) {
-                    //Ignore
+
+                for (final MulticastSocket socket : clientSockets) {
+
+                    try {
+                        socket.leaveGroup(ia);
+                    } catch (Throwable e) {
+                        //Ignore
+                    }
+                    try {
+                        socket.close();
+                    } catch (Throwable e) {
+                        //Ignore
+                    }
                 }
-                t.interrupt();
             }
         }, timeout);
 
-        //Pulse the server
-        final MulticastSocket ms = MulticastPulseClient.getSocket(ia, port);
-        ms.send(request);
-
-        //Wait for thread to complete
-        t.join();
+        //Wait for threads to complete
+        for (final Future future : futures) {
+            try {
+                future.get();
+            } catch (Throwable e) {
+                //Ignore
+            }
+        }
 
         return set;
     }
@@ -343,34 +361,69 @@ public class MulticastPulseClient extend
         }
     }
 
-    public static MulticastSocket getSocket(final InetAddress ia, final int port) throws
Exception {
+    public static MulticastSocket[] getSockets(final InetAddress ia, final int port) throws
Exception {
 
-        MulticastSocket ms = null;
+        final ArrayList<MulticastSocket> list = new ArrayList<MulticastSocket>();
 
-        try {
-            ms = new MulticastSocket(port);
-            final NetworkInterface ni = NetworkInterface.getByInetAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()));
-            ms.setNetworkInterface(ni);
-            ms.setSoTimeout(0);
-            if (!ms.getBroadcast()) {
-                ms.setBroadcast(true);
-            }
-            ms.joinGroup(ia);
+        for (final NetworkInterface ni : interfaces) {
+
+            MulticastSocket ms = null;
+
+            try {
+
+                ms = new MulticastSocket(port);
+                ms.setNetworkInterface(ni);
+                ms.setSoTimeout(0);
+                ms.setTimeToLive(TTL);
+                if (!ms.getBroadcast()) {
+                    ms.setBroadcast(true);
+                }
+                ms.joinGroup(ia);
+
+                list.add(ms);
 
-        } catch (Throwable e) {
+            } catch (Throwable e) {
 
-            if (null != ms) {
-                try {
-                    ms.close();
-                } catch (Throwable t) {
-                    //Ignore
+                if (null != ms) {
+                    try {
+                        ms.close();
+                    } catch (Throwable t) {
+                        //Ignore
+                    }
                 }
+
             }
+        }
+
+        return list.toArray(new MulticastSocket[list.size()]);
+    }
 
-            throw new Exception("Failed to create a MultiPulse socket", e);
+    private static NetworkInterface[] getNetworkInterfaces() {
+
+        final HashSet<NetworkInterface> list = new HashSet<NetworkInterface>();
+
+        try {
+            final Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
+            while (interfaces.hasMoreElements()) {
+                final NetworkInterface next = interfaces.nextElement();
+
+                if (next.supportsMulticast() && next.isUp()) {
+
+                    final Enumeration<InetAddress> addresses = next.getInetAddresses();
+                    while (addresses.hasMoreElements()) {
+                        final InetAddress address = addresses.nextElement();
+                        if (address.isSiteLocalAddress()) {
+                            list.add(next);
+                            break;
+                        }
+                    }
+                }
+            }
+        } catch (SocketException e) {
+            //Ignore
         }
 
-        return ms;
+        return list.toArray(new NetworkInterface[list.size()]);
     }
 
     private static final CommandParser cmd = new CommandParser() {

Modified: openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java?rev=1357572&r1=1357571&r2=1357572&view=diff
==============================================================================
--- openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
(original)
+++ openejb/trunk/openejb/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastPulseAgent.java
Thu Jul  5 12:18:35 2012
@@ -15,11 +15,14 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.*;
 import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -41,7 +44,10 @@ import java.util.concurrent.atomic.Atomi
 public class MulticastPulseAgent implements DiscoveryAgent, ServerService, SelfManaging {
 
     private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery").createChild("multipulse"),
MulticastPulseAgent.class);
-    private static final Executor executor = Executors.newSingleThreadExecutor();
+    private static final NetworkInterface[] interfaces = getNetworkInterfaces();
+    private static final ExecutorService executor = Executors.newFixedThreadPool(interfaces.length
+ 1);
+    private static final Charset UTF8 = Charset.forName("UTF-8");
+    private static final int TTL = Integer.parseInt(System.getProperty("org.apache.openejb.multipulse.ttl",
"32"));
 
     public static final String SERVER = "OpenEJB.MCP.Server:";
     public static final String CLIENT = "OpenEJB.MCP.Client:";
@@ -49,8 +55,8 @@ public class MulticastPulseAgent impleme
 
     private final Set<URI> uriSet = new HashSet<URI>();
     private AtomicBoolean running = new AtomicBoolean(false);
-    private Thread listenerThread = null;
-    private MulticastSocket socket = null;
+    final ArrayList<Future> futures = new ArrayList<Future>();
+    private MulticastSocket[] sockets = null;
     private InetSocketAddress address = null;
 
     private String multicast = "239.255.3.2";
@@ -112,7 +118,7 @@ public class MulticastPulseAgent impleme
 
         sb.append(hosts);
 
-        final byte[] bytes = (sb.toString()).getBytes(Charset.forName("utf8"));
+        final byte[] bytes = (sb.toString()).getBytes(UTF8);
         this.response = new DatagramPacket(bytes, bytes.length, this.address);
 
         log.debug("MultiPulse packet is: " + sb);
@@ -182,64 +188,68 @@ public class MulticastPulseAgent impleme
     public void start() throws ServiceException {
         if (!this.running.getAndSet(true)) {
 
-            this.socket = getSocket(this.multicast, this.port);
-            final MulticastSocket ms = this.socket;
+            try {
+                this.sockets = getSockets(this.multicast, this.port);
+            } catch (Exception e) {
+                throw new ServiceException("Failed to get Multicast sockets", e);
+            }
 
-            this.listenerThread = new Thread(new Runnable() {
-                @Override
-                public void run() {
+            for (final MulticastSocket socket : this.sockets) {
 
-                    final DatagramPacket request = new DatagramPacket(new byte[2048], 2048);
+                this.futures.add(executor.submit(new Runnable() {
+                    @Override
+                    public void run() {
 
-                    while (MulticastPulseAgent.this.running.get()) {
+                        final DatagramPacket request = new DatagramPacket(new byte[2048],
2048);
 
-                        try {
-                            ms.receive(request);
-                            final SocketAddress sa = request.getSocketAddress();
+                        while (MulticastPulseAgent.this.running.get()) {
 
-                            if (null != sa) {
+                            try {
+                                socket.receive(request);
+                                final SocketAddress sa = request.getSocketAddress();
 
-                                String s = new String(request.getData(), 0, request.getLength());
+                                if (null != sa) {
 
-                                if (s.startsWith(CLIENT)) {
+                                    String s = new String(request.getData(), 0, request.getLength());
 
-                                    s = (s.replace(CLIENT, ""));
+                                    if (s.startsWith(CLIENT)) {
 
-                                    final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
+                                        s = (s.replace(CLIENT, ""));
 
-                                    if (MulticastPulseAgent.this.group.equals(s) || "*".equals(s))
{
+                                        final String client = ((InetSocketAddress) sa).getAddress().getHostAddress();
 
-                                        if (MulticastPulseAgent.this.loopbackOnly) {
-                                            //We only have local services, so make sure the
request is from a local source else ignore it
-                                            if (!MulticastPulseAgent.isLocalAddress(client,
false)) {
-                                                log.debug(String.format("Ignoring remote
client %1$s pulse request for group: %2$s - No remote services available", client, s));
-                                                continue;
+                                        if (MulticastPulseAgent.this.group.equals(s) || "*".equals(s))
{
+
+                                            if (MulticastPulseAgent.this.loopbackOnly) {
+                                                //We only have local services, so make sure
the request is from a local source else ignore it
+                                                if (!MulticastPulseAgent.isLocalAddress(client,
false)) {
+                                                    log.debug(String.format("Ignoring remote
client %1$s pulse request for group: %2$s - No remote services available"
+                                                            , client, s));
+                                                    continue;
+                                                }
                                             }
-                                        }
 
-                                        log.debug(String.format("Answering client %1$s pulse
request for group: %2$s", client, s));
-                                        ms.send(MulticastPulseAgent.this.response);
-                                    } else {
-                                        log.debug(String.format("Ignoring client %1$s pulse
request for group: %2$s", client, s));
+                                            log.debug(String.format("Answering client %1$s
pulse request for group: %2$s", client, s));
+                                            socket.send(MulticastPulseAgent.this.response);
+                                        } else {
+                                            log.debug(String.format("Ignoring client %1$s
pulse request for group: %2$s", client, s));
+                                        }
                                     }
                                 }
+
+                            } catch (Throwable e) {
+                                //Ignore
                             }
+                        }
 
+                        try {
+                            socket.close();
                         } catch (Throwable e) {
                             //Ignore
                         }
                     }
-
-                    try {
-                        ms.close();
-                    } catch (Throwable e) {
-                        //Ignore
-                    }
-                }
-            }, "MultiPulse Listener");
-
-            this.listenerThread.setDaemon(true);
-            this.listenerThread.start();
+                }));
+            }
         }
     }
 
@@ -247,24 +257,39 @@ public class MulticastPulseAgent impleme
     public void stop() throws ServiceException {
         if (this.running.getAndSet(false)) {
 
-            if (null != this.listenerThread) {
-
-                this.listenerThread.interrupt();
+            try {
+                //Iterrupt threads
+                for (final Future future : this.futures) {
+                    try {
+                        future.cancel(true);
+                    } catch (Throwable e) {
+                        //Ignore
+                    }
+                }
 
-                try {
-                    listenerThread.join(5000);
-                } catch (InterruptedException e) {
-                    //Ignore
+                //Wait for threads to complete
+                for (final Future future : this.futures) {
+                    try {
+                        future.get();
+                    } catch (Throwable e) {
+                        //Ignore
+                    }
                 }
+            } finally {
+                this.futures.clear();
             }
 
-            if (null != this.socket) {
+            if (null != this.sockets) {
                 try {
-                    this.socket.close();
-                } catch (Throwable e) {
-                    //Ignore
+                    for (final MulticastSocket s : sockets) {
+                        try {
+                            s.close();
+                        } catch (Throwable e) {
+                            //Ignore
+                        }
+                    }
                 } finally {
-                    this.socket = null;
+                    this.sockets = null;
                 }
             }
         }
@@ -295,7 +320,7 @@ public class MulticastPulseAgent impleme
         return this.port;
     }
 
-    public static MulticastSocket getSocket(final String multicastAddress, final int port)
throws ServiceException {
+    public static MulticastSocket[] getSockets(final String multicastAddress, final int port)
throws Exception {
 
         final InetAddress ia;
 
@@ -309,35 +334,74 @@ public class MulticastPulseAgent impleme
             throw new ServiceException(multicastAddress + " is not a valid multicast address");
         }
 
-        MulticastSocket ms = null;
+        return getSockets(ia, port);
+    }
 
-        try {
-            ms = new MulticastSocket(port);
-            final NetworkInterface ni = NetworkInterface.getByInetAddress(InetAddress.getByName(InetAddress.getLocalHost().getHostName()));
-            ms.setNetworkInterface(ni);
-            ms.setSoTimeout(0);
-            if (!ms.getBroadcast()) {
-                ms.setBroadcast(true);
-            }
-            ms.joinGroup(ia);
+    private static MulticastSocket[] getSockets(final InetAddress ia, final int port) throws
Exception {
 
-            log.debug(String.format("Created MulticastSocket for '%1$s:%2$s' on network adapter:
%3$s", multicastAddress, port, ni));
+        final ArrayList<MulticastSocket> list = new ArrayList<MulticastSocket>();
 
-        } catch (Throwable e) {
-            log.error("getSocket", e);
+        for (final NetworkInterface ni : interfaces) {
 
-            if (null != ms) {
-                try {
-                    ms.close();
-                } catch (Throwable t) {
-                    //Ignore
+            MulticastSocket ms = null;
+
+            try {
+
+                ms = new MulticastSocket(port);
+                ms.setNetworkInterface(ni);
+                ms.setSoTimeout(0);
+                ms.setTimeToLive(TTL);
+                if (!ms.getBroadcast()) {
+                    ms.setBroadcast(true);
                 }
+                ms.joinGroup(ia);
+
+                list.add(ms);
+
+                log.debug(String.format("Created MulticastSocket for '%1$s:%2$s' on network
adapter: %3$s", ia.getHostName(), port, ni));
+
+            } catch (Throwable e) {
+
+                if (null != ms) {
+                    try {
+                        ms.close();
+                    } catch (Throwable t) {
+                        //Ignore
+                    }
+                }
+
             }
+        }
+
+        return list.toArray(new MulticastSocket[list.size()]);
+    }
+
+    private static NetworkInterface[] getNetworkInterfaces() {
 
-            throw new ServiceException("Failed to create a multicast socket", e);
+        final HashSet<NetworkInterface> list = new HashSet<NetworkInterface>();
+
+        try {
+            final Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
+            while (interfaces.hasMoreElements()) {
+                final NetworkInterface next = interfaces.nextElement();
+
+                if (next.supportsMulticast() && next.isUp()) {
+
+                    final Enumeration<InetAddress> addresses = next.getInetAddresses();
+                    while (addresses.hasMoreElements()) {
+                        final InetAddress address = addresses.nextElement();
+                        if (address.isSiteLocalAddress()) {
+                            list.add(next);
+                            break;
+                        }
+                    }
+                }
+            }
+        } catch (SocketException e) {
+            //Ignore
         }
 
-        return ms;
+        return list.toArray(new NetworkInterface[list.size()]);
     }
 
     public static boolean isLoopback(final String host) {



Mime
View raw message