usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [28/50] [abbrv] usergrid git commit: Use ClusterClient feature instead of roles to ensure that all write are done only in the appropriate authoritative region.
Date Mon, 11 Jul 2016 15:35:38 GMT
Use ClusterClient feature instead of roles to ensure that all write are done only in the appropriate
authoritative region.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/d12307bd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/d12307bd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/d12307bd

Branch: refs/heads/release-2.1.1
Commit: d12307bdb3219ac87550147cb23cbb0e14155200
Parents: 841409f
Author: Dave Johnson <snoopdave@apache.org>
Authored: Thu Jun 23 17:33:33 2016 -0400
Committer: Dave Johnson <snoopdave@apache.org>
Committed: Thu Jun 23 17:33:33 2016 -0400

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java |   2 +-
 .../actorsystem/ActorSystemManager.java         |   8 +-
 .../actorsystem/ActorSystemManagerImpl.java     | 297 ++++++++++---------
 .../persistence/actorsystem/RouterProducer.java |   8 +-
 .../src/main/resources/application.conf         |   7 +-
 .../src/main/resources/cluster-singleton.conf   |  25 --
 .../uniquevalues/UniqueValueActor.java          |   2 +-
 .../uniquevalues/UniqueValuesServiceImpl.java   |  95 +++---
 8 files changed, 223 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 9bd589a..060ec18 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -155,7 +155,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
                 actorSystemManager.registerMessageType( UniqueValueActor.Cancellation.class,
"/user/uvProxy" );
                 actorSystemManager.registerMessageType( UniqueValueActor.Confirmation.class,
"/user/uvProxy" );
                 actorSystemManager.start();
