cassandra-pr mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dineshjoshi <...@git.apache.org>
Subject [GitHub] cassandra pull request #253: 13630
Date Fri, 07 Sep 2018 23:09:51 GMT
Github user dineshjoshi commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216103691
  
    --- Diff: src/java/org/apache/cassandra/net/async/MessageInHandler.java ---
    @@ -18,143 +18,296 @@
     
     package org.apache.cassandra.net.async;
     
    -import java.io.DataInputStream;
    +import java.io.EOFException;
     import java.io.IOException;
    -import java.util.Collections;
    -import java.util.EnumMap;
    -import java.util.List;
    -import java.util.Map;
    -import java.util.function.BiConsumer;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.RejectedExecutionHandler;
    +import java.util.concurrent.ThreadPoolExecutor;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
     
    -import com.google.common.primitives.Ints;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import io.netty.buffer.ByteBuf;
     import io.netty.channel.ChannelHandlerContext;
    -import org.apache.cassandra.io.util.DataInputBuffer;
    +import io.netty.channel.ChannelInboundHandlerAdapter;
    +import io.netty.handler.codec.ByteToMessageDecoder;
    +import io.netty.util.ReferenceCountUtil;
    +import org.apache.cassandra.concurrent.NamedThreadFactory;
    +import org.apache.cassandra.exceptions.UnknownTableException;
     import org.apache.cassandra.locator.InetAddressAndPort;
    -import org.apache.cassandra.net.MessageIn;
    -import org.apache.cassandra.net.MessagingService;
    -import org.apache.cassandra.net.ParameterType;
    -import org.apache.cassandra.utils.vint.VIntCoding;
    +import org.apache.cassandra.net.MessageIn.MessageInProcessor;
     
     /**
      * Parses incoming messages as per the 4.0 internode messaging protocol.
      */
    -public class MessageInHandler extends BaseMessageInHandler
    +public class MessageInHandler extends ChannelInboundHandlerAdapter
     {
         public static final Logger logger = LoggerFactory.getLogger(MessageInHandler.class);
     
    -    private MessageHeader messageHeader;
    +    private final InetAddressAndPort peer;
     
    -    MessageInHandler(InetAddressAndPort peer, int messagingVersion)
    +    private final BufferHandler bufferHandler;
    +    private volatile boolean closed;
    +
    +    public MessageInHandler(InetAddressAndPort peer, MessageInProcessor messageProcessor,
boolean handlesLargeMessages)
    +    {
    +        this.peer = peer;
    +
    +        bufferHandler = handlesLargeMessages
    +                        ? new BlockingBufferHandler(messageProcessor)
    +                        : new NonblockingBufferHandler(messageProcessor);
    +    }
    +
    +    public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException
    +    {
    +        if (!closed)
    +        {
    +            bufferHandler.channelRead(ctx, (ByteBuf) msg);
    +        }
    +        else
    +        {
    +            ReferenceCountUtil.release(msg);
    +            ctx.close();
    +        }
    +    }
    +
    +    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
    +    {
    +        if (cause instanceof EOFException)
    +            logger.trace("eof reading from socket; closing", cause);
    +        else if (cause instanceof UnknownTableException)
    +            logger.warn("Got message from unknown table while reading from socket; closing",
cause);
    +        else if (cause instanceof IOException)
    +            logger.trace("IOException reading from socket; closing", cause);
    +        else
    +            logger.warn("Unexpected exception caught in inbound channel pipeline from
" + ctx.channel().remoteAddress(), cause);
    +
    +        close();
    +        ctx.close();
    +    }
    +
    +    public void channelInactive(ChannelHandlerContext ctx)
    +    {
    +        logger.trace("received channel closed message for peer {} on local addr {}",
ctx.channel().remoteAddress(), ctx.channel().localAddress());
    +        close();
    +        ctx.fireChannelInactive();
    +    }
    +
    +    void close()
         {
    -        this (peer, messagingVersion, MESSAGING_SERVICE_CONSUMER);
    +        closed = true;
    +        bufferHandler.close();
         }
     
    -    public MessageInHandler(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn,
Integer> messageConsumer)
    +    boolean isClosed()
         {
    -        super(peer, messagingVersion, messageConsumer);
    +        return closed;
    +    }
     
    -        assert messagingVersion >= MessagingService.VERSION_40 : String.format("wrong
messaging version for this handler: got %d, but expect %d or higher",
    -                                                                              messagingVersion,
MessagingService.VERSION_40);
    -        state = State.READ_FIRST_CHUNK;
    +    /**
    +     * An abstraction around how incoming buffers are handled: either in a non-blocking
manner ({@link NonblockingBufferHandler})
    +     * or in a blocking manner ({@link BlockingBufferHandler}).
    +     *
    +     * The methods declared here will only be invoked on the netty event loop.
    +     */
    +    interface BufferHandler
    +    {
    +        void channelRead(ChannelHandlerContext ctx, ByteBuf in) throws IOException;
    +
    +        void close();
         }
     
         /**
    -     * For each new message coming in, builds up a {@link MessageHeader} instance incrementally.
This method
    -     * attempts to deserialize as much header information as it can out of the incoming
{@link ByteBuf}, and
    -     * maintains a trivial state machine to remember progress across invocations.
    +     * Processes incoming buffers on the netty event loop, in a non-blocking manner.
If buffers are not completely consumed,
    +     * it is stashed in {@link #retainedInlineBuffer}, and the next incoming buffer is
combined with it.
          */
    -    @SuppressWarnings("resource")
    -    public void handleDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) throws Exception
    +    class NonblockingBufferHandler implements BufferHandler
    --- End diff --
    
    This can be made into a static inner class.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


Mime
View raw message