cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aweisb...@apache.org
Subject cassandra git commit: Don't write to system_distributed.repair_history, system_traces.sessions, system_traces.events in mixed version 3.X/4.0 clusters
Date Thu, 01 Nov 2018 16:13:06 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 0766f7e54 -> 877b08eaf


Don't write to system_distributed.repair_history, system_traces.sessions, system_traces.events
in mixed version 3.X/4.0 clusters

Patch by Ariel Weisberg; Reviewed by Dinesh Joshi for CASSANDRA-14841


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/877b08ea
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/877b08ea
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/877b08ea

Branch: refs/heads/trunk
Commit: 877b08eaf0e02542c9f6d9f8cd457a8e44b4febf
Parents: 0766f7e
Author: Ariel Weisberg <aweisberg@apple.com>
Authored: Mon Oct 29 15:26:22 2018 -0400
Committer: Ariel Weisberg <aweisberg@apple.com>
Committed: Thu Nov 1 12:12:34 2018 -0400

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 NEWS.txt                                        |  6 +++
 src/java/org/apache/cassandra/gms/Gossiper.java | 43 ++++++++++++++-
 .../apache/cassandra/gms/VersionedValue.java    |  7 +++
 .../repair/SystemDistributedKeyspace.java       | 16 ++++++
 .../cassandra/tracing/TraceStateImpl.java       | 11 ++++
 .../org/apache/cassandra/gms/GossiperTest.java  | 55 +++++++++++++++++---
 7 files changed, 130 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f49531c..b7c0398 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Don't write to system_distributed.repair_history, system_traces.sessions, system_traces.events
in mixed version 3.X/4.0 clusters (CASSANDRA-14841)
  * Avoid running query to self through messaging service (CASSANDRA-14807)
  * Allow using custom script for chronicle queue BinLog archival (CASSANDRA-14373)
  * Transient->Full range movements mishandle consistency level upgrade (CASSANDRA-14759)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 5066378..3267c91 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -101,6 +101,12 @@ New features
 
 Upgrading
 ---------
+    - Additional columns have been added to system_distributed.repair_history,
+      system_traces.sessions and system_traces.events. As a result select * queries
+      againsts these tables will fail and generate an error in the log
+      during upgrade when the cluster is mixed version. Additionally these
+      tables will not be written to if repair or tracing occurs until
+      the entire cluster is upgraded and there are no 3.X version nodes in Gossip.
     - Timestamp ties between values resolve differently: if either value has a TTL,
       this value always wins. This is to provide consistent reconciliation before
       and after the value expires into a tombstone.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 170843b..aedcb04 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BooleanSupplier;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import javax.annotation.Nullable;
@@ -31,6 +32,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -108,7 +110,8 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     private final List<IEndpointStateChangeSubscriber> subscribers = new CopyOnWriteArrayList<IEndpointStateChangeSubscriber>();
 
     /* live member set */
-    private final Set<InetAddressAndPort> liveEndpoints = new ConcurrentSkipListSet<>();
+    @VisibleForTesting
+    final Set<InetAddressAndPort> liveEndpoints = new ConcurrentSkipListSet<>();
 
     /* unreachable member set */
     private final Map<InetAddressAndPort, Long> unreachableEndpoints = new ConcurrentHashMap<>();
@@ -136,6 +139,39 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
 
     private volatile long lastProcessedMessageAt = System.currentTimeMillis();
 
+    //This property and anything that checks it should be removed in 5.0
+    private boolean haveMajorVersion3Nodes = true;
+
+    final com.google.common.base.Supplier<Boolean> haveMajorVersion3NodesSupplier =
() ->
+    {
+        //Once there are no prior version nodes we don't need to keep rechecking
+        if (!haveMajorVersion3Nodes)
+            return false;
+
+        Iterable<InetAddressAndPort> allHosts = Iterables.concat(Gossiper.instance.getLiveMembers(),
Gossiper.instance.getUnreachableMembers());
+        CassandraVersion referenceVersion = null;
+
+        for (InetAddressAndPort host : allHosts)
+        {
+            CassandraVersion version = getReleaseVersion(host);
+
+            //Raced with changes to gossip state
+            if (version == null)
+                continue;
+
+            if (referenceVersion == null)
+                referenceVersion = version;
+
+            if (version.major < 4)
+                return true;
+        }
+
+        haveMajorVersion3Nodes = false;
+        return false;
+    };
+
+    private final Supplier<Boolean> haveMajorVersion3NodesMemoized = Suppliers.memoizeWithExpiration(haveMajorVersion3NodesSupplier,
1, TimeUnit.MINUTES);
+
     private class GossipTask implements Runnable
     {
         public void run()
@@ -1906,6 +1942,11 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         }
     }
 
