usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [02/18] usergrid git commit: Fixes to get Akka unique values code running and working in a JUnit test.
Date Mon, 02 May 2016 21:42:14 GMT
Fixes to get Akka unique values code running and working in a JUnit test.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/eeb8a60a
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/eeb8a60a
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/eeb8a60a

Branch: refs/heads/usergrid-1268-akka-211
Commit: eeb8a60a6a37eed3254583e525e88ef29884da39
Parents: 52ee2fb
Author: Dave Johnson <snoopdave@apache.org>
Authored: Wed Apr 13 11:42:53 2016 -0400
Committer: Dave Johnson <snoopdave@apache.org>
Committed: Mon Apr 25 14:34:15 2016 -0400

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |   6 +-
 .../mvcc/stage/write/WriteCommit.java           |  49 +++++++-
 .../mvcc/stage/write/WriteUniqueVerify.java     |  51 +++++++-
 .../collection/uniquevalues/AkkaFig.java        |  46 ++++---
 .../uniquevalues/ClusterSingletonRouter.java    |   6 +-
 .../uniquevalues/ReservationCache.java          |   4 +-
 .../uniquevalues/UniqueValueActor.java          | 122 +++++++++++--------
 .../uniquevalues/UniqueValueException.java      |  28 ++++-
 .../uniquevalues/UniqueValuesService.java       |   7 +-
 .../uniquevalues/UniqueValuesServiceImpl.java   | 107 ++++++++--------
 .../uniquevalues/UniqueValuesTable.java         |  17 ++-
 .../uniquevalues/UniqueValuesTableImpl.java     |  47 ++++++-
 .../mvcc/stage/delete/MarkCommitTest.java       |   7 +-
 .../mvcc/stage/write/WriteCommitTest.java       |   4 +-
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |   2 +-
 .../uniquevalues/LocalPreventDupsTest.java      |  18 ++-
 .../src/test/resources/usergrid.properties      |  13 --
 17 files changed, 358 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 3d794d1..45e519e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -20,9 +20,7 @@ package org.apache.usergrid.persistence.collection.guice;
 
 import java.util.concurrent.ThreadPoolExecutor;
 
-import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig;
-import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
-import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesServiceImpl;
+import org.apache.usergrid.persistence.collection.uniquevalues.*;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
@@ -72,6 +70,8 @@ public abstract class CollectionModule extends AbstractModule {
 
         bind( UniqueValuesService.class ).to( UniqueValuesServiceImpl.class );
 
+        bind( UniqueValuesTable.class ).to( UniqueValuesTableImpl.class );
+
         bind( ChangeLogGenerator.class).to( ChangeLogGeneratorImpl.class);
 
         configureMigrationProvider();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
index 7eb96e7..360e954 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommit.java
@@ -18,8 +18,14 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
 
+import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
+import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig;
+import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException;
+import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +66,9 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
 
     private static final Logger logger = LoggerFactory.getLogger( WriteCommit.class );
 
+    AkkaFig akkaFig;
+    UniqueValuesService akkaUvService;
+
     @Inject
     private UniqueValueSerializationStrategy uniqueValueStrat;
 
@@ -71,7 +80,9 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
     @Inject
     public WriteCommit( final MvccLogEntrySerializationStrategy logStrat,
                         final MvccEntitySerializationStrategy entryStrat,
-                        final UniqueValueSerializationStrategy uniqueValueStrat) {
+                        final UniqueValueSerializationStrategy uniqueValueStrat,
+                        final AkkaFig akkaFig,
+                        final UniqueValuesService akkaUvService ) {
 
         Preconditions.checkNotNull( logStrat, "MvccLogEntrySerializationStrategy is required" );
         Preconditions.checkNotNull( entryStrat, "MvccEntitySerializationStrategy is required" );
@@ -80,12 +91,44 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
         this.logEntryStrat = logStrat;
         this.entityStrat = entryStrat;
         this.uniqueValueStrat = uniqueValueStrat;
+        this.akkaFig = akkaFig;
+        this.akkaUvService = akkaUvService;
     }
 
 
     @Override
     public CollectionIoEvent<MvccEntity> call( final CollectionIoEvent<MvccEntity> ioEvent ) {
+        if ( akkaFig.getAkkaEnabled() ) {
+            return confirmUniqueFieldsAkka( ioEvent );
+        }
+        return confirmUniqueFields( ioEvent );
+    }
+
+    private CollectionIoEvent<MvccEntity> confirmUniqueFieldsAkka(CollectionIoEvent<MvccEntity> ioEvent) {
+
+        final MvccEntity mvccEntity = ioEvent.getEvent();
+        MvccValidationUtils.verifyMvccEntityWithEntity( mvccEntity );
+
+        final Id entityId = mvccEntity.getId();
+        final UUID version = mvccEntity.getVersion();
+        final ApplicationScope applicationScope = ioEvent.getEntityCollection();
 
+        //set the version into the entity
+        final Entity entity = mvccEntity.getEntity().get();
+
+        try {
+            akkaUvService.confirmUniqueValues( applicationScope, entity, mvccEntity.getVersion() );
+
+        } catch (UniqueValueException e) {
+            Map<String, Field> violations = new HashMap<>();
+            violations.put( e.getField().getName(), e.getField() );
+            throw new WriteUniqueVerifyException( mvccEntity, applicationScope, violations  );
+        }
+
+        return ioEvent;
+    }
+
+    private CollectionIoEvent<MvccEntity> confirmUniqueFields(CollectionIoEvent<MvccEntity> ioEvent) {
         final MvccEntity mvccEntity = ioEvent.getEvent();
         MvccValidationUtils.verifyMvccEntityWithEntity( mvccEntity );
 
@@ -101,7 +144,8 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
         MvccValidationUtils.verifyMvccEntityWithEntity( ioEvent.getEvent() );
         ValidationUtils.verifyTimeUuid( version ,"version" );
 
-        final MvccLogEntry startEntry = new MvccLogEntryImpl( entityId, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE );
+        final MvccLogEntry startEntry =
+            new MvccLogEntryImpl( entityId, version, Stage.COMMITTED, MvccLogEntry.State.COMPLETE );
 
         MutationBatch logMutation = logEntryStrat.write( applicationScope, startEntry );
 
@@ -134,7 +178,6 @@ public class WriteCommit implements Func1<CollectionIoEvent<MvccEntity>, Collect
                 "Failed to execute write asynchronously ", e );
         }
 
