usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [39/50] [abbrv] incubator-usergrid git commit: Removed Hystrix. Causing thread OOM issues and not really used in production.
Date Thu, 19 Mar 2015 23:26:56 GMT
Removed Hystrix.   Causing thread OOM issues and not really used in production.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5e6ed4a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5e6ed4a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5e6ed4a1

Branch: refs/heads/USERGRID-480
Commit: 5e6ed4a12c7aa154f1411d85c6fcce56fbcd4367
Parents: b09dc43
Author: Todd Nine <tnine@apigee.com>
Authored: Tue Mar 17 18:23:58 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Tue Mar 17 18:23:58 2015 -0600

----------------------------------------------------------------------
 .../core/astyanax/MultiRowColumnIterator.java   | 12 ++-
 .../core/hystrix/HystrixCassandra.java          | 94 --------------------
 .../persistence/core/task/TaskExecutor.java     |  4 +-
 .../graph/impl/GraphManagerImpl.java            | 30 +++++--
 .../graph/impl/stage/EdgeDeleteRepairImpl.java  |  9 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    | 16 +++-
 .../impl/stage/NodeDeleteListenerImpl.java      | 16 +++-
 .../impl/NodeSerializationImpl.java             | 36 ++++----
 .../shard/count/NodeShardApproximationImpl.java |  4 +-
 .../NodeShardCounterSerializationImpl.java      | 25 +++---
 .../shard/impl/NodeShardAllocationImpl.java     | 16 +++-
 .../shard/impl/ShardGroupCompactionImpl.java    | 64 ++++---------
 12 files changed, 128 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index fdc4768..667992c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -33,10 +33,8 @@ import java.util.NoSuchElementException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
-
 import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.ColumnFamily;
 import com.netflix.astyanax.model.ColumnList;
@@ -184,7 +182,13 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
                 keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice(
rowKeys )
                         .withColumnRange( rangeBuilder.build() );
 
