Github user jolynch commented on a diff in the pull request:
https://github.com/apache/cassandra/pull/283#discussion_r225361405
--- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---
@@ -22,121 +22,194 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import com.codahale.metrics.ExponentiallyDecayingReservoir;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import com.codahale.metrics.Snapshot;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.net.LatencyMeasurementType;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.PingMessage;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
+import static org.apache.cassandra.net.MessagingService.Verb.PING;
+import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE;
+import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE;
+
+
/**
* A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector
+ * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible
for actually measuring
+ * latency and populating the {@link #scores} map.
*/
-public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber,
DynamicEndpointSnitchMBean
+public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements
ILatencySubscriber, DynamicEndpointSnitchMBean
{
- private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
-
- private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased
to towards the newer values
- private static final int WINDOW_SIZE = 100;
-
- private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval();
- private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval();
- private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold();
+ private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class);
+
+ // Subclass specific functionality
+ protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity");
+ protected boolean registered = false;
+ // The scores map is updated via copy in updateScores
+ // We keep it in the base class for performance reasons (so it can be easily aliased)
+ protected volatile Map<InetAddressAndPort, Double> scores = new HashMap<>();
+
+ // Rate limit how often we generate latency probes
+ protected long nextAllowedProbeGenerationTime;
+ protected long nextProbeGenerationTime;
+ protected int currentProbePosition = 0;
+ protected final List<InetAddressAndPort> latencyProbeSequence = new ArrayList<>();
+
+ // DES general functionality
+ protected volatile int dynamicUpdateInterval = -1;
+ protected volatile int dynamicLatencyProbeInterval = -1;
+ protected volatile double dynamicBadnessThreshold = 0;
// the score for a merged set of endpoints must be this much worse than the score
for separate endpoints to
// warrant not merging two ranges into a single range
private static final double RANGE_MERGING_PREFERENCE = 1.5;
private String mbeanName;
- private boolean registered = false;
-
- private volatile HashMap<InetAddressAndPort, Double> scores = new HashMap<>();
- private final ConcurrentHashMap<InetAddressAndPort, ExponentiallyDecayingReservoir>
samples = new ConcurrentHashMap<>();
+ private boolean mbeanRegistered = false;
public final IEndpointSnitch subsnitch;
- private volatile ScheduledFuture<?> updateSchedular;
- private volatile ScheduledFuture<?> resetSchedular;
+ private volatile ScheduledFuture<?> updateScheduler;
+ private volatile ScheduledFuture<?> latencyProbeScheduler;
- private final Runnable update;
- private final Runnable reset;
+ private final Runnable update = this::updateScores;
+ private final Runnable latencyProbe = this::maybeSendLatencyProbe;
public DynamicEndpointSnitch(IEndpointSnitch snitch)
{
this(snitch, null);
}
- public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
+ protected DynamicEndpointSnitch(IEndpointSnitch snitch, String instance)
{
mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch";
if (instance != null)
mbeanName += ",instance=" + instance;
subsnitch = snitch;
- update = new Runnable()
- {
- public void run()
- {
- updateScores();
- }
- };
- reset = new Runnable()
- {
- public void run()
- {
- // we do this so that a host considered bad has a chance to recover,
otherwise would we never try
- // to read from it, which would cause its score to never change
- reset();
- }
- };
if (DatabaseDescriptor.isDaemonInitialized())
{
- updateSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update,
dynamicUpdateInterval, dynamicUpdateInterval, TimeUnit.MILLISECONDS);
- resetSchedular = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset,
dynamicResetInterval, dynamicResetInterval, TimeUnit.MILLISECONDS);
+ applyConfigChanges();
registerMBean();
}
+ nextProbeGenerationTime = System.nanoTime();
+ }
+
+ /**
+ * Records a latency. This MUST be cheap as it is called in the fast path
+ */
+ abstract public void receiveTiming(InetAddressAndPort address, long latency, LatencyMeasurementType
measurementType);
--- End diff --
correct, and storing the latency is basically all either implementation does. As part
of pulling samples (along with the annotations) back to the base class I'll see how much of
this I can unify.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org
|