usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [35/50] [abbrv] usergrid git commit: Simplify and rename Actorsystem configuration properties to be more generic, e.g. starting with "usergrid.cluster" instead of "collection.akka"
Date Mon, 11 Jul 2016 15:35:45 GMT
Simplify and rename Actorsystem configuration properties to be more generic, e.g. starting
with "usergrid.cluster" instead of "collection.akka"


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f0c9fd4b
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f0c9fd4b
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f0c9fd4b

Branch: refs/heads/release-2.1.1
Commit: f0c9fd4bd91a271ee1e9a93a6fa70bf69159f7db
Parents: 2d5ad05
Author: Dave Johnson <snoopdave@apache.org>
Authored: Fri Jul 1 11:09:37 2016 -0400
Committer: Dave Johnson <snoopdave@apache.org>
Committed: Fri Jul 1 11:09:37 2016 -0400

----------------------------------------------------------------------
 .../main/resources/usergrid-default.properties  |  56 ++---
 .../corepersistence/CpEntityManagerFactory.java |   2 +-
 .../persistence/actorsystem/ActorSystemFig.java |  60 ++---
 .../actorsystem/ActorSystemManagerImpl.java     |  53 ++---
 .../mvcc/stage/write/WriteCommit.java           |  10 +-
 .../mvcc/stage/write/WriteUniqueVerify.java     |  10 +-
 .../uniquevalues/UniqueValuesFig.java           |  28 +--
 .../mvcc/stage/delete/MarkCommitTest.java       |   2 +-
 .../mvcc/stage/write/WriteCommitTest.java       |   8 +-
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |   2 +-
 .../org/apache/usergrid/rest/UniqueCatsIT.java  | 237 +++++++++++++++++++
 11 files changed, 344 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index 29b8d36..fe70569 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -410,60 +410,44 @@ usergrid.queue.lock.timeout=5
 #usergrid.queue.publish.queuesize=850000
 
 
-#########################  Akka Actor System Configiuration ###################
+#########################  Usergrid Cluster Configuration ###################
 #
-# Usergrid includes Akka, an Actor-based system that allows for the
-# distribution of work across multiple Usergrid instances and multiple regions.
-#
-# All properties are required. If Akka is enabled then all properties in this
-# section MUST be specified.
-#
-# For more information: https://issues.apache.org/jira/browse/USERGRID-1268
+# Usergrid includes a multi-region clustering system.
+# To user it you must specify your region, the list of regions and seeds for each region.
 #
 
-# Currently, Akka is disable and not required for Usergrid
-collection.akka.enabled=false
+# This is an experimentation new feature, disabled by default
+usergrid.cluster.enabled=false
 
-# host name of this machine
-collection.akka.hostname=localhost
+# Comma-separated list of regions to be considered
+usergrid.cluster.region.list=default
 
-# The region of this Usergrid installation
-# Region MUST be in the region list specified in the 'usergrid.queue.regionList' property
-collection.akka.region=
+# The regions of this local instance of Usergrid
+usergrid.cluster.region.local=default
 
-# Comma-separated lists of Akka seeds each with format {region}:{hostname}:{port}.
-# All regions MUST be listed in the 'usergrid.queue.regionList'
-collection.akka.region.seeds=
-
-# The default authoritative region for when is not specified elsewhere
-# Region MUST be in the region list specified in the 'usergrid.queue.regionList' property
-collection.akka.authoritative.region=
+# Comma-separated lists of cluster seeds each with format {region}:{hostname}
+usergrid.cluster.seeds=default:localhost
 
-# Default number of Akka actors to start per instance / router producer
-collection.akka.instances-per-node=300
+# Port used for cluster communications.
+usergrid.cluster.port=2551
 
 
 #########################  Usergrid Unique Values Validation ##################
 #
