Github user jolynch commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/283#discussion_r237960560
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---
@@ -154,20 +310,155 @@ private void registerMBean()
public void close()
{
- updateSchedular.cancel(false);
- resetSchedular.cancel(false);
+ if (updateScheduler != null)
+ updateScheduler.cancel(false);
+ if (latencyProbeScheduler != null)
+ latencyProbeScheduler.cancel(false);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
try
{
- mbs.unregisterMBean(new ObjectName(mbeanName));
+ if (mbeanRegistered)
+ mbs.unregisterMBean(new ObjectName(mbeanName));
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
+ /**
+ * Determines if latency probes need to be sent, and potentially sends a single latency
probe per invocation
+ */
+ protected void maybeSendLatencyProbe()
+ {
+ if (!StorageService.instance.isGossipActive())
+ return;
+
+ currentProbePosition = latencyProbeNeeded(samples, latencyProbeSequence, currentProbePosition);
+
+ if (currentProbePosition < latencyProbeSequence.size())
+ {
+ try
+ {
+ InetAddressAndPort peer = latencyProbeSequence.get(currentProbePosition);
+ sendPingMessageToPeer(peer);
+ }
+ catch (IndexOutOfBoundsException ignored) {}
+ }
+ }
+
+ /**
+ * This method (unfortunately) mutates a lot of state so that it doesn't create any
garbage and only iterates the
+ * sample map a single time . In particular on every call we:
+ * - increment every sample's intervalsSinceLastMeasure
+ *
+ * When probes should be generated we also potentially:
+ * - reset sample's recentlyRequested that have reached the "CONSTANT" phase of
probing (10 minutes by default)
+ * - add any InetAddressAndPort's that need probing to the provided endpointsToProbe
+ * - shuffle the endpointsToProbe
+ *
+ * If there are probes to be sent, this method short circuits all generation of probes
and just returns the
+ * passed probePosition plus one.
+ * @return The position of the passed endpointsToProbe that should be probed.
+ */
+ @VisibleForTesting
+ int latencyProbeNeeded(Map<InetAddressAndPort, AnnotatedMeasurement> samples,
+ List<InetAddressAndPort> endpointsToProbe,
int probePosition) {
+ boolean shouldGenerateProbes = (probePosition >= endpointsToProbe.size());
+
+ if (shouldGenerateProbes)
+ {
+ endpointsToProbe.clear();
+ samples.keySet().retainAll(Gossiper.instance.getLiveMembers());
+ }
+
+ // We have to increment intervalsSinceLastMeasure regardless of if we generate
probes
+ for (Map.Entry<InetAddressAndPort, AnnotatedMeasurement> entry: samples.entrySet())
+ {
+ AnnotatedMeasurement measurement = entry.getValue();
+ long intervalsSinceLastMeasure = measurement.intervalsSinceLastMeasure.getAndIncrement();
--- End diff --
I believe that I've addressed this feedback, although I didn't end up looking at the futures
in receiveTiming since that's performance critical (we'll maybe send an extra probe, not a
big deal imo)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
|