cassandra-pr mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aweisberg <...@git.apache.org>
Subject [GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Date Mon, 10 Dec 2018 17:58:49 GMT
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/283#discussion_r240295987
  
    --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---
    @@ -21,133 +21,505 @@
     import java.net.InetAddress;
     import java.net.UnknownHostException;
     import java.util.*;
    +import java.util.concurrent.CancellationException;
     import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutionException;
     import java.util.concurrent.ScheduledFuture;
     import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.concurrent.atomic.AtomicLong;
     import java.util.stream.Collectors;
     
    -import com.codahale.metrics.ExponentiallyDecayingReservoir;
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.util.concurrent.Futures;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.RateLimiter;
    +import com.google.common.util.concurrent.SettableFuture;
     
    -import com.codahale.metrics.Snapshot;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
     import org.apache.cassandra.concurrent.ScheduledExecutors;
     import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.exceptions.RequestFailureReason;
     import org.apache.cassandra.gms.ApplicationState;
     import org.apache.cassandra.gms.EndpointState;
     import org.apache.cassandra.gms.Gossiper;
     import org.apache.cassandra.gms.VersionedValue;
    +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
    +import org.apache.cassandra.net.IAsyncCallbackWithFailure;
    +import org.apache.cassandra.net.LatencyMeasurementType;
    +import org.apache.cassandra.net.MessageIn;
    +import org.apache.cassandra.net.MessageOut;
     import org.apache.cassandra.net.MessagingService;
    +import org.apache.cassandra.net.PingMessage;
     import org.apache.cassandra.service.StorageService;
     import org.apache.cassandra.utils.FBUtilities;
     import org.apache.cassandra.utils.MBeanWrapper;
     
    +import static org.apache.cassandra.net.MessagingService.Verb.PING;
    +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
    +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
    +
    +
     /**
      * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
    + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible
for actually measuring
    + * latency and providing an ISnitchMeasurement implementation back to this class.
      */
    -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber,
DynamicEndpointSnitchMBean
    +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements
ILatencySubscriber, DynamicEndpointSnitchMBean
     {
    -    private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
    +    private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class);
    +
    +    // Latency measurement and ranking. The samples contain latency measurements and
the scores are used for ranking
    +    protected boolean registered = false;
    +    protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
    +    protected volatile Map<InetAddressAndPort, Double> scores = new HashMap<>();
     
    -    private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased
to towards the newer values
    -    private static final int WINDOW_SIZE = 100;
    +    protected final Map<InetAddressAndPort, AnnotatedMeasurement> samples = new
ConcurrentHashMap<>();
     
    -    private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
    -    private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
    -    private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold();
    +    // Latency probe functionality for actively probing endpoints that we haven't measured
recently but are ranking
    +    public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms",
60 * 10 * 1000L);
    +    public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms",
60 * 1000L) ;
    +    // The probe rate is set later when configuration is read in applyConfigChanges
    +    protected static final RateLimiter probeRateLimiter = RateLimiter.create(1);
    +    protected static final DebuggableScheduledThreadPoolExecutor latencyProbeExecutor
= new DebuggableScheduledThreadPoolExecutor("LatencyProbes");
    +    private long lastUpdateSamplesNanos = System.nanoTime();
    +
    +    // User configuration of the snitch tunables
    +    protected volatile int dynamicUpdateInterval = -1;
    +    protected volatile int dynamicSampleUpdateInterval = -1;
    +    protected volatile double dynamicBadnessThreshold = 0;
     
         // the score for a merged set of endpoints must be this much worse than the score
for separate endpoints to
         // warrant not merging two ranges into a single range
         private static final double RANGE_MERGING_PREFERENCE = 1.5;
     
         private String mbeanName;
    -    private boolean registered = false;
    -
    -    private volatile HashMap<InetAddressAndPort, Double> scores = new HashMap<>();
    -    private final ConcurrentHashMap<InetAddressAndPort, ExponentiallyDecayingReservoir>
