usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [02/33] usergrid git commit: Integrate datastax cluster into migration manager for creation of keyspaces using database/setup.
Date Wed, 11 May 2016 05:45:55 GMT
Integrate datastax cluster into migration manager for creation of keyspaces using database/setup.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/01c4970a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/01c4970a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/01c4970a

Branch: refs/heads/datastax-cass-driver
Commit: 01c4970a80fa080f7deccf3f004112d9112f624d
Parents: a631975
Author: Michael Russo <michaelarusso@gmail.com>
Authored: Tue Feb 9 10:18:27 2016 -0800
Committer: Michael Russo <michaelarusso@gmail.com>
Committed: Tue Feb 9 10:18:27 2016 -0800

----------------------------------------------------------------------
 .../usergrid/corepersistence/CpSetup.java       |  15 +--
 .../persistence/core/datastax/CQLUtils.java     |  49 ++++++++
 .../core/datastax/DatastaxSessionProvider.java  |  22 +++-
 .../core/datastax/impl/DatastaxClusterImpl.java |  68 +++--------
 .../persistence/core/guice/CommonModule.java    |  10 +-
 .../migration/schema/MigrationManagerImpl.java  | 113 +++++++------------
 6 files changed, 142 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/01c4970a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
index e97be3f..d2c38e4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpSetup.java
@@ -83,6 +83,9 @@ public class CpSetup implements Setup {
         //a no op, creating the injector creates the connections
         //init our index if required
         this.emf.initializeManagementIndex();
+
+        logger.info( "Initialize application keyspace" );
+        migrate();
         setupStaticKeyspace();
         setupSystemKeyspace();
 
@@ -129,9 +132,7 @@ public class CpSetup implements Setup {
     @Override
     public void setupSystemKeyspace() throws Exception {
 
-        logger.info( "Initialize system keyspace" );
-
-        migrate();
+        logger.info( "Initialize system tables" );
 
         cass.createColumnFamily( getApplicationKeyspace(),
             createColumnFamilyDefinition( getApplicationKeyspace(), APPLICATIONS_CF, ComparatorType.BYTESTYPE
) );
@@ -145,7 +146,7 @@ public class CpSetup implements Setup {
         cass.createColumnFamily( getApplicationKeyspace(),
             createColumnFamilyDefinition( getApplicationKeyspace(), PRINCIPAL_TOKEN_CF, ComparatorType.UUIDTYPE
) );
 
-        logger.info( "System keyspace initialized" );
+        logger.info( "System tables initialized" );
     }
 
 
@@ -167,11 +168,9 @@ public class CpSetup implements Setup {
     @Override
     public void setupStaticKeyspace() throws Exception {
 
-        migrate();
-
         // Need this legacy stuff for queues
 
-        logger.info( "Creating static application keyspace {}", getApplicationKeyspace()
);
+        logger.info( "Initialize application tables" );
 
         cass.createColumnFamily( getApplicationKeyspace(),
             createColumnFamilyDefinition( getApplicationKeyspace(), APPLICATIONS_CF,
@@ -183,6 +182,8 @@ public class CpSetup implements Setup {
         cass.createColumnFamilies( getApplicationKeyspace(),
             getCfDefs( QueuesCF.class, getApplicationKeyspace() ) );
 
+        logger.info( "Application tables initialized" );
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/01c4970a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
new file mode 100644
index 0000000..b663934
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/CQLUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.core.datastax;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CQLUtils {
+
+    public static String getFormattedReplication(String strategy, String strategyOptions)
throws JsonProcessingException {
+
+        Map<String, String> replicationSettings = new HashMap<>();
+        replicationSettings.put("class", strategy);
+        String[] strategyOptionsSplit = strategyOptions.split(",");
+        for ( String option : strategyOptionsSplit){
+            String[] splitOptions = option.split(":");
+            replicationSettings.put(splitOptions[0], splitOptions[1]);
+        }
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.writeValueAsString(replicationSettings).replace("\"", "'");
+    }
+
+
+    public static void createColumnFamily(){
+
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/01c4970a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DatastaxSessionProvider.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DatastaxSessionProvider.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DatastaxSessionProvider.java
index eeca763..1b39cb8 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DatastaxSessionProvider.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/DatastaxSessionProvider.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.usergrid.persistence.core.datastax;
 
 
@@ -5,12 +23,12 @@ import com.datastax.driver.core.Session;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 
-public class DatastaxSessionProvider implements Provider<Session> {
+public class DataStaxSessionProvider implements Provider<Session> {
 
     private final DataStaxCluster dataStaxCluster;
 
     @Inject
-    public DatastaxSessionProvider( final DataStaxCluster dataStaxCluster){
+    public DataStaxSessionProvider( final DataStaxCluster dataStaxCluster ){
 
         this.dataStaxCluster = dataStaxCluster;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/01c4970a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DatastaxClusterImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DatastaxClusterImpl.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DatastaxClusterImpl.java
index 0582c4e..ffe61e6 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DatastaxClusterImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/datastax/impl/DatastaxClusterImpl.java
@@ -21,8 +21,6 @@ package org.apache.usergrid.persistence.core.datastax.impl;
 import com.datastax.driver.core.*;
 import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
 import com.datastax.driver.core.policies.LoadBalancingPolicy;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
@@ -30,15 +28,11 @@ import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 
 @Singleton
-public class DatastaxClusterImpl implements DataStaxCluster {
+public class DataStaxClusterImpl implements DataStaxCluster {
 
-    private static final Logger logger = LoggerFactory.getLogger( DatastaxClusterImpl.class
);
+    private static final Logger logger = LoggerFactory.getLogger( DataStaxClusterImpl.class
);
 
 
     private final CassandraFig cassandraFig;
@@ -47,7 +41,7 @@ public class DatastaxClusterImpl implements DataStaxCluster {
     private Session clusterSession;
 
     @Inject
-    public DatastaxClusterImpl( final CassandraFig cassandraFig ) throws Exception {
+    public DataStaxClusterImpl(final CassandraFig cassandraFig ) throws Exception {
         this.cassandraFig = cassandraFig;
 
         ConsistencyLevel defaultConsistencyLevel;
@@ -99,15 +93,12 @@ public class DatastaxClusterImpl implements DataStaxCluster {
         }
 
         this.cluster = datastaxCluster.build();
-        this.clusterSession = cluster.connect();
-        logger.info("Initialized datastax client cluster. Hosts={}, Idle Timeout={}s,  Request
Timeout={}s",
+        logger.info("Initialized datastax cluster client. Hosts={}, Idle Timeout={}s,  Request
Timeout={}s",
             cluster.getMetadata().getAllHosts().toString(),
             cluster.getConfiguration().getPoolingOptions().getIdleTimeoutSeconds(),
             cluster.getConfiguration().getPoolingOptions().getPoolTimeoutMillis() / 1000);
 
-        logger.info("Creating keyspaces if they do not already exist.");
-        createKeyspaces();
-        this.applicationSession = cluster.connect( "\""+cassandraFig.getApplicationKeyspace()+"\""
);
+
 
 
 
@@ -115,55 +106,26 @@ public class DatastaxClusterImpl implements DataStaxCluster {
     }
 
     public Cluster getCluster(){
+
         return cluster;
     }
 
     public Session getClusterSession(){
-        return clusterSession;
-    }
-
-    public Session getApplicationSession(){
-        return applicationSession;
-    }
-
-    private void createKeyspaces() throws Exception{
-
-        final String createApplicationKeyspace = String.format(
-            "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH replication = %s",
-            cassandraFig.getApplicationKeyspace(),
-            getFormattedReplication( cassandraFig.getStrategy(), cassandraFig.getStrategyOptions()
)
-
-        );
-
-        final String createLocksKeyspace = String.format(
-            "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH replication = %s",
-            cassandraFig.getLocksKeyspace(),
-            getFormattedReplication(
-                cassandraFig.getLocksKeyspaceStrategy(),
-                cassandraFig.getLocksKeyspaceReplication()
-            )
-        );
-
-        logger.info("Creating application keyspace with the following CQL: {}", createApplicationKeyspace);
-        clusterSession.execute(createApplicationKeyspace);
-
-        logger.info("Creating locks keyspace with the following CQL: {}", createLocksKeyspace);
-        clusterSession.execute(createLocksKeyspace);
 
+        if ( clusterSession == null || clusterSession.isClosed() ){
+            clusterSession = cluster.connect();
+        }
 
+        return clusterSession;
     }
 
-    private String getFormattedReplication(String strategy, String strategyOptions) throws
JsonProcessingException{
+    public Session getApplicationSession(){
 
-        Map<String, String> replicationSettings = new HashMap<>();
-        replicationSettings.put("class", strategy);
-        String[] strategyOptionsSplit = strategyOptions.split(",");
-        for ( String option : strategyOptionsSplit){
-            String[] splitOptions = option.split(":");
-            replicationSettings.put(splitOptions[0], splitOptions[1]);
+        if ( applicationSession == null || applicationSession.isClosed() ){
+            applicationSession = cluster.connect( "\""+cassandraFig.getApplicationKeyspace()+"\""
);
         }
-            ObjectMapper mapper = new ObjectMapper();
-            return mapper.writeValueAsString(replicationSettings).replace("\"", "'");
+        return applicationSession;
     }
 
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/01c4970a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
index a06a6e7..460efa5 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/guice/CommonModule.java
@@ -19,9 +19,12 @@
 package org.apache.usergrid.persistence.core.guice;
 
 
+import com.datastax.driver.core.Session;
 import com.netflix.astyanax.Keyspace;
 import org.apache.usergrid.persistence.core.astyanax.*;
-import org.apache.usergrid.persistence.core.datastax.impl.DatastaxClusterImpl;
+import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
+import org.apache.usergrid.persistence.core.datastax.DataStaxSessionProvider;
+import org.apache.usergrid.persistence.core.datastax.impl.DataStaxClusterImpl;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
@@ -63,7 +66,10 @@ public class CommonModule extends AbstractModule {
         bind(CassandraCluster.class).to(CassandraClusterImpl.class).asEagerSingleton();
 
         // bind our Datastax cluster
-        bind(DatastaxClusterImpl.class).asEagerSingleton();
+        bind(DataStaxCluster.class).to(DataStaxClusterImpl.class).asEagerSingleton();
+
+        // bind our Session to the DataStaxSessionProvider
+        bind(Session.class).toProvider(DataStaxSessionProvider.class).asEagerSingleton();
 
         // bind our keyspace to the AstyanaxKeyspaceProvider
         bind(Keyspace.class).toProvider(AstyanaxKeyspaceProvider.class).asEagerSingleton();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/01c4970a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
index db694fe..e7e0cb5 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
@@ -20,22 +20,21 @@ package org.apache.usergrid.persistence.core.migration.schema;
 
 
 import java.util.Collection;
-import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
+import com.datastax.driver.core.Session;
+import org.apache.usergrid.persistence.core.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.core.datastax.CQLUtils;
+import org.apache.usergrid.persistence.core.datastax.DataStaxCluster;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.astyanax.MultiTenantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.migration.util.AstayanxUtils;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
 import com.netflix.astyanax.ddl.ColumnFamilyDefinition;
 import com.netflix.astyanax.ddl.KeyspaceDefinition;
 
@@ -51,18 +50,20 @@ public class MigrationManagerImpl implements MigrationManager {
 
     private static final Logger logger = LoggerFactory.getLogger( MigrationManagerImpl.class
);
 
+    private final CassandraFig cassandraFig;
     private final Set<Migration> migrations;
     private final Keyspace keyspace;
-
-    private final MigrationManagerFig fig;
+    private final DataStaxCluster dataStaxCluster;
 
 
     @Inject
-    public MigrationManagerImpl( final Keyspace keyspace, final Set<Migration> migrations,
-                                 MigrationManagerFig fig ) {
+    public MigrationManagerImpl( final CassandraFig cassandraFig, final Keyspace keyspace,
+                                 final Set<Migration> migrations, final DataStaxCluster
dataStaxCluster) {
+
+        this.cassandraFig = cassandraFig;
         this.keyspace = keyspace;
         this.migrations = migrations;
-        this.fig = fig;
+        this.dataStaxCluster = dataStaxCluster;
     }
 
 
@@ -72,7 +73,7 @@ public class MigrationManagerImpl implements MigrationManager {
 
         try {
 
-            testAndCreateKeyspace();
+            createOrUpdateKeyspace();
 
             for ( Migration migration : migrations ) {
 
@@ -116,86 +117,54 @@ public class MigrationManagerImpl implements MigrationManager {
 
         logger.info( "Created column family {}", columnFamily.getColumnFamily().getName()
);
 
-        waitForMigration();
+        waitForSchemaAgreement();
     }
 
 
     /**
-     * Check if they keyspace exists.  If it doesn't create it
+     * Execute CQL to create the keyspace if it does not already exists.  Always update the
keyspace with the
+     * configured strategy options to allow for real time replication updates.
+     *
+     * @throws Exception
      */
-    private void testAndCreateKeyspace() throws ConnectionException {
-
-
-        KeyspaceDefinition keyspaceDefinition = null;
-
-        try {
-            keyspaceDefinition = keyspace.describeKeyspace();
-
-        }catch( NotFoundException nfe){
-            //if we execute this immediately after a drop keyspace in 1.2.x, Cassandra is
returning the NFE instead of a BadRequestException
-            //swallow and log, then continue to create the keyspaces.
-            logger.info( "Received a NotFoundException when attempting to describe keyspace.
 It does not exist" );
-        }
-        catch(Exception e){
-            AstayanxUtils.isKeyspaceMissing("Unable to connect to cassandra", e);
-        }
-
-
-        if ( keyspaceDefinition != null ) {
-            return;
-        }
-
+    private void createOrUpdateKeyspace() throws Exception {
 
-        ImmutableMap.Builder<String, Object> strategyOptions = getKeySpaceProps();
+        Session clusterSession = dataStaxCluster.getClusterSession();
 
+        final String createApplicationKeyspace = String.format(
+            "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH replication = %s",
+            cassandraFig.getApplicationKeyspace(),
+            CQLUtils.getFormattedReplication( cassandraFig.getStrategy(), cassandraFig.getStrategyOptions()
)
 
-        ImmutableMap<String, Object> options =
-                ImmutableMap.<String, Object>builder().put( "strategy_class", fig.getStrategyClass()
)
-                            .put( "strategy_options", strategyOptions.build() ).build();
+        );
 
+        final String updateApplicationKeyspace = String.format(
+            "ALTER KEYSPACE \"%s\" WITH replication = %s",
+            cassandraFig.getApplicationKeyspace(),
+            CQLUtils.getFormattedReplication( cassandraFig.getStrategy(), cassandraFig.getStrategyOptions()
)
+        );
 
-        keyspace.createKeyspace( options );
+        logger.info("Creating application keyspace with the following CQL: {}", createApplicationKeyspace);
+        clusterSession.execute(createApplicationKeyspace);
+        logger.info("Updating application keyspace with the following CQL: {}", updateApplicationKeyspace);
+        clusterSession.execute(updateApplicationKeyspace);
 
-        strategyOptions.toString();
+        // this session pool is only used when running database setup so close it when finished
to clear resources
+        clusterSession.close();
 
-        logger.info( "Created keyspace {} with options {}", keyspace.getKeyspaceName(), options.toString()
);
-
-        waitForMigration();
+        waitForSchemaAgreement();
     }
 
 
     /**
-     * Get keyspace properties
+     * Wait until all Cassandra nodes agree on the schema.  Sleeps 100ms between checks.
+     *
      */
-    private ImmutableMap.Builder<String, Object> getKeySpaceProps() {
-        ImmutableMap.Builder<String, Object> keyspaceProps = ImmutableMap.<String,
Object>builder();
-
-        String optionString = fig.getStrategyOptions();
-
-        if(optionString == null){
-            return keyspaceProps;
-        }
-
-
-
-        for ( String key : optionString.split( "," ) ) {
-
-            final String[] options = key.split( ":" );
-
-            keyspaceProps.put( options[0], options[1] );
-        }
-
-        return keyspaceProps;
-    }
-
-
-    private void waitForMigration() throws ConnectionException {
+    private void waitForSchemaAgreement() {
 
         while ( true ) {
 
-            final Map<String, List<String>> versions = keyspace.describeSchemaVersions();
-
-            if ( versions != null && versions.size() == 1 ) {
+            if( dataStaxCluster.getCluster().getMetadata().checkSchemaAgreement() ){
                 return;
             }
 
@@ -208,4 +177,6 @@ public class MigrationManagerImpl implements MigrationManager {
             }
         }
     }
+
+
 }


Mime
View raw message