cassandra-pr mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jasobrown <...@git.apache.org>
Subject [GitHub] cassandra pull request #253: 13630
Date Fri, 07 Sep 2018 19:48:26 GMT
Github user jasobrown commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/253#discussion_r216069307
  
    --- Diff: src/java/org/apache/cassandra/net/MessageIn.java ---
    @@ -231,4 +241,437 @@ public String toString()
             sbuf.append("FROM:").append(from).append(" TYPE:").append(getMessageType()).append("
VERB:").append(verb);
             return sbuf.toString();
         }
    +
    +    public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion)
    +    {
    +        return getProcessor(peer, messagingVersion, MessageInProcessor.MESSAGING_SERVICE_CONSUMER);
    +
    +    }
    +
    +    public static MessageInProcessor getProcessor(InetAddressAndPort peer, int messagingVersion,
BiConsumer<MessageIn, Integer> messageConsumer)
    +    {
    +        return messagingVersion >= MessagingService.VERSION_40
    +               ? new MessageInProcessorAsOf40(peer, messagingVersion, messageConsumer)
    +               : new MessageInProcessorPre40(peer, messagingVersion, messageConsumer);
    +
    +    }
    +
    +    /**
    +     * Implementations contain the mechanics and logic of parsing incoming messages.
Allows for both non-blocking
    +     * and blocking styles of interaction via the {@link #process(ByteBuf)} and {@link
#process(RebufferingByteBufDataInputPlus)}
    +     * methods, respectively.
    +     *
    +     * Does not contain the actual deserialization code for message fields nor payload.
That is left to the
    +     * {@link MessageIn#read(DataInputPlus, int, int)} family of methods.
    +     */
    +    public static abstract class MessageInProcessor
    +    {
    +        /**
    +         * The current state of deserializing an incoming message. This enum is only
used in the nonblocking versions.
    +         */
    +        public enum State
    +        {
    +            READ_PREFIX,
    +            READ_IP_ADDRESS,
    +            READ_VERB,
    +            READ_PARAMETERS_SIZE,
    +            READ_PARAMETERS_DATA,
    +            READ_PAYLOAD_SIZE,
    +            READ_PAYLOAD
    +        }
    +
    +        static final int VERB_LENGTH = Integer.BYTES;
    +
    +        /**
    +         * The default target for consuming deserialized {@link MessageIn}.
    +         */
    +        private static final BiConsumer<MessageIn, Integer> MESSAGING_SERVICE_CONSUMER
= (messageIn, id) -> MessagingService.instance().receive(messageIn, id);
    +
    +        final InetAddressAndPort peer;
    +        final int messagingVersion;
    +
    +        /**
    +         * Abstracts out depending directly on {@link MessagingService#receive(MessageIn,
int)}; this makes tests more sane
    +         * as they don't require nor trigger the entire message processing circus.
    +         */
    +        final BiConsumer<MessageIn, Integer> messageConsumer;
    +
    +        /**
    +         * Captures the current {@link State} of processing a message. Primarily useful
in the non-blocking use case.
    +         */
    +        State state = State.READ_PREFIX;
    +
    +        /**
    +         * Captures the current data we've parsed out of in incoming message. Primarily
useful in the non-blocking use case.
    +         */
    +        MessageHeader messageHeader;
    +
    +        /**
    +         * Process the buffer in a non-blocking manner. Will try to read out as much
of a message(s) as possible,
    +         * and send any fully deserialized messages to {@link #messageConsumer}.
    +         */
    +        public abstract void process(ByteBuf in) throws IOException;
    +
    +        /**
    +         * Process the buffer in a blocking manner. Will read as many messages as possible,
blocking for more data,
    +         * and send any fully deserialized messages to {@link #messageConsumer}.
    +         */
    +        public abstract void process(RebufferingByteBufDataInputPlus in) throws IOException;
    +
    +        MessageInProcessor(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn,
Integer> messageConsumer)
    +        {
    +            this.peer = peer;
    +            this.messagingVersion = messagingVersion;
    +            this.messageConsumer = messageConsumer;
    +        }
    +
    +        /**
    +         * Only applicable in the non-blocking use case, and should ony be used for testing!!!
    +         */
    +        @VisibleForTesting
    +        public MessageHeader getMessageHeader()
    +        {
    +            return messageHeader;
    +        }
    +
    +        /**
    +         * A simple struct to hold the message header data as it is being built up.
    +         */
    +        public static class MessageHeader
    +        {
    +            public int messageId;
    +            long constructionTime;
    +            public InetAddressAndPort from;
    +            public MessagingService.Verb verb;
    +            int payloadSize;
    +
    +            Map<ParameterType, Object> parameters = Collections.emptyMap();
    +
    +            /**
    +             * Length of the parameter data. If the message's version is {@link MessagingService#VERSION_40}
or higher,
    +             * this value is the total number of header bytes; else, for legacy messaging,
this is the number of
    +             * key/value entries in the header.
    +             */
    +            int parameterLength;
    +        }
    +
    +        MessageHeader readPrefix(DataInputPlus in) throws IOException
    +        {
    +            MessagingService.validateMagic(in.readInt());
    +            MessageHeader messageHeader = new MessageHeader();
    +            messageHeader.messageId = in.readInt();
    +            int messageTimestamp = in.readInt(); // make sure to read the sent timestamp,
even if DatabaseDescriptor.hasCrossNodeTimeout() is not enabled
    +            messageHeader.constructionTime = MessageIn.deriveConstructionTime(peer, messageTimestamp,
ApproximateTime.currentTimeMillis());
    +
    +            return messageHeader;
    +        }
    +    }
    +
    +    /**
    +     * Reads the incoming stream of bytes in the 4.0 format.
    +     */
    +    static class MessageInProcessorAsOf40 extends MessageInProcessor
    +    {
    +        MessageInProcessorAsOf40(InetAddressAndPort peer, int messagingVersion, BiConsumer<MessageIn,
Integer> messageConsumer)
    +        {
    +            super(peer, messagingVersion, messageConsumer);
    +            assert messagingVersion >= MessagingService.VERSION_40;
    +        }
    +
    +        @SuppressWarnings("resource")
    +        public void process(ByteBuf in) throws IOException
    --- End diff --
    
    done. I also pulled the main loop logic into the base class.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


Mime
View raw message