samples = new ConcurrentHashMap<>();
    +    private boolean mbeanRegistered = false;
     
         public final IEndpointSnitch subsnitch;
     
    -    private volatile ScheduledFuture<?> updateSchedular;
    -    private volatile ScheduledFuture<?> resetSchedular;
    -
    -    private final Runnable update;
    -    private final Runnable reset;
    +    private volatile ScheduledFuture<?> updateScoresScheduler = null;
    +    private volatile ScheduledFuture<?> updateSamplesScheduler = null;
     
         public DynamicEndpointSnitch(IEndpointSnitch snitch)
         {
             this(snitch, null);
         }
     
    -    public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
    +    protected DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
         {
             mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch";
             if (instance != null)
                 mbeanName += ",instance=" + instance;
             subsnitch = snitch;
    -        update = new Runnable()
    +
    +        if (DatabaseDescriptor.isDaemonInitialized())
             {
    -            public void run()
    +            open();
    +        }
    +    }
    +
    +    /**
    +     * Update configuration of the background tasks and restart the various scheduler
tasks
    +     * if the configured rates for these tasks have changed.
    +     */
    +    public void applyConfigChanges(int newDynamicUpdateInternal, int newDynamicSampleUpdateInterval,
double newDynamicBadnessThreshold)
    +    {
    +        if (DatabaseDescriptor.isDaemonInitialized())
    +        {
    +            if (dynamicUpdateInterval != newDynamicUpdateInternal || updateScoresScheduler
== null)
                 {
    -                updateScores();
    +                cancelAndWait(updateScoresScheduler, Math.max(1, dynamicUpdateInterval),
TimeUnit.MILLISECONDS);
    +                updateScoresScheduler = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::updateScores,
newDynamicUpdateInternal, newDynamicUpdateInternal, TimeUnit.MILLISECONDS);
                 }
    -        };
    -        reset = new Runnable()
    +
    +            if (dynamicSampleUpdateInterval != newDynamicSampleUpdateInterval || updateSamplesScheduler
== null)
    +            {
    +                cancelAndWait(updateSamplesScheduler, Math.max(1, dynamicSampleUpdateInterval),
TimeUnit.MILLISECONDS);
    +                if (newDynamicSampleUpdateInterval > 0)
    +                    updateSamplesScheduler = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::updateSamples,
newDynamicSampleUpdateInterval, newDynamicSampleUpdateInterval, TimeUnit.MILLISECONDS);
    +            }
    +        }
    +
    +        dynamicUpdateInterval = newDynamicUpdateInternal;
    +        dynamicSampleUpdateInterval = newDynamicSampleUpdateInterval;
    +        dynamicBadnessThreshold = newDynamicBadnessThreshold;
    +
    +        if (dynamicSampleUpdateInterval > 0)
    +            probeRateLimiter.setRate(dynamicSampleUpdateInterval);
    +    }
    +
    +    public synchronized void open()
    +    {
    +        applyConfigChanges(DatabaseDescriptor.getDynamicUpdateInterval(),
    +                           DatabaseDescriptor.getDynamicSampleUpdateInterval(),
    +                           DatabaseDescriptor.getDynamicBadnessThreshold());
    +
    +        MBeanWrapper.instance.registerMBean(this, mbeanName);
    +        mbeanRegistered = true;
    +    }
    +
    +    public synchronized void close()
    +    {
    +        cancelAndWait(updateScoresScheduler, Math.max(1, dynamicUpdateInterval), TimeUnit.MILLISECONDS);
    +        cancelAndWait(updateSamplesScheduler, Math.max(1, dynamicSampleUpdateInterval),
TimeUnit.MILLISECONDS);
    +        updateScoresScheduler = null;
    +        updateSamplesScheduler = null;
    +
    +        for (AnnotatedMeasurement measurement : samples.values())
             {
    -            public void run()
    +            cancelAndWait(measurement.probeFuture, PING.getTimeout(), TimeUnit.MILLISECONDS);
    +
    +            measurement.millisSinceLastMeasure.set(0);
    +            measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS);
    +            measurement.nextProbeDelayMillis = 0;
    +        }
    +
    +        if (mbeanRegistered)
    +            MBeanWrapper.instance.unregisterMBean(mbeanName);
    +
    +        mbeanRegistered = false;
    +    }
    +
    +    private static void cancelAndWait(ScheduledFuture future, long timeout, TimeUnit
unit)
    +    {
    +        if (future != null)
    +        {
    +            future.cancel(false);
    +            try
                 {
    -                // we do this so that a host considered bad has a chance to recover,
otherwise would we never try
    -                // to read from it, which would cause its score to never change
    -                reset();
    +                future.get(timeout, unit);
                 }
    -        };
    +            catch (CancellationException | InterruptedException | ExecutionException
| TimeoutException ignored)
    +            {
    +                // Exception is expected to happen eventually due to the cancel ->
get
    +            }
    +        }
    +    }
     
    -        if (DatabaseDescriptor.isDaemonInitialized())
    +    /**
    +     * Allows subclasses to inject new ways of measuring latency back to this abstract
base class.
    +     */
    +    protected interface ISnitchMeasurement
    +    {
    +        void sample(long value);
    +        double measure();
    +        Iterable<Double> measurements();
    +    }
    +
    +    /**
    +     * Adds some boookeeping that the DES uses over top of the various metrics techniques
used by the
    +     * implementations. This is used to allow CASSANDRA-14459 latency probes as well
as further safe
    +     * experimentation on new latency measurement techniques in CASSANDRA-14817
    +     *
    +     * {@link AnnotatedMeasurement#millisSinceLastRequest} is set to zero through
    +     * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}.
It defaults to
    +     * the maximum interval so that we only start probing once it has been ranked at
least once
    +     * {@link AnnotatedMeasurement#millisSinceLastMeasure} is set to zero from
    +     * {@link DynamicEndpointSnitch#receiveTiming(InetAddressAndPort, long, LatencyMeasurementType)}.
    +     *
    +     * {@link AnnotatedMeasurement#millisSinceLastMeasure and {@link AnnotatedMeasurement#nextProbeDelayMillis
}
    +     * are incremented via {@link DynamicEndpointSnitch#updateSamples()}
    +     */
    +    protected static class AnnotatedMeasurement
    +    {
    +        // Used to optimally target latency probes only on nodes that are both requested
for ranking
    +        // and are not being measured. For example with token aware clients a large portion
of the cluster will never
    +        // be ranked at all and therefore we won't probe them.
    +        public AtomicLong millisSinceLastRequest = new AtomicLong(MAX_PROBE_INTERVAL_MS);
    +        public AtomicLong millisSinceLastMeasure = new AtomicLong(0);
    +        public volatile long nextProbeDelayMillis = 0;
    +        public volatile ScheduledFuture<?> probeFuture = null;
    +
    +        // The underlying measurement technique. E.g. a median filter (histogram) or
an EMA filter, or ...
    +        public final ISnitchMeasurement measurement;
    +        public volatile double cachedMeasurement;
    +
    +        public AnnotatedMeasurement(ISnitchMeasurement measurement)
             {
    -            updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update,
dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS);
    -            resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset,
dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS);
    -            registerMBean();
    +            this.measurement = measurement;
    +            this.cachedMeasurement = measurement.measure();
    +        }
    +
    +        @Override
    +        public String toString()
    +        {
    +            return "AnnotatedMeasurement{" +
    +                   "millisSinceLastRequest=" + millisSinceLastRequest +
    +                   ", millisSinceLastMeasure=" + millisSinceLastMeasure +
    +                   ", nextProbeDelayMillis=" + nextProbeDelayMillis +
    +                   ", probeFuturePending=" + (probeFuture != null && !probeFuture.isDone())
+
    +                   ", measurementClass=" + measurement.getClass().getSimpleName() +
    +                   ", cachedMeasurement=" + cachedMeasurement +
    +                   '}';
             }
         }
     
         /**
    -     * Update configuration from {@link DatabaseDescriptor} and estart the update-scheduler
and reset-scheduler tasks
    -     * if the configured rates for these tasks have changed.
    +     * Allows the subclasses to inject ISnitchMeasurement instances back into this common
class so that common
    +     * functionality can be handled here
    +     * @param initialValue The initial value of the measurement, some implementations
may use this, others may not
    +     * @return a constructed instance of an ISnitchMeasurement interface
          */
    -    public void applyConfigChanges()
    +    abstract protected ISnitchMeasurement measurementImpl(long initialValue);
    +
    +    /**
    +     * Records a latency. This MUST be cheap as it is called in the fast path
    +     */
    +    public void receiveTiming(InetAddressAndPort address, long latencyMicros, LatencyMeasurementType
measurementType)
    +    {
    +        if (measurementType == LatencyMeasurementType.IGNORE)
    +           return;
    +
    +        AnnotatedMeasurement sample = samples.get(address);
    +
    +        if (sample == null)
    +        {
    +            AnnotatedMeasurement maybeNewSample = new AnnotatedMeasurement(measurementImpl(latencyMicros));
    +            sample = samples.putIfAbsent(address, maybeNewSample);
    +            if (sample == null)
    +                sample = maybeNewSample;
    +        }
    +
    +        if (measurementType == LatencyMeasurementType.READ && sample.millisSinceLastMeasure.get()
> 0)
    +            sample.millisSinceLastMeasure.lazySet(0);
    +
    +        sample.measurement.sample(latencyMicros);
    +    }
    +
    +    @VisibleForTesting
    +    protected void reset()
    +    {
    +        scores.clear();
    +        samples.clear();
    +    }
    +
    +    @VisibleForTesting
    +    protected void updateScores()
         {
    -        if (dynamicUpdateInterval != DatabaseDescriptor.getDynamicUpdateInterval())
    +        if (!StorageService.instance.isGossipActive())
    +            return;
    +
    +        if (!registered)
             {
    -            dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
    -            if (DatabaseDescriptor.isDaemonInitialized())
    +            if (MessagingService.instance() != null)
                 {
    -                updateSchedular.cancel(false);
    -                updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update,
dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS);
    +                MessagingService.instance().register(this);
    +                registered = true;
                 }
             }
     
    -        if (dynamicResetInterval != DatabaseDescriptor.getDynamicResetInterval())
    +        this.scores = calculateScores(samples);
    +    }
    +
    +    /**
    +     * This is generally expensive and is called periodically (semi-frequently) not on
the fast path.
    +     * The main concern here is generating garbage from the measurements (e.g. histograms
in particular)
    +     */
    +    @VisibleForTesting
    +    protected static Map<InetAddressAndPort, Double> calculateScores(Map<InetAddressAndPort,
AnnotatedMeasurement> samples)
    +    {
    +        double maxLatency = 1;
    +        HashMap<InetAddressAndPort, Double> newScores = new HashMap<>(samples.size());
    +
    +        // We're going to weight the latency for each host against the worst one we see,
to
    +        // arrive at sort of a 'badness percentage' for them. First, find the worst latency
for each:
    +        for (Map.Entry<InetAddressAndPort, AnnotatedMeasurement> entry : samples.entrySet())
             {
    -            dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
    -            if (DatabaseDescriptor.isDaemonInitialized())
    +            AnnotatedMeasurement annotatedMeasurement = entry.getValue();
    +
    +            // only compute the measurement, which probably generates the most garbage
(e.g. for this Histogram),
    +            // for endpoints that have been recently updated (millisSinceLastRequest)
or somewhat recently requested
    +            // for ranking.
    +            if (annotatedMeasurement.millisSinceLastMeasure.get() < MIN_PROBE_INTERVAL_MS
||
    +                annotatedMeasurement.millisSinceLastRequest.get() <= MAX_PROBE_INTERVAL_MS)
                 {
    -                resetSchedular.cancel(false);
    -                resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset,
dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS);
    +                // This is expensive for e.g. the Histogram, so do it once and cache
the result
    +                annotatedMeasurement.cachedMeasurement = annotatedMeasurement.measurement.measure();
                 }
    +
    +            newScores.put(entry.getKey(), annotatedMeasurement.cachedMeasurement);
    +            maxLatency = Math.max(annotatedMeasurement.cachedMeasurement, maxLatency);
             }
     
    -        dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold();
    +        // now make another pass to normalize the latency scores based on the maximums
