avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1499040 - in /avro/trunk: CHANGES.txt lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
Date Tue, 02 Jul 2013 18:00:12 GMT
Author: cutting
Date: Tue Jul  2 18:00:11 2013
New Revision: 1499040

URL: http://svn.apache.org/r1499040
Log:
AVRO-1292. Java: Fix potential client blocking in NettyTransceiver.  Contributed by James
Baldassari.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1499040&r1=1499039&r2=1499040&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Jul  2 18:00:11 2013
@@ -115,6 +115,9 @@ Trunk (not yet released)
     AVRO-1293. Java: Fix potential deadlock in NettyTransceiver.
     (James Baldassari via cutting)
 
+    AVRO-1292. Java: Fix potential client blocking in NettyTransceiver.
+    (James Baldassari via cutting)
+
 Avro 1.7.4 (22 February 2012)
 
   NEW FEATURES

Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java?rev=1499040&r1=1499039&r2=1499040&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyTransceiver.java Tue Jul
 2 18:00:11 2013
@@ -40,11 +40,13 @@ import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelEvent;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.ChannelState;
 import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ChannelUpstreamHandler;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.ExceptionEvent;
 import org.jboss.netty.channel.MessageEvent;
@@ -62,6 +64,7 @@ public class NettyTransceiver extends Tr
   public static final String NETTY_CONNECT_TIMEOUT_OPTION = 
       "connectTimeoutMillis";
   public static final String NETTY_TCP_NODELAY_OPTION = "tcpNoDelay";
+  public static final String NETTY_KEEPALIVE_OPTION = "keepAlive";
   public static final boolean DEFAULT_TCP_NODELAY_VALUE = true;
   
   private static final Logger LOG = LoggerFactory.getLogger(NettyTransceiver.class
@@ -186,7 +189,7 @@ public class NettyTransceiver extends Tr
         ChannelPipeline p = Channels.pipeline();
         p.addLast("frameDecoder", new NettyFrameDecoder());
         p.addLast("frameEncoder", new NettyFrameEncoder());
-        p.addLast("handler", new NettyClientAvroHandler());
+        p.addLast("handler", createNettyClientAvroHandler());
         return p;
       }
     });
@@ -207,15 +210,25 @@ public class NettyTransceiver extends Tr
   }
   
   /**
+   * Creates a Netty ChannelUpstreamHandler for handling events on the 
+   * Netty client channel.
+   * @return the ChannelUpstreamHandler to use.
+   */
+  protected ChannelUpstreamHandler createNettyClientAvroHandler() {
+    return new NettyClientAvroHandler();
+  }
+  
+  /**
    * Creates the default options map for the Netty ClientBootstrap.
    * @param connectTimeoutMillis connection timeout in milliseconds, or null 
    * if no timeout is desired.
    * @return the map of Netty bootstrap options.
    */
-  private static Map<String, Object> buildDefaultBootstrapOptions(
+  protected static Map<String, Object> buildDefaultBootstrapOptions(
       Long connectTimeoutMillis) {
-    Map<String, Object> options = new HashMap<String, Object>(2);
+    Map<String, Object> options = new HashMap<String, Object>(3);
     options.put(NETTY_TCP_NODELAY_OPTION, DEFAULT_TCP_NODELAY_VALUE);
+    options.put(NETTY_KEEPALIVE_OPTION, true);
     options.put(NETTY_CONNECT_TIMEOUT_OPTION, 
         connectTimeoutMillis == null ? DEFAULT_CONNECTION_TIMEOUT_MILLIS : 
           connectTimeoutMillis);
@@ -254,7 +267,13 @@ public class NettyTransceiver extends Tr
             }
           }
           if (channelFuture != null) {
-          channelFuture.awaitUninterruptibly(connectTimeoutMillis);
+            try {
+              channelFuture.await(connectTimeoutMillis);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt(); // Reset interrupt flag
+              throw new IOException("Interrupted while connecting to " + 
+                  remoteAddr);
+            }
 
             synchronized(channelFutureLock) {
           if (!channelFuture.isSuccess()) {
@@ -351,7 +370,12 @@ public class NettyTransceiver extends Tr
     if (channelToClose != null) {
       ChannelFuture closeFuture = channelToClose.close();
       if (awaitCompletion && (closeFuture != null)) {
-        closeFuture.awaitUninterruptibly(connectTimeoutMillis);
+        try {
+          closeFuture.await(connectTimeoutMillis);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();   // Reset interrupt flag
+          LOG.warn("Interrupted while disconnecting", e);
+        }
       }
     }
   }
@@ -430,13 +454,26 @@ public class NettyTransceiver extends Tr
   
   @Override
   public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
+    ChannelFuture writeFuture;
     stateLock.readLock().lock();
     try {
-      writeDataPack(
+      writeFuture = writeDataPack(
           new NettyDataPack(serialGenerator.incrementAndGet(), buffers));
     } finally {
       stateLock.readLock().unlock();
     }
+    
+    if (!writeFuture.isDone()) {
+      try {
+        writeFuture.await();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();   // Reset interrupt flag
+        throw new IOException("Interrupted while writing Netty data pack", e);
+      }
+    }
+    if (!writeFuture.isSuccess()) {
+      throw new IOException("Error writing buffers", writeFuture.getCause());
+    }
   }
   
   /**
@@ -444,10 +481,11 @@ public class NettyTransceiver extends Tr
    * NOTE: The stateLock read lock *must* be acquired before calling this 
    * method.
    * @param dataPack the data pack to write.
+   * @return the Netty ChannelFuture for the write operation.
    * @throws IOException if an error occurs connecting to the remote peer.
    */
-  private void writeDataPack(NettyDataPack dataPack) throws IOException {
-    getChannel().write(dataPack);
+  private ChannelFuture writeDataPack(NettyDataPack dataPack) throws IOException {
+    return getChannel().write(dataPack);
   }
 
   @Override
@@ -484,11 +522,36 @@ public class NettyTransceiver extends Tr
       stateLock.writeLock().unlock();
     }
   }
+  
+  /**
+   * A ChannelFutureListener for channel write operations that notifies 
+   * a {@link Callback} if an error occurs while writing to the channel.
+   */
+  protected class WriteFutureListener implements ChannelFutureListener {
+    protected final Callback<List<ByteBuffer>> callback;
+    
+    /**
+     * Creates a WriteFutureListener that notifies the given callback 
+     * if an error occurs writing data to the channel.
+     * @param callback the callback to notify, or null to skip notification.
+     */
+    public WriteFutureListener(Callback<List<ByteBuffer>> callback) {
+      this.callback = callback;
+    }
+    
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if (!future.isSuccess() && (callback != null)) {
+        callback.handleError(
+            new IOException("Error writing buffers", future.getCause()));
+      }
+    }
+  }
 
   /**
    * Avro client handler for the Netty transport 
    */
-  class NettyClientAvroHandler extends SimpleChannelUpstreamHandler {
+  protected class NettyClientAvroHandler extends SimpleChannelUpstreamHandler {
 
     @Override
     public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
@@ -536,7 +599,7 @@ public class NettyTransceiver extends Tr
   /**
    * Creates threads with unique names based on a specified name prefix.
    */
-  private static class NettyTransceiverThreadFactory implements ThreadFactory {
+  protected static class NettyTransceiverThreadFactory implements ThreadFactory {
     private final AtomicInteger threadId = new AtomicInteger(0);
     private final String prefix;
     



Mime
View raw message