Github user jolynch commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/283#discussion_r232436157
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---
@@ -23,128 +23,300 @@
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import com.codahale.metrics.Snapshot;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
/**
* A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible
for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this class.
*/
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber,
DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements
ILatencySubscriber, DynamicEndpointSnitchMBean
{
- private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
-
- private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased
to towards the newer values
- private static final int WINDOW_SIZE = 100;
-
- private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
- private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
- private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold();
+ private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+ // Latency measurement and ranking. The samples contain latency measurements and
the scores are used for ranking
+ protected boolean registered = false;
+ protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+ protected volatile Map<InetAddressAndPort, Double> scores = new HashMap<>();
+ protected final Map<InetAddressAndPort, AnnotatedMeasurement> samples = new
ConcurrentHashMap<>();
+
+ // Latency probe functionality for actively probing endpoints that we haven't measured
recently but are ranking
+ public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms",
60 * 10 * 1000L);
+ public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms",
60 * 1000L) ;
+ // The probe rate is set later when configuration is read
+ protected static final RateLimiter probeRateLimiter = RateLimiter.create(1);
+ protected static final DebuggableScheduledThreadPoolExecutor latencyProbeExecutor
= new DebuggableScheduledThreadPoolExecutor(1, "LatencyProbes", Thread.MIN_PRIORITY);
+
+ // User configuration of the snitch tunables
+ protected volatile int dynamicUpdateInterval = -1;
+ protected volatile int dynamicLatencyProbeInterval = -1;
+ protected volatile double dynamicBadnessThreshold = 0;
// the score for a merged set of endpoints must be this much worse than the score
for separate endpoints to
// warrant not merging two ranges into a single range
private static final double RANGE_MERGING_PREFERENCE = 1.5;
private String mbeanName;
- private boolean registered = false;
-
- private volatile HashMap<InetAddressAndPort, Double> scores = new HashMap<>();
- private final ConcurrentHashMap<InetAddressAndPort, ExponentiallyDecayingReservoir>
samples = new ConcurrentHashMap<>();
+ private boolean mbeanRegistered = false;
public final IEndpointSnitch subsnitch;
- private volatile ScheduledFuture<?> updateSchedular;
- private volatile ScheduledFuture<?> resetSchedular;
-
- private final Runnable update;
- private final Runnable reset;
+ private volatile ScheduledFuture<?> updateScoresScheduler;
+ private volatile ScheduledFuture<?> updateSamplesScheduler;
public DynamicEndpointSnitch(IEndpointSnitch snitch)
{
this(snitch, null);
}
- public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
+ protected DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
{
mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch";
if (instance != null)
mbeanName += ",instance=" + instance;
subsnitch = snitch;
- update = new Runnable()
+
+ if (DatabaseDescriptor.isDaemonInitialized())
{
- public void run()
- {
- updateScores();
- }
- };
- reset = new Runnable()
+ applyConfigChanges(DatabaseDescriptor.getDynamicUpdateInterval(),
+ DatabaseDescriptor.getDynamicSampleUpdateInterval(),
+ DatabaseDescriptor.getDynamicBadnessThreshold());
+ open();
+ }
+ }
+
+ /**
+ * Allows subclasses to inject new ways of measuring latency back to this abstract
base class.
+ */
+ protected interface ISnitchMeasurement
+ {
+ void sample(long value);
+ double measure();
+ Iterable<Double> measurements();
+ }
+
+ /**
+ * Adds some boookeeping that the DES uses over top of the various metrics techniques
used by the
+ * implementations. This is used to allow CASSANDRA-14459 latency probes as well
as further experimentation
+ * on new latency measurement techniques in CASSANDRA-14817
+ *
+ * {@link AnnotatedMeasurement#millisSinceLastRequest} is set to zero through
+ * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}.
It defaults to
+ * the maximum interval so that we only start probing once it has been ranked at
least once
+ * {@link AnnotatedMeasurement#millisSinceLastMeasure} is set to zero from
+ * {@link DynamicEndpointSnitch#receiveTiming(InetAddressAndPort, long, LatencyMeasurementType)}.
+ *
+ * {@link AnnotatedMeasurement#millisSinceLastMeasure and {@link AnnotatedMeasurement#probeTimerMillis}
+ * are incremented via {@link DynamicEndpointSnitch#updateSamples()}
+ */
+ protected static class AnnotatedMeasurement
+ {
+ // Used to optimally target latency probes only on nodes that are both requested
for ranking
+ // and are not being measured. For example with token aware clients a large portion
of the cluster will never
+ // be ranked at all and therefore we won't probe them.
+ public AtomicLong millisSinceLastRequest = new AtomicLong(MAX_PROBE_INTERVAL_MS);
+ public AtomicLong millisSinceLastMeasure = new AtomicLong(0);
+ public long probeTimerMillis = 0;
+ public ScheduledFuture<?> probeFuture = null;
+
+ // The underlying measurement technique. E.g. a median filter (histogram) or
an EMA filter, or ...
+ public final ISnitchMeasurement measurement;
+
+ public AnnotatedMeasurement(ISnitchMeasurement measurement)
+ {
+ this.measurement = measurement;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "AnnotatedMeasurement{" +
+ "millisSinceLastRequest=" + millisSinceLastRequest +
+ ", millisSinceLastMeasure=" + millisSinceLastMeasure +
+ ", probeTimerMillis=" + probeTimerMillis +
+ ", probeFuturePending=" + (probeFuture != null && !probeFuture.isDone())
+
+ ", measurementClass=" + measurement.getClass().getSimpleName() +
+ '}';
+ }
+ }
+
+ /**
+ * Allows the subclasses to inject ISnitchMeasurement instances back into this common
class so that common
+ * functionality can be handled here
+ * @param initialValue The initial value of the measurement, some implementations
may use this, others may not
+ * @return a constructed instance of an ISnitchMeasurement interface
+ */
+ abstract protected ISnitchMeasurement measurementImpl(long initialValue);
+
+ /**
+ * Records a latency. This MUST be cheap as it is called in the fast path
+ */
+ public void receiveTiming(InetAddressAndPort address, long latency, LatencyMeasurementType
measurementType)
+ {
+ if (measurementType == LatencyMeasurementType.IGNORE)
+ return;
+
+ AnnotatedMeasurement sample = samples.get(address);
+
+ if (sample == null)
+ {
+ AnnotatedMeasurement maybeNewSample = new AnnotatedMeasurement(measurementImpl(latency));
+ sample = samples.putIfAbsent(address, maybeNewSample);
+ if (sample == null)
+ sample = maybeNewSample;
+ }
+
+ if (measurementType == LatencyMeasurementType.READ && sample.millisSinceLastMeasure.get()
> 0)
+ sample.millisSinceLastMeasure.lazySet(0);
+
+ sample.measurement.sample(latency);
+ }
+
+ @VisibleForTesting
+ protected void reset()
+ {
+ scores.clear();
+ samples.clear();
+ }
+
+ @VisibleForTesting
+ void updateScores()
+ {
+ if (!StorageService.instance.isGossipActive())
+ return;
+
+ if (!registered)
{
- public void run()
+ if (MessagingService.instance() != null)
{
- // we do this so that a host considered bad has a chance to recover,
otherwise would we never try
- // to read from it, which would cause its score to never change
- reset();
+ MessagingService.instance().register(this);
+ registered = true;
}
- };
+ }
- if (DatabaseDescriptor.isDaemonInitialized())
+ this.scores = calculateScores();
+ }
+
+ /**
+ * This is generally expensive and is called periodically (semi-frequently) not on
the fast path.
+ * @return a freshly constructed scores map.
+ */
+ public Map<InetAddressAndPort, Double> calculateScores()
+ {
+ double maxLatency = 1;
+
+ Map<InetAddressAndPort, Double> measurements = new HashMap<>(samples.size());
+
+ // We're going to weight the latency for each host against the worst one we see,
to
+ // arrive at sort of a 'badness percentage' for them. First, find the worst for
each:
+ for (Map.Entry<InetAddressAndPort, AnnotatedMeasurement> entry : samples.entrySet())
{
- updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update,
dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS);
- resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset,
dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS);
- registerMBean();
+ // This is expensive for e.g. the Histogram, so do it once and cache the
result
+ double measure = entry.getValue().measurement.measure();
+ if (measure > maxLatency)
+ maxLatency = measure;
+ measurements.put(entry.getKey(), measure);
}
+
+ HashMap<InetAddressAndPort, Double> newScores = new HashMap<>(measurements.size());
+
+ // now make another pass to do the weighting based on the maximums we found before
+ for (Map.Entry<InetAddressAndPort, Double> entry : measurements.entrySet())
+ {
+ double score = entry.getValue() / maxLatency;
+ // finally, add the severity without any weighting, since hosts scale this
relative to their own load and the size of the task causing the severity.
+ // "Severity" is basically a measure of compaction activity (CASSANDRA-3722).
+ if (USE_SEVERITY)
+ score += getSeverity(entry.getKey());
+ // lowest score (least amount of badness) wins.
+ newScores.put(entry.getKey(), score);
+ }
+
+ return newScores;
}
/**
- * Update configuration from {@link DatabaseDescriptor} and estart the update-scheduler
and reset-scheduler tasks
+ * Update configuration from {@link DatabaseDescriptor} and restart the various scheduler
tasks
* if the configured rates for these tasks have changed.
*/
- public void applyConfigChanges()
+ public void applyConfigChanges(int newDynamicUpdateInternal, int newDynamicLatencyProbeInterval,
double newDynamicBadnessThreshold)
{
- if (dynamicUpdateInterval != DatabaseDescriptor.getDynamicUpdateInterval())
+ if (dynamicUpdateInterval != newDynamicUpdateInternal)
{
- dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
+ dynamicUpdateInterval = newDynamicUpdateInternal;
if (DatabaseDescriptor.isDaemonInitialized())
{
- updateSchedular.cancel(false);
- updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update,
dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS);
+ if (updateScoresScheduler != null)
+ updateScoresScheduler.cancel(false);
+ updateScoresScheduler = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(this::updateScores,
dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS);
--- End diff --
Yea ... I can move it onto a dedicated STPE but I don't want to change too much behavior.
Regarding the naming the LegacyHistogram doesn't send any probes, so I thought it could just
be generically a `updateSamples` method similar to `updateScores` ... I don't care strongly
one way or the other it's up to you :-)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
|