-# Usergrid includes a distributed unique values validation that ensure that
-# unique values rename unique across a distributed and multi-region system.
-# This system is based on the Akka actor system and requires some additional
-# configuration.
-#
-# The system uses consistent hashing to ensure that one single-threaded actor
-# ever accesses a unique value record at one time.
-#
-# For more information: https://issues.apache.org/jira/browse/USERGRID-1268
+# These only apply if the above Usergrid cluster system is enabled.
 #
 
 # The number of unique value actors to start on each Usergrid instance.
-collection.akka.uniquevalue.actors=300
+collection.uniquevalues.actors=300
 
 # TTL of unique value reservation in in-memory cache
-collection.akka.uniquevalue.cache.ttl=10
+collection.uniquevalues.cache.ttl=10
 
 # TTL of a unique value reservation when written to Cassandra
-collection.akka.uniquevalue.reservation.ttl=10
+collection.uniquevalues.reservation.ttl=10
+
+# The default authoritative region for when is not specified elsewhere
+collection.uniquevalues.authoritative.region=default
 
 
 ##############################  Usergrid Scheduler  ###########################

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index eca5927..e70a6fd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -142,7 +142,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
 
         logger.info("EntityManagerFactoring starting...");
 
-        if ( actorSystemFig.getAkkaEnabled() ) {
+        if ( actorSystemFig.getEnabled() ) {
             try {
                 logger.info("Akka cluster starting...");
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
index ec010d0..5d7b6aa 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemFig.java
@@ -30,64 +30,54 @@ import java.io.Serializable;
 @FigSingleton
 public interface ActorSystemFig extends GuicyFig, Serializable {
 
-    String AKKA_ENABLED = "collection.akka.enabled";
+    String CLUSTER_ENABLED = "usergrid.cluster.enabled";
 
-    String AKKA_HOSTNAME = "collection.akka.hostname";
+    String CLUSTER_REGIONS_LIST = "usergrid.cluster.region.list";
 
-    String AKKA_REGION = "collection.akka.region";
+    String CLUSTER_REGIONS_LOCAL = "usergrid.cluster.region.local";
 
-    String AKKA_REGION_LIST = "usergrid.queue.regionList"; // same region list used by queues
+    String CLUSTER_SEEDS = "usergrid.cluster.seeds";
 
-    String AKKA_REGION_SEEDS = "collection.akka.region.seeds";
-
-    String AKKA_AUTHORITATIVE_REGION = "collection.akka.authoritative.region";
-
-    String AKKA_INSTANCES_PER_NODE = "collection.akka.instances-per-node";
+    String CLUSTER_PORT = "usergrid.cluster.port";
 
 
     /**
-     * Use Akka or nah
+     * Use Cluster or nah
      */
-    @Key(AKKA_ENABLED)
+    @Key(CLUSTER_ENABLED)
     @Default("true")
-    boolean getAkkaEnabled();
-
-    /**
-     * Hostname to be used in Akka configuration.
-     */
-    @Key(AKKA_HOSTNAME)
-    String getHostname();
+    boolean getEnabled();
 
     /**
      * Local region to be used in Akka configuration.
      */
-    @Key(AKKA_REGION)
-    String getRegion();
+    @Key(CLUSTER_REGIONS_LOCAL)
+    @Default("default")
+    String getRegionLocal();
 
     /**
      * Comma separated list of regions known to cluster.
      */
-    @Key(AKKA_REGION_LIST)
-    String getRegionList();
+    @Key(CLUSTER_REGIONS_LIST)
+    @Default("default")
+    String getRegionsList();
 
     /**
-     * Comma-separated lists of seeds each with format {region}:{hostname}:{port}.
-     * Regions MUST be listed in the 'usergrid.queue.regionList'
+     * Comma-separated lists of seeds each with format {region}:{hostname}
      */
-    @Key(AKKA_REGION_SEEDS)
-    String getRegionSeeds();
+    @Key(CLUSTER_SEEDS)
+    @Default("default:localhost")
+    String getSeeds();
 
     /**
-     * If no region specified for type, use the authoritative region
+     * Port for cluster comms.
      */
-    @Key(AKKA_AUTHORITATIVE_REGION)
-    String getAkkaAuthoritativeRegion();
+    @Key(CLUSTER_PORT)
+    @Default("2551")
+    String getPort();
 
 
-    /**
-     * Number of actor instances to create on each node for each router.
-     */
-    @Key(AKKA_INSTANCES_PER_NODE)
-    @Default("300")
-    int getInstancesPerNode();
+    @Key("usergrid.cluster.hostname")
+    @Default("")
+    String getHostname();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 05f837d..a79f447 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -39,6 +39,8 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
@@ -77,8 +79,17 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
     @Override
     public void start() {
 
-        this.hostname = actorSystemFig.getHostname();
-        this.currentRegion = actorSystemFig.getRegion();
+        if ( !StringUtils.isEmpty( actorSystemFig.getHostname()) ) {
+            this.hostname = actorSystemFig.getHostname();
+        } else {
+            try {
+                this.hostname = InetAddress.getLocalHost().getHostName();
+            } catch (UnknownHostException e) {
+                logger.error("Cannot get hostname, defaulting to 'localhost': " + e.getMessage());
+            }
+        }
+
+        this.currentRegion = actorSystemFig.getRegionLocal();
         this.port = null;
 
         initAkka();
@@ -155,32 +166,22 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
         // Create one actor system with request actor for each region
 
-        if ( StringUtils.isEmpty( hostname )) {
-            throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_HOSTNAME
);
-        }
-
         if ( StringUtils.isEmpty( currentRegion )) {
-            throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION
);
+            throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_REGIONS_LOCAL
);
         }
 
-        if ( StringUtils.isEmpty( actorSystemFig.getRegionList() )) {
-            throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION_LIST
);
+        if ( StringUtils.isEmpty( actorSystemFig.getRegionsList() )) {
+            throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_REGIONS_LIST
);
         }
 
-        if ( StringUtils.isEmpty( actorSystemFig.getRegionSeeds() )) {
-            throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_REGION_SEEDS);
+        if ( StringUtils.isEmpty( actorSystemFig.getSeeds() )) {
+            throw new RuntimeException( "No value specified for " + ActorSystemFig.CLUSTER_SEEDS
);
         }
 
-        if ( StringUtils.isEmpty( actorSystemFig.getAkkaAuthoritativeRegion() )) {
-            logger.warn("No value for {} specified, will use current region as authoriative
region",
-                ActorSystemFig.AKKA_AUTHORITATIVE_REGION);
-            //throw new RuntimeException( "No value specified for " + ActorSystemFig.AKKA_AUTHORITATIVE_REGION);
-        }
-
-        List regionList = Arrays.asList( actorSystemFig.getRegionList().toLowerCase().split(",")
);
+        List regionList = Arrays.asList( actorSystemFig.getRegionsList().toLowerCase().split(",")
);
 
         logger.info("Initializing Akka for hostname {} region {} regionList {} seeds {}",
-            hostname, currentRegion, regionList, actorSystemFig.getRegionSeeds() );
+            hostname, currentRegion, regionList, actorSystemFig.getSeeds() );
 
         Config config = readClusterSystemConfig();
 
