cassandra-pr mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jolynch <...@git.apache.org>
Subject [GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Date Mon, 15 Oct 2018 23:28:48 GMT
Github user jolynch commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/283#discussion_r225348901
  
    --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitchEMA.java ---
    @@ -0,0 +1,181 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.cassandra.locator;
    +
    +import java.net.UnknownHostException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +import java.util.concurrent.ConcurrentHashMap;
    +
    +import org.apache.cassandra.gms.Gossiper;
    +import org.apache.cassandra.metrics.ExponentialMovingAverage;
    +import org.apache.cassandra.net.LatencyMeasurementType;
    +
    +
    +/**
    + * A dynamic snitching implementation that uses Exponentially Decaying Histograms to
prefer or
    + * de-prefer hosts
    + *
    + * This was the default implementation prior to Cassandra 4.0 and is being left as the
default
    + * in 4.0
    + */
    +public class DynamicEndpointSnitchEMA extends DynamicEndpointSnitch
    +{
    +    // A ~10 sample EMA
    +    private static final double EMA_ALPHA = 0.10;
    +
    +    private final ConcurrentHashMap<InetAddressAndPort, AnnotatedEMA> samples =
new ConcurrentHashMap<>();
    +
    +    /**
    +     * Adds two boolean markers to the ExponentialMovingAverage for telling if the data
has been
    +     * updated or requested recently.
    +     *
    +     * recentlyMeasured is updated through {@link AnnotatedEMA#update(long, boolean)}
    +     * recentlyRequested is updated through {@link DynamicEndpointSnitch#markRequested}
    +     *
    +     * Both markers are periodically reset via {@link DynamicEndpointSnitch#latencyProbeNeeded(long)}
    +     */
    +    private static class AnnotatedEMA extends ExponentialMovingAverage
    +    {
    +        volatile boolean recentlyRequested = false;
    +        volatile boolean recentlyMeasured = false;
    +
    +        AnnotatedEMA(double alpha, double initialValue)
    +        {
    +            super(alpha, initialValue);
    +        }
    +
    +        public void update(long value, boolean isRealRead)
    +        {
    +            recentlyMeasured = recentlyMeasured || isRealRead;
    +            super.update(value);
    +        }
    +    }
    +
    +    // Called via reflection
    +    public DynamicEndpointSnitchEMA(IEndpointSnitch snitch)
    +    {
    +        this(snitch, "ema");
    +    }
    +
    +    protected DynamicEndpointSnitchEMA(IEndpointSnitch snitch, String instance)
    +    {
    +        super(snitch, instance);
    +    }
    +
    +    private void receiveTiming(InetAddressAndPort host, long latency, boolean isRealRead)
// this is cheap
    +    {
    +        AnnotatedEMA sample = samples.get(host);
    +        if (sample == null)
    +        {
    +            AnnotatedEMA maybeNewSample = new AnnotatedEMA(EMA_ALPHA, latency);
    +            sample = samples.putIfAbsent(host, maybeNewSample);
    +            if (sample == null)
    +                sample = maybeNewSample;
    +        }
    +        sample.update(latency, isRealRead);
    +    }
    +
    +    public void receiveTiming(InetAddressAndPort address, long latency, LatencyMeasurementType
measurementType)
    +    {
    +        if (measurementType == LatencyMeasurementType.READ)
    +            receiveTiming(address, latency, true);
    +        else if (measurementType == LatencyMeasurementType.PROBE)
    +            receiveTiming(address, latency, false);
    +    }
    +
    +    @Override
    +    protected void reset() {
    +        this.samples.clear();
    +        super.reset();
    +    }
    +
    +    @Override
    +    public Map<InetAddressAndPort, Double> calculateScores()
    +    {
    +        // We're going to weight the latency for each host against the worst one we see,
to
    +        // arrive at sort of a 'badness percentage' for them. First, find the worst for
each:
    +        HashMap<InetAddressAndPort, Double> newScores = new HashMap<>(samples.size());
    +        Optional<Double> maxObservedAvgLatency = samples.values().stream()
    +                                                        .map(ExponentialMovingAverage::getAvg)
    +                                                        .max(Double::compare);
    +
    +        final double maxAvgLatency = maxObservedAvgLatency.isPresent() ? maxObservedAvgLatency.get()
: 1;
    +
    +        // now make another pass to do the weighting based on the maximums we found before
    +        for (Map.Entry<InetAddressAndPort, AnnotatedEMA> entry : samples.entrySet())
    +        {
    +            // Samples may have changed but rather than creating garbage by copying samples
we just ensure
    +            // that all scores are less than 1.0
    +            double addrAvg = entry.getValue().getAvg();
    +            double score = addrAvg / Math.max(addrAvg, maxAvgLatency);
    +            // finally, add the severity without any weighting, since hosts scale this
relative to their own load
    +            // and the size of the task causing the severity.
    +            // "Severity" is basically a measure of compaction activity (CASSANDRA-3722).
    +            if (USE_SEVERITY)
    +                score += getSeverity(entry.getKey());
    +            // lowest score (least amount of badness) wins.
    +            newScores.put(entry.getKey(), score);
    +        }
    +        return newScores;
    +    }
    +
    +    @Override
    +    public List<Double> dumpTimings(String hostname) throws UnknownHostException
    +    {
    +        InetAddressAndPort host = InetAddressAndPort.getByName(hostname);
    +        ArrayList<Double> timings = new ArrayList<>();
    +        ExponentialMovingAverage avg = samples.get(host);
    +        if (avg != null)
    +        {
    +            timings.add(avg.getAvg());
    +        }
    +        return timings;
    +    }
    +
    +    @Override
    +    protected void updateLatencyProbeSequence(List<InetAddressAndPort> probeSequence)
    +    {
    +        samples.keySet().retainAll(Gossiper.instance.getLiveMembers());
    +
    +        for (Map.Entry<InetAddressAndPort, AnnotatedEMA> entry: samples.entrySet())
    +        {
    +            // We only send latency probes to nodes that we may plausibly talk to (requested
is true)
    +            // but we have not talked to since the last reset of this information (recentlyMeasured
is false)
    +            if (entry.getValue().recentlyRequested && !entry.getValue().recentlyMeasured)
    +            {
    +                probeSequence.add(entry.getKey());
    +            }
    +
    +            entry.getValue().recentlyMeasured = false;
    +            entry.getValue().recentlyRequested = false;
    +        }
    +    }
    +
    +    @Override
    +    protected void markRequested(InetAddressAndPort address)
    +    {
    +        AnnotatedEMA ema = samples.get(address);
    +        if (ema != null)
    +            ema.recentlyRequested = true;
    --- End diff --
    
    I was curious about this if a load before the set would be faster or slower than just
a set. I guess I decided that the performance difference was probably so negligible I should
just go for most readable. I don't care though heh, changing with the flag update.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


Mime
View raw message