-        final Rows<R, C> result = HystrixCassandra.user( query ).getResult();
+        final Rows<R, C> result;
+        try {
+            result = query.execute().getResult();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to casandra", e );
+        }
 
 
         //now aggregate them together

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
deleted file mode 100644
index ab71782..0000000
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/hystrix/HystrixCassandra.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.core.hystrix;
-
-
-import com.netflix.astyanax.Execution;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.hystrix.HystrixCommand;
-import com.netflix.hystrix.HystrixCommandGroupKey;
-import com.netflix.hystrix.HystrixThreadPoolProperties;
-
-
-/**
- * A utility class that creates graph observables wrapped in Hystrix for timeouts and circuit
breakers.
- */
-public class HystrixCassandra {
-
-
-
-
-    /**
-     * Command group used for realtime user commands
-     */
-    public static final HystrixCommand.Setter
-            USER_GROUP = HystrixCommand.Setter.withGroupKey(   HystrixCommandGroupKey.Factory.asKey(
"user" ) ).andThreadPoolPropertiesDefaults(
-            HystrixThreadPoolProperties.Setter().withCoreSize( 1000 ) );
-
-    /**
-     * Command group for asynchronous operations
-     */
-    public static final HystrixCommand.Setter
-            ASYNC_GROUP = HystrixCommand.Setter.withGroupKey( HystrixCommandGroupKey.Factory.asKey(
"async" ) ).andThreadPoolPropertiesDefaults(
-            HystrixThreadPoolProperties.Setter().withCoreSize( 1000 ) );
-
-
-    /**
-     * Execute an user operation
-     */
-    public static <R> OperationResult<R> user( final Execution<R> execution)
{
-        return new HystrixCommand<OperationResult<R>>( USER_GROUP ) {
-
-            @Override
-            protected OperationResult<R> run() {
-                try {
-                    return  execution.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( e );
-                }
-            }
-        }.execute();
-    }
-
-
-    /**
-     * Execute an an async operation
-     */
-    public static <R> OperationResult<R> async( final Execution<R> execution)
{
-
-
-        return new HystrixCommand<OperationResult<R>>( ASYNC_GROUP ) {
-
-            @Override
-            protected OperationResult<R> run() {
-                try {
-                    return  execution.execute();
-                }
-                catch ( ConnectionException e ) {
-                    throw new RuntimeException( e );
-                }
-            }
-        }.execute();
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
index 5e9aa4c..5728d2e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/task/TaskExecutor.java
@@ -25,7 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 /**
  * An interface for execution of tasks
  */
-public interface TaskExecutor {
+public interface  TaskExecutor {
 
     /**
      * Submit the task asynchronously
@@ -37,5 +37,5 @@ public interface TaskExecutor {
      * Stop the task executor without waiting for scheduled threads to run
      */
     public void shutdown();
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
index 246bbca..26d06ad 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/GraphManagerImpl.java
@@ -25,14 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import com.codahale.metrics.Meter;
-import com.codahale.metrics.Timer;
-import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -54,10 +51,12 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
-import com.google.inject.assistedinject.Assisted;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Notification;
 import rx.Observable;
@@ -202,7 +201,12 @@ public class GraphManagerImpl implements GraphManager {
 
                 mutation.mergeShallow( edgeMutation );
 
-                HystrixCassandra.user( mutation );
+                try {
+                    mutation.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute mutation", e );
+                }
 
                 return edge;
             }
@@ -241,7 +245,12 @@ public class GraphManagerImpl implements GraphManager {
 
 
                 LOG.debug("Marking edge {} as deleted to commit log", edge);
-                HystrixCassandra.user(edgeMutation);
+                try {
+                    edgeMutation.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute mutation", e );
+                }
 
 
                 //HystrixCassandra.async( edgeDeleteListener.receive( scope, markedEdge,
@@ -285,7 +294,12 @@ public class GraphManagerImpl implements GraphManager {
 
 
                 LOG.debug( "Marking node {} as deleted to node mark", node );
-                HystrixCassandra.user( nodeMutation );
+                try {
+                    nodeMutation.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute mutation", e );
+                }
 
 
                 //HystrixCassandra.async(nodeDeleteListener.receive(scope, id, eventTimestamp
 )).subscribeOn(

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index 0137ba4..7dca0ce 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -26,7 +26,6 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -39,6 +38,7 @@ import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.functions.Action1;
@@ -94,7 +94,12 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
 
 
                                     //remove from storage
-                                    HystrixCassandra.async(storageSerialization.deleteEdge(
scope, edge, timestamp ));
+                                    try {
+                                        storageSerialization.deleteEdge( scope, edge, timestamp
).execute();
+                                    }
+                                    catch ( ConnectionException e ) {
+                                        throw new RuntimeException( "Unable to connect to
casandra", e );
+                                    }
                                 }
                             }
                         } );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index 0e1c4e2..ab141f7 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -47,6 +46,7 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.functions.Action1;
@@ -188,7 +188,12 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                                                                                 + "Mutation
has {} rows to mutate ",
                                                                         edgeType, batch.getRowCount()
);
 
-                                                                HystrixCassandra.async( batch
);
+                                                                try {
+                                                                    batch.execute();
+                                                                }
+                                                                catch ( ConnectionException
e ) {
+                                                                    throw new RuntimeException(
"Unable to connect to casandra", e );
+                                                                }
                                                             }
                                                         }
 
@@ -219,7 +224,12 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
 
                 LOG.debug( "Type {} has no subtypes in use as of maxTimestamp {}.  Deleting
type.", edgeType,
                         maxTimestamp );
-                HystrixCassandra.async( serialization.removeEdgeType( scope, node, edgeType,
maxTimestamp ) );
+                try {
+                    serialization.removeEdgeType( scope, node, edgeType, maxTimestamp ).execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to connect to casandra", e );
+                }
             }
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index f167f0c..e8c224e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -29,7 +29,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.guice.ProxyImpl;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -47,6 +46,7 @@ import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.functions.Action0;
@@ -129,7 +129,12 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
                                 .doOnCompleted( new Action0() {
                                     @Override
                                     public void call() {
-                                        HystrixCassandra.async(nodeSerialization.delete(
scope, node, maxVersion.get() ));
+                                        try {
+                                            nodeSerialization.delete( scope, node, maxVersion.get()).execute();
+                                        }
+                                        catch ( ConnectionException e ) {
+                                            throw new RuntimeException( "Unable to connect
to casandra", e );
+                                        }
                                     }
                                 } );
                     }