+    public boolean haveMajorVersion3Nodes()
+    {
+        return haveMajorVersion3NodesMemoized.get();
+    }
+
     private boolean nodesAgreeOnSchema(Collection<InetAddressAndPort> nodes)
     {
         UUID expectedVersion = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 691f544..94b8cb8 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 
 import static java.nio.charset.StandardCharsets.ISO_8859_1;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.TypeSizes;
@@ -265,6 +266,12 @@ public class VersionedValue implements Comparable<VersionedValue>
             return new VersionedValue(FBUtilities.getReleaseVersionString());
         }
 
+        @VisibleForTesting
+        public VersionedValue releaseVersion(String version)
+        {
+            return new VersionedValue(version);
+        }
+
         public VersionedValue networkVersion()
         {
             return new VersionedValue(String.valueOf(MessagingService.current_version));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
index fc09e71..d4b7259 100644
--- a/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
+++ b/src/java/org/apache/cassandra/repair/SystemDistributedKeyspace.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -187,6 +188,11 @@ public final class SystemDistributedKeyspace
 
     public static void startRepairs(UUID id, UUID parent_id, String keyspaceName, String[]
cfnames, CommonRange commonRange)
     {
+        //Don't record repair history if an upgrade is in progress as version 3 nodes generates
errors
+        //due to schema differences
+        if (Gossiper.instance.haveMajorVersion3Nodes())
+            return;
+
         InetAddressAndPort coordinator = FBUtilities.getBroadcastAddressAndPort();
         Set<String> participants = Sets.newHashSet();
         Set<String> participants_v2 = Sets.newHashSet();
@@ -230,6 +236,11 @@ public final class SystemDistributedKeyspace
 
     public static void successfulRepairJob(UUID id, String keyspaceName, String cfname)
     {
+        //Don't record repair history if an upgrade is in progress as version 3 nodes generates
errors
+        //due to schema differences
+        if (Gossiper.instance.haveMajorVersion3Nodes())
+            return;
+
         String query = "UPDATE %s.%s SET status = '%s', finished_at = toTimestamp(now())
WHERE keyspace_name = '%s' AND columnfamily_name = '%s' AND id = %s";
         String fmtQuery = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, REPAIR_HISTORY,
                                         RepairState.SUCCESS.toString(),
@@ -241,6 +252,11 @@ public final class SystemDistributedKeyspace
 
     public static void failedRepairJob(UUID id, String keyspaceName, String cfname, Throwable
t)
     {
+        //Don't record repair history if an upgrade is in progress as version 3 nodes generates
errors
+        //due to schema differences
+        if (Gossiper.instance.haveMajorVersion3Nodes())
+            return;
+
         String query = "UPDATE %s.%s SET status = '%s', finished_at = toTimestamp(now()),
exception_message=?, exception_stacktrace=? WHERE keyspace_name = '%s' AND columnfamily_name
= '%s' AND id = %s";
         StringWriter sw = new StringWriter();
         PrintWriter pw = new PrintWriter(sw);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
index 2722406..e16f778 100644
--- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
+++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -102,6 +103,11 @@ public class TraceStateImpl extends TraceState
 
     void executeMutation(final Mutation mutation)
     {
+        //Don't record trace state if an upgrade is in progress as version 3 nodes generates
errors
+        //due to schema differences
+        if (Gossiper.instance.haveMajorVersion3Nodes())
+            return;
+
         CompletableFuture<Void> fut = CompletableFuture.runAsync(new WrappedRunnable()
         {
             protected void runMayThrow()
@@ -117,6 +123,11 @@ public class TraceStateImpl extends TraceState
 
     static void mutateWithCatch(Mutation mutation)
     {
+        //Don't record trace state if an upgrade is in progress as version 3 nodes generates
errors
+        //due to schema differences
+        if (Gossiper.instance.haveMajorVersion3Nodes())
+            return;
+
         try
         {
             StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY,
System.nanoTime());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/877b08ea/test/unit/org/apache/cassandra/gms/GossiperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index b856983..a78d300 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -42,6 +42,8 @@ import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class GossiperTest
 {
@@ -74,6 +76,43 @@ public class GossiperTest
     }
 
     @Test
+    public void testHaveVersion3Nodes() throws Exception
+    {
+        VersionedValue.VersionedValueFactory factory = new VersionedValue.VersionedValueFactory(null);
+        EndpointState es = new EndpointState(null);
+        es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("4.0-SNAPSHOT"));
+        Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.1"),
es);
+        Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.1"));
+
+
+        es = new EndpointState(null);
+        es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("3.11.3"));
+        Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.2"),
es);
+        Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.2"));
+
+
+        es = new EndpointState(null);
+        es.addApplicationState(ApplicationState.RELEASE_VERSION, factory.releaseVersion("3.0.0"));
+        Gossiper.instance.endpointStateMap.put(InetAddressAndPort.getByName("127.0.0.3"),
es);
+        Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.3"));
+
+
+        assertTrue(Gossiper.instance.haveMajorVersion3NodesSupplier.get());
+
+        Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.2"));
+        Gossiper.instance.liveEndpoints.remove(InetAddressAndPort.getByName("127.0.0.2"));
+
+
+        assertTrue(Gossiper.instance.haveMajorVersion3NodesSupplier.get());
+
+        Gossiper.instance.endpointStateMap.remove(InetAddressAndPort.getByName("127.0.0.3"));
+        Gossiper.instance.liveEndpoints.add(InetAddressAndPort.getByName("127.0.0.3"));
+
+        assertFalse(Gossiper.instance.haveMajorVersion3NodesSupplier.get());
+
+    }
+
+    @Test
     public void testLargeGenerationJump() throws UnknownHostException, InterruptedException
     {
         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds,
2);
@@ -136,15 +175,15 @@ public class GossiperTest
         // Check that the new entry was added
         Assert.assertEquals(nextSize, loadedList.size());
         for (InetAddressAndPort a : nextSeeds)
-            Assert.assertTrue(loadedList.contains(a.toString()));
+            assertTrue(loadedList.contains(a.toString()));
 
         // Check that the return value of the reloadSeeds matches the content of the getSeeds
call
         // and that they both match the internal contents of the Gossiper seeds list
         Assert.assertEquals(loadedList.size(), gossiper.getSeeds().size());
         for (InetAddressAndPort a : gossiper.seeds)
         {
-            Assert.assertTrue(loadedList.contains(a.toString()));
-            Assert.assertTrue(gossiper.getSeeds().contains(a.toString()));
+            assertTrue(loadedList.contains(a.toString()));
+            assertTrue(gossiper.getSeeds().contains(a.toString()));
         }
 
         // Add a duplicate of the last address to the seed provider list
@@ -157,7 +196,7 @@ public class GossiperTest
         // Check that the number of seed nodes reported hasn't increased
         Assert.assertEquals(uniqueSize, loadedList.size());
         for (InetAddressAndPort a : nextSeeds)
-            Assert.assertTrue(loadedList.contains(a.toString()));
+            assertTrue(loadedList.contains(a.toString()));
 
         // Create a new list that has no overlaps with the previous list
         addr = InetAddressAndPort.getByAddress(InetAddress.getByName("127.99.2.1"));
@@ -176,8 +215,8 @@ public class GossiperTest
         Assert.assertEquals(disjointSize, loadedList.size());
         for (InetAddressAndPort a : disjointSeeds)
         {
-            Assert.assertTrue(gossiper.getSeeds().contains(a.toString()));
-            Assert.assertTrue(loadedList.contains(a.toString()));
+            assertTrue(gossiper.getSeeds().contains(a.toString()));
+            assertTrue(loadedList.contains(a.toString()));
         }
 
         // Set the seed node provider to return an empty list
@@ -187,7 +226,7 @@ public class GossiperTest
         // Check that the in memory seed node list was not modified
         Assert.assertEquals(disjointSize, loadedList.size());
         for (InetAddressAndPort a : disjointSeeds)
-            Assert.assertTrue(loadedList.contains(a.toString()));
+            assertTrue(loadedList.contains(a.toString()));
 
         // Change the seed provider to one that throws an unchecked exception
         DatabaseDescriptor.setSeedProvider(new ErrorSeedProvider());
@@ -199,7 +238,7 @@ public class GossiperTest
         // Check that the in memory seed node list was not modified and the exception was
caught
         Assert.assertEquals(disjointSize, gossiper.getSeeds().size());
         for (InetAddressAndPort a : disjointSeeds)
-            Assert.assertTrue(gossiper.getSeeds().contains(a.toString()));
+            assertTrue(gossiper.getSeeds().contains(a.toString()));
     }
 
     static class TestSeedProvider implements SeedProvider


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message