avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1515933 [4/4] - in /avro/trunk: ./ lang/csharp/ lang/csharp/lib/main/ lang/csharp/src/apache/codegen/ lang/csharp/src/apache/codegen/Properties/ lang/csharp/src/apache/ipc/ lang/csharp/src/apache/ipc/Generic/ lang/csharp/src/apache/ipc/Pro...
Date Tue, 20 Aug 2013 19:13:41 GMT
Added: avro/trunk/lang/csharp/src/apache/test/Ipc/SocketServerWithCallbacksTest.cs
URL: http://svn.apache.org/viewvc/avro/trunk/lang/csharp/src/apache/test/Ipc/SocketServerWithCallbacksTest.cs?rev=1515933&view=auto
==============================================================================
--- avro/trunk/lang/csharp/src/apache/test/Ipc/SocketServerWithCallbacksTest.cs (added)
+++ avro/trunk/lang/csharp/src/apache/test/Ipc/SocketServerWithCallbacksTest.cs Tue Aug 20
19:13:39 2013
@@ -0,0 +1,791 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Diagnostics;
+using System.Net.Sockets;
+using System.Threading;
+using Avro.ipc;
+using Avro.ipc.Specific;
+using NUnit.Framework;
+using org.apache.avro.test;
+
+namespace Avro.Test.Ipc
+{
+    [TestFixture]
+    public class SocketServerWithCallbacksTest
+    {
+        private static volatile bool ackFlag;
+        private static volatile CountdownLatch ackLatch = new CountdownLatch(1);
+
+        private SocketServer server;
+        private SocketTransceiver transceiver;
+        private SimpleCallback simpleClient;
+        
+        [TestFixtureSetUp]
+        public void Init()
+        {
+            var responder = new SpecificResponder<Simple>(new SimpleImpl());
+            server = new SocketServer("localhost", 0, responder);
+            server.Start();
+
+            transceiver = new SocketTransceiver("localhost", server.Port);
+            simpleClient = SpecificRequestor.CreateClient<SimpleCallback>(transceiver);
+        }
+
+        [TestFixtureTearDown]
+        public void TearDown()
+        {
+            try
+            {
+                if (transceiver != null)
+                {
+                    transceiver.Disconnect();
+                }
+            }
+            catch
+            {
+            }
+
+            try
+            {
+                server.Stop();
+            }
+            catch
+            {
+            }
+        }
+
+
+        // AVRO-625 [Test]
+        public void CancelPendingRequestsOnTransceiverClose()
+        {
+            // Start up a second server so that closing the server doesn't 
+            // interfere with the other unit tests:
+            var blockingSimpleImpl = new BlockingSimpleImpl();
+
+            var responder = new SpecificResponder<Simple>(blockingSimpleImpl);
+            var server2 = new SocketServer("localhost", 0, responder);
+
+            server2.Start();
+
+            try
+            {
+                int serverPort = server2.Port;
+
+                var transceiver2 = new SocketTransceiver("localhost", serverPort);
+
+                var addFuture = new CallFuture<int>();
+                try
+                {
+                    var simpleClient2 = SpecificRequestor.CreateClient<SimpleCallback>(transceiver2);
+
+                    // The first call has to block for the handshake:
+                    Assert.AreEqual(3, simpleClient2.add(1, 2));
+
+                    // Now acquire the semaphore so that the server will block:
+                    blockingSimpleImpl.acquireRunPermit();
+                    simpleClient2.add(1, 2, addFuture);
+                }
+                finally
+                {
+                    // When the transceiver is closed, the CallFuture should get 
+                    // an IOException
+                    transceiver2.Close();
+                }
+                bool ioeThrown = false;
+                try
+                {
+                    addFuture.WaitForResult(2000);
+                }
+                catch (Exception)
+                {
+                }
+                //catch (ExecutionException e) {
+                //  ioeThrown = e.getCause() instanceof IOException;
+                //  Assert.assertTrue(e.getCause() instanceof IOException);
+                //} catch (Exception e) {
+                //  e.printStackTrace();
+                //  Assert.fail("Unexpected Exception: " + e.toString());
+                //}
+                Assert.IsTrue(ioeThrown, "Expected IOException to be thrown");
+            }
+            finally
+            {
+                blockingSimpleImpl.releaseRunPermit();
+                server2.Stop();
+            }
+        }
+
+        // AVRO-625 [Test]
+        public void CancelPendingRequestsAfterChannelCloseByServerShutdown()
+        {
+            // The purpose of this test is to verify that a client doesn't stay
+            // blocked when a server is unexpectedly killed (or when for some
+            // other reason the channel is suddenly closed) while the server
+            // was in the process of handling a request (thus after it received
+            // the request, and before it returned the response).
+
+            // Start up a second server so that closing the server doesn't
+            // interfere with the other unit tests:
+            var blockingSimpleImpl = new BlockingSimpleImpl();
+
+            var responder = new SpecificResponder<Simple>(blockingSimpleImpl);
+            var server2 = new SocketServer("localhost", 0, responder);
+
+            server2.Start();
+            SocketTransceiver transceiver2 = null;
+
+            try
+            {
+                transceiver2 = new SocketTransceiver("localhost", server2.Port);
+
+                var simpleClient2 = SpecificRequestor.CreateClient<SimpleCallback>(transceiver2);
+
+                // Acquire the method-enter permit, which will be released by the
+                // server method once we call it
+                blockingSimpleImpl.acquireEnterPermit();
+
+                // Acquire the run permit, to avoid that the server method returns immediately
+                blockingSimpleImpl.acquireRunPermit();
+
+                var t = new Thread(() =>
+                                       {
+                                           try
+                                           {
+                                               simpleClient2.add(3, 4);
+                                               Assert.Fail("Expected an exception");
+                                           }
+                                           catch (Exception)
+                                           {
+                                               // expected
+                                           }
+                                       });
+                // Start client call
+                t.Start();
+
+                // Wait until method is entered on the server side
+                blockingSimpleImpl.acquireEnterPermit();
+
+                // The server side method is now blocked waiting on the run permit
+                // (= is busy handling the request)
+
+                // Stop the server
+                server2.Stop();
+
+                // With the server gone, we expect the client to get some exception and exit
+                // Wait for client thread to exit
+                t.Join(10000);
+
+                Assert.IsFalse(t.IsAlive, "Client request should not be blocked on server
shutdown");
+            }
+            finally
+            {
+                blockingSimpleImpl.releaseRunPermit();
+                server2.Stop();
+                if (transceiver2 != null)
+                    transceiver2.Close();
+            }
+        }
+
+
+        private class CallbackCallFuture<T> : CallFuture<T>
+        {
+            private readonly Action<Exception> _handleException;
+            private readonly Action<T> _handleResult;
+
+            public CallbackCallFuture(Action<T> handleResult = null, Action<Exception>
handleException = null)
+            {
+                _handleResult = handleResult;
+                _handleException = handleException;
+            }
+
+            public override void HandleResult(T result)
+            {
+                _handleResult(result);
+            }
+
+            public override void HandleException(Exception exception)
+            {
+                _handleException(exception);
+            }
+        }
+
+        private class NestedCallFuture<T> : CallFuture<T>
+        {
+            private readonly CallFuture<T> cf;
+
+            public NestedCallFuture(CallFuture<T> cf)
+            {
+                this.cf = cf;
+            }
+
+            public override void HandleResult(T result)
+            {
+                cf.HandleResult(result);
+            }
+
+            public override void HandleException(Exception exception)
+            {
+                cf.HandleException(exception);
+            }
+        }
+
+
+        private string Hello(string howAreYou)
+        {
+            var response = new CallFuture<string>();
+
+            simpleClient.hello(howAreYou, response);
+
+            return response.WaitForResult(2000);
+        }
+
+        private void Hello(string howAreYou, CallFuture<string> future1)
+        {
+            simpleClient.hello(howAreYou, future1);
+        }
+
+        private class BlockingSimpleImpl : SimpleImpl
+        {
+            /** Semaphore that is released when the method is entered. */
+            private readonly Semaphore enterSemaphore = new Semaphore(1, 1);
+            /** Semaphore that must be acquired for the method to run and exit. */
+            private readonly Semaphore runSemaphore = new Semaphore(1, 1);
+
+
+            public override string hello(string greeting)
+            {
+                releaseEnterPermit();
+                acquireRunPermit();
+                try
+                {
+                    return base.hello(greeting);
+                }
+                finally
+                {
+                    releaseRunPermit();
+                }
+            }
+
+            public override TestRecord echo(TestRecord record)
+            {
+                releaseEnterPermit();
+                acquireRunPermit();
+                try
+                {
+                    return base.echo(record);
+                }
+                finally
+                {
+                    releaseRunPermit();
+                }
+            }
+
+            public override int add(int arg1, int arg2)
+            {
+                releaseEnterPermit();
+                acquireRunPermit();
+                try
+                {
+                    return base.add(arg1, arg2);
+                }
+                finally
+                {
+                    releaseRunPermit();
+                }
+            }
+
+            public override byte[] echoBytes(byte[] data)
+            {
+                releaseEnterPermit();
+                acquireRunPermit();
+                try
+                {
+                    return base.echoBytes(data);
+                }
+                finally
+                {
+                    releaseRunPermit();
+                }
+            }
+
+            public override object error()
+            {
+                releaseEnterPermit();
+                acquireRunPermit();
+                try
+                {
+                    return base.error();
+                }
+                finally
+                {
+                    releaseRunPermit();
+                }
+            }
+
+            public override void ack()
+            {
+                releaseEnterPermit();
+                acquireRunPermit();
+                try
+                {
+                    base.ack();
+                }
+                finally
+                {
+                    releaseRunPermit();
+                }
+            }
+
+
+            /**
+     * Acquires a single permit from the semaphore.
+     */
+
+            public void acquireRunPermit()
+            {
+                try
+                {
+                    runSemaphore.WaitOne();
+                    //  } catch (InterruptedException e) {
+                    //    Thread.currentThread().interrupt();
+                    //    throw new RuntimeException(e);
+                    //}
+                }
+                finally
+                {
+                }
+            }
+
+            /**
+     * Releases a single permit to the semaphore.
+     */
+
+            public void releaseRunPermit()
+            {
+                try
+                {
+                    runSemaphore.Release();
+                }
+                catch //(SemaphoreFullException)
+                {
+                }
+            }
+
+            private void releaseEnterPermit()
+            {
+                try
+                {
+                    enterSemaphore.Release();
+                }
+                catch //(SemaphoreFullException)
+                {
+                }
+            }
+
+            /**
+     * Acquires a single permit from the semaphore.
+     */
+
+            public void acquireEnterPermit()
+            {
+                try
+                {
+                    enterSemaphore.WaitOne();
+                    //    } catch (InterruptedException e) {
+                    //      Thread.currentThread().interrupt();
+                    //      throw new RuntimeException(e);
+                    //}
+                }
+                finally
+                {
+                }
+            }
+        }
+
+        [Test]
+        public void Ack()
+        {
+            simpleClient.ack();
+
+            ackLatch.Wait(2000);
+            Assert.IsTrue(ackFlag, "Expected ack flag to be set");
+
+            ackLatch = new CountdownLatch(1);
+            simpleClient.ack();
+            ackLatch.Wait(2000);
+            Assert.IsFalse(ackFlag, "Expected ack flag to be cleared");
+        }
+
+        [Test]
+        public void Add()
+        {
+            // Test synchronous RPC:
+            Assert.AreEqual(8, simpleClient.add(2, 6));
+
+            // Test asynchronous RPC (future):
+            var future1 = new CallFuture<int>();
+            simpleClient.add(8, 8, future1);
+            Assert.AreEqual(16, future1.WaitForResult(2000));
+            Assert.IsNull(future1.Error);
+
+            // Test asynchronous RPC (callback):
+            var future2 = new CallFuture<int>();
+            simpleClient.add(512, 256, new NestedCallFuture<int>(future2));
+
+            Assert.AreEqual(768, future2.WaitForResult(2000));
+            Assert.IsNull(future2.Error);
+        }
+
+        [Test]
+        public void ClientReconnectAfterServerRestart()
+        {
+            // Start up a second server so that closing the server doesn't 
+            // interfere with the other unit tests:
+            SimpleImpl simpleImpl = new BlockingSimpleImpl();
+
+            var responder = new SpecificResponder<Simple>(simpleImpl);
+            var server2 = new SocketServer("localhost", 0, responder);
+
+            server2.Start();
+
+            try
+            {
+                int serverPort = server2.Port;
+
+                // Initialize a client, and establish a connection to the server:
+                Transceiver transceiver2 = new SocketTransceiver("localhost", serverPort);
+
+                var simpleClient2 =
+                    SpecificRequestor.CreateClient<SimpleCallback>(transceiver2);
+
+                Assert.AreEqual(3, simpleClient2.add(1, 2));
+
+                // Restart the server:
+                server2.Stop();
+                try
+                {
+                    simpleClient2.add(2, -1);
+                    Assert.Fail("Client should not be able to invoke RPCs because server
is no longer running");
+                }
+                catch (Exception)
+                {
+                    // Expected since server is no longer running
+                }
+
+                Thread.Sleep(2000);
+                server2 = new SocketServer("localhost", serverPort, new SpecificResponder<Simple>(new
SimpleImpl()));
+
+                server2.Start();
+
+                // Invoke an RPC using the same client, which should reestablish the 
+                // connection to the server:
+                Assert.AreEqual(3, simpleClient2.add(1, 2));
+            }
+            finally
+            {
+                server2.Stop();
+            }
+        }
+
+        [Test]
+        public void Echo()
+        {
+            var record = new TestRecord
+                             {
+                                 hash =
+                                     new MD5
+                                         {
+                                             Value =
+                                                 new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 1, 2,
3, 4, 5, 6, 7, 8}
+                                         },
+                                 kind = Kind.FOO,
+                                 name = "My Record"
+                             };
+
+            // Test synchronous RPC:
+            TestRecord testRecord = simpleClient.echo(record);
+            Assert.AreEqual(record, testRecord);
+
+            // Test asynchronous RPC (future):
+            var future1 = new CallFuture<TestRecord>();
+            simpleClient.echo(record, future1);
+            Assert.AreEqual(record, future1.WaitForResult(2000));
+            Assert.IsNull(future1.Error);
+
+            // Test asynchronous RPC (callback):
+            var future2 = new CallFuture<TestRecord>();
+            simpleClient.echo(record, new NestedCallFuture<TestRecord>(future2));
+
+            Assert.AreEqual(record, future2.WaitForResult(2000));
+            Assert.IsNull(future2.Error);
+        }
+
+        [Test]
+        public void EchoBytes()
+        {
+            var byteBuffer = new byte[] {1, 2, 3, 4, 5, 6, 7, 8};
+
+            // Test synchronous RPC:
+            Assert.AreEqual(byteBuffer, simpleClient.echoBytes(byteBuffer));
+
+            // Test asynchronous RPC (future):
+            var future1 = new CallFuture<byte[]>();
+            simpleClient.echoBytes(byteBuffer, future1);
+            Assert.AreEqual(byteBuffer, future1.WaitForResult(2000));
+            Assert.IsNull(future1.Error);
+
+            // Test asynchronous RPC (callback):
+            var future2 = new CallFuture<byte[]>();
+            simpleClient.echoBytes(byteBuffer, new NestedCallFuture<byte[]>(future2));
+
+            Assert.AreEqual(byteBuffer, future2.WaitForResult(2000));
+            Assert.IsNull(future2.Error);
+        }
+
+        [Test]
+        public void Error()
+        {
+            // Test synchronous RPC:
+            try
+            {
+                simpleClient.error();
+                Assert.Fail("Expected " + typeof (TestError).Name + " to be thrown");
+            }
+            catch (TestError)
+            {
+                // Expected
+            }
+            catch (Exception e)
+            {
+                Assert.Fail("Unexpected error: " + e);
+            }
+
+            // Test asynchronous RPC (future):
+            var future = new CallFuture<object>();
+            simpleClient.error(future);
+            try
+            {
+                future.WaitForResult(2000);
+                Assert.Fail("Expected " + typeof (TestError).Name + " to be thrown");
+            }
+            catch (TestError)
+            {
+            }
+
+            Assert.IsNotNull(future.Error);
+            Assert.AreEqual(typeof (TestError), future.Error.GetType());
+            Assert.IsNull(future.Result);
+
+            // Test asynchronous RPC (callback):
+            Exception errorRef = null;
+            var latch = new CountdownLatch(1);
+            simpleClient.error(new CallbackCallFuture<object>(
+                                   result => Assert.Fail("Expected " + typeof (TestError).Name),
+                                   exception =>
+                                       {
+                                           errorRef = exception;
+                                           latch.Signal();
+                                       }));
+
+            Assert.IsTrue(latch.Wait(2000), "Timed out waiting for error");
+            Assert.IsNotNull(errorRef);
+            Assert.AreEqual(typeof (TestError), errorRef.GetType());
+        }
+
+        [Test]
+        public void Greeting()
+        {
+            // Test synchronous RPC:
+            string response = Hello("how are you?");
+            Assert.AreEqual("Hello, how are you?", response);
+
+            // Test asynchronous RPC (future):
+            var future1 = new CallFuture<String>();
+            Hello("World!", future1);
+
+            string result = future1.WaitForResult();
+            Assert.AreEqual("Hello, World!", result);
+            Assert.IsNull(future1.Error);
+
+            // Test asynchronous RPC (callback):
+            var future2 = new CallFuture<String>();
+
+            Hello("what's up?", new NestedCallFuture<string>(future2));
+            Assert.AreEqual("Hello, what's up?", future2.WaitForResult());
+            Assert.IsNull(future2.Error);
+        }
+
+        //[Test]
+        public void PerformanceTest()
+        {
+            const int threadCount = 8;
+            const long runTimeMillis = 10*1000L;
+
+
+            long rpcCount = 0;
+            int[] runFlag = {1};
+
+            var startLatch = new CountdownLatch(threadCount);
+            for (int ii = 0; ii < threadCount; ii++)
+            {
+                new Thread(() =>
+                               {
+                                   {
+                                       try
+                                       {
+                                           startLatch.Signal();
+                                           startLatch.Wait(2000);
+
+                                           while (Interlocked.Add(ref runFlag[0], 0) == 1)
+                                           {
+                                               Interlocked.Increment(ref rpcCount);
+                                               Assert.AreEqual("Hello, World!", simpleClient.hello("World!"));
+                                           }
+                                       }
+                                       catch (Exception e)
+                                       {
+                                           Console.WriteLine(e);
+                                       }
+                                   }
+                               }).Start();
+            }
+
+            startLatch.Wait(2000);
+            Thread.Sleep(2000);
+            Interlocked.Exchange(ref runFlag[0], 1);
+
+            string results = "Completed " + rpcCount + " RPCs in " + runTimeMillis +
+                             "ms => " + ((rpcCount/(double) runTimeMillis)*1000) + " RPCs/sec,
" +
+                             (runTimeMillis/(double) rpcCount) + " ms/RPC.";
+
+            Debug.WriteLine(results);
+        }
+
+        [Test]
+        public void TestSendAfterChannelClose()
+        {
+            // Start up a second server so that closing the server doesn't 
+            // interfere with the other unit tests:
+
+            var responder = new SpecificResponder<Simple>(new SimpleImpl());
+            var server2 = new SocketServer("localhost", 0, responder);
+
+            server2.Start();
+
+            try
+            {
+                var transceiver2 = new SocketTransceiver("localhost", server2.Port);
+
+                try
+                {
+                    var simpleClient2 = SpecificRequestor.CreateClient<SimpleCallback>(transceiver2);
+
+                    // Verify that connection works:
+                    Assert.AreEqual(3, simpleClient2.add(1, 2));
+
+                    // Try again with callbacks:
+                    var addFuture = new CallFuture<int>();
+                    simpleClient2.add(1, 2, addFuture);
+                    Assert.AreEqual(3, addFuture.WaitForResult(2000));
+
+                    // Shut down server:
+                    server2.Stop();
+
+                    // Send a new RPC, and verify that it throws an Exception that 
+                    // can be detected by the client:
+                    bool ioeCaught = false;
+                    try
+                    {
+                        simpleClient2.add(1, 2);
+                        Assert.Fail("Send after server close should have thrown Exception");
+                    }
+                    catch (SocketException)
+                    {
+                        ioeCaught = true;
+                    }
+
+                    Assert.IsTrue(ioeCaught, "Expected IOException");
+
+                    // Send a new RPC with callback, and verify that the correct Exception

+                    // is thrown:
+                    ioeCaught = false;
+                    try
+                    {
+                        addFuture = new CallFuture<int>();
+                        simpleClient2.add(1, 2, addFuture);
+                        addFuture.WaitForResult(2000);
+
+                        Assert.Fail("Send after server close should have thrown Exception");
+                    }
+                    catch (SocketException)
+                    {
+                        ioeCaught = true;
+                    }
+
+                    Assert.IsTrue(ioeCaught, "Expected IOException");
+                }
+                finally
+                {
+                    transceiver2.Disconnect();
+                }
+            }
+            finally
+            {
+                server2.Stop();
+                Thread.Sleep(1000);
+            }
+        }
+
+        private class SimpleImpl : Simple
+        {
+            public override string hello(string greeting)
+            {
+                return "Hello, " + greeting;
+            }
+
+            public override TestRecord echo(TestRecord record)
+            {
+                return record;
+            }
+
+            public override int add(int arg1, int arg2)
+            {
+                return arg1 + arg2;
+            }
+
+            public override byte[] echoBytes(byte[] data)
+            {
+                return data;
+            }
+
+            public override object error()
+            {
+                throw new TestError { message = "Test Message" };
+            }
+
+            public override void ack()
+            {
+                ackFlag = !ackFlag;
+                ackLatch.Signal();
+            }
+        }
+
+    }
+}
\ No newline at end of file

Added: avro/trunk/lang/csharp/src/apache/test/Ipc/SocketTransceiverWhenServerStopsTest.cs
URL: http://svn.apache.org/viewvc/avro/trunk/lang/csharp/src/apache/test/Ipc/SocketTransceiverWhenServerStopsTest.cs?rev=1515933&view=auto
==============================================================================
--- avro/trunk/lang/csharp/src/apache/test/Ipc/SocketTransceiverWhenServerStopsTest.cs (added)
+++ avro/trunk/lang/csharp/src/apache/test/Ipc/SocketTransceiverWhenServerStopsTest.cs Tue
Aug 20 19:13:39 2013
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using Avro.ipc;
+using Avro.ipc.Specific;
+using NUnit.Framework;
+using org.apache.avro.test;
+
+namespace Avro.Test.Ipc
+{
+    [TestFixture]
+    public class SocketTransceiverWhenServerStopsTest
+    {
+        private static org.apache.avro.test.Message CreateMessage()
+        {
+            var msg = new org.apache.avro.test.Message
+                          {
+                              to = "wife",
+                              from = "husband",
+                              body = "I love you!"
+                          };
+            return msg;
+        }
+
+        private static readonly DateTime Jan1st1970 = new DateTime
+            (1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
+
+        public static long CurrentTimeMillis()
+        {
+            return (long) (DateTime.UtcNow - Jan1st1970).TotalMilliseconds;
+        }
+
+        public class MailImpl : Mail
+        {
+            private CountdownLatch allMessages = new CountdownLatch(5);
+
+            // in this simple example just return details of the message
+            public override String send(org.apache.avro.test.Message message)
+            {
+                return "Sent message to [" + message.to +
+                       "] from [" + message.from + "] with body [" +
+                       message.body + "]";
+            }
+
+            public override void fireandforget(org.apache.avro.test.Message message)
+            {
+                allMessages.Signal();
+            }
+
+            public void reset()
+            {
+                allMessages = new CountdownLatch(5);
+            }
+        }
+
+        [Test]
+        public void TestSocketTransceiverWhenServerStops()
+        {
+            Responder responder = new SpecificResponder<Mail>(new MailImpl());
+            var server = new SocketServer("localhost", 0, responder);
+
+            server.Start();
+
+            var transceiver = new SocketTransceiver("localhost", server.Port);
+            var mail = SpecificRequestor.CreateClient<Mail>(transceiver);
+
+            int[] successes = {0};
+            int failures = 0;
+            int[] quitOnFailure = {0};
+            var threads = new List<Thread>();
+
+            // Start a bunch of client threads that use the transceiver to send messages
+            for (int i = 0; i < 100; i++)
+            {
+                var thread = new Thread(
+                    () =>
+                        {
+                            while (true)
+                            {
+                                try
+                                {
+                                    mail.send(CreateMessage());
+                                    Interlocked.Increment(ref successes[0]);
+                                }
+                                catch (Exception)
+                                {
+                                    Interlocked.Increment(ref failures);
+
+                                    if (Interlocked.Add(ref quitOnFailure[0], 0) == 1)
+                                    {
+                                        return;
+                                    }
+                                }
+                            }
+                        });
+
+                thread.Name = "Thread" + i;
+                threads.Add(thread);
+                thread.Start();
+            }
+
+            // Be sure the threads are running: wait until we get a good deal of successes
+            while (Interlocked.Add(ref successes[0], 0) < 10000)
+            {
+                Thread.Sleep(50);
+            }
+
+            // Now stop the server
+            server.Stop();
+
+            // Server is stopped: successes should not increase anymore: wait until we're
in that situation
+            while (true)
+            {
+                int previousSuccesses = Interlocked.Add(ref successes[0], 0);
+                Thread.Sleep(500);
+                if (previousSuccesses == Interlocked.Add(ref successes[0], 0))
+                {
+                    break;
+                }
+            }
+
+            server.Start();
+
+            long now = CurrentTimeMillis();
+
+            int previousSuccesses2 = successes[0];
+            while (true)
+            {
+                Thread.Sleep(500);
+                if (successes[0] > previousSuccesses2)
+                {
+                    break;
+                }
+                if (CurrentTimeMillis() - now > 5000)
+                {
+                    Console.WriteLine("FYI: requests don't continue immediately...");
+                    break;
+                }
+            }
+
+            // Stop our client, we would expect this to go on immediately
+            Console.WriteLine("Stopping transceiver");
+
+            Interlocked.Add(ref quitOnFailure[0], 1);
+            now = CurrentTimeMillis();
+            transceiver.Close();
+
+            // Wait for all threads to quit
+            while (true)
+            {
+                threads.RemoveAll(x => !x.IsAlive);
+
+                if (threads.Count > 0)
+                    Thread.Sleep(1000);
+                else 
+                    break;
+            }
+
+            if (CurrentTimeMillis() - now > 10000)
+            {
+                Assert.Fail("Stopping NettyTransceiver and waiting for client threads to
quit took too long.");
+            }
+            else
+            {
+                Console.WriteLine("Stopping NettyTransceiver and waiting for client threads
to quit took "
+                                  + (CurrentTimeMillis() - now) + " ms");
+            }
+        }
+    }
+}
\ No newline at end of file

Modified: avro/trunk/lang/csharp/src/apache/test/Properties/AssemblyInfo.cs
URL: http://svn.apache.org/viewvc/avro/trunk/lang/csharp/src/apache/test/Properties/AssemblyInfo.cs?rev=1515933&r1=1515932&r2=1515933&view=diff
==============================================================================
--- avro/trunk/lang/csharp/src/apache/test/Properties/AssemblyInfo.cs (original)
+++ avro/trunk/lang/csharp/src/apache/test/Properties/AssemblyInfo.cs Tue Aug 20 19:13:39
2013
@@ -15,39 +15,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 using System.Reflection;
-using System.Runtime.CompilerServices;
 using System.Runtime.InteropServices;
 
-// General Information about an assembly is controlled through the following 
-// set of attributes. Change these attribute values to modify the information
-// associated with an assembly.
 [assembly: AssemblyTitle("Avro.test")]
 [assembly: AssemblyDescription("")]
 [assembly: AssemblyConfiguration("")]
-[assembly: AssemblyCompany("Apache.org")]
+[assembly: AssemblyCompany("Apache")]
 [assembly: AssemblyProduct("Avro.test")]
-[assembly: AssemblyCopyright("Copyright © Apache 2010")]
+[assembly: AssemblyCopyright("Copyright © Apache 2013")]
 [assembly: AssemblyTrademark("")]
 [assembly: AssemblyCulture("")]
-
-// Setting ComVisible to false makes the types in this assembly not visible 
-// to COM components.  If you need to access a type in this assembly from 
-// COM, set the ComVisible attribute to true on that type.
 [assembly: ComVisible(false)]
-
-// The following GUID is for the ID of the typelib if this project is exposed to COM
-[assembly: Guid("9c47069f-9d58-4815-bfea-f7dac9a48b05")]
-
-// Version information for an assembly consists of the following four values:
-//
-//      Major Version
-//      Minor Version 
-//      Build Number
-//      Revision
-//
-// You can specify all the values or you can default the Build and Revision Numbers 
-// by using the '*' as shown below:
-// [assembly: AssemblyVersion("1.0.*")]
+[assembly: Guid("442785CE-3633-4A04-A103-434104F63D55")]
 [assembly: AssemblyVersion("0.9.0.0")]
-[assembly: AssemblyFileVersion("0.9.0.0")]
+[assembly: AssemblyFileVersion("0.9.0.0")]
\ No newline at end of file

Modified: avro/trunk/lang/csharp/src/apache/test/Specific/SpecificTests.cs
URL: http://svn.apache.org/viewvc/avro/trunk/lang/csharp/src/apache/test/Specific/SpecificTests.cs?rev=1515933&r1=1515932&r2=1515933&view=diff
==============================================================================
--- avro/trunk/lang/csharp/src/apache/test/Specific/SpecificTests.cs (original)
+++ avro/trunk/lang/csharp/src/apache/test/Specific/SpecificTests.cs Tue Aug 20 19:13:39 2013
@@ -154,7 +154,7 @@ namespace Avro.Test
 
 "}
 )]
-        public static void TestSpecific(string str, object[] result)
+        public void TestSpecific(string str, object[] result)
         {
             Protocol protocol = Protocol.Parse(str);
             var codegen = new CodeGen();



Mime
View raw message