@@ -210,7 +215,12 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
                             targetNodes.add( new TargetPair( edge.getTargetNode(), edge.getType()
) );
                         }
 
-                        HystrixCassandra.async( batch );
+                        try {
+                            batch.execute();
+                        }
+                        catch ( ConnectionException e ) {
+                            throw new RuntimeException( "Unable to connect to casandra",
e );
+                        }
 
                         //now  delete meta data
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
index 2cc0391..18062c4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/NodeSerializationImpl.java
@@ -38,7 +38,6 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.migration.schema.Migration;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
@@ -52,6 +51,7 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.Row;
@@ -150,22 +150,23 @@ public class NodeSerializationImpl implements NodeSerialization, Migration
{
                 keyspace.prepareQuery( GRAPH_DELETE ).setConsistencyLevel( fig.getReadCL()
);
 
 
+
+        Column<Boolean> result = null;
         try {
-            Column<Boolean> result = HystrixCassandra
-                    .user( query.getKey( ScopedRowKey.fromKey( scope.getApplication(), node
) ).getColumn( COLUMN_NAME ) )
+            result = query.getKey( ScopedRowKey.fromKey( scope.getApplication(), node ) ).getColumn(
COLUMN_NAME ).execute()
                     .getResult();
-
-            return Optional.of( result.getLongValue() );
         }
-        catch (RuntimeException re ) {
-            if(re.getCause().getCause() instanceof   NotFoundException) {
-                //swallow, there's just no column
-                return Optional.absent();
-            }
-
-            throw re;
+        catch(NotFoundException nfe){
+             //swallow, there's just no column
+            return Optional.absent();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to casandra", e );
         }
 
+        return Optional.of( result.getLongValue() );
+
+
     }
 
 
@@ -193,9 +194,14 @@ public class NodeSerializationImpl implements NodeSerialization, Migration
{
         }
 
 
-        final Rows<ScopedRowKey<Id>, Boolean> results = HystrixCassandra
-                .user( query.getRowSlice( keys ).withColumnSlice( Collections.singletonList(
COLUMN_NAME ) ) )
-                .getResult();
+        final Rows<ScopedRowKey<Id>, Boolean> results;
+        try {
+            results = query.getRowSlice( keys ).withColumnSlice( Collections.singletonList(
COLUMN_NAME )).execute()
+                    .getResult();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to casandra", e );
+        }
 
         for ( Row<ScopedRowKey<Id>, Boolean> row : results ) {
             Column<Boolean> column = row.getColumns().getColumnByName( COLUMN_NAME
);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
index 47243e5..a47d528 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
@@ -39,6 +38,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
 
 import rx.functions.Action0;
 import rx.schedulers.Schedulers;
@@ -229,7 +229,7 @@ public class NodeShardApproximationImpl implements NodeShardApproximation
{
                 /**
                  * Execute the command in hystrix to avoid slamming cassandra
                  */
-                new HystrixCommand( HystrixCassandra.ASYNC_GROUP ) {
+                new HystrixCommand( HystrixCommandGroupKey.Factory.asKey("BatchCounterRollup")
) {
 
                     @Override
                     protected Void run() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index 524a0cf..6934275 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -33,9 +33,8 @@ import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
 import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -50,6 +49,7 @@ import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.CompositeBuilder;
@@ -117,24 +117,19 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
         final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope.getApplication(), key
);
 
 
+        OperationResult<Column<Boolean>> column = null;
         try {
-            OperationResult<Column<Boolean>> column = HystrixCassandra.user(
-                    keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn(
true ) );
-
-            return column.getResult().getLongValue();
+            column = keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn(
true ).execute();
         }
         //column not found, return 0
-        catch ( RuntimeException re ) {
-
-            final Throwable cause = re.getCause();
-
-            if(cause != null && cause.getCause() instanceof NotFoundException) {
-                return 0;
-            }
-
-            throw  re;
+        catch ( NotFoundException nfe ) {
+            return 0;
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to read from cassandra", e );
         }
 
+        return column.getResult().getLongValue();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 0d34b63..ad64338 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -27,7 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
@@ -49,6 +48,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.util.TimeUUIDUtils;
 
 
@@ -110,7 +110,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
             final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, MIN_SHARD,
directedEdgeMeta );
-            HystrixCassandra.user( batch  );
+            try {
+                batch.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to connect to casandra", e );
+            }
 
             existingShards = Collections.singleton( MIN_SHARD ).iterator();
         }
@@ -232,7 +237,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
         final MutationBatch batch = this.edgeShardSerialization.writeShardMeta( scope, newShard,
directedEdgeMeta );
 
-        HystrixCassandra.user( batch );
+        try {
+            batch.execute();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to casandra", e );
+        }
 
 
         return true;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5e6ed4a1/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 9f8efc8..6135121 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -29,13 +29,7 @@ import java.util.Iterator;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
@@ -43,7 +37,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.task.Task;
 import org.apache.usergrid.persistence.core.task.TaskExecutor;
@@ -74,6 +67,7 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 
 /**
@@ -207,8 +201,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
                 if ( edgeCount % maxWorkSize == 0 ) {
 
                     try {
-                        HystrixCassandra.async( newRowBatch );
-                        HystrixCassandra.async( deleteRowBatch );
+                        newRowBatch.execute();
+                        deleteRowBatch.execute();
                     }
                     catch ( Throwable t ) {
                         LOG.error( "Unable to move edges from shard {} to shard {}", sourceShard,
targetShard );
@@ -219,8 +213,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
 
 
         try {
-            HystrixCassandra.async( newRowBatch );
-            HystrixCassandra.async( deleteRowBatch );
+            newRowBatch.execute();
+            deleteRowBatch.execute();
         }
         catch ( Throwable t ) {
             LOG.error( "Unable to move edges to target shard {}", targetShard );
@@ -260,7 +254,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
             }
 
 
-            HystrixCassandra.async( shardRemovalRollup );
+            try {
+                shardRemovalRollup.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to connect to casandra", e );
+            }
 
 
             LOG.info( "Shard has been fully compacted.  Marking shard {} as compacted in
Cassandra", targetShard );
@@ -268,7 +267,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
             //Overwrite our shard index with a newly created one that has been marked as
compacted
             Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(),
true );
             final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope,
compactedShard, edgeMeta );
-            HystrixCassandra.async( updateMark );
+            try {
+                updateMark.execute();
+            }
+            catch ( ConnectionException e ) {
+                throw new RuntimeException( "Unable to connect to casandra", e );
+            }
 
             resultBuilder.withCompactedShard( compactedShard );
         }
@@ -530,40 +534,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
     }
 
 
-    /**
-     * Create a thread pool that will reject work if our audit tasks become overwhelmed
-     */
-    private final class MaxSizeThreadPool extends ThreadPoolExecutor {
-
-        public MaxSizeThreadPool( final int workerSize, final int queueLength ) {
-            super( 1, workerSize, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(
queueLength ),
-                    new CompactionThreadFactory(), new RejectionLogger() );
-        }
-    }
-
-
-    private final class CompactionThreadFactory implements ThreadFactory {
-
-        private final AtomicLong threadCounter = new AtomicLong();
-
-
-        @Override
-        public Thread newThread( final Runnable r ) {
-            final long newValue = threadCounter.incrementAndGet();
-
-            return new Thread( r, "Graph-Shard-Compaction-" + newValue );
-        }
-    }
-
-
-    private final class RejectionLogger implements RejectedExecutionHandler {
-
-
-        @Override
-        public void rejectedExecution( final Runnable r, final ThreadPoolExecutor executor
) {
-            LOG.warn( "Audit queue full, rejecting audit task {}", r );
-        }
-    }
 
 
     public static final class CompactionResult {


Mime
View raw message