-
         return ioEvent;
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index d05f838..a37abf1 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -23,6 +23,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.usergrid.persistence.collection.uniquevalues.AkkaFig;
+import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValueException;
+import org.apache.usergrid.persistence.collection.uniquevalues.UniqueValuesService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +67,9 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
     private static final Logger logger = LoggerFactory.getLogger( WriteUniqueVerify.class );
 
+    AkkaFig akkaFig;
+    UniqueValuesService akkaUvService;
+
     private final UniqueValueSerializationStrategy uniqueValueStrat;
 
     public static int uniqueVerifyPoolSize = 100;
@@ -75,10 +81,17 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
 
     @Inject
-    public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
-                              final SerializationFig serializationFig, final Keyspace keyspace, final CassandraConfig cassandraFig ) {
+    public WriteUniqueVerify(final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
+                             final SerializationFig serializationFig,
+                             final Keyspace keyspace,
+                             final CassandraConfig cassandraFig,
+                             final AkkaFig akkaFig,
+                             final UniqueValuesService akkaUvService ) {
+
         this.keyspace = keyspace;
         this.cassandraFig = cassandraFig;
+        this.akkaFig = akkaFig;
+        this.akkaUvService = akkaUvService;
 
         Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy is required" );
         Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
@@ -92,6 +105,34 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
 
     @Override
     public void call( final CollectionIoEvent<MvccEntity> ioevent ) {
+        if ( akkaFig.getAkkaEnabled() ) {
+            verifyUniqueFieldsAkka( ioevent );
+        } else {
+            verifyUniqueFields( ioevent );
+        }
+    }
+
+    private void verifyUniqueFieldsAkka(CollectionIoEvent<MvccEntity> ioevent) {
+
+        MvccValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() );
+
+        final MvccEntity mvccEntity = ioevent.getEvent();
+
+        final Entity entity = mvccEntity.getEntity().get();
+
+        final ApplicationScope applicationScope = ioevent.getEntityCollection();
+
+        try {
+            akkaUvService.reserveUniqueValues( applicationScope, entity, mvccEntity.getVersion() );
+
+        } catch (UniqueValueException e) {
+            Map<String, Field> violations = new HashMap<>();
+            violations.put( e.getField().getName(), e.getField() );
+            throw new WriteUniqueVerifyException( mvccEntity, applicationScope, violations  );
+        }
+    }
+
+    private void verifyUniqueFields(CollectionIoEvent<MvccEntity> ioevent) {
 
         MvccValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() );
 
@@ -139,9 +180,10 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
         }
 
         // use simple thread pool to verify fields in parallel
-        ConsistentReplayCommand cmd = new ConsistentReplayCommand(uniqueValueStrat,cassandraFig,scope, entity.getId().getType(), uniqueFields,entity);
+        ConsistentReplayCommand cmd = new ConsistentReplayCommand(
+            uniqueValueStrat,cassandraFig,scope, entity.getId().getType(), uniqueFields,entity);
 
-        Map<String,Field>  uniquenessViolations = cmd.execute();
+        Map<String,Field> uniquenessViolations = cmd.execute();
 
         //do we want to do this?
 
@@ -151,6 +193,7 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
         }
     }
 