we found before
    +        for (Map.Entry<InetAddressAndPort, Double> entry : newScores.entrySet())
    +        {
    +            double score = entry.getValue() / maxLatency;
    +            // finally, add the severity without any weighting, since hosts scale this
relative to their own load and the size of the task causing the severity.
    +            // "Severity" is basically a measure of compaction activity (CASSANDRA-3722).
    +            if (USE_SEVERITY)
    +                score += getSeverity(entry.getKey());
    +            // lowest score (least amount of badness) wins.
    +            newScores.put(entry.getKey(), score);
    +        }
    +
    +        return newScores;
         }
     
    -    private void registerMBean()
    +    /**
    +     * Background task running on the samples dictionary. The default implementation
sends latency probes (PING)
    +     * messages to explore nodes that we have not received timings for recently but have
ranked in
    +     * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}.
    +     */
    +    protected void updateSamples()
         {
    -        MBeanWrapper.instance.registerMBean(this, mbeanName);
    +        long updateIntervalMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
lastUpdateSamplesNanos);
    +        lastUpdateSamplesNanos = System.nanoTime();
    +
    +        // Split calculation of probe timers from sending probes for testability
    +        calculateProbes(samples, updateIntervalMillis);
    +
    +        // We do this after the calculations so that the progression of the logical clocks
