cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jon Meredith (Jira)" <j...@apache.org>
Subject [jira] [Updated] (CASSANDRA-16808) Pre-4.0 FWD_FRM message parameter serialization and message-id forwarding is incorrect
Date Sat, 17 Jul 2021 04:38:00 GMT

     [ https://issues.apache.org/jira/browse/CASSANDRA-16808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Jon Meredith updated CASSANDRA-16808:
-------------------------------------
    Test and Documentation Plan: 
[Branch|https://github.com/jonmeredith/cassandra/tree/test-fwd-frm]
[Pull Request|https://github.com/apache/cassandra/pull/1111]
[CircleCI|https://app.circleci.com/pipelines/github/jonmeredith/cassandra?branch=C16808]

The branch has three commits, one to demonstrate the issue and two to fix the discovered issues.
                         Status: Patch Available  (was: Open)

> Pre-4.0 FWD_FRM message parameter serialization and message-id forwarding is incorrect
> --------------------------------------------------------------------------------------
>
>                 Key: CASSANDRA-16808
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-16808
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Messaging/Internode
>            Reporter: Jon Meredith
>            Assignee: Jon Meredith
>            Priority: Normal
>             Fix For: 4.0-rc
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Fixing CASSANDRA-16797 has exposed an issue with the way {{FWD_FRM}} is serialized.
> In the code cleanup during the internode messaging refactor, the serialization for {{FWD_FRM}} (the
endpoint to respond to for forwarded messages) was implemented using the same serialization
format as CompactEndpointSerializationHelper which prefixes the address bytes with their length,
however the FWD_FRM parameter value does not include a length and just converts the parameter
value to an InetAddress.
> In a mixed version cluster this causes the pre-4.0 nodes to fail when deserializing the
mutation
> {code:java}
> java.lang.RuntimeException: java.net.UnknownHostException: addr is of illegal length
>         at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:72)
~[dtest-3.0.25.jar:na]
>         at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
~[na:na]
>         at org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$FutureTask.run(AbstractLocalAwareExecutorService.java:162)
~[dtest-3.0.25.jar:na]
>         at org.apache.cassandra.concurrent.AbstractLocalAwareExecutorService$LocalSessionFutureTask.run(AbstractLocalAwareExecutorService.java:134)
~[dtest-3.0.25.jar:na]
>         at org.apache.cassandra.concurrent.SEPWorker.run(SEPWorker.java:109) ~[dtest-3.0.25.jar:na]
>         at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
> Caused by: java.net.UnknownHostException: addr is of illegal length
>         at java.base/java.net.InetAddress.getByAddress(InetAddress.java:1208) ~[na:na]
>         at java.base/java.net.InetAddress.getByAddress(InetAddress.java:1571) ~[na:na]
>         at org.apache.cassandra.db.MutationVerbHandler.doVerb(MutationVerbHandler.java:57)
~[dtest-3.0.25.jar:na]
>         at org.apache.cassandra.net.MessageDeliveryTask.run(MessageDeliveryTask.java:67)
~[dtest-3.0.25.jar:na]
>         ... 5 common frames omitted
> {code}
> Unfortunately there isn't a clean fix I can see as {{org.apache.cassandra.io.IVersionedAsymmetricSerializer#deserialize}}
used to deserialize the FWD_FRM address does not take a maximum length to deserialize and
it's impossible to tell definitely know if it's an IPv4 or IPv6 address from the first four
bytes.
> The patch I'm submitting special-cases the deserializing pre-4.0 {{FWD_FRM}} parameters
in the {{Message}} deserializer. That seems preferable to extending the deserialization interface
or creating a new {{DataInputBuffer}} limited by the parameter value length.
> Once that was fixed, the INSERT statements were still failing which I tracked down to
the 4.0 optimization of serializing the forwarded message once if the message id is the same
>  [https://github.com/apache/cassandra/blob/cassandra-4.0/src/java/org/apache/cassandra/db/MutationVerbHandler.java#L76]
> In the test case I wrote, only one message was being forwarded and that had a different
id to the original forwarded message. The {{useSameMessageID}} method only checked message
Ids within the forwarded messages.
>  
> Code Details:
> When MutationVerbHandler.forwardToLocalNodes is constructing the forwarding message it
just stores the the byte array representing the IPv4 or IPv6 address in the parameter array.
> (link [https://github.com/apache/cassandra/blob/44604b7316fcbfd7d0d7425e75cd7ebe267e3247/src/java/org/apache/cassandra/db/MutationVerbHandler.java#L90]
)
> {code:java}
>     private static void forwardToLocalNodes(Mutation mutation, MessagingService.Verb
verb, byte[] forwardBytes, InetAddress from) throws IOException
>     {
>         try (DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes)))
>         {
>             int size = in.readInt();
>             // tell the recipients who to send their ack to
>             MessageOut<Mutation> message = new MessageOut<>(verb, mutation,
Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress());
> {code}
> When the message is serialized in 3.0 MessageOut.serialize, that raw entry of bytes is
written with the length
> (link [https://github.com/apache/cassandra/blob/44604b7316fcbfd7d0d7425e75cd7ebe267e3247/src/java/org/apache/cassandra/net/MessageOut.java#L119]
)
> {code:java}
>     public void serialize(DataOutputPlus out, int version) throws IOException
>     {
>         CompactEndpointSerializationHelper.serialize(from, out);
>         out.writeInt(MessagingService.Verb.convertForMessagingServiceVersion(verb, version).getId());
>         out.writeInt(parameters.size());
>         for (Map.Entry<String, byte[]> entry : parameters.entrySet())
>         {
>             out.writeUTF(entry.getKey());
>             out.writeInt(entry.getValue().length);
>             out.write(entry.getValue());
>         }
>         ....
>     }
> {code}
> And we do the same on 4.0, however in 4.0 the parameter is serialized using the ParamType
enum
> (link [https://github.com/apache/cassandra/blob/fcd30b6e0db3622a8e78e9aa35221f630c77f6de/src/java/org/apache/cassandra/net/Message.java#L1154]
)
> {code:java}
>             for (int i = 0; i < count; i++)
>             {
>                 ParamType type = version >= VERSION_40
>                     ? ParamType.lookUpById(Ints.checkedCast(in.readUnsignedVInt()))
>                     : ParamType.lookUpByAlias(in.readUTF());
>                 int length = version >= VERSION_40
>                     ? Ints.checkedCast(in.readUnsignedVInt())
>                     : in.readInt();
>                 if (null != type)
>                     params.put(type, type.serializer.deserialize(in, version));
>                 else
>                     in.skipBytesFully(length); // forward compatibiliy with minor version
changes
>             }
> {code}
> (link [https://github.com/apache/cassandra/blob/fcd30b6e0db3622a8e78e9aa35221f630c77f6de/src/java/org/apache/cassandra/net/ParamType.java#L45]
)
> {code:java}
> public enum ParamType
> {
>     FORWARD_TO          (0, "FWD_TO",        ForwardingInfo.serializer),
>     RESPOND_TO          (1, "FWD_FRM",       inetAddressAndPortSerializer),
>     ...
> }
> {code}
> The {{InetAddressAndPortSerializer}} has been based on the 3.0 {{CompactEndpointSerializationHelper}}
encoding used in the message header,
>  however that format includes a single byte with the length of the address when pre-4.0
nodes are just expecting the parameter value
>  to contain the raw address bytes.
> (link [https://github.com/apache/cassandra/blob/fcd30b6e0db3622a8e78e9aa35221f630c77f6de/src/java/org/apache/cassandra/locator/InetAddressAndPort.java#L308]
)
> {code:java}
>             if (version >= MessagingService.VERSION_40)
>             {
>                 out.writeByte(buf.length + 2);
>                 out.write(buf);
>                 out.writeShort(endpoint.port);
>             }
>             else
>             {
>                 out.writeByte(buf.length); //// Surprise!  Bonus byte!
>                 out.write(buf);
>             }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message