-                actorSystemManager.waitForRequestActors();
+                actorSystemManager.waitForClientActors();
 
             } catch (Throwable t) {
                 logger.error("Error starting Akka", t);

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
index e2c2913..c45ccac 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java
@@ -27,7 +27,7 @@ public interface ActorSystemManager {
 
     void start(String hostname, Integer port, String currentRegion);
 
-    void waitForRequestActors();
+    void waitForClientActors();
 
     boolean isReady();
 
@@ -35,5 +35,9 @@ public interface ActorSystemManager {
 
     void registerMessageType( Class messageType, String routerPath );
 
-    ActorRef getClientActor(String region );
+    ActorRef getClientActor();
+
+    ActorRef getClusterClient(String region );
+
+    String getCurrentRegion();
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
index 1f7bf70..5e23c20 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java
@@ -19,15 +19,15 @@
 package org.apache.usergrid.persistence.actorsystem;
 
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Props;
+import akka.actor.*;
+import akka.cluster.client.ClusterClient;
+import akka.cluster.client.ClusterClientReceptionist;
+import akka.cluster.client.ClusterClientSettings;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
 import com.google.inject.Inject;
-import com.google.inject.Injector;
 import com.google.inject.Singleton;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -45,22 +45,25 @@ import java.util.concurrent.TimeUnit;
 public class ActorSystemManagerImpl implements ActorSystemManager {
     private static final Logger logger = LoggerFactory.getLogger( ActorSystemManagerImpl.class
);
 
+    private boolean started = false;
+
     private String  hostname;
     private Integer port;
     private String  currentRegion;
 
-    private static Injector             injector;
     private final ActorSystemFig        actorSystemFig;
-    private final Map<String, ActorRef> requestActorsByRegion;
     private final List<RouterProducer>  routerProducers = new ArrayList<>();
     private final Map<Class, String>    routersByMessageType = new HashMap<>();
+    private final Map<String, ActorRef> clusterClientsByRegion = new HashMap<String,
ActorRef>(20);
+
+    private ActorRef                    clientActor;
+
+    private ListMultimap<String, String> seedsByRegion;
 
 
     @Inject
-    public ActorSystemManagerImpl(Injector inj, ActorSystemFig actorSystemFig) {
-        injector = inj;
+    public ActorSystemManagerImpl( ActorSystemFig actorSystemFig ) {
         this.actorSystemFig = actorSystemFig;
-        this.requestActorsByRegion = new HashMap<>();
     }
 
 
@@ -75,7 +78,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
         this.port = null;
 
         initAkka();
-        waitForRequestActors();
+        waitForClientActors();
     }
 
 
@@ -95,7 +98,7 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
     @Override
     public boolean isReady() {
-        return !getRequestActorsByRegion().isEmpty();
+        return started;
     }
 
 
@@ -112,13 +115,20 @@ public class ActorSystemManagerImpl implements ActorSystemManager {
 
 
     @Override
-    public ActorRef getClientActor(String region) {
-        return getRequestActorsByRegion().get( region );
+    public ActorRef getClientActor() {
+        return clientActor;
+    }
+
+
+    @Override
+    public ActorRef getClusterClient(String region) {
+        return clusterClientsByRegion.get( region );
     }
 
 
-    private Map<String, ActorRef> getRequestActorsByRegion() {
-        return requestActorsByRegion;
+    @Override
+    public String getCurrentRegion() {
+        return currentRegion;
     }
 
 
@@ -152,215 +162,214 @@ public class ActorSystemManagerImpl implements ActorSystemManager
{
         logger.info("Initializing Akka for hostname {} region {} regionList {} seeds {}",
             hostname, currentRegion, regionList, actorSystemFig.getRegionSeeds() );
 
-        final Map<String, ActorSystem> systemMap = new HashMap<>();
-
-        Map<String, Config> configMap = readClusterSingletonConfigs();
+        Config config = readClusterSystemConfig();
 
-        ActorSystem localSystem = createClusterSingletonProxies( configMap, systemMap );
+        ActorSystem localSystem = createClusterSystemsFromConfigs( config );
 
-        createRequestActors( systemMap );
+        createClientActors( localSystem );
 
         for ( RouterProducer routerProducer : routerProducers ) {
-            routerProducer.createLocalSystemActors( localSystem, systemMap );
+            routerProducer.createLocalSystemActors( localSystem );
         }
     }
 
 
     /**
-     * Read configuration and create a Config for each region.
-     *
-     * @return Map of regions to Configs.
+     * Read Usergrid's list of seeds, put them in handy multi-map.
      */
-    private Map<String, Config> readClusterSingletonConfigs() {
+    private ListMultimap<String, String> getSeedsByRegion() {
 
-        Map<String, Config> configs = new HashMap<>();
+        if ( seedsByRegion == null ) {
 
-        ListMultimap<String, String> seedsByRegion = ArrayListMultimap.create();
+            seedsByRegion = ArrayListMultimap.create();
 
-        String[] regionSeeds = actorSystemFig.getRegionSeeds().split( "," );
+            String[] regionSeeds = actorSystemFig.getRegionSeeds().split( "," );
 
-        logger.info("Found region {} seeds {}", regionSeeds.length, regionSeeds);
+            logger.info( "Found region {} seeds {}", regionSeeds.length, regionSeeds );
 
-        try {
+            try {
 
-            if ( port != null ) {
+                if (port != null) {
 
-                // we are testing, create seeds-by-region map for one region, one seed
+                    // we are testing, create seeds-by-region map for one region, one seed
 
-                String seed = "akka.tcp://ClusterSystem@" + hostname + ":" + port;
-                seedsByRegion.put( currentRegion, seed );
-                logger.info("Akka testing, only starting one seed");
+                    String seed = "akka.tcp://ClusterSystem" + "@" + hostname + ":" + port;
+                    seedsByRegion.put( currentRegion, seed );
+                    logger.info( "Akka testing, only starting one seed" );
 
-            } else { // create seeds-by-region map
+                } else { // create seeds-by-region map
 
-                for (String regionSeed : regionSeeds) {
+                    for (String regionSeed : regionSeeds) {
 
-                    String[] parts = regionSeed.split( ":" );
-                    String region = parts[0];
-                    String hostname = parts[1];
-                    String regionPortString = parts[2];
+                        String[] parts = regionSeed.split( ":" );
+                        String region = parts[0];
+                        String hostname = parts[1];
+                        String regionPortString = parts[2];
 
-                    // all seeds in same region must use same port
-                    // we assume 0th seed has the right port
-                    final Integer regionPort;
+                        // all seeds in same region must use same port
+                        // we assume 0th seed has the right port
+                        final Integer regionPort;
 
-                    if (port == null) {
-                        regionPort = Integer.parseInt( regionPortString );
-                    } else {
-                        regionPort = port; // unless we are testing
-                    }
+                        if (port == null) {
+                            regionPort = Integer.parseInt( regionPortString );
+                        } else {
+                            regionPort = port; // unless we are testing
+                        }
 
-                    String seed = "akka.tcp://ClusterSystem@" + hostname + ":" + regionPort;
+                        String seed = "akka.tcp://ClusterSystem" + "@" + hostname + ":" +
regionPort;
 
-                    logger.info("Adding seed {} for region {}", seed, region );
+                        logger.info( "Adding seed {} for region {}", seed, region );
 
-                    seedsByRegion.put( region, seed );
-                }
+                        seedsByRegion.put( region, seed );
+                    }
 
-                if (seedsByRegion.keySet().isEmpty()) {
-                    throw new RuntimeException(
-                        "No seeds listed in 'parsing collection.akka.region.seeds' property."
);
+                    if (seedsByRegion.keySet().isEmpty()) {
+                        throw new RuntimeException(
+                            "No seeds listed in 'parsing collection.akka.region.seeds' property."
);
+                    }
                 }
+
+            } catch (Exception e) {
+                throw new RuntimeException( "Error 'parsing collection.akka.region.seeds'
property", e );
             }
+        }
+
+        return seedsByRegion;
+    }
 
-            int numInstancesPerNode = actorSystemFig.getInstancesPerNode();
 
-            // read config file once for each region
+    /**
+     * Read cluster config and add seed nodes to it.
+     */
+    private Config readClusterSystemConfig() {
 
-            for ( String region : seedsByRegion.keySet() ) {
+        Config config = null;
 
-                List<String> seeds = seedsByRegion.get( region );
-                int lastColon = seeds.get(0).lastIndexOf(":") + 1;
-                final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon
));
+        try {
 
-                // cluster singletons only run role "io" nodes and NOT on "client" nodes
of other regions
-                String clusterRole = currentRegion.equals( region ) ? "io" : "client";
+            int numInstancesPerNode = actorSystemFig.getInstancesPerNode();
 
-                logger.info( "Akka Config for region {} is:\n" +
-                        "   Hostname {}\n" +
-                        "   Seeds {}\n" +
-                        "   Authoritative Region {}\n",
-                    region, hostname, seeds, actorSystemFig.getAkkaAuthoritativeRegion()
);
+            String region = currentRegion;
 
-                Map<String, Object> configMap = new HashMap<String, Object>()
{{
+            List<String> seeds = getSeedsByRegion().get( region );
+            int lastColon = seeds.get(0).lastIndexOf(":") + 1;
+            final Integer regionPort = Integer.parseInt( seeds.get(0).substring( lastColon
));
 
-                    put( "akka", new HashMap<String, Object>() {{
+            logger.info( "Akka Config for region {} is:\n" +
+                    "   Hostname {}\n" +
+                    "   Seeds {}\n" +
+                    "   Authoritative Region {}\n",
+                region, hostname, seeds, actorSystemFig.getAkkaAuthoritativeRegion() );
 
-                        put( "remote", new HashMap<String, Object>() {{
-                            put( "netty.tcp", new HashMap<String, Object>() {{
-                                put( "hostname", hostname );
-                                put( "bind-hostname", hostname );
-                                put( "port", regionPort );
-                            }} );
-                        }} );
+            Map<String, Object> configMap = new HashMap<String, Object>() {{
 
-                        put( "cluster", new HashMap<String, Object>() {{
-                            put( "max-nr-of-instances-per-node", 300);
-                            put( "roles", Collections.singletonList(clusterRole) );
-                            put( "seed-nodes", new ArrayList<String>() {{
-                                for (String seed : seeds) {
-                                    add( seed );
-                                }
-                            }} );
-                        }} );
+                put( "akka", new HashMap<String, Object>() {{
 
+                    put( "remote", new HashMap<String, Object>() {{
+                        put( "netty.tcp", new HashMap<String, Object>() {{
+                            put( "hostname", hostname );
+                            put( "bind-hostname", hostname );
+                            put( "port", regionPort );
+                        }} );
                     }} );
-                }};
 
-                for ( RouterProducer routerProducer : routerProducers ) {
-                    routerProducer.addConfiguration( configMap );
-                }
+                    put( "cluster", new HashMap<String, Object>() {{
+                        put( "max-nr-of-instances-per-node", numInstancesPerNode);
+                        put( "roles", Collections.singletonList("io") );
+                        put( "seed-nodes", new ArrayList<String>() {{
+                            for (String seed : seeds) {
+                                add( seed );
+                            }
+                        }} );
+                    }} );
 
-                Config config = ConfigFactory.parseMap( configMap )
-                    .withFallback( ConfigFactory.parseString( "akka.cluster.roles = [io]"
) )
-                    .withFallback( ConfigFactory.load( "application.conf" ) );
+                }} );
+            }};
 
-                configs.put( region, config );
+            for ( RouterProducer routerProducer : routerProducers ) {
+                routerProducer.addConfiguration( configMap );
             }
 
+            config = ConfigFactory.parseMap( configMap )
+                .withFallback( ConfigFactory.load( "application.conf" ) );
+
+
         } catch ( Exception e ) {
-            throw new RuntimeException("Error 'parsing collection.akka.region.seeds' property",
e );
+            throw new RuntimeException("Error reading and adding to cluster config", e );
         }
 
-        return configs;
+        return config;
     }
 
 
     /**
-     * Create ActorSystem and ClusterSingletonProxy for every region.
-     * Create ClusterSingletonManager for the current region.
-     *
-     * @param configMap Configurations to be used to create ActorSystems
-     * @param systemMap Map of ActorSystems created by this method
-     *
-     * @return ActorSystem for this region.
+     * Create actor system for this region, with cluster singleton manager & proxy.
      */
-    private ActorSystem createClusterSingletonProxies(
-        Map<String, Config> configMap, Map<String, ActorSystem> systemMap ) {
-
-        ActorSystem localSystem = null;
+    private ActorSystem createClusterSystemsFromConfigs( Config config ) {
 
-        for ( String region : configMap.keySet() ) {
-            Config config = configMap.get( region );
+        ActorSystem system = ActorSystem.create( "ClusterSystem", config );
 
-            ActorSystem system = ActorSystem.create( "ClusterSystem", config );
-            systemMap.put( region, system );
-
-            // cluster singletons only run role "io" nodes and NOT on "client" nodes of other
regions
-            if ( currentRegion.equals( region ) ) {
-
-                localSystem = system;
-
-                for ( RouterProducer routerProducer : routerProducers ) {
-                    routerProducer.createClusterSingletonManager( system );
-                }
-            }
+        for ( RouterProducer routerProducer : routerProducers ) {
+            logger.info("Creating {} for region {}", routerProducer.getName(), currentRegion
);
+            routerProducer.createClusterSingletonManager( system );
+        }
 
-            for ( RouterProducer routerProducer : routerProducers ) {
-                routerProducer.createClusterSingletonProxy( system );
-            }
+        for ( RouterProducer routerProducer : routerProducers ) {
+            logger.info("Creating {} proxy for region {} role 'io'", routerProducer.getName(),
currentRegion);
+            routerProducer.createClusterSingletonProxy( system, "io" );
         }
 
-        return localSystem;
+        return system;
     }
 
 
     /**
      * Create RequestActor for each region.
-     *
-     * @param systemMap Map of regions to ActorSystems.
      */
-    private void createRequestActors( Map<String, ActorSystem> systemMap ) {
+    private void createClientActors( ActorSystem system ) {
+
+        for ( String region : getSeedsByRegion().keySet() ) {
+
+            if ( currentRegion.equals( region )) {
+
+                logger.info( "Creating clientActor for region {}", region );
+
+                // Each clientActor needs to know path to ClusterSingletonProxy and region
+                clientActor = system.actorOf(
+                    Props.create( ClientActor.class, routersByMessageType ), "clientActor"
);
 
-        for ( String region : systemMap.keySet() ) {
+                ClusterClientReceptionist.get(system).registerService( clientActor );
 
-            logger.info("Creating request actor for region {}", region);
+            } else {
 
-            // Each RequestActor needs to know path to ClusterSingletonProxy and region
-            ActorRef requestActor = systemMap.get( region ).actorOf(
-                //Props.create( ClientActor.class, "/user/uvProxy" ), "requestActor" );
-                Props.create( ClientActor.class, routersByMessageType ), "requestActor" );
+                List<String> regionSeeds = getSeedsByRegion().get( region );
+                Set<ActorPath> seedPaths = new HashSet<>(20);
+                for ( String seed : getSeedsByRegion().get( region ) ) {
+                    seedPaths.add( ActorPaths.fromString( seed + "/system/receptionist")
);
+                }
+
+                ActorRef clusterClient = system.actorOf( ClusterClient.props(
+                    ClusterClientSettings.create(system).withInitialContacts( seedPaths )),
"client");
+
+                clusterClientsByRegion.put( region, clusterClient );
+            }
 
-            requestActorsByRegion.put( region, requestActor );
         }
     }
 
 
     @Override
-    public void waitForRequestActors() {
+    public void waitForClientActors() {
 
-        for ( String region : requestActorsByRegion.keySet() ) {
-            ActorRef ra = requestActorsByRegion.get( region );
-            waitForRequestActor( ra );
-        }
+        waitForClientActor( clientActor );
     }
 
-
-    private void waitForRequestActor( ActorRef ra ) {
+    private void waitForClientActor( ActorRef ra ) {
 
         logger.info( "Waiting on request actor {}...", ra.path() );
 
-        boolean started = false;
+        started = false;
+
         int retries = 0;
         int maxRetries = 60;
         while (retries < maxRetries) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
index 3aa91cf..d849dd9 100644
--- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
+++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/RouterProducer.java
@@ -24,6 +24,8 @@ import java.util.Map;
 
 public interface RouterProducer {
 
+    String getName();
+
     /**
      * Create cluster single manager for current region.
      * Will be called once per router per JVM.
@@ -34,16 +36,16 @@ public interface RouterProducer {
      * Create cluster singleton proxy for region.
      * Will be called once per router per JVM per region.
      */
-    void createClusterSingletonProxy( ActorSystem system );
+    void createClusterSingletonProxy( ActorSystem system, String role );
 
     /**
      * Create other actors needed to support the router produced by the implementation.
      */
-    void createLocalSystemActors( ActorSystem localSystem, Map<String, ActorSystem>
systemMap );
+    void createLocalSystemActors( ActorSystem localSystem );
 
     /**
      * Add configuration for the router to configuration map
      */
-    void addConfiguration( Map<String, Object> configMap );
+    void addConfiguration(Map<String, Object> configMap );
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/resources/application.conf b/stack/corepersistence/actorsystem/src/main/resources/application.conf
index a243163..5706610 100644
--- a/stack/corepersistence/actorsystem/src/main/resources/application.conf
+++ b/stack/corepersistence/actorsystem/src/main/resources/application.conf
@@ -38,7 +38,12 @@ akka {
 akka.cluster.metrics.enabled=off
 
 # Enable metrics extension in akka-cluster-metrics.
-akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension", "akka.cluster.pubsub.DistributedPubSub"]
+akka.extensions=[
+    "akka.cluster.metrics.ClusterMetricsExtension",
+    "akka.cluster.pubsub.DistributedPubSub",
+    "akka.cluster.client.ClusterClientReceptionist"
+]
+
 
 # Sigar native library extract location during tests.
 # Note: use per-jvm-instance folder when running multiple jvm on one host.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf
----------------------------------------------------------------------
diff --git a/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf b/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf
deleted file mode 100644
index 907aebb..0000000
--- a/stack/corepersistence/actorsystem/src/main/resources/cluster-singleton.conf
+++ /dev/null
@@ -1,25 +0,0 @@
-include "application"
-
-akka.actor.deployment {
-  /uvRouter/singleton/router {
-    router = consistent-hashing-pool
-    cluster {
-      enabled = on
-      allow-local-routees = on
-      
-      # singleton will only run on nodes with role "io"
-      use-role = io
-
-      # more forgiving failure detector
-      failure-detector {
-        threshold = 10
-        acceptable-heartbeat-pause = 3 s
-        heartbeat-interval = 1 s
-        heartbeat-request {
-          expected-response-after = 3 s
-        }
-      }
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
index bb30b92..e53710c 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
@@ -59,7 +59,7 @@ public class UniqueValueActor extends UntypedActor {
 
             count++;
             if (count % 10 == 0) {
-                logger.debug( "UniqueValueActor {} processed {} requests", name, count );
+                logger.info( "UniqueValueActor {} processed {} requests", name, count );
             }
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/d12307bd/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 119d6f6..b888b1f 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -22,6 +22,7 @@ import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.cluster.client.ClusterClient;
 import akka.cluster.singleton.ClusterSingletonManager;
 import akka.cluster.singleton.ClusterSingletonManagerSettings;
 import akka.cluster.singleton.ClusterSingletonProxy;
@@ -75,18 +76,25 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
     }
 
 
-    private void subscribeToReservations( ActorSystem localSystem, Map<String, ActorSystem>
systemMap ) {
-
-        for ( String region : systemMap.keySet() ) {
-            ActorSystem actorSystem = systemMap.get( region );
-            if ( !actorSystem.equals( localSystem ) ) {
-                logger.info("Starting ReservationCacheUpdater for {}", region );
-                actorSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber");
-            }
-        }
+    @Override
+    public String getName() {
+        return "UniqueValues ClusterSingleton Router";
     }
 
 
+    // TODO: restore reservation cache
+//    private void subscribeToReservations( ActorSystem localSystem, Map<String, ActorSystem>
systemMap ) {
+//
+//        for ( String region : systemMap.keySet() ) {
+//            ActorSystem actorSystem = systemMap.get( region );
+//            if ( !actorSystem.equals( localSystem ) ) {
+//                logger.info("Starting ReservationCacheUpdater for {}", region );
+//                actorSystem.actorOf( Props.create( ReservationCacheActor.class ), "subscriber");
+//            }
+//        }
+//    }
+
+
     @Override
     public void reserveUniqueValues(
         ApplicationScope scope, Entity entity, UUID version, String region ) throws UniqueValueException
{
@@ -154,12 +162,6 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
     private void reserveUniqueField(
         ApplicationScope scope, Entity entity, UUID version, Field field, String region )
throws UniqueValueException {
 
-        final ActorRef requestActor = actorSystemManager.getClientActor( region );
-
-        if ( requestActor == null ) {
-            throw new RuntimeException( "No request actor for region " + region);
-        }
-
         UniqueValueActor.Request request = new UniqueValueActor.Reservation(
             scope, entity.getId(), version, field );
 
@@ -171,39 +173,35 @@ public class UniqueValuesServiceImpl implements UniqueValuesService
{
             throw new UniqueValueException( "Error property not unique (cache)", field);
         }
 
-        sendUniqueValueRequest( entity, requestActor, request );
+        sendUniqueValueRequest( entity, region, request );
     }
 
 
     private void confirmUniqueField(
         ApplicationScope scope, Entity entity, UUID version, Field field, String region)
throws UniqueValueException {
 
-        final ActorRef requestActor = actorSystemManager.getClientActor( region );
-
-        if ( requestActor == null ) {
-            throw new RuntimeException( "No request actor for type, cannot verify unique
fields!" );
-        }
-
         UniqueValueActor.Confirmation request = new UniqueValueActor.Confirmation(
             scope, entity.getId(), version, field );
 
-        sendUniqueValueRequest( entity, requestActor, request );
+        sendUniqueValueRequest( entity, region, request );
     }
 
 
     private void cancelUniqueField(
         ApplicationScope scope, Entity entity, UUID version, Field field, String region )
throws UniqueValueException {
 
-        final ActorRef requestActor = actorSystemManager.getClientActor( region );
-
-        if ( requestActor == null ) {
-            throw new RuntimeException( "No request actor for type, cannot verify unique
fields!" );
-        }
-
         UniqueValueActor.Cancellation request = new UniqueValueActor.Cancellation(
             scope, entity.getId(), version, field );
 
-        requestActor.tell( request, null );
+        if ( actorSystemManager.getCurrentRegion().equals( region ) ) {
+            ActorRef clientActor = actorSystemManager.getClientActor();
+            clientActor.tell( request, null );
+
+        } else {
+            ActorRef clusterClient = actorSystemManager.getClusterClient( region );
+            clusterClient.tell( new ClusterClient.Send("/user/clientActor", request), null
);
+        }
+
     }
 
 
@@ -218,7 +216,7 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
 
 
     private void sendUniqueValueRequest(
-        Entity entity, ActorRef requestActor, UniqueValueActor.Request request ) throws UniqueValueException
{
+        Entity entity, String region, UniqueValueActor.Request request ) throws UniqueValueException
{
 
         int maxRetries = 5;
         int retries = 0;
@@ -230,7 +228,17 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
 
                 // ask RequestActor and wait (up to timeout) for response
 
-                Future<Object> fut = Patterns.ask( requestActor, request, t );
+                Future<Object> fut;
+
+                if ( actorSystemManager.getCurrentRegion().equals( region ) ) {
+                    ActorRef clientActor = actorSystemManager.getClientActor();
+                    fut = Patterns.ask( clientActor, request, t );
+
+                } else {
+                    ActorRef clusterClient = actorSystemManager.getClusterClient( region
);
+                    fut = Patterns.ask( clusterClient, new ClusterClient.Send("/user/clientActor",
request), t );
+                }
+
                 response = (UniqueValueActor.Response) Await.result( fut, t.duration() );
 
                 if ( response != null && (
@@ -280,34 +288,33 @@ public class UniqueValuesServiceImpl implements UniqueValuesService
{
             ClusterSingletonManagerSettings.create( system ).withRole("io");
 
         system.actorOf( ClusterSingletonManager.props(
-            //Props.create( ClusterSingletonRouter.class, table ),
-            Props.create( GuiceActorProducer.class, injector, UniqueValuesRouter.class),
-            PoisonPill.getInstance(), settings ), "uvRouter");
+            Props.create( GuiceActorProducer.class, injector, UniqueValuesRouter.class ),
+            PoisonPill.getInstance(), settings ), "uvRouter" );
+
     }
 
 
     @Override
-    public void createClusterSingletonProxy(ActorSystem system) {
+    public void createClusterSingletonProxy( ActorSystem system, String role ) {
 
         ClusterSingletonProxySettings proxySettings =
-            ClusterSingletonProxySettings.create( system ).withRole("io");
+            ClusterSingletonProxySettings.create( system ).withRole( role );
 
         system.actorOf( ClusterSingletonProxy.props( "/user/uvRouter", proxySettings ), "uvProxy"
);
     }
 
 
     @Override
-    public void createLocalSystemActors( ActorSystem localSystem, Map<String, ActorSystem>
systemMap ) {
-        subscribeToReservations( localSystem, systemMap );
+    public void createLocalSystemActors( ActorSystem localSystem ) {
+        // TODO: restore reservation cache
+        //subscribeToReservations( localSystem );
     }
 
     @Override
-    public void addConfiguration(Map<String, Object> configMap) {
+    public void addConfiguration( Map<String, Object> configMap ) {
 
         int numInstancesPerNode = uniqueValuesFig.getUniqueValueInstancesPerNode();
 
-        // TODO: will the below overwrite things other routers have added under "actor.deployment"?
-
         Map<String, Object> akka = (Map<String, Object>)configMap.get("akka");
 
         akka.put( "actor", new HashMap<String, Object>() {{
@@ -317,10 +324,10 @@ public class UniqueValuesServiceImpl implements UniqueValuesService
{
                     put( "cluster", new HashMap<String, Object>() {{
                         put( "enabled", "on" );
                         put( "allow-local-routees", "on" );
-                        put( "user-role", "io" );
+                        put( "use-role", "io" );
                         put( "max-nr-of-instances-per-node", numInstancesPerNode );
                         put( "failure-detector", new HashMap<String, Object>() {{
-                            put( "threshold", "" );
+                            put( "threshold", "10" );
                             put( "acceptable-heartbeat-pause", "3 s" );
                             put( "heartbeat-interval", "1 s" );
                             put( "heartbeat-request", new HashMap<String, Object>()
{{


Mime
View raw message