@@ -205,7 +206,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
             seedsByRegion = ArrayListMultimap.create();
 
-            String[] regionSeeds = actorSystemFig.getRegionSeeds().split( "," );
+            String[] regionSeeds = actorSystemFig.getSeeds().split( "," );
 
             logger.info( "Found region {} seeds {}", regionSeeds.length, regionSeeds );
 
@@ -226,7 +227,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
                         String[] parts = regionSeed.split( ":" );
                         String region = parts[0];
                         String hostname = parts[1];
-                        String regionPortString = parts[2];
+
+                        String regionPortString = parts.length > 2 ? parts[2] : actorSystemFig.getPort();
 
                         // all seeds in same region must use same port
                         // we assume 0th seed has the right port
@@ -269,7 +271,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
         try {
 
-            int numInstancesPerNode = actorSystemFig.getInstancesPerNode();
+            int numInstancesPerNode = 300; // expect this to be overridden by RouterProducers
 
             String region = currentRegion;
 
@@ -277,11 +279,8 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
             int lastColon = seeds.get(0).lastIndexOf(":") + 1;
             final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon
));
 
-            logger.info( "Akka Config for region {} is:\n" +
-                    "   Hostname {}\n" +
-                    "   Seeds {}\n" +
-                    "   Authoritative Region {}\n",
-                region, hostname, seeds, actorSystemFig.getAkkaAuthoritativeRegion() );
+            logger.info( "Akka Config for region {} is:\n" + "   Hostname {}\n" + "   Seeds
{}\n",
+                region, hostname, seeds );
 
             Map<String, Object> configMap = new HashMap<String, Object>() {{
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 65d1734..5b98ca5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemFig;
 import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException;
+import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesFig;
 import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,6 +68,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>,
Collect
     private static final Logger logger = LoggerFactory.getLogger( WriteCommit.class );
 
     ActorSystemFig actorSystemFig;
+    UniqueValuesFig uniqueValuesFig;
     UniqueValuesService akkaUvService;
 
     @Inject
@@ -82,6 +84,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>,
Collect
                         final MvccEntitySerializationStrategy entryStrat,
                         final UniqueValueSerializationStrategy uniqueValueStrat,
                         final ActorSystemFig actorSystemFig,
+                        final UniqueValuesFig uniqueValuesFig,
                         final UniqueValuesService akkaUvService ) {
 
         Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required"
);
@@ -92,6 +95,7 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>,
Collect
         this.entityStrat = entryStrat;
         this.uniqueValueStrat = uniqueValueStrat;
         this.actorSystemFig = actorSystemFig;
+        this.uniqueValuesFig = uniqueValuesFig;
         this.akkaUvService = akkaUvService;
     }
 