+
     private static class ConsistentReplayCommand extends HystrixCommand<Map<String,Field>>{
 
         private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
index 3bb9fcf..67db3e9 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/AkkaFig.java
@@ -27,22 +27,33 @@ import org.safehaus.guicyfig.Key;
 @FigSingleton
 public interface AkkaFig extends GuicyFig {
 
+    String AKKA_ENABLED = "collection.akka.enabled";
+
     String AKKA_HOSTNAME = "collection.akka.hostname";
 
     String AKKA_PORT = "collection.akka.port";
 
     String AKKA_REGION = "collection.akka.region";
 
-    String AKKA_REGIONS = "collection.akka.regions";
+    String AKKA_REGION_SEEDS = "collection.akka.region.seeds";
 
-    String AKKA_UNIQUE_VALUE_ACTORS = "collection.akka.unique.value.actors";
+    String AKKA_UNIQUEVALUE_ACTORS = "collection.akka.uniquevalue.actors";
 
-    String AKKA_REGION_SEEDS = "collection.akka.region.seeds";
+    String AKKA_UNIQUEVALUE_CACHE_TTL = "collection.akka.uniquevalue.cache.ttl";
 
-    String AKKA_REGION_TYPES = "collection.akka.region.types";
+    String AKKA_UNIQUEVALUE_RESERVATION_TTL= "collection.akka.uniquevalue.reservation.ttl";
+
+    String AKKA_UNIQUEVALUE_REGION_TYPES = "collection.akka.uniquevalue.region.types";
 
 
     /**
+     * Use Akka or nah
+     */
+    @Key(AKKA_ENABLED)
+    @Default("true")
+    boolean getAkkaEnabled();
+
+    /**
      * Hostname to be used in Akka configuration.
      */
     @Key(AKKA_HOSTNAME)
@@ -64,16 +75,9 @@ public interface AkkaFig extends GuicyFig {
     String getRegion();
 
     /**
-     * Comma-separated list of all regions to be used in Akka configuration.
-     */
-    @Key(AKKA_REGIONS)
-    @Default("us-east")
-    String getRegions();
-
-    /**
      * Number of UniqueValueActors to be started on each node
      */
-    @Key(AKKA_UNIQUE_VALUE_ACTORS)
+    @Key(AKKA_UNIQUEVALUE_ACTORS)
     @Default("300")
     int getUniqueValueActors();
 
@@ -89,7 +93,21 @@ public interface AkkaFig extends GuicyFig {
      * Comma-separated lists of region types each with format {region}:{type}
      */
     // TODO: allow this to be set via REST API
-    @Key(AKKA_REGION_TYPES)
-    @Default("")
+    @Key(AKKA_UNIQUEVALUE_REGION_TYPES)
+    @Default("us-east:user")
     String getRegionTypes();
+
+    /**
+     * Unique Value cache TTL in seconds.
+     */
+    @Key(AKKA_UNIQUEVALUE_CACHE_TTL)
+    @Default("5")
+    int getUniqueValueCacheTtl();
+
+    /**
+     * Unique Value Reservation TTL in seconds.
+     */
+    @Key(AKKA_UNIQUEVALUE_RESERVATION_TTL)
+    @Default("5")
+    int getUniqueValueReservationTtl();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java
index 8cd0ab0..d9c1aa4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ClusterSingletonRouter.java
@@ -15,9 +15,9 @@ public class ClusterSingletonRouter extends UntypedActor {
     private final ActorRef router;
 
 
-    public ClusterSingletonRouter( String injectorName ) {
+    public ClusterSingletonRouter( UniqueValuesTable table ) {
         router = getContext().actorOf(
-                FromConfig.getInstance().props(Props.create(UniqueValueActor.class, injectorName )), "router");
+                FromConfig.getInstance().props(Props.create(UniqueValueActor.class, table )), "router");
     }
 
     @Override
@@ -27,7 +27,7 @@ public class ClusterSingletonRouter extends UntypedActor {
             UniqueValueActor.Request request = (UniqueValueActor.Request)message;
 
             ConsistentHashingRouter.ConsistentHashableEnvelope envelope =
-                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, request.getRowKey() );
+                    new ConsistentHashingRouter.ConsistentHashableEnvelope( message, request.getConsistentHashKey() );
             router.tell( envelope, getSender());
 
         } else {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
index d5c67c3..c0911b0 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/ReservationCache.java
@@ -35,11 +35,11 @@ public class ReservationCache {
     }
 
     public void cacheReservation( UniqueValueActor.Reservation reservation ) {
-        cache.put( reservation.getRowKey(), reservation );
+        cache.put( reservation.getConsistentHashKey(), reservation );
     }
 
     public void cancelReservation( UniqueValueActor.Cancellation cancellation ) {
-        cache.invalidate( cancellation.getRowKey() );
+        cache.invalidate( cancellation.getConsistentHashKey() );
     }
 
     public CacheStats getStats() {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
index faf0433..ce1d72f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
@@ -5,6 +5,9 @@ import akka.actor.UntypedActor;
 import akka.cluster.pubsub.DistributedPubSub;
 import akka.cluster.pubsub.DistributedPubSubMediator;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -18,21 +21,16 @@ public class UniqueValueActor extends UntypedActor {
 
     //private MetricsService metricsService;
 
-    private UniqueValuesTable table = new UniqueValuesTableImpl();
+    final private UniqueValuesTable table;
 
-    private ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
+    final private ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
 
     private int count = 0;
 
 
-    public UniqueValueActor( String injectorName ) {
+    public UniqueValueActor( UniqueValuesTable table ) {
+        this.table = table;
 
-//        UniqueValuesService uniqueValuesService =
-//                GuiceModule.getInjector( injectorName ).getInstance( UniqueValuesService.class );
-//
-//        terminateOnError = Boolean.parseBoolean( uniqueValuesService.getProperties()
-//                .getProperty( "akka.unique-value-actor-terminate-on-error", "false" ) );
-//
 //        chaos = Boolean.parseBoolean( uniqueValuesService.getProperties()
 //                .getProperty( "akka.test.chaos", "false" ) );
 
@@ -58,20 +56,20 @@ public class UniqueValueActor extends UntypedActor {
 //            final Timer.Context context = metricsService.getReservationTimer().time();
 
             try {
-                UUID owner = table.lookupOwner( res.getType(), res.getPropertyName(), res.getPropertyValue() );
+                Id owner = table.lookupOwner( res.getApplicationScope(), res.getOwner().getType(), res.getField() );
 
-                if ( owner != null && owner.equals( res.getUuid() )) {
+                if ( owner != null && owner.equals( res.getOwner() )) {
                     // sender already owns this unique value
                     getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() );
                     return;
 
-                } else if ( owner != null && !owner.equals( res.getUuid() )) {
+                } else if ( owner != null && !owner.equals( res.getOwner() )) {
                     // tell sender value is not unique
                     getSender().tell( new Response( Response.Status.NOT_UNIQUE ), getSender() );
                     return;
                 }
 
-                table.reserve( res.getUuid(), res.getType(), res.getPropertyName(), res.getPropertyValue() );
+                table.reserve( res.getApplicationScope(), res.getOwner(), res.getOwnerVersion(), res.getField() );
 
                 getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() );
 
@@ -89,14 +87,14 @@ public class UniqueValueActor extends UntypedActor {
             }
 
         } else if ( message instanceof Confirmation) {
-            Confirmation commit = (Confirmation) message;
+            Confirmation con = (Confirmation) message;
 
 //            final Timer.Context context = metricsService.getCommitmentTimer().time();
 
             try {
-                UUID owner = table.lookupOwner(  commit.getType(), commit.getPropertyName(), commit.getPropertyValue() );
+                Id owner = table.lookupOwner( con.getApplicationScope(), con.getOwner().getType(), con.getField() );
 
-                if ( owner != null && !owner.equals( commit.getUuid() )) {
+                if ( owner != null && !owner.equals( con.getOwner() )) {
                     // cannot reserve, somebody else owns the unique value
                     getSender().tell( new Response( Response.Status.NOT_UNIQUE ), getSender() );
                     return;
@@ -107,12 +105,12 @@ public class UniqueValueActor extends UntypedActor {
                     return;
                 }
 
-                table.commit( commit.getUuid(), commit.getType(), commit.getPropertyName(), commit.getPropertyValue() );
+                table.confirm( con.getApplicationScope(), con.getOwner(), con.getOwnerVersion(), con.getField() );
 
                 getSender().tell( new Response( Response.Status.IS_UNIQUE ), getSender() );
 
                 mediator.tell( new DistributedPubSubMediator.Publish( "content",
-                        new Reservation( commit ) ), getSelf() );
+                        new Reservation( con ) ), getSelf() );
 
             } catch (Throwable t) {
                 getSender().tell( new Response( Response.Status.ERROR ), getSender() );
@@ -127,9 +125,9 @@ public class UniqueValueActor extends UntypedActor {
             Cancellation can = (Cancellation) message;
 
             try {
-                UUID owner = table.lookupOwner(  can.getType(), can.getPropertyName(), can.getPropertyValue() );
+                Id owner = table.lookupOwner( can.getApplicationScope(), can.getOwner().getType(), can.getField() );
 
-                if ( owner != null && !owner.equals( can.getUuid() )) {
+                if ( owner != null && !owner.equals( can.getOwner() )) {
                     // cannot cancel, somebody else owns the unique value
                     getSender().tell( new Response( Response.Status.NOT_UNIQUE ), getSender() );
                     return;
@@ -140,7 +138,7 @@ public class UniqueValueActor extends UntypedActor {
                     return;
                 }
 
-                table.cancel( can.getType(), can.getPropertyName(), can.getPropertyValue() );
+                table.confirm( can.getApplicationScope(), can.getOwner(), can.getOwnerVersion(), can.getField() );
 
                 getSender().tell( new Response( Response.Status.SUCCESS ), getSender() );
 
@@ -162,41 +160,57 @@ public class UniqueValueActor extends UntypedActor {
      * UniqueValue actor receives and processes Requests.
      */
     public abstract static class Request implements Serializable {
-        final UUID uuid;
-        final String type;
-        final String propertyName;
-        final String propertyValue;
-        final String rowKey;
-
-        public Request(UUID uuid, String type, String propertyName, String value) {
-            this.uuid = uuid;
-            this.type = type;
-            this.propertyName = propertyName;
-            this.propertyValue = value;
-            this.rowKey = getType() + ":" + getPropertyName() + ":" + getPropertyValue();
+        final ApplicationScope applicationScope;
+        final Id owner;
+        final UUID ownerVersion;
+        final Field field;
+        final String consistentHashKey;
+
+        public Request( ApplicationScope applicationScope, Id owner, UUID ownerVersion, Field field ) {
+            this.applicationScope = applicationScope;
+            this.owner = owner;
+            this.ownerVersion = ownerVersion;
+            this.field = field;
+            StringBuilder sb = new StringBuilder();
+            sb.append( applicationScope.getApplication() );
+            sb.append(":");
+            sb.append( owner.getType() );
+            sb.append(":");
+            sb.append( field.getName() );
+            sb.append(":");
+            sb.append( field.getValue().toString() );
+            this.consistentHashKey = sb.toString();
         }
         public Request( Request req ) {
-            this.uuid = req.uuid;
-            this.type = req.type;
-            this.propertyName = req.propertyName;
-            this.propertyValue = req.propertyValue;
-            this.rowKey = getType() + ":" + getPropertyName() + ":" + getPropertyValue();
+            this.applicationScope = req.applicationScope;
+            this.owner = req.owner;
+            this.ownerVersion = req.ownerVersion;
+            this.field = req.field;
+            StringBuilder sb = new StringBuilder();
+            sb.append( req.applicationScope.getApplication() );
+            sb.append(":");
+            sb.append( req.owner.getType() );
+            sb.append(":");
+            sb.append( req.field.getName() );
+            sb.append(":");
+            sb.append( req.field.getValue().toString() );
+            this.consistentHashKey = sb.toString();
 
         }
-        public String getRowKey() {
-            return rowKey;
+        public ApplicationScope getApplicationScope() {
+            return applicationScope;
         }
-        public UUID getUuid() {
-            return uuid;
+        public Id getOwner() {
+            return owner;
         }
-        public String getType() {
-            return type;
+        public Field getField() {
+            return field;
         }
-        public String getPropertyName() {
-            return propertyName;
+        public String getConsistentHashKey() {
+            return consistentHashKey;
         }
-        public String getPropertyValue() {
-            return propertyValue;
+        public UUID getOwnerVersion() {
+            return ownerVersion;
         }
     }
 
@@ -219,8 +233,8 @@ public class UniqueValueActor extends UntypedActor {
         public Reservation( Request req ) {
             super( req );
         }
-        public Reservation(UUID uuid, String type, String username, String value) {
-            super( uuid, type, username, value );
+        public Reservation( ApplicationScope applicationScope, Id owner, UUID ownerVersion, Field field) {
+            super( applicationScope, owner, ownerVersion, field );
         }
     }
 
@@ -228,8 +242,8 @@ public class UniqueValueActor extends UntypedActor {
         public Cancellation( Request req ) {
             super( req );
         }
-        public Cancellation(UUID uuid, String type, String username, String value) {
-            super( uuid, type, username, value );
+        public Cancellation( ApplicationScope applicationScope, Id owner, UUID ownerVersion, Field field) {
+            super( applicationScope, owner, ownerVersion, field );
         }
     }
 
@@ -237,8 +251,8 @@ public class UniqueValueActor extends UntypedActor {
         public Confirmation(Request req ) {
             super( req );
         }
-        public Confirmation(UUID uuid, String type, String username, String value) {
-            super( uuid, type, username, value );
+        public Confirmation( ApplicationScope applicationScope, Id owner, UUID ownerVersion, Field field) {
+            super( applicationScope, owner, ownerVersion, field );
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java
index 5df8237..5ecfb68 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueException.java
@@ -1,7 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
 package org.apache.usergrid.persistence.collection.uniquevalues;
 
+import org.apache.usergrid.persistence.model.field.Field;
+
 public class UniqueValueException extends Exception {
-    public UniqueValueException(String message) {
+    final Field field;
+
+    public UniqueValueException(String message, Field field ) {
         super( message );
+        this.field = field;
+    }
+
+    public Field getField() {
+        return field;
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java
index 2219df6..7ebab15 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesService.java
@@ -19,8 +19,11 @@
 package org.apache.usergrid.persistence.collection.uniquevalues;
 
 
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
+import java.util.UUID;
+
 /**
  * Service that reserves and confirms unique values.
  */
@@ -30,12 +33,12 @@ public interface UniqueValuesService {
      * Check that unique values are unique and reserve them for a limited time.
      * If the reservations are not confirmed, they will expire.
      */
-    void reserveUniqueValues( Entity entity ) throws UniqueValueException;
+    void reserveUniqueValues( ApplicationScope scope, Entity entity, UUID version ) throws UniqueValueException;
 
     /**
      * Confirm unique values that were reserved earlier.
      */
-    void confirmUniqueValues( Entity entity ) throws UniqueValueException;
+    void confirmUniqueValues( ApplicationScope scope, Entity entity, UUID version ) throws UniqueValueException;
 
     /**
      * For test purposes only.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 8897091..45909b8 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -31,9 +31,11 @@ import akka.util.Timeout;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.field.Field;
 import org.slf4j.Logger;
@@ -45,12 +47,17 @@ import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 
+@Singleton
 public class UniqueValuesServiceImpl implements UniqueValuesService {
     private static final Logger logger = LoggerFactory.getLogger( UniqueValuesServiceImpl.class );
 
     @Inject
     AkkaFig akkaFig;
 
+    @Inject
+    UniqueValuesTable table;
+
+
     private String hostname;
     private Integer port;
     private String currentRegion;
@@ -92,6 +99,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
         waitForRequestActors();
     }
 
+
     /**
      * For testing purposes only; does not wait for request actors to start.
      */
@@ -103,10 +111,12 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
         initAkka();
     }
 
+
     private Map<String, ActorRef> getRequestActorsByRegion() {
         return requestActorsByRegion;
     }
 
+
     private Map<String, String> getRegionsByType() {
         return regionsByType;
     }
@@ -148,24 +158,13 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
             throw new RuntimeException( "No value specified for akka.region");
         }
 
-        String regionsValue = akkaFig.getRegions();
-        if ( StringUtils.isEmpty( regionsValue )) {
-            throw new RuntimeException( "No value specified for akka.regions");
-        }
-
-        String[] regions = regionsValue.split( "," );
-        for ( String region : regions ) {
-
-            akkaFig.getKeyByMethod( "" );
-
-            String typesValue = akkaFig.getRegionTypes();
-            String[] regionTypes = StringUtils.isEmpty( typesValue ) ? new String[0] : typesValue.split(",");
-            for ( String regionType : regionTypes ) {
-                String[] parts = regionType.split(":");
-                String typeRegion = parts[0];
-                String type = parts[1];
-                this.regionsByType.put( type, typeRegion );
-            }
+        String typesValue = akkaFig.getRegionTypes();
+        String[] regionTypes = StringUtils.isEmpty( typesValue ) ? new String[0] : typesValue.split(",");
+        for ( String regionType : regionTypes ) {
+            String[] parts = regionType.split(":");
+            String typeRegion = parts[0];
+            String type = parts[1];
+            this.regionsByType.put( type, typeRegion );
         }
 
         final Map<String, ActorSystem> systemMap = new HashMap<>();
@@ -177,6 +176,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
         subscribeToReservations( localSystem, systemMap );
     }
 
+
     private void subscribeToReservations( ActorSystem localSystem, Map<String, ActorSystem> systemMap ) {
 
         for ( String region : systemMap.keySet() ) {
@@ -188,6 +188,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
         }
     }
 
+
     /**
      * Create ActorSystem and ClusterSingletonProxy for every region.
      * Create ClusterSingletonManager for the current region.
@@ -217,7 +218,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
                 ClusterSingletonManagerSettings settings =
                         ClusterSingletonManagerSettings.create( system ).withRole("io");
                 system.actorOf( ClusterSingletonManager.props(
-                        Props.create( ClusterSingletonRouter.class, region ),
+                        Props.create( ClusterSingletonRouter.class, table ),
                         PoisonPill.getInstance(), settings ), "uvRouter");
             }
 
@@ -230,6 +231,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
         return localSystem;
     }
 
+
     /**
      * Create RequestActor for each region.
      *
@@ -249,6 +251,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
         }
     }
 
+
     public void waitForRequestActors() {
 
         for ( String region : requestActorsByRegion.keySet() ) {
@@ -257,6 +260,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
         }
     }
 
+
     private void waitForRequestActor( ActorRef ra ) {
 
         logger.info( "Waiting on request actor {}...", ra.path() );
@@ -339,7 +343,8 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
                 }
 
                 if (seedsByRegion.keySet().isEmpty()) {
-                    throw new RuntimeException( "No seeds listed in 'parsing collection.akka.region.seeds' property." );
+                    throw new RuntimeException(
+                        "No seeds listed in 'parsing collection.akka.region.seeds' property." );
                 }
             }
 
@@ -413,14 +418,13 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
     }
 
 
-
     @Override
-    public void reserveUniqueValues(Entity entity) throws UniqueValueException {
+    public void reserveUniqueValues( ApplicationScope scope, Entity entity, UUID version) throws UniqueValueException {
 
         try {
             for (Field field : entity.getFields()) {
                 if (field.isUnique()) {
-                    reserveUniqueField( entity, field.getName(), field.getValue().toString() );
+                    reserveUniqueField( scope, entity, version, field );
                 }
             }
 
@@ -428,7 +432,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
 
             for (Field field : entity.getFields()) {
                 try {
-                    cancelUniqueField( entity, field.getName(), field.getValue().toString() );
+                    cancelUniqueField( scope, entity, version, field );
                 } catch (Throwable ignored) {
                     logger.debug( "Error canceling unique field", ignored );
                 }
@@ -440,12 +444,12 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
 
 
     @Override
-    public void confirmUniqueValues(Entity entity) throws UniqueValueException {
+    public void confirmUniqueValues( ApplicationScope scope, Entity entity, UUID version ) throws UniqueValueException {
 
         try {
             for (Field field : entity.getFields()) {
                 if (field.isUnique()) {
-                    confirmUniqueField( entity, field.getName(), field.getValue().toString() );
+                    confirmUniqueField( scope, entity, version, field );
                 }
             }
 
@@ -453,7 +457,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
 
             for (Field field : entity.getFields()) {
                 try {
-                    cancelUniqueField( entity, field.getName(), field.getValue().toString() );
+                    cancelUniqueField( scope, entity, version, field );
                 } catch (Throwable ignored) {
                     logger.debug( "Error canceling unique field", ignored );
                 }
@@ -465,9 +469,9 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
 
 
     private void reserveUniqueField(
-        Entity entity, String propertyName, String propertyValue ) throws UniqueValueException {
+        ApplicationScope scope, Entity entity, UUID version, Field field ) throws UniqueValueException {
 
-        String region = getRegionsByType().get("user");
+        String region = getRegionsByType().get( entity.getId().getType() );
         ActorRef requestActor = getRequestActorsByRegion().get(region);
 
         if ( requestActor == null ) {
@@ -475,23 +479,24 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
         }
 
         UniqueValueActor.Request request = new UniqueValueActor.Reservation(
-            entity.getId().getUuid(), "user",  propertyName, propertyValue );
+            scope, entity.getId(), version, field );
 
-        UniqueValueActor.Reservation res = reservationCache.get( request.getRowKey() );
+        UniqueValueActor.Reservation res = reservationCache.get( request.getConsistentHashKey() );
 //        if ( res != null ) {
 //            getCacheCounter().inc();
 //        }
-        if ( res != null && !res.getUuid().equals( request.getUuid() )) {
-            throw new UniqueValueException( "Error property not unique (cache)" );
+        if ( res != null && !res.getOwner().equals( request.getOwner() )) {
+            throw new UniqueValueException( "Error property not unique (cache)", field);
         }
 
-        sendUniqueValueRequest( entity, requestActor, request );
+        sendUniqueValueRequest( scope, entity, field, requestActor, request );
     }
 
+
     private void confirmUniqueField(
-        Entity entity, String propertyName, String propertyValue ) throws UniqueValueException {
+        ApplicationScope scope, Entity entity, UUID version, Field field ) throws UniqueValueException {
 
-        String region = getRegionsByType().get("user");
+        String region = getRegionsByType().get( entity.getId().getType() );
         ActorRef requestActor = getRequestActorsByRegion().get(region);
 
         if ( requestActor == null ) {
@@ -499,26 +504,27 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
         }
 
         UniqueValueActor.Confirmation request = new UniqueValueActor.Confirmation(
-                entity.getId().getUuid(), "user",  propertyName, propertyValue );
+            scope, entity.getId(), version, field );
 
-        sendUniqueValueRequest( entity, requestActor, request );
+        sendUniqueValueRequest( scope, entity, field, requestActor, request );
     }
 
-    private void cancelUniqueField(
-        Entity entity, String propertyName, String propertyValue ) throws UniqueValueException {
 
-        ActorRef requestActor = lookupRequestActorForType( "user" );
+    private void cancelUniqueField( ApplicationScope scope, Entity entity, UUID version, Field field ) throws UniqueValueException {
+
+        ActorRef requestActor = lookupRequestActorForType( entity.getId().getType() );
 
         if ( requestActor == null ) {
             throw new RuntimeException( "No request actor for type, cannot verify unique fields!" );
         }
 
         UniqueValueActor.Confirmation request = new UniqueValueActor.Confirmation(
-                entity.getId().getUuid(), "user",  propertyName, propertyValue );
+            scope, entity.getId(), version, field );
 
         requestActor.tell( request, null );
     }
 
+
     private ActorRef lookupRequestActorForType( String type ) {
         String region = getRegionsByType().get( type );
         if ( region == null ) {
@@ -531,8 +537,9 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
         return requestActor;
     }
 
-    private void sendUniqueValueRequest(
-           Entity entity, ActorRef requestActor, UniqueValueActor.Request request) throws UniqueValueException {
+
+    private void sendUniqueValueRequest( ApplicationScope scope, Entity entity, Field field,
+        ActorRef requestActor, UniqueValueActor.Request request) throws UniqueValueException {
 
         int maxRetries = 5;
         int retries = 0;
@@ -552,27 +559,27 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
                                 || response.getStatus().equals( UniqueValueActor.Response.Status.NOT_UNIQUE ))) {
                     if ( retries > 1 ) {
                         logger.debug("IS_UNIQUE after retrying {} for entity {} rowkey {}",
-                                retries, entity.getId().getUuid(), request.getRowKey());
+                                retries, entity.getId().getUuid(), request.getConsistentHashKey());
                     }
                     break;
 
                 } else if ( response != null  ) {
                     logger.debug("ERROR status retrying {} entity {} rowkey {}",
-                            retries, entity.getId().getUuid(), request.getRowKey());
+                            retries, entity.getId().getUuid(), request.getConsistentHashKey());
                 } else {
                     logger.debug("Timed-out retrying {} entity {} rowkey",
-                            retries, entity.getId().getUuid(), request.getRowKey());
+                            retries, entity.getId().getUuid(), request.getConsistentHashKey());
                 }
 
             } catch ( Exception e ) {
                 logger.debug("{} caused retry {} for entity {} rowkey {}",
-                        e.getClass().getSimpleName(), retries, entity.getId().getUuid(), request.getRowKey());
+                        e.getClass().getSimpleName(), retries, entity.getId().getUuid(), request.getConsistentHashKey());
             }
         }
 
         if ( response == null || response.getStatus().equals( UniqueValueActor.Response.Status.ERROR )) {
             logger.debug("ERROR after retrying {} for entity {} rowkey {}",
-                    retries, entity.getId().getUuid(), request.getRowKey());
+                    retries, entity.getId().getUuid(), request.getConsistentHashKey());
 
             // should result in an HTTP 503
             throw new RuntimeException( "Error verifying unique value after " + retries + " retries");
@@ -581,7 +588,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
         if ( response.getStatus().equals( UniqueValueActor.Response.Status.NOT_UNIQUE )) {
 
             // should result in an HTTP 409 (conflict)
-            throw new UniqueValueException( "Error property not unique" );
+            throw new UniqueValueException( "Error property not unique", field );
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java
index 4309eb0..0e69ef7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTable.java
@@ -18,16 +18,25 @@
  */
 package org.apache.usergrid.persistence.collection.uniquevalues;
 
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
+
 import java.util.UUID;
 
 
 public interface UniqueValuesTable {
 
-    UUID lookupOwner(String entityType, String propertyName, String propertyValue);
+    Id lookupOwner(
+        ApplicationScope applicationScope, String type, Field field ) throws ConnectionException;
 
-    void reserve(UUID owner, String entityType, String propertyName, String propertyValue);
+    void reserve(
+        ApplicationScope applicationScope, Id owner, UUID version, Field field ) throws ConnectionException;
 
-    void commit(UUID owner, String entityType, String propertyName, String propertyValue);
+    void confirm(
+        ApplicationScope applicationScope, Id owner, UUID version, Field field ) throws ConnectionException;
 
-    void cancel(String entityType, String propertyName, String propertyValue);
+    void cancel(
+        ApplicationScope applicationScope, Id owner, UUID version, Field field ) throws ConnectionException;
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
index ee3d621..8ba7cbd 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesTableImpl.java
@@ -18,29 +18,66 @@
  */
 package org.apache.usergrid.persistence.collection.uniquevalues;
 
+import com.google.inject.Inject;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.UniqueValueSet;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.UUID;
 
 
 public class UniqueValuesTableImpl implements UniqueValuesTable {
     private static final Logger logger = LoggerFactory.getLogger( UniqueValuesTableImpl.class );
 
+    final UniqueValueSerializationStrategy strat;
+    final AkkaFig akkaFig;
+
+    @Inject
+    public UniqueValuesTableImpl( final UniqueValueSerializationStrategy strat, AkkaFig akkaFig ) {
+        this.strat = strat;
+        this.akkaFig = akkaFig;
+    }
+
+
     @Override
-    public UUID lookupOwner(String entityType, String propertyName, String propertyValue) {
-        return null;
+    public Id lookupOwner( ApplicationScope scope, String type, Field field) throws ConnectionException {
+
+        UniqueValueSet set = strat.load( scope, type, Collections.singletonList( field ) );
+        UniqueValue uv  = set.getValue( field.getName() );
+        return uv == null ? null : uv.getEntityId();
     }
 
     @Override
-    public void reserve(UUID owner, String entityType, String propertyName, String propertyValue) {
+    public void reserve( ApplicationScope scope, Id owner, UUID version, Field field ) throws ConnectionException {
+
+        UniqueValue uv = new UniqueValueImpl( field, owner, version);
+        final MutationBatch write = strat.write( scope, uv, akkaFig.getUniqueValueReservationTtl() );
+        write.execute();
     }
 
     @Override
-    public void commit(UUID owner, String entityType, String propertyName, String propertyValue) {
+    public void confirm( ApplicationScope scope, Id owner, UUID version, Field field) throws ConnectionException {
+
+        UniqueValue uv = new UniqueValueImpl( field, owner, version);
+        final MutationBatch write = strat.write( scope, uv );
+        write.execute();
+
     }
 
     @Override
-    public void cancel(String entityType, String propertyName, String propertyValue) {
+    public void cancel( ApplicationScope scope, Id owner, UUID version, Field field) throws ConnectionException {
+
+        UniqueValue uv = new UniqueValueImpl( field, owner, version );
+        final MutationBatch write = strat.delete( scope, uv );
+        write.execute();
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
index ad6eac6..e7cee21 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/MarkCommitTest.java
@@ -71,13 +71,12 @@ public class MarkCommitTest extends AbstractMvccEntityStageTest {
 
 
         //run the stage
-        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy );
+        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null );
 
 
         //verify the observable is correct
-        Entity result  = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
-
-
+        Entity result  = newStage.call(
+            new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
 
 
         //verify the log entry is correct

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
index 58642d3..8665ee9 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteCommitTest.java
@@ -84,7 +84,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
 
 
         //run the stage
-        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy );
+        WriteCommit newStage = new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null );
 
 
         Entity result = newStage.call( new CollectionIoEvent<MvccEntity>( context, mvccEntityInput ) ).getEvent().getEntity().get();
@@ -131,7 +131,7 @@ public class WriteCommitTest extends AbstractMvccEntityStageTest {
         when( mvccEntityStrategy.write( any( ApplicationScope.class ), any( MvccEntity.class ) ) )
                 .thenReturn( entityMutation );
 
-        new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy ).call( event );
+        new WriteCommit( logStrategy, mvccEntityStrategy, uniqueValueStrategy, null, null ).call( event );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index b9a1565..1b0c7e4 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@ -82,7 +82,7 @@ public class WriteUniqueVerifyTest {
         final MvccEntity mvccEntity = fromEntity( entity );
 
         // run the stage
-        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig );
+        WriteUniqueVerify newStage = new WriteUniqueVerify( uvstrat, fig, keyspace,cassandraConfig, null, null );
 
        newStage.call(
             new CollectionIoEvent<>( collectionScope, mvccEntity ) ) ;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java
index bee47eb..ef5c16f 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/uniquevalues/LocalPreventDupsTest.java
@@ -6,6 +6,7 @@ import com.google.common.collect.Multimaps;
 import com.google.inject.Inject;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
+import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -44,6 +45,9 @@ public class LocalPreventDupsTest {
     @Rule
     public MigrationManagerRule migrationManagerRule;
 
+    @Inject
+    UniqueValuesService uniqueValuesService;
+
 
     private static final AtomicInteger successCounter = new AtomicInteger( 0 );
     private static final AtomicInteger errorCounter = new AtomicInteger( 0 );
@@ -51,16 +55,8 @@ public class LocalPreventDupsTest {
     @Test
     public void testBasicOperation() throws Exception {
 
-        UniqueValuesService appEast1 =
-            TestCollectionModule.getInjector( "us-east" ).getInstance( UniqueValuesService.class );
-        appEast1.start("127.0.0.1", 2551, "us-east");
-
-        UniqueValuesService appEast2 =
-            TestCollectionModule.getInjector( "us-east" ).getInstance( UniqueValuesService.class );
-        appEast2.start("127.0.0.1", 2552, "us-east");
-
-        appEast1.waitForRequestActors();
-        appEast2.waitForRequestActors();
+        uniqueValuesService.start("127.0.0.1", 2551, "us-east");
+        uniqueValuesService.waitForRequestActors();
 
         int numUsers = 100;
         Multimap<String, Entity> usersCreated = generateDuplicateUsers( numUsers );
@@ -115,7 +111,7 @@ public class LocalPreventDupsTest {
                         logger.debug("Created user {}", username);
 
                     } catch ( Throwable t ) {
-                        if ( t instanceof UniqueValueException ) {
+                        if ( t instanceof WriteUniqueVerifyException) {
                             // we expect lots of these
                         } else {
                             errorCounter.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/eeb8a60a/stack/corepersistence/collection/src/test/resources/usergrid.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/resources/usergrid.properties b/stack/corepersistence/collection/src/test/resources/usergrid.properties
index 9059f0e..8de5c27 100644
--- a/stack/corepersistence/collection/src/test/resources/usergrid.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid.properties
@@ -1,16 +1,3 @@
 # This property is required to be set and cannot be defaulted anywhere
 usergrid.cluster_name=usergrid
 
-collection.akka.hostname=localhost
-
-collection.akka.port=2551
-
-collection.akka.region=us-east
-
-collection.akka.regions=us-east
-
-collection.akka.region.seeds=us-east:localhost:2551,us-east:localhost:2552
-
-collection.akka.region.types=us-east:users,us-east:cats
-
-collection.akka.unique.value.actors=400


Mime
View raw message