cassandra-pr mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aweisberg <...@git.apache.org>
Subject [GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Date Mon, 10 Dec 2018 17:58:49 GMT
Github user aweisberg commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/283#discussion_r240288561
  
    --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---
    @@ -21,133 +21,505 @@
     import java.net.InetAddress;
     import java.net.UnknownHostException;
     import java.util.*;
    +import java.util.concurrent.CancellationException;
     import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.ExecutionException;
     import java.util.concurrent.ScheduledFuture;
     import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.TimeoutException;
    +import java.util.concurrent.atomic.AtomicLong;
     import java.util.stream.Collectors;
     
    -import com.codahale.metrics.ExponentiallyDecayingReservoir;
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.util.concurrent.Futures;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import com.google.common.util.concurrent.RateLimiter;
    +import com.google.common.util.concurrent.SettableFuture;
     
    -import com.codahale.metrics.Snapshot;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
     import org.apache.cassandra.concurrent.ScheduledExecutors;
     import org.apache.cassandra.config.DatabaseDescriptor;
    +import org.apache.cassandra.exceptions.RequestFailureReason;
     import org.apache.cassandra.gms.ApplicationState;
     import org.apache.cassandra.gms.EndpointState;
     import org.apache.cassandra.gms.Gossiper;
     import org.apache.cassandra.gms.VersionedValue;
    +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
    +import org.apache.cassandra.net.IAsyncCallbackWithFailure;
    +import org.apache.cassandra.net.LatencyMeasurementType;
    +import org.apache.cassandra.net.MessageIn;
    +import org.apache.cassandra.net.MessageOut;
     import org.apache.cassandra.net.MessagingService;
    +import org.apache.cassandra.net.PingMessage;
     import org.apache.cassandra.service.StorageService;
     import org.apache.cassandra.utils.FBUtilities;
     import org.apache.cassandra.utils.MBeanWrapper;
     
    +import static org.apache.cassandra.net.MessagingService.Verb.PING;
    +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
    +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
    +
    +
     /**
      * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
    + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible
for actually measuring
    + * latency and providing an ISnitchMeasurement implementation back to this class.
      */
    -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber,
DynamicEndpointSnitchMBean
    +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements
ILatencySubscriber, DynamicEndpointSnitchMBean
     {
    -    private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
    +    private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class);
    +
    +    // Latency measurement and ranking. The samples contain latency measurements and
the scores are used for ranking
    +    protected boolean registered = false;
    +    protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
    +    protected volatile Map<InetAddressAndPort, Double> scores = new HashMap<>();
     
    -    private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased
to towards the newer values
    -    private static final int WINDOW_SIZE = 100;
    +    protected final Map<InetAddressAndPort, AnnotatedMeasurement> samples = new
ConcurrentHashMap<>();
     
    -    private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
    -    private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
    -    private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold();
    +    // Latency probe functionality for actively probing endpoints that we haven't measured
recently but are ranking
    +    public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms",
60 * 10 * 1000L);
    +    public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms",
60 * 1000L) ;
    +    // The probe rate is set later when configuration is read in applyConfigChanges
    +    protected static final RateLimiter probeRateLimiter = RateLimiter.create(1);
    +    protected static final DebuggableScheduledThreadPoolExecutor latencyProbeExecutor
= new DebuggableScheduledThreadPoolExecutor("LatencyProbes");
    +    private long lastUpdateSamplesNanos = System.nanoTime();
    +
    +    // User configuration of the snitch tunables
    +    protected volatile int dynamicUpdateInterval = -1;
    +    protected volatile int dynamicSampleUpdateInterval = -1;
    +    protected volatile double dynamicBadnessThreshold = 0;
     
         // the score for a merged set of endpoints must be this much worse than the score
for separate endpoints to
         // warrant not merging two ranges into a single range
         private static final double RANGE_MERGING_PREFERENCE = 1.5;
     
         private String mbeanName;
    -    private boolean registered = false;
    -
    -    private volatile HashMap<InetAddressAndPort, Double> scores = new HashMap<>();
    -    private final ConcurrentHashMap<InetAddressAndPort, ExponentiallyDecayingReservoir>