@@ -130,13 +134,13 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>,
Collect
         logMutation.mergeShallow( entityMutation );
 
         // akkaFig may be null when this is called from JUnit tests
-        if ( actorSystemFig != null && actorSystemFig.getAkkaEnabled() ) {
+        if ( actorSystemFig != null && actorSystemFig.getEnabled() ) {
             String region = ioEvent.getRegion();
             if ( region == null ) {
-                region = actorSystemFig.getAkkaAuthoritativeRegion();
+                region = uniqueValuesFig.getAuthoritativeRegion();
             }
             if ( region == null ) {
-                region = actorSystemFig.getRegion();
+                region = actorSystemFig.getRegionLocal();
             }
             confirmUniqueFieldsAkka( mvccEntity, version, applicationScope, region );
         } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index f159096..985137b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -40,6 +40,7 @@ import org.apache.usergrid.persistence.collection.serialization.UniqueValueSeria
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
 import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException;
+import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesFig;
 import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
 import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -63,6 +64,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
     private static final Logger logger = LoggerFactory.getLogger( WriteUniqueVerify.class
);
 
     ActorSystemFig actorSystemFig;
+    UniqueValuesFig uniqueValuesFig;
     UniqueValuesService akkaUvService;
 
     private final UniqueValueSerializationStrategy uniqueValueStrat;
@@ -83,11 +85,13 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
                              final Keyspace keyspace,
                              final CassandraConfig cassandraFig,
                              final ActorSystemFig actorSystemFig,
+                             final UniqueValuesFig uniqueValuesFig,
                              final UniqueValuesService akkaUvService ) {
 
         this.keyspace = keyspace;
         this.cassandraFig = cassandraFig;
         this.actorSystemFig = actorSystemFig;
+        this.uniqueValuesFig = uniqueValuesFig;
         this.akkaUvService = akkaUvService;
 
         Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy
is required" );
@@ -102,7 +106,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
     @Override
     public void call( final CollectionIoEvent<MvccEntity> ioevent ) {
-        if ( actorSystemFig != null && actorSystemFig.getAkkaEnabled() ) {
+        if ( actorSystemFig != null && actorSystemFig.getEnabled() ) {
             verifyUniqueFieldsAkka( ioevent );
         } else {
             verifyUniqueFields( ioevent );
@@ -121,10 +125,10 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
         String region = ioevent.getRegion();
         if ( region == null ) {
-            region = actorSystemFig.getAkkaAuthoritativeRegion();
+            region = uniqueValuesFig.getAuthoritativeRegion();
         }
         if ( region == null ) {
-            region = actorSystemFig.getRegion();
+            region = actorSystemFig.getRegionLocal();
         }
         try {
             akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion(),
region );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
index c99824f..edd0cbe 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesFig.java
@@ -28,40 +28,40 @@ import java.io.Serializable;
 @FigSingleton
 public interface UniqueValuesFig extends GuicyFig, Serializable {
 
-    String AKKA_UNIQUEVALUE_ACTORS = "collection.akka.uniquevalue.actors";
+    String UNIQUEVALUE_ACTORS = "collection.uniquevalues.actors";
 
-    String AKKA_UNIQUEVALUE_CACHE_TTL = "collection.akka.uniquevalue.cache.ttl";
+    String UNIQUEVALUE_CACHE_TTL = "collection.uniquevalues.cache.ttl";
 
-    String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl";
+    String UNIQUEVALUE_RESERVATION_TTL= "collection.uniquevalues.reservation.ttl";
 
-    String AKKA_UNIQUEVALUE_INSTANCES_PER_NODE = "collection.akka.uniquevalue.instances-per-node";
+    String UNIQUEVALUE_AUTHORITATIVE_REGION = "collection.uniquevalues.authoritative.region";
 
 
     /**
-     * Number of UniqueValueActors to be started on each node
-     */
-    @Key(AKKA_UNIQUEVALUE_ACTORS)
-    @Default("300")
-    int getUniqueValueActors();
-
-    /**
      * Unique Value cache TTL in seconds.
      */
-    @Key(AKKA_UNIQUEVALUE_CACHE_TTL)
+    @Key(UNIQUEVALUE_CACHE_TTL)
     @Default("10")
     int getUniqueValueCacheTtl();
 
     /**
      * Unique Value Reservation TTL in seconds.
      */
-    @Key(AKKA_UNIQUEVALUE_RESERVATION_TTL)
+    @Key(UNIQUEVALUE_RESERVATION_TTL)
     @Default("10")
     int getUniqueValueReservationTtl();
 
     /**
      * Number of actor instances to create on each.
      */
-    @Key(AKKA_UNIQUEVALUE_INSTANCES_PER_NODE)
+    @Key(UNIQUEVALUE_ACTORS)
     @Default("300")
     int getUniqueValueInstancesPerNode();
+
+    /**
+     * Primary authoritative region (used if none other specified).
+     */
+    @Key(UNIQUEVALUE_AUTHORITATIVE_REGION)
+    @Default("default")
+    String getAuthoritativeRegion();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
index e7cee21..a0ee6be 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
@@ -71,7 +71,7 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest {
 
 
         //run the stage
-        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy,
null, null );
+        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy,
null, null, null);
 
 
         //verify the observable is correct

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
index 8665ee9..dcc473c 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
@@ -84,10 +84,12 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
 
 
         //run the stage
-        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy,
null, null );
+        WriteCommit newStage =
+            new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null,
null, null );
 
 
-        Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context,
mvccEntityInput ) ).getEvent().getEntity().get();
+        Entity result = newStage.call(
+            new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
 
 
         //verify the log entry is correct
@@ -131,7 +133,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
         when( mvccEntityStrategy.write( any( ApplicationScope.class ), any( MvccEntity.class
) ) )
                 .thenReturn( entityMutation );
 
-        new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null
).call( event );
+        new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null,
null ).call( event );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index 635e262..46cfde1 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@ -96,7 +96,7 @@ public class WriteUniqueVerifyTest extends AbstractUniqueValueTest {
         final MvccEntity mvccEntity = fromEntity( entity );
 
         // run the stage
-        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig,
null, null );
+        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace, cassandraConfig,
null, null, null );
 
        newStage.call( new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f0c9fd4b/stack/rest/src/test/java/org/apache/usergrid/rest/UniqueCatsIT.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/test/java/org/apache/usergrid/rest/UniqueCatsIT.java b/stack/rest/src/test/java/org/apache/usergrid/rest/UniqueCatsIT.java
new file mode 100644
index 0000000..0120660
--- /dev/null
+++ b/stack/rest/src/test/java/org/apache/usergrid/rest/UniqueCatsIT.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.rest;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import org.apache.commons.lang.RandomStringUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.ProcessingException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.ConnectException;
+import java.text.DecimalFormat;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.codahale.metrics.MetricRegistry.name;
+
+
+/**
+ * Tests that Catgrid will not allow creation of entities with duplicate names.
+ *
+ * Intended for use against a production-like cluster, not run during normal JUnit testing.
+ *
+ * Comment out the @Ignore annotation below and edit to add your target hosts.
+ */
+public class UniqueCatsIT {
+    private static final Logger logger = LoggerFactory.getLogger( UniqueCatsIT.class );
+
+    private static final AtomicInteger successCounter = new AtomicInteger( 0 );
+    private static final AtomicInteger errorCounter = new AtomicInteger( 0 );
+    private static final AtomicInteger dupCounter = new AtomicInteger( 0 );
+
+    @Test
+    //@Ignore("Intended for use against  prod-like cluster")
+    public void testDuplicatePrevention() throws Exception {
+
+        int numThreads = 20;
+        int poolSize = 20;
+        int numCats = 100;
+
+        Multimap<String, String> catsCreated = Multimaps.synchronizedMultimap( HashMultimap.create()
);
+        Multimap<String, Map<String, Object>> dupsRejected = Multimaps.synchronizedMultimap(
HashMultimap.create() );
+
+        ExecutorService execService = Executors.newFixedThreadPool( poolSize );
+
+        Client client = ClientBuilder.newClient();
+
+        final MetricRegistry metrics = new MetricRegistry();
+        final Timer responses = metrics.timer(name(UniqueCatsIT.class, "responses"));
+        long startTime = System.currentTimeMillis();
+
+        final AtomicBoolean failed = new AtomicBoolean(false);
+
+        //String[] targetHosts = {"http://localhost:8080"};
+
+        String[] targetHosts = {
+            "https://ug21-west.e2e.apigee.net",
+            "https://ug21-east.e2e.apigee.net"
+        };
+
+        for (int i = 0; i < numCats; i++) {
+
+            if ( failed.get() ) { break; }
+
+            String randomizer = RandomStringUtils.randomAlphanumeric( 8 );
+
+            // multiple threads simultaneously trying to create a cat with the same propertyName
+            for (int j = 0; j < numThreads; j++) {
+
+                if ( failed.get() ) { break; }
+
+                final String name = "uv_test_cat_" + randomizer;
+                final String host = targetHosts[ j % targetHosts.length ];
+
+                execService.submit( () -> {
+
+                    Map<String, Object> form = new HashMap<String, Object>()
{{
+                        put("name", name);
+                    }};
+
+                    Timer.Context time = responses.time();
+                    try {
+                        WebTarget target = client.target( host ).path(
+                            //"/test-organization/test-app/cats" );
+                            "/dmjohnson/sandbox/cats" );
+
+                        //logger.info("Posting cat {} to host {}", catname, host);
+
+                        Response response = target.request()
+                            //.post( Entity.entity( form, MediaType.APPLICATION_FORM_URLENCODED
));
+                            .post( Entity.entity( form, MediaType.APPLICATION_JSON));
+
+                        org.apache.usergrid.rest.test.resource.model.ApiResponse apiResponse
= null;
+                        String responseAsString = "";
+                        if ( response.getStatus() >= 400 ) {
+                            responseAsString = response.readEntity( String.class );
+                        } else {
+                            apiResponse = response.readEntity(
+                                org.apache.usergrid.rest.test.resource.model.ApiResponse.class
);
+                        }
+
+                        if ( response.getStatus() == 200 || response.getStatus() == 201 )
{
+                            catsCreated.put( name, apiResponse.getEntity().getUuid().toString()
);
+                            successCounter.incrementAndGet();
+
+                        } else if ( response.getStatus() == 400
+                                && responseAsString.contains("DuplicateUniquePropertyExistsException"))
{
+                            dupsRejected.put( name, form );
+                            dupCounter.incrementAndGet();
+
+                        } else {
+                            logger.error("Cat creation failed status {} message {}",
+                                response.getStatus(), responseAsString );
+                            errorCounter.incrementAndGet();
+                        }
+
+                    } catch ( ProcessingException e ) {
+                        errorCounter.incrementAndGet();
+                        if ( e.getCause() instanceof ConnectException ) {
+                            logger.error("Error connecting to " + host);
+                        } else {
+                            logger.error( "Error", e );
+                        }
+
+                    } catch ( Exception e ) {
+                        errorCounter.incrementAndGet();
+                        logger.error("Error", e);
+                    }
+                    time.stop();
+
+                } );
+            }
+        }
+        execService.shutdown();
+
+        try {
+            while (!execService.awaitTermination( 60, TimeUnit.SECONDS )) {
+                System.out.println( "Waiting..." );
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+
+        long endTime = System.currentTimeMillis();
+
+        logger.info( "Total time {}s", (endTime - startTime) / 1000 );
+
+        DecimalFormat format = new DecimalFormat("##.###");
+
+        logger.info( "Timed {} requests:\n" +
+                        "mean rate {}/s\n" +
+                        "min       {}s\n" +
+                        "max       {}s\n" +
+                        "mean      {}s",
+                responses.getCount(),
+                format.format( responses.getMeanRate() ),
+                format.format( (double)responses.getSnapshot().getMin()  / 1000000000 ),
+                format.format( (double)responses.getSnapshot().getMax()  / 1000000000 ),
+                format.format( responses.getSnapshot().getMean() / 1000000000 )
+        );
+
+        logger.info( "Error count {} ratio = {}",
+                errorCounter.get(), (float) errorCounter.get() / (float) responses.getCount()
);
+
+        logger.info( "Success count = {}", successCounter.get() );
+
+        logger.info( "Rejected dup count = {}", dupCounter.get() );
+
+//        for ( String catname : catsCreated.keys() ) {
+//            System.out.println( catname );
+//            Collection<Cat> cats = catsCreated.get( catname );
+//            for ( Cat cat : cats ) {
+//                System.out.println("   " + cat.getUuid() );
+//            }
+//        }
+
+//        int count = 0;
+//        for ( String catname : dupsRejected.keySet() ) {
+//            System.out.println( catname );
+//            Collection<Cat> cats = dupsRejected.get( catname );
+//            for ( Cat cat : cats ) {
+//                System.out.println("   " + (count++) + " rejected " + cat.getCatname()
+ ":" + cat.getUuid() );
+//            }
+//        }
+
+        int catCount = 0;
+        int catnamesWithDuplicates = 0;
+        for ( String name : catsCreated.keySet() ) {
+            //Collection<Map<String, String>> forms =
+            Collection<String> forms = catsCreated.get( name );
+            if ( forms.size() > 1 ) {
+                catnamesWithDuplicates++;
+                logger.info("Duplicate " + name);
+            }
+            catCount++;
+        }
+        Assert.assertEquals( 0, catnamesWithDuplicates );
+        Assert.assertEquals( 0, errorCounter.get() );
+        Assert.assertEquals( numCats, successCounter.get() );
+        Assert.assertEquals( numCats, catCount );
+
+
+    }
+
+}


Mime
View raw message