Github user jolynch commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/283#discussion_r237956751
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---
@@ -154,31 +326,203 @@ private void registerMBean()
public void close()
{
- updateSchedular.cancel(false);
- resetSchedular.cancel(false);
+ if (updateScoresScheduler != null)
+ updateScoresScheduler.cancel(false);
+ if (updateSamplesScheduler != null)
+ updateSamplesScheduler.cancel(false);
+
+ for (AnnotatedMeasurement measurement : samples.values())
+ {
+ if (measurement.probeFuture != null)
+ measurement.probeFuture.cancel(false);
+
+ measurement.millisSinceLastMeasure.set(0);
+ measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS);
+ measurement.probeTimerMillis = 0;
+ }
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
- mbs.unregisterMBean(new ObjectName(mbeanName));
+ if (mbeanRegistered)
+ mbs.unregisterMBean(new ObjectName(mbeanName));
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
+ /**
+ * Background task running on the samples dictionary. The default implementation
sends latency probes (PING)
+ * messages to explore nodes that we have not received timings for recently but have
ranked in
+ * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}.
+ */
+ protected void updateSamples()
+ {
+ // Split calculation of probe timers from sending probes for testability
+ calculateProbes(samples, dynamicLatencyProbeInterval);
+
+ if (!StorageService.instance.isGossipActive())
+ return;
+
+ schedulePings(samples);
+ }
+
+ /**
+ * This method mutates the passed AnnotatedMeasurements to implement capped exponential
backoff per endpoint.
+ *
+ * The algorithm is as follows:
+ * 1. All samples get their millisSinceLastMeasure and millisSinceLastRequest fields
+ * incremented by the passed interval
+ * 2. Any recently requested (ranked) endpoints that have not been measured recently
(e.g. because the snitch
+ * has sent them no traffic) get probes with exponential backoff.
+ *
+ * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the probes are stopped
after
+ * MAX_PROBE_INTERVAL_MS of no ranking requests as well.
+ *
+ * At the end of this method, any passed AnnotatedMeasurements that need latency
probes will have non zero
+ * probeTimerMillis members set.
+ */
+ @VisibleForTesting
+ static void calculateProbes(Map<InetAddressAndPort, AnnotatedMeasurement> samples,
long intervalMillis) {
+ for (Map.Entry<InetAddressAndPort, AnnotatedMeasurement> entry: samples.entrySet())
+ {
+ if (entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()))
+ continue;
+
+ AnnotatedMeasurement measurement = entry.getValue();
+ long lastMeasure = measurement.millisSinceLastMeasure.getAndAdd(intervalMillis);
+ long lastRequest = measurement.millisSinceLastRequest.getAndAdd(intervalMillis);
+
+ if (lastMeasure >= MIN_PROBE_INTERVAL_MS && lastRequest < MAX_PROBE_INTERVAL_MS)
+ {
+ if (measurement.probeTimerMillis == 0)
+ {
+ measurement.probeTimerMillis = intervalMillis;
+ }
+ else if (measurement.probeFuture != null && measurement.probeFuture.isDone())
+ {
+ measurement.probeTimerMillis = Math.min(MAX_PROBE_INTERVAL_MS, measurement.probeTimerMillis
* 2);
+ }
+ }
+ else
+ {
+ measurement.probeTimerMillis = 0;
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void schedulePings(Map<InetAddressAndPort, AnnotatedMeasurement> samples)
+ {
+ for (Map.Entry<InetAddressAndPort, AnnotatedMeasurement> entry: samples.entrySet())
+ {
+ AnnotatedMeasurement measurement = entry.getValue();
+ long delay = measurement.probeTimerMillis;
+ long millisSinceLastRequest = measurement.millisSinceLastRequest.get();
+
+ if (millisSinceLastRequest > MAX_PROBE_INTERVAL_MS && !Gossiper.instance.isAlive(entry.getKey()))
+ {
+ samples.remove(entry.getKey());
+ }
+
+ if (delay > 0 && millisSinceLastRequest < MAX_PROBE_INTERVAL_MS
&&
+ (measurement.probeFuture == null || measurement.probeFuture.isDone())
&&
+ !updateSamplesScheduler.isCancelled())
--- End diff --
Ok, I thought that it might be good to check just to be safe but sure I took it out.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
|