samples = new ConcurrentHashMap<>();
    +    private boolean mbeanRegistered = false;
     
         public final IEndpointSnitch subsnitch;
     
    -    private volatile ScheduledFuture<?> updateSchedular;
    -    private volatile ScheduledFuture<?> resetSchedular;
    -
    -    private final Runnable update;
    -    private final Runnable reset;
    +    private volatile ScheduledFuture<?> updateScoresScheduler = null;
    +    private volatile ScheduledFuture<?> updateSamplesScheduler = null;
     
         public DynamicEndpointSnitch(IEndpointSnitch snitch)
         {
             this(snitch, null);
         }
     
    -    public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
    +    protected DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
         {
             mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch";
             if (instance != null)
                 mbeanName += ",instance=" + instance;
             subsnitch = snitch;
    -        update = new Runnable()
    +
    +        if (DatabaseDescriptor.isDaemonInitialized())
             {
    -            public void run()
    +            open();
    +        }
    +    }
    +
    +    /**
    +     * Update configuration of the background tasks and restart the various scheduler
tasks
    +     * if the configured rates for these tasks have changed.
    +     */
    +    public void applyConfigChanges(int newDynamicUpdateInternal, int newDynamicSampleUpdateInterval,
double newDynamicBadnessThreshold)
    +    {
    +        if (DatabaseDescriptor.isDaemonInitialized())
    +        {
    +            if (dynamicUpdateInterval != newDynamicUpdateInternal || updateScoresScheduler
== null)
                 {
    -                updateScores();
    +                cancelAndWait(updateScoresScheduler, Math.max(1, dynamicUpdateInterval),
TimeUnit.MILLISECONDS);
    +                updateScoresScheduler = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::updateScores,
newDynamicUpdateInternal, newDynamicUpdateInternal, TimeUnit.MILLISECONDS);
                 }
    -        };
    -        reset = new Runnable()
    +
    +            if (dynamicSampleUpdateInterval != newDynamicSampleUpdateInterval || updateSamplesScheduler
== null)
    +            {
    +                cancelAndWait(updateSamplesScheduler, Math.max(1, dynamicSampleUpdateInterval),
TimeUnit.MILLISECONDS);
    +                if (newDynamicSampleUpdateInterval > 0)
    +                    updateSamplesScheduler = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::updateSamples,
newDynamicSampleUpdateInterval, newDynamicSampleUpdateInterval, TimeUnit.MILLISECONDS);
    +            }
    +        }
    +
    +        dynamicUpdateInterval = newDynamicUpdateInternal;
    +        dynamicSampleUpdateInterval = newDynamicSampleUpdateInterval;
    +        dynamicBadnessThreshold = newDynamicBadnessThreshold;
    +
    +        if (dynamicSampleUpdateInterval > 0)
    +            probeRateLimiter.setRate(dynamicSampleUpdateInterval);
    +    }
    +
    +    public synchronized void open()
    +    {
    +        applyConfigChanges(DatabaseDescriptor.getDynamicUpdateInterval(),
    +                           DatabaseDescriptor.getDynamicSampleUpdateInterval(),
    +                           DatabaseDescriptor.getDynamicBadnessThreshold());
    +
    +        MBeanWrapper.instance.registerMBean(this, mbeanName);
    +        mbeanRegistered = true;
    +    }
    +
    +    public synchronized void close()
    +    {
    +        cancelAndWait(updateScoresScheduler, Math.max(1, dynamicUpdateInterval), TimeUnit.MILLISECONDS);
    +        cancelAndWait(updateSamplesScheduler, Math.max(1, dynamicSampleUpdateInterval),
TimeUnit.MILLISECONDS);
    +        updateScoresScheduler = null;
    +        updateSamplesScheduler = null;
    +
    +        for (AnnotatedMeasurement measurement : samples.values())
             {
    -            public void run()
    +            cancelAndWait(measurement.probeFuture, PING.getTimeout(), TimeUnit.MILLISECONDS);
    +
    +            measurement.millisSinceLastMeasure.set(0);
    +            measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS);
    +            measurement.nextProbeDelayMillis = 0;
    +        }
    +
    +        if (mbeanRegistered)
    +            MBeanWrapper.instance.unregisterMBean(mbeanName);
    +
    +        mbeanRegistered = false;
    +    }
    +
    +    private static void cancelAndWait(ScheduledFuture future, long timeout, TimeUnit
unit)
    +    {
    +        if (future != null)
    +        {
    +            future.cancel(false);
    +            try
                 {
    -                // we do this so that a host considered bad has a chance to recover,
otherwise would we never try
    -                // to read from it, which would cause its score to never change
    -                reset();
    +                future.get(timeout, unit);
                 }
    -        };
    +            catch (CancellationException | InterruptedException | ExecutionException
| TimeoutException ignored)
    +            {
    +                // Exception is expected to happen eventually due to the cancel ->
get
    +            }
    +        }
    +    }
     
    -        if (DatabaseDescriptor.isDaemonInitialized())
    +    /**
    +     * Allows subclasses to inject new ways of measuring latency back to this abstract
base class.
    +     */
    +    protected interface ISnitchMeasurement
    +    {
    +        void sample(long value);
    +        double measure();
    +        Iterable<Double> measurements();
    +    }
    +
    +    /**
    +     * Adds some boookeeping that the DES uses over top of the various metrics techniques
used by the
    +     * implementations. This is used to allow CASSANDRA-14459 latency probes as well
as further safe
    +     * experimentation on new latency measurement techniques in CASSANDRA-14817
    +     *
    +     * {@link AnnotatedMeasurement#millisSinceLastRequest} is set to zero through
    +     * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}.
It defaults to
    +     * the maximum interval so that we only start probing once it has been ranked at
least once
    +     * {@link AnnotatedMeasurement#millisSinceLastMeasure} is set to zero from
    +     * {@link DynamicEndpointSnitch#receiveTiming(InetAddressAndPort, long, LatencyMeasurementType)}.
    +     *
    +     * {@link AnnotatedMeasurement#millisSinceLastMeasure and {@link AnnotatedMeasurement#nextProbeDelayMillis
}
    +     * are incremented via {@link DynamicEndpointSnitch#updateSamples()}
    +     */
    +    protected static class AnnotatedMeasurement
    +    {
    +        // Used to optimally target latency probes only on nodes that are both requested
for ranking
    +        // and are not being measured. For example with token aware clients a large portion
of the cluster will never
    +        // be ranked at all and therefore we won't probe them.
    +        public AtomicLong millisSinceLastRequest = new AtomicLong(MAX_PROBE_INTERVAL_MS);
    +        public AtomicLong millisSinceLastMeasure = new AtomicLong(0);
    +        public volatile long nextProbeDelayMillis = 0;
    +        public volatile ScheduledFuture<?> probeFuture = null;
    +
    +        // The underlying measurement technique. E.g. a median filter (histogram) or
an EMA filter, or ...
    +        public final ISnitchMeasurement measurement;
    +        public volatile double cachedMeasurement;
    +
    +        public AnnotatedMeasurement(ISnitchMeasurement measurement)
             {
    -            updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update,
dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS);
    -            resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset,
dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS);
    -            registerMBean();
    +            this.measurement = measurement;
    +            this.cachedMeasurement = measurement.measure();
    +        }
    +
    +        @Override
    +        public String toString()
    +        {
    +            return "AnnotatedMeasurement{" +
    +                   "millisSinceLastRequest=" + millisSinceLastRequest +
    +                   ", millisSinceLastMeasure=" + millisSinceLastMeasure +
    +                   ", nextProbeDelayMillis=" + nextProbeDelayMillis +
    +                   ", probeFuturePending=" + (probeFuture != null && !probeFuture.isDone())
+
    +                   ", measurementClass=" + measurement.getClass().getSimpleName() +
    +                   ", cachedMeasurement=" + cachedMeasurement +
    +                   '}';
             }
         }
     
         /**
    -     * Update configuration from {@link DatabaseDescriptor} and estart the update-scheduler
and reset-scheduler tasks
    -     * if the configured rates for these tasks have changed.
    +     * Allows the subclasses to inject ISnitchMeasurement instances back into this common
class so that common
    +     * functionality can be handled here
    +     * @param initialValue The initial value of the measurement, some implementations
may use this, others may not
    +     * @return a constructed instance of an ISnitchMeasurement interface
          */
    -    public void applyConfigChanges()
    +    abstract protected ISnitchMeasurement measurementImpl(long initialValue);
    +
    +    /**
    +     * Records a latency. This MUST be cheap as it is called in the fast path
    +     */
    +    public void receiveTiming(InetAddressAndPort address, long latencyMicros, LatencyMeasurementType
measurementType)
    +    {
    +        if (measurementType == LatencyMeasurementType.IGNORE)
    +           return;
    +
    +        AnnotatedMeasurement sample = samples.get(address);
    +
    +        if (sample == null)
    +        {
    +            AnnotatedMeasurement maybeNewSample = new AnnotatedMeasurement(measurementImpl(latencyMicros));
    +            sample = samples.putIfAbsent(address, maybeNewSample);
    +            if (sample == null)
    +                sample = maybeNewSample;
    +        }
    +
    +        if (measurementType == LatencyMeasurementType.READ && sample.millisSinceLastMeasure.get()
> 0)
    +            sample.millisSinceLastMeasure.lazySet(0);
    +
    +        sample.measurement.sample(latencyMicros);
    +    }
    +
    +    @VisibleForTesting
    +    protected void reset()
    +    {
    +        scores.clear();
    +        samples.clear();
    +    }
    +
    +    @VisibleForTesting
    +    protected void updateScores()
         {
    -        if (dynamicUpdateInterval != DatabaseDescriptor.getDynamicUpdateInterval())
    +        if (!StorageService.instance.isGossipActive())
    +            return;
    +
    +        if (!registered)
             {
    -            dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
    -            if (DatabaseDescriptor.isDaemonInitialized())
    +            if (MessagingService.instance() != null)
                 {
    -                updateSchedular.cancel(false);
    -                updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update,
dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS);
    +                MessagingService.instance().register(this);
    +                registered = true;
                 }
             }
     
    -        if (dynamicResetInterval != DatabaseDescriptor.getDynamicResetInterval())
    +        this.scores = calculateScores(samples);
    +    }
    +
    +    /**
    +     * This is generally expensive and is called periodically (semi-frequently) not on
the fast path.
    +     * The main concern here is generating garbage from the measurements (e.g. histograms
in particular)
    +     */
    +    @VisibleForTesting
    +    protected static Map<InetAddressAndPort, Double> calculateScores(Map<InetAddressAndPort,
AnnotatedMeasurement> samples)
    +    {
    +        double maxLatency = 1;
    +        HashMap<InetAddressAndPort, Double> newScores = new HashMap<>(samples.size());
    +
    +        // We're going to weight the latency for each host against the worst one we see,
to
    +        // arrive at sort of a 'badness percentage' for them. First, find the worst latency
for each:
    +        for (Map.Entry<InetAddressAndPort, AnnotatedMeasurement> entry : samples.entrySet())
             {
    -            dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
    -            if (DatabaseDescriptor.isDaemonInitialized())
    +            AnnotatedMeasurement annotatedMeasurement = entry.getValue();
    +
    +            // only compute the measurement, which probably generates the most garbage
(e.g. for this Histogram),
    +            // for endpoints that have been recently updated (millisSinceLastRequest)
or somewhat recently requested
    +            // for ranking.
    +            if (annotatedMeasurement.millisSinceLastMeasure.get() < MIN_PROBE_INTERVAL_MS
||
    --- End diff --
    
    Do we need to check millisSinceLast measure here? We could only check since last request.
If we have requested the value then we know independently it will be either probed or getting
real measurements so it is worth calculating the scores.
    
    I think a comment to that effect and a simpler condition would make this easier to reason
about.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


Mime
View raw message