cassandra-pr mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdeggleston <...@git.apache.org>
Subject [GitHub] cassandra pull request #224: 14405 replicas
Date Thu, 17 May 2018 22:14:53 GMT
Github user bdeggleston commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/224#discussion_r189117028
  
    --- Diff: src/java/org/apache/cassandra/service/StorageProxy.java ---
    @@ -1364,68 +1363,72 @@ public static void sendToHintedEndpoints(final Mutation mutation,
                 submitHint(mutation, endpointsToHint, responseHandler);
     
             if (insertLocal)
    -            performLocally(stage, Optional.of(mutation), mutation::apply, responseHandler);
    +        {
    +            Preconditions.checkNotNull(localReplica);
    +            performLocally(stage, localReplica, Optional.of(mutation), mutation::apply,
responseHandler);
    +        }
     
             if (localDc != null)
             {
    -            for (InetAddressAndPort destination : localDc)
    -                MessagingService.instance().sendRR(message, destination, responseHandler,
true);
    +            for (Replica destination : localDc)
    +                MessagingService.instance().sendWriteRR(message, destination, responseHandler,
true);
             }
             if (dcGroups != null)
             {
                 // for each datacenter, send the message to one node to relay the write to
other replicas
    -            for (Collection<InetAddressAndPort> dcTargets : dcGroups.values())
    +            for (Replicas dcTargets : dcGroups.values())
                     sendMessagesToNonlocalDC(message, dcTargets, responseHandler);
             }
         }
     
    -    private static void checkHintOverload(InetAddressAndPort destination)
    +    private static void checkHintOverload(Replica destination)
         {
             // avoid OOMing due to excess hints.  we need to do this check even for "live"
nodes, since we can
             // still generate hints for those if it's overloaded or simply dead but not yet
known-to-be-dead.
             // The idea is that if we have over maxHintsInProgress hints in flight, this
is probably due to
             // a small number of nodes causing problems, so we should avoid shutting down
writes completely to
             // healthy nodes.  Any node with no hintsInProgress is considered healthy.
             if (StorageMetrics.totalHintsInProgress.getCount() > maxHintsInProgress
    -                && (getHintsInProgressFor(destination).get() > 0 &&
shouldHint(destination)))
    +                && (getHintsInProgressFor(destination.getEndpoint()).get() >
0 && shouldHint(destination)))
             {
                 throw new OverloadedException("Too many in flight hints: " + StorageMetrics.totalHintsInProgress.getCount()
+
                                               " destination: " + destination +
    -                                          " destination hints: " + getHintsInProgressFor(destination).get());
    +                                          " destination hints: " + getHintsInProgressFor(destination.getEndpoint()).get());
             }
         }
     
         private static void sendMessagesToNonlocalDC(MessageOut<? extends IMutation>
message,
    -                                                 Collection<InetAddressAndPort>
targets,
    +                                                 Replicas targets,
                                                      AbstractWriteResponseHandler<IMutation>
handler)
         {
    -        Iterator<InetAddressAndPort> iter = targets.iterator();
    +        Iterator<Replica> iter = targets.iterator();
             int[] messageIds = new int[targets.size()];
    -        InetAddressAndPort target = iter.next();
    +        Replica target = iter.next();
     
             int idIdx = 0;
             // Add the other destinations of the same message as a FORWARD_HEADER entry
             while (iter.hasNext())
             {
    -            InetAddressAndPort destination = iter.next();
    -            int id = MessagingService.instance().addCallback(handler,
    -                                                             message,
    -                                                             destination,
    -                                                             message.getTimeout(),
    -                                                             handler.consistencyLevel,
    -                                                             true);
    +            Replica destination = iter.next();
    +            int id = MessagingService.instance().addWriteCallback(handler,
    +                                                                  message,
    +                                                                  destination,
    +                                                                  message.getTimeout(),
    +                                                                  handler.consistencyLevel,
    +                                                                  true);
                 messageIds[idIdx++] = id;
                 logger.trace("Adding FWD message to {}@{}", id, destination);
             }
    -        message = message.withParameter(ParameterType.FORWARD_TO.FORWARD_TO, new ForwardToContainer(targets,
messageIds));
    +        Replicas.checkFull(targets);
    +        message = message.withParameter(ParameterType.FORWARD_TO.FORWARD_TO, new ForwardToContainer(targets.asEndpointList(),
messageIds));
    --- End diff --
    
    fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


Mime
View raw message