continues regardless
    +        // if gossip is enabled or not. However if Gossip is not active we don't _send_
the probes
    +        if (!StorageService.instance.isGossipActive())
    +            return;
    +
    +        scheduleProbes(samples);
    +    }
    +
    +    /**
    +     * This method mutates the passed AnnotatedMeasurements to implement capped exponential
backoff per endpoint.
    +     *
    +     * The algorithm is as follows:
    +     * 1. All samples get their millisSinceLastMeasure and millisSinceLastRequest fields
    +     *    incremented by the passed interval
    +     * 2. Any recently requested (ranked) endpoints that have not been measured recently
(e.g. because the snitch
    +     *    has sent them no traffic) get probes with exponential backoff.
    +     *
    +     * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the probes are stopped
after
    +     * MAX_PROBE_INTERVAL_MS of no ranking requests as well.
    +     *
    +     * At the end of this method, any passed AnnotatedMeasurements that need latency
probes will have non zero
    +     * nextProbeDelayMillis members set.
    +     */
    +    @VisibleForTesting
    +    protected static void calculateProbes(Map<InetAddressAndPort, AnnotatedMeasurement>
samples, long intervalMillis) {
    +        for (Map.Entry<InetAddressAndPort, AnnotatedMeasurement> entry: samples.entrySet())
    +        {
    +            if (entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()))
    +                continue;
    +
    +            AnnotatedMeasurement measurement = entry.getValue();
    +            long lastMeasure = measurement.millisSinceLastMeasure.addAndGet(intervalMillis);
    +            long lastRequest = measurement.millisSinceLastRequest.addAndGet(intervalMillis);
    +
    +            if (lastMeasure >= MIN_PROBE_INTERVAL_MS && lastRequest < MAX_PROBE_INTERVAL_MS)
    +            {
    +                if (measurement.nextProbeDelayMillis == 0)
    +                {
    +                    measurement.nextProbeDelayMillis = intervalMillis;
    +                }
    +                else if (measurement.probeFuture != null && measurement.probeFuture.isDone())
    +                {
    +                    measurement.nextProbeDelayMillis = Math.min(MAX_PROBE_INTERVAL_MS,
measurement.nextProbeDelayMillis * 2);
    +                }
    +            }
    +            else
    +            {
    +                measurement.nextProbeDelayMillis = 0;
    +            }
    +        }
         }
     
    -    public void close()
    +    @VisibleForTesting
    +    void scheduleProbes(Map<InetAddressAndPort, AnnotatedMeasurement> samples)
         {
    -        updateSchedular.cancel(false);
    -        resetSchedular.cancel(false);
    +        for (Map.Entry<InetAddressAndPort, AnnotatedMeasurement> entry: samples.entrySet())
    +        {
    +            AnnotatedMeasurement measurement = entry.getValue();
    +            if (measurement.millisSinceLastRequest.get() > MAX_PROBE_INTERVAL_MS &&
    +                !Gossiper.instance.isAlive(entry.getKey()))
    +            {
    +                samples.remove(entry.getKey());
    +                continue;
    +            }
     
    -        MBeanWrapper.instance.unregisterMBean(mbeanName);
    +            long delay = measurement.nextProbeDelayMillis;
    +            if (delay > 0 && (measurement.probeFuture == null || measurement.probeFuture.isDone()))
    +            {
    +                logger.trace("Scheduled latency probe against {} in {}ms", entry.getKey(),
delay);
    +                measurement.probeFuture = latencyProbeExecutor.schedule(() -> sendPingMessageToPeer(entry.getKey()),
    +                                                                        delay, TimeUnit.MILLISECONDS);
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Method that actually sends latency probes as PING messages. This is the only function
in this class
    +     * that operates on the latencyProbeExecutor thread and it records the maximum latency
between a small and large
    +     * message channel ping.
    +     */
    +    private void sendPingMessageToPeer(InetAddressAndPort to)
    +    {
    +        // This method may have been scheduled (a long time) before it executes, so have
to do
    +        // some quick sanity checks before sending a message to this host
    +        if (!StorageService.instance.isGossipActive() || !Gossiper.instance.isAlive(to))
    +            return;
    +
    +        probeRateLimiter.acquire(dynamicSampleUpdateInterval);
    --- End diff --
    
    Reminder to fix the bug where this always does one a second.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


Mime
View raw message