Github user jolynch commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/283#discussion_r232435703
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---
@@ -23,128 +23,300 @@
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import com.codahale.metrics.Snapshot;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
/**
* A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible
for actually measuring
+ * latency and providing an ISnitchMeasurement implementation back to this class.
*/
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber,
DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements
ILatencySubscriber, DynamicEndpointSnitchMBean
{
- private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
-
- private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased
to towards the newer values
- private static final int WINDOW_SIZE = 100;
-
- private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
- private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
- private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold();
+ private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+ // Latency measurement and ranking. The samples contain latency measurements and
the scores are used for ranking
+ protected boolean registered = false;
+ protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+ protected volatile Map<InetAddressAndPort, Double> scores = new HashMap<>();
+ protected final Map<InetAddressAndPort, AnnotatedMeasurement> samples = new
ConcurrentHashMap<>();
+
+ // Latency probe functionality for actively probing endpoints that we haven't measured
recently but are ranking
+ public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms",
60 * 10 * 1000L);
+ public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms",
60 * 1000L) ;
+ // The probe rate is set later when configuration is read
+ protected static final RateLimiter probeRateLimiter = RateLimiter.create(1);
+ protected static final DebuggableScheduledThreadPoolExecutor latencyProbeExecutor
= new DebuggableScheduledThreadPoolExecutor(1, "LatencyProbes", Thread.MIN_PRIORITY);
+
+ // User configuration of the snitch tunables
+ protected volatile int dynamicUpdateInterval = -1;
+ protected volatile int dynamicLatencyProbeInterval = -1;
+ protected volatile double dynamicBadnessThreshold = 0;
// the score for a merged set of endpoints must be this much worse than the score
for separate endpoints to
// warrant not merging two ranges into a single range
private static final double RANGE_MERGING_PREFERENCE = 1.5;
private String mbeanName;
- private boolean registered = false;
-
- private volatile HashMap<InetAddressAndPort, Double> scores = new HashMap<>();
- private final ConcurrentHashMap<InetAddressAndPort, ExponentiallyDecayingReservoir>
samples = new ConcurrentHashMap<>();
+ private boolean mbeanRegistered = false;
public final IEndpointSnitch subsnitch;
- private volatile ScheduledFuture<?> updateSchedular;
- private volatile ScheduledFuture<?> resetSchedular;
-
- private final Runnable update;
- private final Runnable reset;
+ private volatile ScheduledFuture<?> updateScoresScheduler;
+ private volatile ScheduledFuture<?> updateSamplesScheduler;
public DynamicEndpointSnitch(IEndpointSnitch snitch)
{
this(snitch, null);
}
- public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
+ protected DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
{
mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch";
if (instance != null)
mbeanName += ",instance=" + instance;
subsnitch = snitch;
- update = new Runnable()
+
+ if (DatabaseDescriptor.isDaemonInitialized())
{
- public void run()
- {
- updateScores();
- }
- };
- reset = new Runnable()
+ applyConfigChanges(DatabaseDescriptor.getDynamicUpdateInterval(),
+ DatabaseDescriptor.getDynamicSampleUpdateInterval(),
+ DatabaseDescriptor.getDynamicBadnessThreshold());
+ open();
+ }
+ }
+
+ /**
+ * Allows subclasses to inject new ways of measuring latency back to this abstract
base class.
+ */
+ protected interface ISnitchMeasurement
+ {
+ void sample(long value);
+ double measure();
+ Iterable<Double> measurements();
+ }
+
+ /**
+ * Adds some boookeeping that the DES uses over top of the various metrics techniques
used by the
+ * implementations. This is used to allow CASSANDRA-14459 latency probes as well
as further experimentation
+ * on new latency measurement techniques in CASSANDRA-14817
+ *
+ * {@link AnnotatedMeasurement#millisSinceLastRequest} is set to zero through
+ * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}.
It defaults to
+ * the maximum interval so that we only start probing once it has been ranked at
least once
+ * {@link AnnotatedMeasurement#millisSinceLastMeasure} is set to zero from
+ * {@link DynamicEndpointSnitch#receiveTiming(InetAddressAndPort, long, LatencyMeasurementType)}.
+ *
+ * {@link AnnotatedMeasurement#millisSinceLastMeasure and {@link AnnotatedMeasurement#probeTimerMillis}
+ * are incremented via {@link DynamicEndpointSnitch#updateSamples()}
+ */
+ protected static class AnnotatedMeasurement
+ {
+ // Used to optimally target latency probes only on nodes that are both requested
for ranking
+ // and are not being measured. For example with token aware clients a large portion
of the cluster will never
+ // be ranked at all and therefore we won't probe them.
+ public AtomicLong millisSinceLastRequest = new AtomicLong(MAX_PROBE_INTERVAL_MS);
+ public AtomicLong millisSinceLastMeasure = new AtomicLong(0);
+ public long probeTimerMillis = 0;
--- End diff --
Done.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
|