Github user jasobrown commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/253#discussion_r216157096
--- Diff: src/java/org/apache/cassandra/net/async/MessageInHandler.java ---
@@ -18,143 +18,296 @@
package org.apache.cassandra.net.async;
-import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.IOException;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.List;
-import java.util.Map;
-import java.util.function.BiConsumer;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
-import org.apache.cassandra.io.util.DataInputBuffer;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.util.ReferenceCountUtil;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.exceptions.UnknownTableException;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.net.MessageIn;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.ParameterType;
-import org.apache.cassandra.utils.vint.VIntCoding;
+import org.apache.cassandra.net.MessageIn.MessageInProcessor;
/**
* Parses incoming messages as per the 4.0 internode messaging protocol.
*/
-public class MessageInHandler extends BaseMessageInHandler
+public class MessageInHandler extends ChannelInboundHandlerAdapter
{
public static final Logger logger = LoggerFactory.getLogger(MessageInHandler.class);
- private MessageHeader messageHeader;
+ private final InetAddressAndPort peer;
- MessageInHandler(InetAddressAndPort peer, int messagingVersion)
+ private final BufferHandler bufferHandler;
+ private volatile boolean closed;
+
+ public MessageInHandler(InetAddressAndPort peer, MessageInProcessor messageProcessor,
boolean handlesLargeMessages)
+ {
+ this.peer = peer;
+
+ bufferHandler = handlesLargeMessages
+ ? new BlockingBufferHandler(messageProcessor)
+ : new NonblockingBufferHandler(messageProcessor);
+ }
+
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException
+ {
+ if (!closed)
+ {
+ bufferHandler.channelRead(ctx, (ByteBuf) msg);
+ }
+ else
+ {
+ ReferenceCountUtil.release(msg);
+ ctx.close();
+ }
+ }
+
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+ {
+ if (cause instanceof EOFException)
+ logger.trace("eof reading from socket; closing", cause);
+ else if (cause instanceof UnknownTableException)
+ logger.warn("Got message from unknown table while reading from socket; closing",
cause);
+ else if (cause instanceof IOException)
+ logger.trace("IOException reading from socket; closing", cause);
+ else
+ logger.warn("Unexpected exception caught in inbound channel pipeline from
" + ctx.channel().remoteAddress(), cause);
+
+ close();
+ ctx.close();
+ }
+
+ public void channelInactive(ChannelHandlerContext ctx)
+ {
+ logger.trace("received channel closed message for peer {} on local addr {}",
ctx.channel().remoteAddress(), ctx.channel().localAddress());
+ close();
+ ctx.fireChannelInactive();
+ }
+
+ void close()
{
- this (peer, messagingVersion, MESSAGING_SERVICE_CONSUMER);
+ closed = true;
+ bufferHandler.close();
}
- public MessageInHandler(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn,
Integer> messageConsumer)
+ boolean isClosed()
{
- super(peer, messagingVersion, messageConsumer);
+ return closed;
+ }
- assert messagingVersion >= MessagingService.VERSION_40 : String.format("wrong
messaging version for this handler: got %d, but expect %d or higher",
- messagingVersion,
MessagingService.VERSION_40);
- state = State.READ_FIRST_CHUNK;
+ /**
+ * An abstraction around how incoming buffers are handled: either in a non-blocking
manner ({@link NonblockingBufferHandler})
+ * or in a blocking manner ({@link BlockingBufferHandler}).
+ *
+ * The methods declared here will only be invoked on the netty event loop.
+ */
+ interface BufferHandler
+ {
+ void channelRead(ChannelHandlerContext ctx, ByteBuf in) throws IOException;
+
+ void close();
}
/**
- * For each new message coming in, builds up a {@link MessageHeader} instance incrementally.
This method
- * attempts to deserialize as much header information as it can out of the incoming
{@link ByteBuf}, and
- * maintains a trivial state machine to remember progress across invocations.
+ * Processes incoming buffers on the netty event loop, in a non-blocking manner.
If buffers are not completely consumed,
+ * it is stashed in {@link #retainedInlineBuffer}, and the next incoming buffer is
combined with it.
*/
- @SuppressWarnings("resource")
- public void handleDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out) throws Exception
+ class NonblockingBufferHandler implements BufferHandler
--- End diff --
done
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
|