Author: cutting Date: Fri Dec 16 19:35:35 2011 New Revision: 1215248 URL: http://svn.apache.org/viewvc?rev=1215248&view=rev Log: AVRO-982. Java: Fix NettyTransceiver to not hang when server stops. Contributed by Bruno Dumon. Modified: avro/trunk/CHANGES.txt avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java Modified: avro/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1215248&r1=1215247&r2=1215248&view=diff ============================================================================== --- avro/trunk/CHANGES.txt (original) +++ avro/trunk/CHANGES.txt Fri Dec 16 19:35:35 2011 @@ -49,6 +49,9 @@ Avro 1.6.2 (unreleased) AVRO-968. C: Fixed avro_value_cmp and avro_value_cmp_fast for string values. (Vivek Nadkarni via dcreager) + AVRO-982. Java: Fix NettyTransceiver to not hang when server stops. + (Bruno Dumon via cutting) + Avro 1.6.1 (8 November 2011) INCOMPATIBLE CHANGES Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1215248&r1=1215247&r2=1215248&view=diff ============================================================================== --- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original) +++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Fri Dec 16 19:35:35 2011 @@ -462,7 +462,7 @@ public class NettyTransceiver extends Tr if ((cse.getState() == ChannelState.OPEN) && (Boolean.FALSE.equals(cse.getValue()))) { // Server closed connection; disconnect client side LOG.debug("Remote peer " + remoteAddr + " closed connection."); - disconnect(); + disconnect(false, true, null); } } super.handleUpstream(ctx, e); Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java?rev=1215248&r1=1215247&r2=1215248&view=diff ============================================================================== --- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java (original) +++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCallbacks.java Fri Dec 16 19:35:35 2011 @@ -346,7 +346,7 @@ public class TestNettyServerWithCallback Assert.assertEquals(3, simpleClient2.add(1, 2)); // Now acquire the semaphore so that the server will block: - blockingSimpleImpl.acquirePermit(); + blockingSimpleImpl.acquireRunPermit(); simpleClient2.add(1, 2, addFuture); } finally { // When the transceiver is closed, the CallFuture should get @@ -365,11 +365,83 @@ public class TestNettyServerWithCallback } Assert.assertTrue("Expected IOException to be thrown", ioeThrown); } finally { - blockingSimpleImpl.releasePermit(); + blockingSimpleImpl.releaseRunPermit(); server2.close(); } } + @Test + public void cancelPendingRequestsAfterChannelCloseByServerShutdown() throws Exception { + // The purpose of this test is to verify that a client doesn't stay + // blocked when a server is unexpectedly killed (or when for some + // other reason the channel is suddenly closed) while the server + // was in the process of handling a request (thus after it received + // the request, and before it returned the response). + + // Start up a second server so that closing the server doesn't + // interfere with the other unit tests: + BlockingSimpleImpl blockingSimpleImpl = new BlockingSimpleImpl(); + Server server2 = new NettyServer(new SpecificResponder(Simple.class, + blockingSimpleImpl), new InetSocketAddress(0)); + server2.start(); + + Transceiver transceiver2 = null; + + try { + int serverPort = server2.getPort(); + System.out.println("server2 port : " + serverPort); + + transceiver2 = new NettyTransceiver(new InetSocketAddress( + serverPort), TestNettyServer.CONNECT_TIMEOUT_MILLIS); + + final Simple.Callback simpleClient2 = + SpecificRequestor.getClient(Simple.Callback.class, transceiver2); + + // Acquire the method-enter permit, which will be released by the + // server method once we call it + blockingSimpleImpl.acquireEnterPermit(); + + // Acquire the run permit, to avoid that the server method returns immediately + blockingSimpleImpl.acquireRunPermit(); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + simpleClient2.add(3, 4); + Assert.fail("Expected an exception"); + } catch (Exception e) { + // expected + } + } + }); + + // Start client call + t.start(); + + // Wait until method is entered on the server side + blockingSimpleImpl.acquireEnterPermit(); + + // The server side method is now blocked waiting on the run permit + // (= is busy handling the request) + + // Stop the server + server2.close(); + + // With the server gone, we expect the client to get some exception and exit + // Wait for client thread to exit + t.join(10000); + + Assert.assertFalse("Client request should not be blocked on server shutdown", t.isAlive()); + + } finally { + blockingSimpleImpl.releaseRunPermit(); + server2.close(); + if (transceiver2 != null) + transceiver2.close(); + } + } + @Ignore @Test public void performanceTest() throws Exception { @@ -459,7 +531,10 @@ public class TestNettyServerWithCallback * A SimpleImpl that requires a semaphore permit before executing any method. */ private static class BlockingSimpleImpl extends SimpleImpl { - private final Semaphore semaphore = new Semaphore(1); + /** Semaphore that is released when the method is entered. */ + private final Semaphore enterSemaphore = new Semaphore(1); + /** Semaphore that must be acquired for the method to run and exit. */ + private final Semaphore runSemaphore = new Semaphore(1); /** * Creates a BlockingSimpleImpl. @@ -470,76 +545,106 @@ public class TestNettyServerWithCallback @Override public String hello(String greeting) throws AvroRemoteException { - acquirePermit(); + releaseEnterPermit(); + acquireRunPermit(); try { return super.hello(greeting); } finally { - releasePermit(); + releaseRunPermit(); } } @Override public TestRecord echo(TestRecord record) throws AvroRemoteException { - acquirePermit(); + releaseEnterPermit(); + acquireRunPermit(); try { return super.echo(record); } finally { - releasePermit(); + releaseRunPermit(); } } @Override public int add(int arg1, int arg2) throws AvroRemoteException { - acquirePermit(); + releaseEnterPermit(); + acquireRunPermit(); try { return super.add(arg1, arg2); } finally { - releasePermit(); + releaseRunPermit(); } } @Override public ByteBuffer echoBytes(ByteBuffer data) throws AvroRemoteException { - acquirePermit(); + releaseEnterPermit(); + acquireRunPermit(); try { return super.echoBytes(data); } finally { - releasePermit(); + releaseRunPermit(); } } @Override public Void error() throws AvroRemoteException, TestError { - acquirePermit(); + releaseEnterPermit(); + acquireRunPermit(); try { return super.error(); } finally { - releasePermit(); + releaseRunPermit(); } } @Override public void ack() { - acquirePermit(); + releaseEnterPermit(); + acquireRunPermit(); try { super.ack(); } finally { - releasePermit(); + releaseRunPermit(); } } /** * Acquires a single permit from the semaphore. */ - public void acquirePermit() { - semaphore.acquireUninterruptibly(); + public void acquireRunPermit() { + try { + runSemaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } } /** * Releases a single permit to the semaphore. */ - public void releasePermit() { - semaphore.release(); + public void releaseRunPermit() { + runSemaphore.release(); + } + + /** + * Acquires a single permit from the semaphore. + */ + public void acquireEnterPermit() { + try { + enterSemaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } +} + + /** + * Releases a single permit to the semaphore. + */ + public void releaseEnterPermit() { + enterSemaphore.release(); } } }