openjpa-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From p..@apache.org
Subject svn commit: r640685 [5/14] - in /openjpa/trunk: ./ openjpa-all/ openjpa-jdbc-5/ openjpa-jdbc/ openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/kernel/ openjpa-jdbc/src/main/java/org/apache/openjpa/jdbc/meta/ openjpa-jdbc/src/main/java/org/apache/open...
Date Tue, 25 Mar 2008 03:38:02 GMT
Modified: openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java?rev=640685&r1=640684&r2=640685&view=diff
==============================================================================
--- openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java (original)
+++ openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TCPRemoteCommitProvider.java Mon Mar 24 20:37:56 2008
@@ -1,948 +1,948 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.    
- */
-package org.apache.openjpa.event;
-
-import java.io.BufferedInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.security.AccessController;
-import java.security.PrivilegedActionException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.Set;
-import java.util.List;
-import java.util.Collections;
-
-import org.apache.commons.pool.PoolableObjectFactory;
-import org.apache.commons.pool.impl.GenericObjectPool;
-import org.apache.openjpa.lib.conf.Configurable;
-import org.apache.openjpa.lib.log.Log;
-import org.apache.openjpa.lib.util.J2DoPrivHelper;
-import org.apache.openjpa.lib.util.Localizer;
-import org.apache.openjpa.util.GeneralException;
-import org.apache.openjpa.util.InternalException;
-import org.apache.openjpa.util.Serialization;
-import org.apache.openjpa.lib.util.concurrent.ReentrantLock;
-
-import serp.util.Strings;
-
-/**
- * TCP-based implementation of {@link RemoteCommitProvider} that
- * listens for object modifications and propagates those changes to
- * other RemoteCommitProviders over TCP sockets.
- *
- * @author Brian Leair
- * @author Patrick Linskey
- * @since 0.2.5.0
- */
-public class TCPRemoteCommitProvider
-    extends AbstractRemoteCommitProvider
-    implements Configurable {
-
-    private static final int DEFAULT_PORT = 5636;
-
-    private static final Localizer s_loc = Localizer.forPackage
-        (TCPRemoteCommitProvider.class);
-    private static long s_idSequence = System.currentTimeMillis();
-
-    //	A map of listen ports to listeners in this JVM. We might
-    //	want to look into allowing same port, different interface --
-    //	that is not currently possible in a single JVM.
-    private static final Map s_portListenerMap = new HashMap();
-
-    private long _id;
-    private byte[] _localhost;
-    private int _port = DEFAULT_PORT;
-    private int _maxActive = 2;
-    private int _maxIdle = 2;
-    private int _recoveryTimeMillis = 15000;
-    private TCPPortListener _listener;
-    private BroadcastQueue _broadcastQueue = new BroadcastQueue();
-    private final List _broadcastThreads = Collections.synchronizedList(
-        new LinkedList());
-
-    private ArrayList _addresses = new ArrayList();
-    private ReentrantLock _addressesLock;
-
-    public TCPRemoteCommitProvider()
-        throws UnknownHostException {
-        // obtain a unique ID.
-        synchronized (TCPRemoteCommitProvider.class) {
-            _id = s_idSequence++;
-        }
-
-        // cache the local IP address.
-        _localhost = InetAddress.getLocalHost().getAddress();
-        _addressesLock = new ReentrantLock();
-        setNumBroadcastThreads(2);
-    }
-
-    /**
-     * The port that this provider should listen on.
-     */
-    public int getPort() {
-        return _port;
-    }
-
-    /**
-     * The port that this provider should listen on. Set once only.
-     */
-    public void setPort(int port) {
-        _port = port;
-    }
-
-    /**
-     * The number of milliseconds to wait before retrying
-     * to reconnect to a peer after it becomes unreachable.
-     */
-    public void setRecoveryTimeMillis(int recoverytime) {
-        _recoveryTimeMillis = recoverytime;
-    }
-
-    /**
-     * The number of milliseconds to wait before retrying
-     * to reconnect to a peer after it becomes unreachable.
-     */
-    public int getRecoveryTimeMillis() {
-        return _recoveryTimeMillis;
-    }
-
-    /**
-     * The maximum number of sockets that this provider can
-     * simetaneously open to each peer in the cluster.
-     */
-    public void setMaxActive(int maxActive) {
-        _maxActive = maxActive;
-    }
-
-    /**
-     * The maximum number of sockets that this provider can
-     * simetaneously open to each peer in the cluster.
-     */
-    public int getMaxActive() {
-        return _maxActive;
-    }
-
-    /**
-     * The number of idle sockets that this provider can keep open
-     * to each peer in the cluster.
-     */
-    public void setMaxIdle(int maxIdle) {
-        _maxIdle = maxIdle;
-    }
-
-    /**
-     * The number of idle sockets that this provider can keep open
-     * to each peer in the cluster.
-     */
-    public int getMaxIdle() {
-        return _maxIdle;
-    }
-
-    /**
-     * The number of worker threads that are used for
-     * transmitting packets to peers in the cluster.
-     */
-    public void setNumBroadcastThreads(int numBroadcastThreads) {
-        synchronized (_broadcastThreads) {
-            int cur = _broadcastThreads.size();
-            if (cur > numBroadcastThreads) {
-                // Notify the extra worker threads so they stop themselves
-                // Threads will not end until they send another pk.
-                for (int i = numBroadcastThreads; i < cur; i++) {
-                    BroadcastWorkerThread worker = (BroadcastWorkerThread)
-                        _broadcastThreads.remove(0);
-                    worker.setRunning(false);
-                }
-            } else if (cur < numBroadcastThreads) {
-                // Create additional worker threads
-                for (int i = cur; i < numBroadcastThreads; i++) {
-                    BroadcastWorkerThread wt = new BroadcastWorkerThread();
-                    wt.setDaemon(true);
-                    wt.start();
-                    _broadcastThreads.add(wt);
-                }
-            }
-        }
-    }
-
-    /**
-     * The number of worker threads that are used for
-     * transmitting packets to peers in the cluster.
-     */
-    public int getNumBroadcastThreads() {
-        return _broadcastThreads.size();
-    }
-
-    /**
-     * Sets the list of addresses of peers to which this provider will
-     * send events to. The peers are semicolon-separated <code>names</code>
-     * list in the form of "myhost1:portA;myhost2:portB".
-     */
-    public void setAddresses(String names)
-        throws UnknownHostException {
-        // NYI. Could look for equivalence of addresses and avoid
-        // changing those that didn't change.
-
-        _addressesLock.lock();
-        try {
-            for (Iterator iter = _addresses.iterator(); iter.hasNext();) {
-                ((HostAddress) iter.next()).close();
-            }
-            String[] toks = Strings.split(names, ";", 0);
-            _addresses = new ArrayList(toks.length);
-
-            InetAddress localhost = InetAddress.getLocalHost();
-            String localhostName = localhost.getHostName();
-
-            for (int i = 0; i < toks.length; i++) {
-                String host = toks[i];
-                String hostname;
-                int tmpPort;
-                int colon = host.indexOf(':');
-                if (colon != -1) {
-                    hostname = host.substring(0, colon);
-                    tmpPort = Integer.parseInt(host.substring(colon + 1));
-                } else {
-                    hostname = host;
-                    tmpPort = DEFAULT_PORT;
-                }
-                InetAddress tmpAddress = (InetAddress) AccessController
-                    .doPrivileged(J2DoPrivHelper.getByNameAction(hostname)); 
-
-                // bleair: For each address we would rather make use of
-                // the jdk1.4 isLinkLocalAddress () || isLoopbackAddress ().
-                // (Though in practice on win32 they don't work anyways!)
-                // Instead we will check hostname. Not perfect, but
-                // it will match often enough (people will typically
-                // use the DNS machine names and be cutting/pasting.)
-                if (localhostName.equals(hostname)) {
-                    // This string matches the hostname for for ourselves, we
-                    // don't actually need to send ourselves messages.
-                    if (log.isTraceEnabled()) {
-                        log.trace(s_loc.get("tcp-address-asself",
-                            tmpAddress.getHostName() + ":" + tmpPort));
-                    }
-                } else {
-                    HostAddress newAddress = new HostAddress(host);
-                    _addresses.add(newAddress);
-                    if (log.isTraceEnabled()) {
-                        log.trace(s_loc.get("tcp-address-set",
-                            newAddress._address.getHostName() + ":"
-                                + newAddress._port));
-                    }
-                }
-            }
-        } catch (PrivilegedActionException pae) {
-            throw (UnknownHostException) pae.getException();
-        } finally {
-            _addressesLock.unlock();
-        }
-    }
-
-    // ---------- Configurable implementation ----------
-
-    /**
-     * Subclasses that need to perform actions in
-     * {@link Configurable#endConfiguration} must invoke this method.
-     */
-    public void endConfiguration() {
-        super.endConfiguration();
-        synchronized (s_portListenerMap) {
-            // see if a listener exists for this port.
-            _listener = (TCPPortListener) s_portListenerMap.get
-                (String.valueOf(_port));
-
-            if (_listener == null ||
-                (!_listener.isRunning() && _listener._port == _port)) {
-                try {
-                    _listener = new TCPPortListener(_port, log);
-                    _listener.listen();
-                    s_portListenerMap.put(String.valueOf(_port), _listener);
-                } catch (Exception e) {
-                    throw new GeneralException(s_loc.get("tcp-init-exception",
-                        String.valueOf(_port)), e).setFatal(true);
-                }
-            } else if (_listener.isRunning()) {
-                if (_listener._port != _port) {
-                    // this really shouldn't be able to happen.
-                    throw new GeneralException(s_loc.get
-                        ("tcp-not-equal", String.valueOf(_port))).
-                        setFatal(true);
-                }
-            } else
-                throw new InternalException(s_loc.get("tcp-listener-broken"));
-            _listener.addProvider(this);
-        }
-
-        _addressesLock.lock();
-        try {
-            HostAddress curAddress;
-            for (Iterator iter = _addresses.iterator();
-                iter.hasNext();) {
-                curAddress = (HostAddress) iter.next();
-                curAddress.setMaxActive(_maxActive);
-                curAddress.setMaxIdle(_maxIdle);
-            }
-        }
-        finally {
-            _addressesLock.unlock();
-        }
-    }
-
-    // ---------- RemoteCommitProvider implementation ----------
-
-    // pre 3.3.4	= <no version number transmitted>
-    // 3.3 Preview 	= 0x1428acfd;
-    // 3.4 			= 0x1428acff;
-    private static final long PROTOCOL_VERSION = 0x1428acff;
-
-    public void broadcast(RemoteCommitEvent event) {
-        try {
-            // build a packet notifying other JVMs of object changes.
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(baos);
-
-            oos.writeLong(PROTOCOL_VERSION);
-            oos.writeLong(_id);
-            oos.writeInt(_port);
-            oos.writeObject(_localhost);
-            oos.writeObject(event);
-            oos.flush();
-
-            byte[] bytes = baos.toByteArray();
-            baos.close();
-            if (_broadcastThreads.isEmpty())
-                sendUpdatePacket(bytes);
-            else
-                _broadcastQueue.addPacket(bytes);
-        } catch (IOException ioe) {
-            if (log.isWarnEnabled())
-                log.warn(s_loc.get("tcp-payload-create-error"), ioe);
-        }
-    }
-
-    /**
-     * Sends a change notification packet to other machines in this
-     * provider cluster.
-     */
-    private void sendUpdatePacket(byte[] bytes) {
-        _addressesLock.lock();
-        try {
-            for (Iterator iter = _addresses.iterator(); iter.hasNext();)
-                ((HostAddress) iter.next()).sendUpdatePacket(bytes);
-        } finally {
-            _addressesLock.unlock();
-        }
-    }
-
-    public void close() {
-        if (_listener != null)
-            _listener.removeProvider(this);
-
-        // Remove Broadcast Threads then close sockets.
-        _broadcastQueue.close();
-
-        // Wait for _broadcastThreads to get cleaned up.
-        while(!_broadcastThreads.isEmpty()) {
-            try {
-                Thread.sleep(500);
-            } catch (InterruptedException ie) {
-                // Ignore.
-            }
-        }
-        
-        _addressesLock.lock();
-        try {
-            for (Iterator iter = _addresses.iterator(); iter.hasNext();)
-                ((HostAddress) iter.next()).close();
-        } finally {
-            _addressesLock.unlock();
-        }
-    }
-
-    /**
-     * Utility class to hold messages to be sent. This
-     * allows calls to broadcast () to return without
-     * waiting for the send to complete.
-     */
-    private static class BroadcastQueue {
-
-        private LinkedList _packetQueue = new LinkedList();
-        private boolean _closed = false;
-
-        public synchronized void close() {
-            _closed = true;
-            notifyAll();
-        }
-
-        public synchronized boolean isClosed() {
-            return _closed;
-        }
-
-        public synchronized void addPacket(byte[] bytes) {
-            _packetQueue.addLast(bytes);
-            notify();
-        }
-
-        /**
-         * @return the bytes defining the packet to process, or
-         * <code>null</code> if the queue is empty.
-         */
-        public synchronized byte[] removePacket()
-            throws InterruptedException {
-            // only wait if the queue is still open. This allows processing
-            // of events in the queue to continue, while avoiding sleeping
-            // during shutdown.
-            while (!_closed && _packetQueue.isEmpty())
-                wait();
-            if (_packetQueue.isEmpty())
-                return null;
-            else
-                return (byte[]) _packetQueue.removeFirst();
-        }
-    }
-
-    /**
-     * Threads to broadcast packets placed in the {@link BroadcastQueue}.
-     */
-    private class BroadcastWorkerThread
-        extends Thread {
-
-        private boolean _keepRunning = true;
-
-        public void run() {
-            while (_keepRunning) {
-                try {
-                    // This will block until there is a packet to send, or
-                    // until the queue is closed.
-                    byte[] bytes = _broadcastQueue.removePacket();
-                    if (bytes != null)
-                        sendUpdatePacket(bytes);
-                    else if (_broadcastQueue.isClosed())
-                        _keepRunning = false;
-                } catch (InterruptedException e) {
-                    // End the thread.
-                    break;
-                }
-            }
-            remove();
-        }
-
-        public void setRunning(boolean keepRunning) {
-            _keepRunning = keepRunning;
-        }
-        
-        private void remove() {
-            _broadcastThreads.remove(this);
-        }
-    }
-
-    /**
-     * Responsible for listening for incoming packets and processing them.
-     */
-    private static class TCPPortListener
-        implements Runnable {
-
-        private final Log _log;
-        private ServerSocket _receiveSocket;
-        private Thread _acceptThread;
-        private Set _receiverThreads = new HashSet();
-        private final Set _providers = new HashSet();
-
-        /**
-         * Cache the local IP address
-         */
-        private byte[] _localhost;
-
-        /**
-         * The port that this listener should listen on. Configured
-         * by TCPRemoteCommitProvider.
-         */
-        private int _port;
-
-        /**
-         * Should be set to <code>true</code> once the listener is listening.
-         */
-        private boolean _isRunning = false;
-
-        /**
-         * Construct a new TCPPortListener configured to use the specified port.
-         */
-        private TCPPortListener(int port, Log log)
-            throws IOException {
-            _port = port;
-            _log = log;
-            try {
-                _receiveSocket = (ServerSocket) AccessController
-                    .doPrivileged(J2DoPrivHelper.newServerSocketAction(_port));
-            } catch (PrivilegedActionException pae) {
-                throw (IOException) pae.getException();
-            }
-            _localhost = InetAddress.getLocalHost().getAddress();
-
-            if (_log.isTraceEnabled())
-                _log.info(s_loc.get("tcp-start-listener",
-                    String.valueOf(_port)));
-        }
-
-        private void listen() {
-            _acceptThread = new Thread(this);
-            _acceptThread.setDaemon(true);
-            _acceptThread.start();
-        }
-
-        /**
-         * All providers added here will be notified of any incoming
-         * provider messages. There will be one of these per
-         * BrokerFactory in a given JVM.
-         * {@link TCPRemoteCommitProvider#endConfiguration} invokes
-         * <code>addProvider</code> with <code>this</code> upon
-         * completion of configuration.
-         */
-        private void addProvider(TCPRemoteCommitProvider provider) {
-            synchronized (_providers) {
-                _providers.add(provider);
-            }
-        }
-
-        /**
-         * Remove a provider from the list of providers to notify of
-         * commit events.
-         */
-        private synchronized void removeProvider
-            (TCPRemoteCommitProvider provider) {
-            synchronized (_providers) {
-                _providers.remove(provider);
-
-                // if the provider list is empty, shut down the thread.
-                if (_providers.size() == 0) {
-                    _isRunning = false;
-                    try {
-                        _receiveSocket.close();
-                    } catch (IOException ioe) {
-                        if (_log.isWarnEnabled())
-                            _log.warn(s_loc.get("tcp-close-error"), ioe);
-                    }
-                    _acceptThread.interrupt();
-                }
-            }
-        }
-
-        private boolean isRunning() {
-            synchronized (_providers) {
-                return _isRunning;
-            }
-        }
-
-        public void run() {
-            synchronized (_providers) {
-                _isRunning = true;
-            }
-
-            Socket s = null;
-            while (_isRunning) {
-                try {
-                    s = null;
-                    // Block, waiting to accept new connection from a peer
-                    s = (Socket) AccessController.doPrivileged(J2DoPrivHelper
-                        .acceptAction(_receiveSocket));
-                    if (_log.isTraceEnabled()) {
-                        _log.trace(s_loc.get("tcp-received-connection",
-                            s.getInetAddress().getHostAddress()
-                                + ":" + s.getPort()));
-                    }
-                    ReceiveSocketHandler sh = new ReceiveSocketHandler(s);
-                    Thread receiverThread = new Thread(sh);
-                    receiverThread.setDaemon(true);
-                    receiverThread.start();
-                    _receiverThreads.add(receiverThread);
-                } catch (Exception e) {
-                    if (e instanceof PrivilegedActionException)
-                        e = ((PrivilegedActionException) e).getException();
-                    if (!(e instanceof SocketException) || _isRunning)
-                        if (_log.isWarnEnabled())
-                            _log.warn(s_loc.get("tcp-accept-error"), e);
-
-                    // Nominal case (InterruptedException) because close ()
-                    // calls _acceptThread.interrupt ();
-                    try {
-                        if (s != null)
-                            s.close();
-                    } catch (Exception ee) {
-                        if (_log.isWarnEnabled())
-                            _log.warn(s_loc.get("tcp-close-error"), e);
-                    }
-                }
-            }
-
-            // We are done listening. Interrupt any worker threads.
-            Thread worker;
-            for (Iterator iter = _receiverThreads.iterator();
-                iter.hasNext();) {
-                worker = (Thread) iter.next();
-                // FYI, the worker threads are blocked
-                // reading from the socket's InputStream. InputStreams
-                // aren't interruptable, so this interrupt isn't
-                // really going to be delivered until something breaks
-                // the InputStream.
-                worker.interrupt();
-            }
-            synchronized (_providers) {
-                try {
-                    if (_isRunning)
-                        _receiveSocket.close();
-                } catch (Exception e) {
-                    if (_log.isWarnEnabled())
-                        _log.warn(s_loc.get("tcp-close-error"), e);
-                }
-                _isRunning = false;
-                if (_log.isTraceEnabled())
-                    _log.trace(s_loc.get("tcp-close-listener",
-                        _port + ""));
-            }
-        }
-
-        /**
-         * Utility class that acts as a worker thread to receive Events
-         * from broadcasters.
-         */
-        private class ReceiveSocketHandler
-            implements Runnable {
-
-            private InputStream _in;
-            private Socket _s;
-
-            private ReceiveSocketHandler(Socket s) {
-                // We are the receiving end and we don't send any messages
-                // back to the broadcaster. Turn off Nagle's so that
-                // we will send ack packets without waiting.
-                _s = s;
-                try {
-                    _s.setTcpNoDelay(true);
-                    _in = new BufferedInputStream(s.getInputStream());
-                } catch (IOException ioe) {
-                    if (_log.isInfoEnabled())
-                        _log.info(s_loc.get("tcp-socket-option-error"), ioe);
-                    _s = null;
-                } catch (Exception e) {
-                    if (_log.isWarnEnabled())
-                        _log.warn(s_loc.get("tcp-receive-error"), e);
-                    _s = null;
-                }
-            }
-
-            public void run() {
-                if (_s == null)
-                    return;
-                while (_isRunning && _s != null) {
-                    try {
-                        // This will block our thread, waiting to read
-                        // the next Event-object-message.
-                        handle(_in);
-                    } catch (EOFException eof) {
-                        // EOFException raised when peer is properly
-                        // closing its end.
-                        if (_log.isTraceEnabled()) {
-                            _log.trace(s_loc.get("tcp-close-socket",
-                                _s.getInetAddress().getHostAddress()
-                                    + ":" + _s.getPort()));
-                        }
-                        break;
-                    } catch (Throwable e) {
-                        if (_log.isWarnEnabled())
-                            _log.warn(s_loc.get("tcp-receive-error"), e);
-                        break;
-                    }
-                }
-                // We are done receiving on this socket and this worker
-                // thread is terminating.
-                try {
-                    _in.close();
-                    if (_s != null)
-                        _s.close();
-                } catch (IOException e) {
-                    _log.warn(s_loc.get("tcp-close-socket-error",
-                        _s.getInetAddress().getHostAddress() + ":"
-                            + _s.getPort()), e);
-                }
-            }
-
-            /**
-             * Process an {@link InputStream} containing objects written
-             * by {@link TCPRemoteCommitProvider#broadcast(RemoteCommitEvent)}.
-             */
-            private void handle(InputStream in)
-                throws IOException, ClassNotFoundException {
-                // This will block waiting for the next
-                ObjectInputStream ois = 
-                    new Serialization.ClassResolvingObjectInputStream(in);
-
-                long protocolVersion = ois.readLong();
-                if (protocolVersion != PROTOCOL_VERSION) {
-                    if (_log.isWarnEnabled()) {
-                        _log.warn(s_loc.get("tcp-wrong-version-error",
-                            _s.getInetAddress().getHostAddress() + ":"
-                                + _s.getPort()));
-                        return;
-                    }
-                }
-
-                long senderId = ois.readLong();
-                int senderPort = ois.readInt();
-                byte[] senderAddress = (byte[]) ois.readObject();
-                RemoteCommitEvent rce = (RemoteCommitEvent) ois.readObject();
-                if (_log.isTraceEnabled()) {
-                    _log.trace(s_loc.get("tcp-received-event",
-                        _s.getInetAddress().getHostAddress() + ":"
-                            + _s.getPort()));
-                }
-
-                boolean fromSelf = senderPort == _port &&
-                    Arrays.equals(senderAddress, _localhost);
-                TCPRemoteCommitProvider provider;
-                synchronized (_providers) {
-                    // bleair: We're iterating, but currenlty there can really
-                    // only be a single provider.
-                    for (Iterator iter = _providers.iterator();
-                        iter.hasNext();) {
-                        provider = (TCPRemoteCommitProvider) iter.next();
-                        if (senderId != provider._id || !fromSelf)
-                            provider.eventManager.fireEvent(rce);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Utility class to store an InetAddress and an int. Not using
-     * InetSocketAddress because it's a JDK1.4 API. This also
-     * provides a wrapper around the socket(s) associated with this address.
-     */
-    private class HostAddress {
-
-        private InetAddress _address;
-        private int _port;
-        private long _timeLastError; // millis
-        private boolean _isAvailable; // is peer thought to be up
-        private int _infosIssued = 0; // limit log entries
-
-        private GenericObjectPool _socketPool; // reusable open sockets
-
-        /**
-         * Construct a new host address from a string of the form
-         * "host:port" or of the form "host".
-         */
-        private HostAddress(String host)
-            throws UnknownHostException {
-            int colon = host.indexOf(':');
-            try {
-                if (colon != -1) {
-                    _address = (InetAddress) AccessController
-                        .doPrivileged(J2DoPrivHelper.getByNameAction(host
-                            .substring(0, colon)));
-                    _port = Integer.parseInt(host.substring(colon + 1));
-                } else {
-                    _address = (InetAddress) AccessController
-                        .doPrivileged(J2DoPrivHelper.getByNameAction(host));
-                    _port = DEFAULT_PORT;
-                }
-            } catch (PrivilegedActionException pae) {
-                throw (UnknownHostException) pae.getException();
-            }
-            // -1 max wait == as long as it takes
-            _socketPool = new GenericObjectPool
-                (new SocketPoolableObjectFactory(), _maxActive,
-                    GenericObjectPool.WHEN_EXHAUSTED_BLOCK, -1);
-            _isAvailable = true;
-        }
-
-        private void setMaxActive(int maxActive) {
-            _socketPool.setMaxActive(maxActive);
-        }
-
-        private void setMaxIdle(int maxIdle) {
-            _socketPool.setMaxIdle(maxIdle);
-        }
-
-        public void close() {
-            // Close the pool of sockets to this peer. This
-            // will close all sockets in the pool.
-            try {
-                _socketPool.close();
-            } catch (Exception e) {
-                if (log.isWarnEnabled()) {
-                    log.warn(s_loc.get("tcp-close-pool-error"), e);
-                }
-            }
-        }
-
-        private void sendUpdatePacket(byte[] bytes) {
-            if (!_isAvailable) {
-                long now = System.currentTimeMillis();
-                if (now - _timeLastError < _recoveryTimeMillis)
-                    // Not enough time has passed since the last error
-                    return;
-            }
-            Socket s = null;
-            try {
-                s = getSocket();
-                OutputStream os = s.getOutputStream();
-                os.write(bytes);
-                os.flush();
-
-                if (log.isTraceEnabled()) {
-                    log.trace(s_loc.get("tcp-sent-update",
-                        _address.getHostAddress() + ":" + _port,
-                        String.valueOf(s.getLocalPort())));
-                }
-                _isAvailable = true;
-                _infosIssued = 0;
-                // Return the socket to the pool; the socket is
-                // still good.
-                returnSocket(s);
-            } catch (Exception e) {
-                // There has been a problem sending to the peer.
-                // The OS socket that was being used is can no longer
-                // be used.
-                if (s != null)
-                    this.closeSocket(s);
-                this.clearAllSockets();
-
-                if (_isAvailable) {
-                    // Log a warning, the peer was up and has now gone down
-                    if (log.isWarnEnabled()) {
-                        log.warn(s_loc.get("tcp-send-error",
-                            _address.getHostAddress() + ":" + _port), e);
-                    }
-                    _isAvailable = false;
-                    // Once enough time has passed we will log another warning
-                    _timeLastError = System.currentTimeMillis();
-                } else {
-                    long now = System.currentTimeMillis();
-                    if (now - _timeLastError > _recoveryTimeMillis) {
-                        if (_infosIssued < 5) {
-                            // Enough time has passed, and peer is still down
-                            _timeLastError = System.currentTimeMillis();
-                            // We were trying to reestablish the connection,
-                            // but we failed again. Log a message, but
-                            // lower severity. This log will occur periodically
-                            // for 5 times until the peer comes back.
-                            if (log.isInfoEnabled()) {
-                                log.info(s_loc.get("tcp-send-still-error",
-                                    _address.getHostAddress() + ":"
-                                        + _port), e);
-                            }
-                            _infosIssued++;
-                        }
-                    }
-                }
-            }
-        }
-
-        private Socket getSocket()
-            throws Exception {
-            return (Socket) _socketPool.borrowObject();
-        }
-
-        private void returnSocket(Socket s)
-            throws Exception {
-            _socketPool.returnObject(s);
-        }
-
-        private void clearAllSockets() {
-            _socketPool.clear();
-        }
-
-        private void closeSocket(Socket s) {
-            // All sockets come from the pool.
-            // This socket is no longer usable, so delete it from the
-            // pool.
-            try {
-                _socketPool.invalidateObject(s);
-            } catch (Exception e) {
-            }
-        }
-
-        /**
-         * Factory for pooled sockets.
-         */
-        private class SocketPoolableObjectFactory
-            implements PoolableObjectFactory {
-
-            public Object makeObject()
-                throws IOException {
-                try {
-                    Socket s = (Socket) AccessController
-                        .doPrivileged(J2DoPrivHelper.newSocketAction(_address,
-                            _port));
-                    if (log.isTraceEnabled()) {
-                        log.trace(s_loc.get("tcp-open-connection", _address
-                            + ":" + _port, "" + s.getLocalPort()));
-                    }
-                    return s;
-                } catch (PrivilegedActionException pae) {
-                    throw (IOException) pae.getException();
-                }
-            }
-
-            public void destroyObject(Object obj) {
-                // silentClose ().
-                try {
-                    Socket s = (Socket) obj;
-                    if (log.isTraceEnabled())
-                        log.trace(s_loc.get("tcp-close-sending-socket",
-                            _address + ":" + _port, "" + s.getLocalPort()));
-                    s.close();
-                } catch (Exception e) {
-                    log.warn(s_loc.get("tcp-close-socket-error",
-                        _address.getHostAddress() + ":" + _port), e);
-                }
-            }
-
-            public boolean validateObject(Object obj) {
-                return true;
-            }
-
-            public void activateObject (Object value)
-			{
-			}
-
-			public void passivateObject (Object value)
-			{
-			}
-		}
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.event;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
+import java.util.Collections;
+
+import org.apache.commons.pool.PoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericObjectPool;
+import org.apache.openjpa.lib.conf.Configurable;
+import org.apache.openjpa.lib.log.Log;
+import org.apache.openjpa.lib.util.J2DoPrivHelper;
+import org.apache.openjpa.lib.util.Localizer;
+import org.apache.openjpa.util.GeneralException;
+import org.apache.openjpa.util.InternalException;
+import org.apache.openjpa.util.Serialization;
+import java.util.concurrent.locks.ReentrantLock;
+
+import serp.util.Strings;
+
+/**
+ * TCP-based implementation of {@link RemoteCommitProvider} that
+ * listens for object modifications and propagates those changes to
+ * other RemoteCommitProviders over TCP sockets.
+ *
+ * @author Brian Leair
+ * @author Patrick Linskey
+ * @since 0.2.5.0
+ */
+public class TCPRemoteCommitProvider
+    extends AbstractRemoteCommitProvider
+    implements Configurable {
+
+    private static final int DEFAULT_PORT = 5636;
+
+    private static final Localizer s_loc = Localizer.forPackage
+        (TCPRemoteCommitProvider.class);
+    private static long s_idSequence = System.currentTimeMillis();
+
+    //	A map of listen ports to listeners in this JVM. We might
+    //	want to look into allowing same port, different interface --
+    //	that is not currently possible in a single JVM.
+    private static final Map s_portListenerMap = new HashMap();
+
+    private long _id;
+    private byte[] _localhost;
+    private int _port = DEFAULT_PORT;
+    private int _maxActive = 2;
+    private int _maxIdle = 2;
+    private int _recoveryTimeMillis = 15000;
+    private TCPPortListener _listener;
+    private BroadcastQueue _broadcastQueue = new BroadcastQueue();
+    private final List _broadcastThreads = Collections.synchronizedList(
+        new LinkedList());
+
+    private ArrayList _addresses = new ArrayList();
+    private ReentrantLock _addressesLock;
+
+    public TCPRemoteCommitProvider()
+        throws UnknownHostException {
+        // obtain a unique ID.
+        synchronized (TCPRemoteCommitProvider.class) {
+            _id = s_idSequence++;
+        }
+
+        // cache the local IP address.
+        _localhost = InetAddress.getLocalHost().getAddress();
+        _addressesLock = new ReentrantLock();
+        setNumBroadcastThreads(2);
+    }
+
+    /**
+     * The port that this provider should listen on.
+     */
+    public int getPort() {
+        return _port;
+    }
+
+    /**
+     * The port that this provider should listen on. Set once only.
+     */
+    public void setPort(int port) {
+        _port = port;
+    }
+
+    /**
+     * The number of milliseconds to wait before retrying
+     * to reconnect to a peer after it becomes unreachable.
+     */
+    public void setRecoveryTimeMillis(int recoverytime) {
+        _recoveryTimeMillis = recoverytime;
+    }
+
+    /**
+     * The number of milliseconds to wait before retrying
+     * to reconnect to a peer after it becomes unreachable.
+     */
+    public int getRecoveryTimeMillis() {
+        return _recoveryTimeMillis;
+    }
+
+    /**
+     * The maximum number of sockets that this provider can
+     * simetaneously open to each peer in the cluster.
+     */
+    public void setMaxActive(int maxActive) {
+        _maxActive = maxActive;
+    }
+
+    /**
+     * The maximum number of sockets that this provider can
+     * simetaneously open to each peer in the cluster.
+     */
+    public int getMaxActive() {
+        return _maxActive;
+    }
+
+    /**
+     * The number of idle sockets that this provider can keep open
+     * to each peer in the cluster.
+     */
+    public void setMaxIdle(int maxIdle) {
+        _maxIdle = maxIdle;
+    }
+
+    /**
+     * The number of idle sockets that this provider can keep open
+     * to each peer in the cluster.
+     */
+    public int getMaxIdle() {
+        return _maxIdle;
+    }
+
+    /**
+     * The number of worker threads that are used for
+     * transmitting packets to peers in the cluster.
+     */
+    public void setNumBroadcastThreads(int numBroadcastThreads) {
+        synchronized (_broadcastThreads) {
+            int cur = _broadcastThreads.size();
+            if (cur > numBroadcastThreads) {
+                // Notify the extra worker threads so they stop themselves
+                // Threads will not end until they send another pk.
+                for (int i = numBroadcastThreads; i < cur; i++) {
+                    BroadcastWorkerThread worker = (BroadcastWorkerThread)
+                        _broadcastThreads.remove(0);
+                    worker.setRunning(false);
+                }
+            } else if (cur < numBroadcastThreads) {
+                // Create additional worker threads
+                for (int i = cur; i < numBroadcastThreads; i++) {
+                    BroadcastWorkerThread wt = new BroadcastWorkerThread();
+                    wt.setDaemon(true);
+                    wt.start();
+                    _broadcastThreads.add(wt);
+                }
+            }
+        }
+    }
+
+    /**
+     * The number of worker threads that are used for
+     * transmitting packets to peers in the cluster.
+     */
+    public int getNumBroadcastThreads() {
+        return _broadcastThreads.size();
+    }
+
+    /**
+     * Sets the list of addresses of peers to which this provider will
+     * send events to. The peers are semicolon-separated <code>names</code>
+     * list in the form of "myhost1:portA;myhost2:portB".
+     */
+    public void setAddresses(String names)
+        throws UnknownHostException {
+        // NYI. Could look for equivalence of addresses and avoid
+        // changing those that didn't change.
+
+        _addressesLock.lock();
+        try {
+            for (Iterator iter = _addresses.iterator(); iter.hasNext();) {
+                ((HostAddress) iter.next()).close();
+            }
+            String[] toks = Strings.split(names, ";", 0);
+            _addresses = new ArrayList(toks.length);
+
+            InetAddress localhost = InetAddress.getLocalHost();
+            String localhostName = localhost.getHostName();
+
+            for (int i = 0; i < toks.length; i++) {
+                String host = toks[i];
+                String hostname;
+                int tmpPort;
+                int colon = host.indexOf(':');
+                if (colon != -1) {
+                    hostname = host.substring(0, colon);
+                    tmpPort = Integer.parseInt(host.substring(colon + 1));
+                } else {
+                    hostname = host;
+                    tmpPort = DEFAULT_PORT;
+                }
+                InetAddress tmpAddress = (InetAddress) AccessController
+                    .doPrivileged(J2DoPrivHelper.getByNameAction(hostname)); 
+
+                // bleair: For each address we would rather make use of
+                // the jdk1.4 isLinkLocalAddress () || isLoopbackAddress ().
+                // (Though in practice on win32 they don't work anyways!)
+                // Instead we will check hostname. Not perfect, but
+                // it will match often enough (people will typically
+                // use the DNS machine names and be cutting/pasting.)
+                if (localhostName.equals(hostname)) {
+                    // This string matches the hostname for for ourselves, we
+                    // don't actually need to send ourselves messages.
+                    if (log.isTraceEnabled()) {
+                        log.trace(s_loc.get("tcp-address-asself",
+                            tmpAddress.getHostName() + ":" + tmpPort));
+                    }
+                } else {
+                    HostAddress newAddress = new HostAddress(host);
+                    _addresses.add(newAddress);
+                    if (log.isTraceEnabled()) {
+                        log.trace(s_loc.get("tcp-address-set",
+                            newAddress._address.getHostName() + ":"
+                                + newAddress._port));
+                    }
+                }
+            }
+        } catch (PrivilegedActionException pae) {
+            throw (UnknownHostException) pae.getException();
+        } finally {
+            _addressesLock.unlock();
+        }
+    }
+
+    // ---------- Configurable implementation ----------
+
+    /**
+     * Subclasses that need to perform actions in
+     * {@link Configurable#endConfiguration} must invoke this method.
+     */
+    public void endConfiguration() {
+        super.endConfiguration();
+        synchronized (s_portListenerMap) {
+            // see if a listener exists for this port.
+            _listener = (TCPPortListener) s_portListenerMap.get
+                (String.valueOf(_port));
+
+            if (_listener == null ||
+                (!_listener.isRunning() && _listener._port == _port)) {
+                try {
+                    _listener = new TCPPortListener(_port, log);
+                    _listener.listen();
+                    s_portListenerMap.put(String.valueOf(_port), _listener);
+                } catch (Exception e) {
+                    throw new GeneralException(s_loc.get("tcp-init-exception",
+                        String.valueOf(_port)), e).setFatal(true);
+                }
+            } else if (_listener.isRunning()) {
+                if (_listener._port != _port) {
+                    // this really shouldn't be able to happen.
+                    throw new GeneralException(s_loc.get
+                        ("tcp-not-equal", String.valueOf(_port))).
+                        setFatal(true);
+                }
+            } else
+                throw new InternalException(s_loc.get("tcp-listener-broken"));
+            _listener.addProvider(this);
+        }
+
+        _addressesLock.lock();
+        try {
+            HostAddress curAddress;
+            for (Iterator iter = _addresses.iterator();
+                iter.hasNext();) {
+                curAddress = (HostAddress) iter.next();
+                curAddress.setMaxActive(_maxActive);
+                curAddress.setMaxIdle(_maxIdle);
+            }
+        }
+        finally {
+            _addressesLock.unlock();
+        }
+    }
+
+    // ---------- RemoteCommitProvider implementation ----------
+
+    // pre 3.3.4	= <no version number transmitted>
+    // 3.3 Preview 	= 0x1428acfd;
+    // 3.4 			= 0x1428acff;
+    private static final long PROTOCOL_VERSION = 0x1428acff;
+
+    public void broadcast(RemoteCommitEvent event) {
+        try {
+            // build a packet notifying other JVMs of object changes.
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+
+            oos.writeLong(PROTOCOL_VERSION);
+            oos.writeLong(_id);
+            oos.writeInt(_port);
+            oos.writeObject(_localhost);
+            oos.writeObject(event);
+            oos.flush();
+
+            byte[] bytes = baos.toByteArray();
+            baos.close();
+            if (_broadcastThreads.isEmpty())
+                sendUpdatePacket(bytes);
+            else
+                _broadcastQueue.addPacket(bytes);
+        } catch (IOException ioe) {
+            if (log.isWarnEnabled())
+                log.warn(s_loc.get("tcp-payload-create-error"), ioe);
+        }
+    }
+
+    /**
+     * Sends a change notification packet to other machines in this
+     * provider cluster.
+     */
+    private void sendUpdatePacket(byte[] bytes) {
+        _addressesLock.lock();
+        try {
+            for (Iterator iter = _addresses.iterator(); iter.hasNext();)
+                ((HostAddress) iter.next()).sendUpdatePacket(bytes);
+        } finally {
+            _addressesLock.unlock();
+        }
+    }
+
+    public void close() {
+        if (_listener != null)
+            _listener.removeProvider(this);
+
+        // Remove Broadcast Threads then close sockets.
+        _broadcastQueue.close();
+
+        // Wait for _broadcastThreads to get cleaned up.
+        while(!_broadcastThreads.isEmpty()) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException ie) {
+                // Ignore.
+            }
+        }
+        
+        _addressesLock.lock();
+        try {
+            for (Iterator iter = _addresses.iterator(); iter.hasNext();)
+                ((HostAddress) iter.next()).close();
+        } finally {
+            _addressesLock.unlock();
+        }
+    }
+
+    /**
+     * Utility class to hold messages to be sent. This
+     * allows calls to broadcast () to return without
+     * waiting for the send to complete.
+     */
+    private static class BroadcastQueue {
+
+        private LinkedList _packetQueue = new LinkedList();
+        private boolean _closed = false;
+
+        public synchronized void close() {
+            _closed = true;
+            notifyAll();
+        }
+
+        public synchronized boolean isClosed() {
+            return _closed;
+        }
+
+        public synchronized void addPacket(byte[] bytes) {
+            _packetQueue.addLast(bytes);
+            notify();
+        }
+
+        /**
+         * @return the bytes defining the packet to process, or
+         * <code>null</code> if the queue is empty.
+         */
+        public synchronized byte[] removePacket()
+            throws InterruptedException {
+            // only wait if the queue is still open. This allows processing
+            // of events in the queue to continue, while avoiding sleeping
+            // during shutdown.
+            while (!_closed && _packetQueue.isEmpty())
+                wait();
+            if (_packetQueue.isEmpty())
+                return null;
+            else
+                return (byte[]) _packetQueue.removeFirst();
+        }
+    }
+
+    /**
+     * Threads to broadcast packets placed in the {@link BroadcastQueue}.
+     */
+    private class BroadcastWorkerThread
+        extends Thread {
+
+        private boolean _keepRunning = true;
+
+        public void run() {
+            while (_keepRunning) {
+                try {
+                    // This will block until there is a packet to send, or
+                    // until the queue is closed.
+                    byte[] bytes = _broadcastQueue.removePacket();
+                    if (bytes != null)
+                        sendUpdatePacket(bytes);
+                    else if (_broadcastQueue.isClosed())
+                        _keepRunning = false;
+                } catch (InterruptedException e) {
+                    // End the thread.
+                    break;
+                }
+            }
+            remove();
+        }
+
+        public void setRunning(boolean keepRunning) {
+            _keepRunning = keepRunning;
+        }
+        
+        private void remove() {
+            _broadcastThreads.remove(this);
+        }
+    }
+
+    /**
+     * Responsible for listening for incoming packets and processing them.
+     */
+    private static class TCPPortListener
+        implements Runnable {
+
+        private final Log _log;
+        private ServerSocket _receiveSocket;
+        private Thread _acceptThread;
+        private Set _receiverThreads = new HashSet();
+        private final Set _providers = new HashSet();
+
+        /**
+         * Cache the local IP address
+         */
+        private byte[] _localhost;
+
+        /**
+         * The port that this listener should listen on. Configured
+         * by TCPRemoteCommitProvider.
+         */
+        private int _port;
+
+        /**
+         * Should be set to <code>true</code> once the listener is listening.
+         */
+        private boolean _isRunning = false;
+
+        /**
+         * Construct a new TCPPortListener configured to use the specified port.
+         */
+        private TCPPortListener(int port, Log log)
+            throws IOException {
+            _port = port;
+            _log = log;
+            try {
+                _receiveSocket = (ServerSocket) AccessController
+                    .doPrivileged(J2DoPrivHelper.newServerSocketAction(_port));
+            } catch (PrivilegedActionException pae) {
+                throw (IOException) pae.getException();
+            }
+            _localhost = InetAddress.getLocalHost().getAddress();
+
+            if (_log.isTraceEnabled())
+                _log.info(s_loc.get("tcp-start-listener",
+                    String.valueOf(_port)));
+        }
+
+        private void listen() {
+            _acceptThread = new Thread(this);
+            _acceptThread.setDaemon(true);
+            _acceptThread.start();
+        }
+
+        /**
+         * All providers added here will be notified of any incoming
+         * provider messages. There will be one of these per
+         * BrokerFactory in a given JVM.
+         * {@link TCPRemoteCommitProvider#endConfiguration} invokes
+         * <code>addProvider</code> with <code>this</code> upon
+         * completion of configuration.
+         */
+        private void addProvider(TCPRemoteCommitProvider provider) {
+            synchronized (_providers) {
+                _providers.add(provider);
+            }
+        }
+
+        /**
+         * Remove a provider from the list of providers to notify of
+         * commit events.
+         */
+        private synchronized void removeProvider
+            (TCPRemoteCommitProvider provider) {
+            synchronized (_providers) {
+                _providers.remove(provider);
+
+                // if the provider list is empty, shut down the thread.
+                if (_providers.size() == 0) {
+                    _isRunning = false;
+                    try {
+                        _receiveSocket.close();
+                    } catch (IOException ioe) {
+                        if (_log.isWarnEnabled())
+                            _log.warn(s_loc.get("tcp-close-error"), ioe);
+                    }
+                    _acceptThread.interrupt();
+                }
+            }
+        }
+
+        private boolean isRunning() {
+            synchronized (_providers) {
+                return _isRunning;
+            }
+        }
+
+        public void run() {
+            synchronized (_providers) {
+                _isRunning = true;
+            }
+
+            Socket s = null;
+            while (_isRunning) {
+                try {
+                    s = null;
+                    // Block, waiting to accept new connection from a peer
+                    s = (Socket) AccessController.doPrivileged(J2DoPrivHelper
+                        .acceptAction(_receiveSocket));
+                    if (_log.isTraceEnabled()) {
+                        _log.trace(s_loc.get("tcp-received-connection",
+                            s.getInetAddress().getHostAddress()
+                                + ":" + s.getPort()));
+                    }
+                    ReceiveSocketHandler sh = new ReceiveSocketHandler(s);
+                    Thread receiverThread = new Thread(sh);
+                    receiverThread.setDaemon(true);
+                    receiverThread.start();
+                    _receiverThreads.add(receiverThread);
+                } catch (Exception e) {
+                    if (e instanceof PrivilegedActionException)
+                        e = ((PrivilegedActionException) e).getException();
+                    if (!(e instanceof SocketException) || _isRunning)
+                        if (_log.isWarnEnabled())
+                            _log.warn(s_loc.get("tcp-accept-error"), e);
+
+                    // Nominal case (InterruptedException) because close ()
+                    // calls _acceptThread.interrupt ();
+                    try {
+                        if (s != null)
+                            s.close();
+                    } catch (Exception ee) {
+                        if (_log.isWarnEnabled())
+                            _log.warn(s_loc.get("tcp-close-error"), e);
+                    }
+                }
+            }
+
+            // We are done listening. Interrupt any worker threads.
+            Thread worker;
+            for (Iterator iter = _receiverThreads.iterator();
+                iter.hasNext();) {
+                worker = (Thread) iter.next();
+                // FYI, the worker threads are blocked
+                // reading from the socket's InputStream. InputStreams
+                // aren't interruptable, so this interrupt isn't
+                // really going to be delivered until something breaks
+                // the InputStream.
+                worker.interrupt();
+            }
+            synchronized (_providers) {
+                try {
+                    if (_isRunning)
+                        _receiveSocket.close();
+                } catch (Exception e) {
+                    if (_log.isWarnEnabled())
+                        _log.warn(s_loc.get("tcp-close-error"), e);
+                }
+                _isRunning = false;
+                if (_log.isTraceEnabled())
+                    _log.trace(s_loc.get("tcp-close-listener",
+                        _port + ""));
+            }
+        }
+
+        /**
+         * Utility class that acts as a worker thread to receive Events
+         * from broadcasters.
+         */
+        private class ReceiveSocketHandler
+            implements Runnable {
+
+            private InputStream _in;
+            private Socket _s;
+
+            private ReceiveSocketHandler(Socket s) {
+                // We are the receiving end and we don't send any messages
+                // back to the broadcaster. Turn off Nagle's so that
+                // we will send ack packets without waiting.
+                _s = s;
+                try {
+                    _s.setTcpNoDelay(true);
+                    _in = new BufferedInputStream(s.getInputStream());
+                } catch (IOException ioe) {
+                    if (_log.isInfoEnabled())
+                        _log.info(s_loc.get("tcp-socket-option-error"), ioe);
+                    _s = null;
+                } catch (Exception e) {
+                    if (_log.isWarnEnabled())
+                        _log.warn(s_loc.get("tcp-receive-error"), e);
+                    _s = null;
+                }
+            }
+
+            public void run() {
+                if (_s == null)
+                    return;
+                while (_isRunning && _s != null) {
+                    try {
+                        // This will block our thread, waiting to read
+                        // the next Event-object-message.
+                        handle(_in);
+                    } catch (EOFException eof) {
+                        // EOFException raised when peer is properly
+                        // closing its end.
+                        if (_log.isTraceEnabled()) {
+                            _log.trace(s_loc.get("tcp-close-socket",
+                                _s.getInetAddress().getHostAddress()
+                                    + ":" + _s.getPort()));
+                        }
+                        break;
+                    } catch (Throwable e) {
+                        if (_log.isWarnEnabled())
+                            _log.warn(s_loc.get("tcp-receive-error"), e);
+                        break;
+                    }
+                }
+                // We are done receiving on this socket and this worker
+                // thread is terminating.
+                try {
+                    _in.close();
+                    if (_s != null)
+                        _s.close();
+                } catch (IOException e) {
+                    _log.warn(s_loc.get("tcp-close-socket-error",
+                        _s.getInetAddress().getHostAddress() + ":"
+                            + _s.getPort()), e);
+                }
+            }
+
+            /**
+             * Process an {@link InputStream} containing objects written
+             * by {@link TCPRemoteCommitProvider#broadcast(RemoteCommitEvent)}.
+             */
+            private void handle(InputStream in)
+                throws IOException, ClassNotFoundException {
+                // This will block waiting for the next
+                ObjectInputStream ois = 
+                    new Serialization.ClassResolvingObjectInputStream(in);
+
+                long protocolVersion = ois.readLong();
+                if (protocolVersion != PROTOCOL_VERSION) {
+                    if (_log.isWarnEnabled()) {
+                        _log.warn(s_loc.get("tcp-wrong-version-error",
+                            _s.getInetAddress().getHostAddress() + ":"
+                                + _s.getPort()));
+                        return;
+                    }
+                }
+
+                long senderId = ois.readLong();
+                int senderPort = ois.readInt();
+                byte[] senderAddress = (byte[]) ois.readObject();
+                RemoteCommitEvent rce = (RemoteCommitEvent) ois.readObject();
+                if (_log.isTraceEnabled()) {
+                    _log.trace(s_loc.get("tcp-received-event",
+                        _s.getInetAddress().getHostAddress() + ":"
+                            + _s.getPort()));
+                }
+
+                boolean fromSelf = senderPort == _port &&
+                    Arrays.equals(senderAddress, _localhost);
+                TCPRemoteCommitProvider provider;
+                synchronized (_providers) {
+                    // bleair: We're iterating, but currenlty there can really
+                    // only be a single provider.
+                    for (Iterator iter = _providers.iterator();
+                        iter.hasNext();) {
+                        provider = (TCPRemoteCommitProvider) iter.next();
+                        if (senderId != provider._id || !fromSelf)
+                            provider.eventManager.fireEvent(rce);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Utility class to store an InetAddress and an int. Not using
+     * InetSocketAddress because it's a JDK1.4 API. This also
+     * provides a wrapper around the socket(s) associated with this address.
+     */
+    private class HostAddress {
+
+        private InetAddress _address;
+        private int _port;
+        private long _timeLastError; // millis
+        private boolean _isAvailable; // is peer thought to be up
+        private int _infosIssued = 0; // limit log entries
+
+        private GenericObjectPool _socketPool; // reusable open sockets
+
+        /**
+         * Construct a new host address from a string of the form
+         * "host:port" or of the form "host".
+         */
+        private HostAddress(String host)
+            throws UnknownHostException {
+            int colon = host.indexOf(':');
+            try {
+                if (colon != -1) {
+                    _address = (InetAddress) AccessController
+                        .doPrivileged(J2DoPrivHelper.getByNameAction(host
+                            .substring(0, colon)));
+                    _port = Integer.parseInt(host.substring(colon + 1));
+                } else {
+                    _address = (InetAddress) AccessController
+                        .doPrivileged(J2DoPrivHelper.getByNameAction(host));
+                    _port = DEFAULT_PORT;
+                }
+            } catch (PrivilegedActionException pae) {
+                throw (UnknownHostException) pae.getException();
+            }
+            // -1 max wait == as long as it takes
+            _socketPool = new GenericObjectPool
+                (new SocketPoolableObjectFactory(), _maxActive,
+                    GenericObjectPool.WHEN_EXHAUSTED_BLOCK, -1);
+            _isAvailable = true;
+        }
+
+        private void setMaxActive(int maxActive) {
+            _socketPool.setMaxActive(maxActive);
+        }
+
+        private void setMaxIdle(int maxIdle) {
+            _socketPool.setMaxIdle(maxIdle);
+        }
+
+        public void close() {
+            // Close the pool of sockets to this peer. This
+            // will close all sockets in the pool.
+            try {
+                _socketPool.close();
+            } catch (Exception e) {
+                if (log.isWarnEnabled()) {
+                    log.warn(s_loc.get("tcp-close-pool-error"), e);
+                }
+            }
+        }
+
+        private void sendUpdatePacket(byte[] bytes) {
+            if (!_isAvailable) {
+                long now = System.currentTimeMillis();
+                if (now - _timeLastError < _recoveryTimeMillis)
+                    // Not enough time has passed since the last error
+                    return;
+            }
+            Socket s = null;
+            try {
+                s = getSocket();
+                OutputStream os = s.getOutputStream();
+                os.write(bytes);
+                os.flush();
+
+                if (log.isTraceEnabled()) {
+                    log.trace(s_loc.get("tcp-sent-update",
+                        _address.getHostAddress() + ":" + _port,
+                        String.valueOf(s.getLocalPort())));
+                }
+                _isAvailable = true;
+                _infosIssued = 0;
+                // Return the socket to the pool; the socket is
+                // still good.
+                returnSocket(s);
+            } catch (Exception e) {
+                // There has been a problem sending to the peer.
+                // The OS socket that was being used is can no longer
+                // be used.
+                if (s != null)
+                    this.closeSocket(s);
+                this.clearAllSockets();
+
+                if (_isAvailable) {
+                    // Log a warning, the peer was up and has now gone down
+                    if (log.isWarnEnabled()) {
+                        log.warn(s_loc.get("tcp-send-error",
+                            _address.getHostAddress() + ":" + _port), e);
+                    }
+                    _isAvailable = false;
+                    // Once enough time has passed we will log another warning
+                    _timeLastError = System.currentTimeMillis();
+                } else {
+                    long now = System.currentTimeMillis();
+                    if (now - _timeLastError > _recoveryTimeMillis) {
+                        if (_infosIssued < 5) {
+                            // Enough time has passed, and peer is still down
+                            _timeLastError = System.currentTimeMillis();
+                            // We were trying to reestablish the connection,
+                            // but we failed again. Log a message, but
+                            // lower severity. This log will occur periodically
+                            // for 5 times until the peer comes back.
+                            if (log.isInfoEnabled()) {
+                                log.info(s_loc.get("tcp-send-still-error",
+                                    _address.getHostAddress() + ":"
+                                        + _port), e);
+                            }
+                            _infosIssued++;
+                        }
+                    }
+                }
+            }
+        }
+
+        private Socket getSocket()
+            throws Exception {
+            return (Socket) _socketPool.borrowObject();
+        }
+
+        private void returnSocket(Socket s)
+            throws Exception {
+            _socketPool.returnObject(s);
+        }
+
+        private void clearAllSockets() {
+            _socketPool.clear();
+        }
+
+        private void closeSocket(Socket s) {
+            // All sockets come from the pool.
+            // This socket is no longer usable, so delete it from the
+            // pool.
+            try {
+                _socketPool.invalidateObject(s);
+            } catch (Exception e) {
+            }
+        }
+
+        /**
+         * Factory for pooled sockets.
+         */
+        private class SocketPoolableObjectFactory
+            implements PoolableObjectFactory {
+
+            public Object makeObject()
+                throws IOException {
+                try {
+                    Socket s = (Socket) AccessController
+                        .doPrivileged(J2DoPrivHelper.newSocketAction(_address,
+                            _port));
+                    if (log.isTraceEnabled()) {
+                        log.trace(s_loc.get("tcp-open-connection", _address
+                            + ":" + _port, "" + s.getLocalPort()));
+                    }
+                    return s;
+                } catch (PrivilegedActionException pae) {
+                    throw (IOException) pae.getException();
+                }
+            }
+
+            public void destroyObject(Object obj) {
+                // silentClose ().
+                try {
+                    Socket s = (Socket) obj;
+                    if (log.isTraceEnabled())
+                        log.trace(s_loc.get("tcp-close-sending-socket",
+                            _address + ":" + _port, "" + s.getLocalPort()));
+                    s.close();
+                } catch (Exception e) {
+                    log.warn(s_loc.get("tcp-close-socket-error",
+                        _address.getHostAddress() + ":" + _port), e);
+                }
+            }
+
+            public boolean validateObject(Object obj) {
+                return true;
+            }
+
+            public void activateObject (Object value)
+			{
+			}
+
+			public void passivateObject (Object value)
+			{
+			}
+		}
+	}
+}

Modified: openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TransactionEventManager.java
URL: http://svn.apache.org/viewvc/openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TransactionEventManager.java?rev=640685&r1=640684&r2=640685&view=diff
==============================================================================
--- openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TransactionEventManager.java (original)
+++ openjpa/trunk/openjpa-kernel/src/main/java/org/apache/openjpa/event/TransactionEventManager.java Mon Mar 24 20:37:56 2008
@@ -1,129 +1,129 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.    
- */
-package org.apache.openjpa.event;
-
-import org.apache.openjpa.lib.util.concurrent.AbstractConcurrentEventManager;
-
-/**
- * Manager that can be used to track and notify transaction listeners
- * of transaction-related events.
- *
- * @author Patrick Linskey
- * @author Abe White
- * @since 0.3.0
- * @nojavadoc
- */
-public class TransactionEventManager
-    extends AbstractConcurrentEventManager {
-
-    private int _begin = 0;
-    private int _flush = 0;
-    private int _end = 0;
-
-    public void addListener(Object listener) {
-        super.addListener(listener);
-        if (listener instanceof BeginTransactionListener)
-            _begin++;
-        if (listener instanceof FlushTransactionListener)
-            _flush++;
-        if (listener instanceof EndTransactionListener)
-            _end++;
-    }
-
-    public boolean removeListener(Object listener) {
-        if (!super.removeListener(listener))
-            return false;
-
-        if (listener instanceof BeginTransactionListener)
-            _begin--;
-        if (listener instanceof FlushTransactionListener)
-            _flush--;
-        if (listener instanceof EndTransactionListener)
-            _end--;
-        return true;
-    }
-
-    /**
-     * Whether there are any begin transaction listeners.
-     */
-    public boolean hasBeginListeners() {
-        return _begin > 0;
-    }
-
-    /**
-     * Whether there are any flush transaction listeners.
-     */
-    public boolean hasFlushListeners() {
-        return _flush > 0;
-    }
-
-    /**
-     * Whether there are any end transaction listeners.
-     */
-    public boolean hasEndListeners() {
-        return _end > 0;
-    }
-
-    /**
-     * Fire the given event to all registered listeners.
-     */
-    protected void fireEvent(Object event, Object listener) {
-        TransactionEvent ev = (TransactionEvent) event;
-        switch (ev.getType()) {
-            case TransactionEvent.AFTER_BEGIN:
-                if (listener instanceof BeginTransactionListener)
-                    ((BeginTransactionListener) listener).afterBegin(ev);
-                break;
-            case TransactionEvent.BEFORE_FLUSH:
-                if (listener instanceof FlushTransactionListener)
-                    ((FlushTransactionListener) listener).beforeFlush(ev);
-                break;
-            case TransactionEvent.AFTER_FLUSH:
-                if (listener instanceof FlushTransactionListener)
-                    ((FlushTransactionListener) listener).afterFlush(ev);
-                break;
-            case TransactionEvent.BEFORE_COMMIT:
-                if (listener instanceof EndTransactionListener)
-                    ((EndTransactionListener) listener).beforeCommit(ev);
-                break;
-            case TransactionEvent.AFTER_COMMIT:
-                if (listener instanceof EndTransactionListener)
-                    ((EndTransactionListener) listener).afterCommit(ev);
-                break;
-            case TransactionEvent.AFTER_ROLLBACK:
-                if (listener instanceof EndTransactionListener)
-                    ((EndTransactionListener) listener).afterRollback(ev);
-                break;
-            case TransactionEvent.AFTER_STATE_TRANSITIONS:
-                if (listener instanceof EndTransactionListener)
-                    ((EndTransactionListener) listener)
-                        .afterStateTransitions(ev);
-                break;
-            case TransactionEvent.AFTER_COMMIT_COMPLETE:
-                if (listener instanceof EndTransactionListener)
-                    ((EndTransactionListener) listener).afterCommitComplete(ev);
-                break;
-            case TransactionEvent.AFTER_ROLLBACK_COMPLETE:
-                if (listener instanceof EndTransactionListener)
-                    ((EndTransactionListener) listener)
-                        .afterRollbackComplete(ev);
-                break;
-        }
-	}
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.    
+ */
+package org.apache.openjpa.event;
+
+import org.apache.openjpa.lib.util.concurrent.AbstractConcurrentEventManager;
+
+/**
+ * Manager that can be used to track and notify transaction listeners
+ * of transaction-related events.
+ *
+ * @author Patrick Linskey
+ * @author Abe White
+ * @since 0.3.0
+ * @nojavadoc
+ */
+public class TransactionEventManager
+    extends AbstractConcurrentEventManager {
+
+    private int _begin = 0;
+    private int _flush = 0;
+    private int _end = 0;
+
+    public void addListener(Object listener) {
+        super.addListener(listener);
+        if (listener instanceof BeginTransactionListener)
+            _begin++;
+        if (listener instanceof FlushTransactionListener)
+            _flush++;
+        if (listener instanceof EndTransactionListener)
+            _end++;
+    }
+
+    public boolean removeListener(Object listener) {
+        if (!super.removeListener(listener))
+            return false;
+
+        if (listener instanceof BeginTransactionListener)
+            _begin--;
+        if (listener instanceof FlushTransactionListener)
+            _flush--;
+        if (listener instanceof EndTransactionListener)
+            _end--;
+        return true;
+    }
+
+    /**
+     * Whether there are any begin transaction listeners.
+     */
+    public boolean hasBeginListeners() {
+        return _begin > 0;
+    }
+
+    /**
+     * Whether there are any flush transaction listeners.
+     */
+    public boolean hasFlushListeners() {
+        return _flush > 0;
+    }
+
+    /**
+     * Whether there are any end transaction listeners.
+     */
+    public boolean hasEndListeners() {
+        return _end > 0;
+    }
+
+    /**
+     * Fire the given event to all registered listeners.
+     */
+    protected void fireEvent(Object event, Object listener) {
+        TransactionEvent ev = (TransactionEvent) event;
+        switch (ev.getType()) {
+            case TransactionEvent.AFTER_BEGIN:
+                if (listener instanceof BeginTransactionListener)
+                    ((BeginTransactionListener) listener).afterBegin(ev);
+                break;
+            case TransactionEvent.BEFORE_FLUSH:
+                if (listener instanceof FlushTransactionListener)
+                    ((FlushTransactionListener) listener).beforeFlush(ev);
+                break;
+            case TransactionEvent.AFTER_FLUSH:
+                if (listener instanceof FlushTransactionListener)
+                    ((FlushTransactionListener) listener).afterFlush(ev);
+                break;
+            case TransactionEvent.BEFORE_COMMIT:
+                if (listener instanceof EndTransactionListener)
+                    ((EndTransactionListener) listener).beforeCommit(ev);
+                break;
+            case TransactionEvent.AFTER_COMMIT:
+                if (listener instanceof EndTransactionListener)
+                    ((EndTransactionListener) listener).afterCommit(ev);
+                break;
+            case TransactionEvent.AFTER_ROLLBACK:
+                if (listener instanceof EndTransactionListener)
+                    ((EndTransactionListener) listener).afterRollback(ev);
+                break;
+            case TransactionEvent.AFTER_STATE_TRANSITIONS:
+                if (listener instanceof EndTransactionListener)
+                    ((EndTransactionListener) listener)
+                        .afterStateTransitions(ev);
+                break;
+            case TransactionEvent.AFTER_COMMIT_COMPLETE:
+                if (listener instanceof EndTransactionListener)
+                    ((EndTransactionListener) listener).afterCommitComplete(ev);
+                break;
+            case TransactionEvent.AFTER_ROLLBACK_COMPLETE:
+                if (listener instanceof EndTransactionListener)
+                    ((EndTransactionListener) listener)
+                        .afterRollbackComplete(ev);
+                break;
+        }
+	}
+}



Mime
View raw message