usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdun...@apache.org
Subject [3/5] usergrid git commit: Removes approximation logic on shards to ensure that we're auditing the actual shard value.
Date Thu, 05 Nov 2015 23:43:45 GMT
Removes approximation logic on shards to ensure that we're auditing the actual shard value.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/591a2f1f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/591a2f1f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/591a2f1f

Branch: refs/heads/2.1-release
Commit: 591a2f1fd97b7259cf72d10855f2832b8342b5a4
Parents: 9c54de3
Author: Todd Nine <tnine@apigee.com>
Authored: Thu Nov 5 14:37:39 2015 -0700
Committer: Todd Nine <tnine@apigee.com>
Committed: Thu Nov 5 14:40:08 2015 -0700

----------------------------------------------------------------------
 .../persistence/graph/guice/GraphModule.java    |   7 -
 .../impl/shard/EdgeShardStrategy.java           |  10 +-
 .../impl/shard/NodeShardApproximation.java      |  66 --
 .../serialization/impl/shard/count/Counter.java | 131 ----
 .../shard/count/NodeShardApproximationImpl.java | 272 --------
 .../count/NodeShardCounterSerialization.java    |  48 --
 .../NodeShardCounterSerializationImpl.java      | 186 ------
 .../impl/shard/count/ShardKey.java              |  75 ---
 .../shard/impl/NodeShardAllocationImpl.java     |  19 +-
 .../impl/shard/impl/ShardGroupDeletionImpl.java |   7 +
 .../impl/ShardedEdgeSerializationImpl.java      |  28 -
 .../shard/impl/SizebasedEdgeShardStrategy.java  |  13 +-
 .../graph/GraphManagerShardingIT.java           | 208 ------
 .../impl/shard/NodeShardAllocationTest.java     |  48 +-
 .../shard/count/NodeShardApproximationTest.java | 627 -------------------
 .../NodeShardCounterSerializationTest.java      | 124 ----
 .../shard/impl/ShardGroupDeletionImplTest.java  |   3 +
 17 files changed, 25 insertions(+), 1847 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index d2476eb..cf0ffcb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -59,14 +59,10 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumn
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupDeletion;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardApproximationImpl;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.AsyncTaskExecutorImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
@@ -114,9 +110,7 @@ public abstract class GraphModule extends AbstractModule {
          */
 
         bind(NodeShardAllocation.class).to( NodeShardAllocationImpl.class );
-        bind( NodeShardApproximation.class ).to( NodeShardApproximationImpl.class );
         bind( NodeShardCache.class ).to( NodeShardCacheImpl.class );
-        bind( NodeShardCounterSerialization.class ).to( NodeShardCounterSerializationImpl.class );
 
         /**
          * Binding for task tracker
@@ -182,7 +176,6 @@ public abstract class GraphModule extends AbstractModule {
         migrationBinding.addBinding().to( Key.get( EdgeColumnFamilies.class ) );
 
         migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) );
-        migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) );
 
         //Get the old version and the new one
         migrationBinding.addBinding().to( Key.get( EdgeMetadataSerializationV1Impl.class) );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
index 1e02a72..803e31e 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardStrategy.java
@@ -44,15 +44,7 @@ public interface EdgeShardStrategy {
      */
     public Iterator<ShardEntryGroup> getReadShards(final ApplicationScope scope, final long maxTimestamp, final DirectedEdgeMeta directedEdgeMeta );
 
-    /**
-     * Increment our count meta data by the passed value.  Can be a positive or a negative number.
-     * @param scope The scope in the application
-     * @param shard The shard to use
-     * @param count The amount to increment or decrement
-     * @param directedEdgeMeta The edge meta data to use
-     * @return
-     */
-    public void increment(final ApplicationScope scope, Shard shard, long count, final DirectedEdgeMeta directedEdgeMeta );
+
 
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
deleted file mode 100644
index fc39e56..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardApproximation.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-
-/**
- * Interface for creating approximate estimates of shards
- */
-public interface NodeShardApproximation {
-
-
-    /**
-     * Increment the shard Id the specified amount
-     *
-     * @param scope The scope
-     * @param shard The shard to use
-     * @param count The count to increment
-     * @param directedEdgeMeta The directed edge meta data to use
-     */
-    public void increment( final ApplicationScope scope, final Shard shard,
-                           final long count, final DirectedEdgeMeta directedEdgeMeta );
-
-
-    /**
-     * Get the approximation of the number of unique items
-     *
-     * @param scope The scope
-     * @param directedEdgeMeta The directed edge meta data to use
-     */
-    public long getCount( final ApplicationScope scope, final Shard shard,  final DirectedEdgeMeta directedEdgeMeta );
-
-
-    /**
-     * Flush the current counters in the Approximation.  Will return immediately after the flush. You can then use flushPending
-     * to check the state.
-     */
-    public void beginFlush();
-
-    /**
-     * Return true if there is data to be flushed
-     * @return
-     */
-    public boolean flushPending();
-
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
deleted file mode 100644
index f5666a2..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
-
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- * This class is synchronized for addition.  It is meant to be used across multiple threads
- */
-public class Counter {
-    /**
-     * The counter to tell us how often it was invoked
-     */
-    private final AtomicLong invokeCounter;
-
-    /**
-     * Pointer to our "current" counter map.  We beginFlush this when time expires or we hit our count
-     */
-    private final ConcurrentHashMap<ShardKey, AtomicLong> counts;
-
-    /**
-     * The timestamp the concurrent map was created
-     */
-    private final long createTimestamp;
-
-
-    /**
-     * Implementation of the internal counters
-     */
-    public Counter() {
-        this.createTimestamp = System.currentTimeMillis();
-        this.invokeCounter = new AtomicLong();
-        this.counts = new ConcurrentHashMap<>();
-    }
-
-
-    /**
-     * Add the count to the key.
-     */
-    public void add( final ShardKey key, final long count ) {
-        AtomicLong counter = counts.get( key );
-
-        if ( counter == null ) {
-            counter = new AtomicLong();
-            AtomicLong existingCounter = counts.putIfAbsent( key, counter );
-
-            if ( existingCounter != null ) {
-                counter = existingCounter;
-            }
-        }
-
-        counter.addAndGet( count );
-        invokeCounter.incrementAndGet();
-    }
-
-
-    /**
-     * Get the current valye from the cache
-     */
-    public long get( final ShardKey key ) {
-        AtomicLong counter = counts.get( key );
-
-        if ( counter == null ) {
-            return 0;
-        }
-
-        return counter.get();
-    }
-
-
-    /**
-     * Deep copy the counts from other into this counter
-     * @param other
-     */
-    public void merge(final Counter other){
-
-        Preconditions.checkNotNull(other, "other cannot be null");
-        Preconditions.checkNotNull( other.counts, "other.counts cannot be null" );
-
-        for(Map.Entry<ShardKey, AtomicLong> entry: other.counts.entrySet()){
-            add(entry.getKey(), entry.getValue().get());
-        }
-    }
-
-
-    /**
-     * Get all entries
-     * @return
-     */
-    public Set<Map.Entry<ShardKey, AtomicLong>> getEntries(){
-        return counts.entrySet();
-    }
-
-
-    /**
-     * Get the count of the number of times we've been incremented
-     * @return
-     */
-    public long getInvokeCount() {
-        return invokeCounter.get();
-    }
-
-
-
-    public long getCreateTimestamp() {
-        return createTimestamp;
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
deleted file mode 100644
index fceb32c..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
-
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import javax.inject.Inject;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.hystrix.HystrixCommand;
-import com.netflix.hystrix.HystrixCommandGroupKey;
-import com.netflix.hystrix.HystrixThreadPoolProperties;
-
-import rx.functions.Action0;
-import rx.schedulers.Schedulers;
-
-
-/**
- * Implementation for doing edge approximation based on counters.  Uses a guava loading cache to load values from
- * cassandra, and beginFlush them on cache eviction.
- */
-public class NodeShardApproximationImpl implements NodeShardApproximation {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NodeShardApproximationImpl.class);
-
-    /**
-     * Read write locks to ensure we atomically swap correctly
-     */
-    private final ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
-    private final ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
-    private final ReentrantReadWriteLock.WriteLock writeLockLock = reentrantReadWriteLock.writeLock();
-
-    private final GraphFig graphFig;
-    private final NodeShardCounterSerialization nodeShardCounterSerialization;
-    private final TimeService timeService;
-
-    /**
-     * Counter currently implemented
-     */
-    private volatile Counter currentCounter;
-
-    /**
-     * The counter that is currently in process of flushing to Cassandra.  Can be null
-     */
-    private final BlockingQueue<Counter> flushQueue;
-
-    private final FlushWorker worker;
-
-    /**
-        * Command group used for realtime user commands
-        */
-       public static final HystrixCommand.Setter
-           COUNT_GROUP = HystrixCommand.Setter.withGroupKey(
-               HystrixCommandGroupKey.Factory.asKey( "BatchCounterRollup" ) ).andThreadPoolPropertiesDefaults(
-                   HystrixThreadPoolProperties.Setter().withCoreSize( 100 ) );
-
-
-    /**
-     * Create a time shard approximation with the correct configuration.
-     */
-    @Inject
-    public NodeShardApproximationImpl( final GraphFig graphFig,
-                                       final NodeShardCounterSerialization nodeShardCounterSerialization,
-                                       final TimeService timeService ) {
-        this.graphFig = graphFig;
-        this.nodeShardCounterSerialization = nodeShardCounterSerialization;
-        this.timeService = timeService;
-        this.currentCounter = new Counter();
-        this.flushQueue = new LinkedBlockingQueue<>( graphFig.getCounterFlushQueueSize() );
-
-        this.worker = new FlushWorker( this.flushQueue, nodeShardCounterSerialization );
-
-        Schedulers.newThread().createWorker().schedule( worker );
-
-    }
-
-
-    @Override
-    public void increment(
-            final ApplicationScope scope, final Shard shard,
-            final long count, final DirectedEdgeMeta directedEdgeMeta  ) {
-
-
-        final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta );
-
-        readLock.lock();
-
-
-        try {
-            currentCounter.add( key, count );
-        }
-        finally {
-            readLock.unlock();
-        }
-
-        checkFlush();
-    }
-
-
-    @Override
-    public long getCount( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
-
-        final ShardKey key = new ShardKey( scope, shard, directedEdgeMeta );
-
-
-        readLock.lock();
-
-        long count;
-
-        try {
-            count = currentCounter.get( key );
-
-        }
-        finally {
-            readLock.unlock();
-        }
-
-
-        //read from Cassandra and add to get a "close enough" number
-        return count + nodeShardCounterSerialization.getCount( key );
-    }
-
-
-    @Override
-    public void beginFlush() {
-
-        writeLockLock.lock();
-
-        try {
-
-            final boolean queued = flushQueue.offer( currentCounter );
-
-            /**
-             * We were able to q the beginFlush, swap it
-             */
-            if ( queued ) {
-                currentCounter = new Counter();
-            }
-        }
-        finally {
-            writeLockLock.unlock();
-        }
-    }
-
-
-    @Override
-    public boolean flushPending() {
-        return flushQueue.size() > 0 || worker.isFlushing();
-    }
-
-
-    /**
-     * Check if we need to beginFlush.  If we do, perform the beginFlush
-     */
-    private void checkFlush() {
-
-        //there's no beginFlush pending and we're past the timeout or count
-        if ( currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() > timeService.getCurrentTime()
-                || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount() ) {
-            beginFlush();
-        }
-    }
-
-
-    /**
-     * Worker that will take from the queue
-     */
-    private static class FlushWorker implements Action0 {
-
-        private final BlockingQueue<Counter> counterQueue;
-        private final NodeShardCounterSerialization nodeShardCounterSerialization;
-
-        private volatile Counter rollUp;
-
-
-        private FlushWorker( final BlockingQueue<Counter> counterQueue,
-                             final NodeShardCounterSerialization nodeShardCounterSerialization ) {
-            this.counterQueue = counterQueue;
-            this.nodeShardCounterSerialization = nodeShardCounterSerialization;
-        }
-
-
-        @Override
-        public void call() {
-
-
-            while ( true ) {
-                /**
-                 * Block taking the first element.  Once we take this, batch drain and roll up the rest
-                 */
-
-                try {
-                    rollUp = null;
-                    rollUp = counterQueue.take();
-                }
-                catch ( InterruptedException e ) {
-                    LOG.error( "Unable to read from counter queue", e );
-                    throw new RuntimeException( "Unable to read from counter queue", e );
-
-                }
-
-
-
-
-                //copy to the batch outside of the command for performance
-                final MutationBatch batch = nodeShardCounterSerialization.flush( rollUp );
-
-                /**
-                 * Execute the command in hystrix to avoid slamming cassandra
-                 */
-                new HystrixCommand( COUNT_GROUP ) {
-
-                    @Override
-                    protected Void run() throws Exception {
-                        batch.execute();
-
-                        return null;
-                    }
-
-
-                    @Override
-                    protected Object getFallback() {
-                        //we've failed to mutate.  Merge this count back into the current one
-                        counterQueue.offer( rollUp );
-
-                        return null;
-                    }
-                }.execute();
-            }
-
-        }
-
-
-        /**
-         * Return true if we're in the process of flushing
-         * @return
-         */
-        public boolean isFlushing(){
-            return rollUp != null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
deleted file mode 100644
index aafbd26..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
-
-
-import org.apache.usergrid.persistence.core.migration.schema.Migration;
-
-import com.netflix.astyanax.MutationBatch;
-
-
-/**
- * Serialization for flushing and reading counters
- */
-public interface NodeShardCounterSerialization extends Migration {
-
-
-    /**
-     * Flush the counter to the mutation batch
-     * @param counter
-     * @return
-     */
-    public MutationBatch flush(Counter counter);
-
-
-    /**
-     * Get the count of this shard, if it exists.
-     * @param key The shard key to get
-     * @return
-     */
-    public long getCount(ShardKey key);
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
deleted file mode 100644
index 6934275..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
-
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.CounterColumnType;
-
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
-import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-import com.netflix.astyanax.serializers.BooleanSerializer;
-
-
-@Singleton
-public class NodeShardCounterSerializationImpl implements NodeShardCounterSerialization {
-
-
-    private static final ShardKeySerializer SHARD_KEY_SERIALIZER = new ShardKeySerializer();
-
-    /**
-     * Edge shards
-     */
-    private static final MultiTennantColumnFamily<ScopedRowKey<ShardKey>, Boolean> EDGE_SHARD_COUNTS =
-            new MultiTennantColumnFamily<>( "Edge_Shard_Counts",
-                    new ScopedRowKeySerializer<>( SHARD_KEY_SERIALIZER ), BooleanSerializer.get() );
-
-
-    protected final Keyspace keyspace;
-    protected final CassandraConfig cassandraConfig;
-    protected final GraphFig graphFig;
-
-
-    @Inject
-    public NodeShardCounterSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
-                                            final GraphFig graphFig ) {
-        this.keyspace = keyspace;
-        this.cassandraConfig = cassandraConfig;
-        this.graphFig = graphFig;
-    }
-
-
-    @Override
-    public MutationBatch flush( final Counter counter ) {
-
-
-        Preconditions.checkNotNull( counter, "counter must be specified" );
-
-
-        final MutationBatch batch = keyspace.prepareMutationBatch();
-
-        for ( Map.Entry<ShardKey, AtomicLong> entry : counter.getEntries() ) {
-
-            final ShardKey key = entry.getKey();
-            final long value = entry.getValue().get();
-
-
-            final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope.getApplication(), key );
-
-
-            batch.withRow( EDGE_SHARD_COUNTS, rowKey ).incrementCounterColumn(true , value );
-        }
-
-
-        return batch;
-    }
-
-
-    @Override
-    public long getCount( final ShardKey key ) {
-
-        final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.scope.getApplication(), key );
-
-
-        OperationResult<Column<Boolean>> column = null;
-        try {
-            column = keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( true ).execute();
-        }
-        //column not found, return 0
-        catch ( NotFoundException nfe ) {
-            return 0;
-        }
-        catch ( ConnectionException e ) {
-            throw new RuntimeException( "Unable to read from cassandra", e );
-        }
-
-        return column.getResult().getLongValue();
-    }
-
-
-    @Override
-    public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
-        return Collections.singleton(
-                new MultiTennantColumnFamilyDefinition( EDGE_SHARD_COUNTS, BytesType.class.getSimpleName(),
-                        ColumnTypes.BOOLEAN, CounterColumnType.class.getSimpleName(),
-                        MultiTennantColumnFamilyDefinition.CacheOption.ALL ) );
-    }
-
-
-
-    private static class ShardKeySerializer implements CompositeFieldSerializer<ShardKey> {
-
-
-        private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
-
-        private static final EdgeShardRowKeySerializer EDGE_SHARD_ROW_KEY_SERIALIZER = EdgeShardRowKeySerializer.INSTANCE;
-
-
-        @Override
-        public void toComposite( final CompositeBuilder builder, final ShardKey key ) {
-
-            ID_SER.toComposite( builder, key.scope.getApplication() );
-
-            EDGE_SHARD_ROW_KEY_SERIALIZER.toComposite( builder, key.directedEdgeMeta );
-
-            builder.addLong( key.shard.getShardIndex() );
-
-            builder.addLong( key.shard.getCreatedTime() );
-        }
-
-
-        @Override
-        public ShardKey fromComposite( final CompositeParser composite ) {
-
-            final Id applicationId = ID_SER.fromComposite( composite );
-
-            final ApplicationScope scope = new ApplicationScopeImpl( applicationId );
-
-            final DirectedEdgeMeta directedEdgeMeta = EDGE_SHARD_ROW_KEY_SERIALIZER.fromComposite( composite );
-
-            final long shardIndex = composite.readLong();
-
-            final long shardCreatedTime = composite.readLong();
-
-            return new ShardKey( scope, new Shard( shardIndex, shardCreatedTime, false ), directedEdgeMeta );
-        }
-
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
deleted file mode 100644
index c976210..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-
-
-/**
- * Key for shards and counts
- */
-public class ShardKey {
-    public final ApplicationScope scope;
-    public final Shard shard;
-    public final DirectedEdgeMeta directedEdgeMeta;
-
-
-    public ShardKey( final ApplicationScope scope, final Shard shard, final DirectedEdgeMeta directedEdgeMeta ) {
-        this.scope = scope;
-        this.shard = shard;
-        this.directedEdgeMeta = directedEdgeMeta;
-    }
-
-
-    @Override
-    public boolean equals( final Object o ) {
-        if ( this == o ) {
-            return true;
-        }
-        if ( o == null || getClass() != o.getClass() ) {
-            return false;
-        }
-
-        final ShardKey shardKey = ( ShardKey ) o;
-
-        if ( !directedEdgeMeta.equals( shardKey.directedEdgeMeta ) ) {
-            return false;
-        }
-        if ( !scope.equals( shardKey.scope ) ) {
-            return false;
-        }
-        if ( !shard.equals( shardKey.shard ) ) {
-            return false;
-        }
-
-        return true;
-    }
-
-
-    @Override
-    public int hashCode() {
-        int result = scope.hashCode();
-        result = 31 * result + shard.hashCode();
-        result = 31 * result + directedEdgeMeta.hashCode();
-        return result;
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index b0875af..8943737 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -37,7 +37,6 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEd
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
@@ -63,7 +62,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
     private final EdgeShardSerialization edgeShardSerialization;
     private final EdgeColumnFamilies edgeColumnFamilies;
     private final ShardedEdgeSerialization shardedEdgeSerialization;
-    private final NodeShardApproximation nodeShardApproximation;
     private final TimeService timeService;
     private final GraphFig graphFig;
     private final ShardGroupCompaction shardGroupCompaction;
@@ -72,13 +70,11 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
     @Inject
     public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization,
                                     final EdgeColumnFamilies edgeColumnFamilies,
-                                    final ShardedEdgeSerialization shardedEdgeSerialization,
-                                    final NodeShardApproximation nodeShardApproximation, final TimeService timeService,
+                                    final ShardedEdgeSerialization shardedEdgeSerialization, final TimeService timeService,
                                     final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction ) {
         this.edgeShardSerialization = edgeShardSerialization;
         this.edgeColumnFamilies = edgeColumnFamilies;
         this.shardedEdgeSerialization = shardedEdgeSerialization;
-        this.nodeShardApproximation = nodeShardApproximation;
         this.timeService = timeService;
         this.graphFig = graphFig;
         this.shardGroupCompaction = shardGroupCompaction;
@@ -166,18 +162,11 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
          * Check out if we have a count for our shard allocation
          */
 
-        final long count = nodeShardApproximation.getCount( scope, shard, directedEdgeMeta );
+
 
         final long shardSize = graphFig.getShardSize();
 
 
-        if ( count < shardSize ) {
-            return false;
-        }
-
-        if ( LOG.isDebugEnabled() ) {
-            LOG.debug( "Count of {} has exceeded shard config of {} will begin compacting", count, shardSize );
-        }
 
         /**
          * We want to allocate a new shard as close to the max value as possible.  This way if we're filling up a
@@ -234,10 +223,10 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
         /**
-         * Sanity check in case our counters become severely out of sync with our edge state in cassandra.
+         * Sanity check in case we audit before we have a full shard
          */
         if ( marked == null ) {
-            LOG.warn( "Incorrect shard count for shard group {}", shardEntryGroup );
+            LOG.info( "Shard {} in shard group {} not full, not splitting", shardEntryGroup );
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
index 6d2a009..ea10ed5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -167,6 +167,13 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
                 continue;
             }
 
+            //The shard is not compacted, we cannot remove it.  This should never happen, a bit of an "oh shit" scenario.
+            //the isCompactionPending should return false in this case
+            if(!shard.isCompacted()){
+                logger.warn( "Shard {} in group {} is not compacted yet was checked.  Short circuiting", shard, shardEntryGroup );
+                return DeleteResult.NO_OP;
+            }
+
 
             final MutationBatch shardRemovalMutation =
                 edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index 2ef50f5..1060495 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -123,10 +123,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                             final Shard shard, final boolean isDeleted ) {
 
                 batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
-
-                if ( !isDeleted ) {
-                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
-                }
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -153,11 +149,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
 
                 batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
-
-
-                if ( !isDeleted ) {
-                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
-                }
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -182,10 +173,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
                 batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).putColumn( edge, isDeleted );
 
-
-                if ( !isDeleted ) {
-                    writeEdgeShardStrategy.increment( scope, shard, 1, targetEdgeMeta );
-                }
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -212,11 +199,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
                 batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
                      .putColumn( edge, isDeleted );
-
-
-                if ( !isDeleted ) {
-                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
-                }
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -241,11 +223,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                             final boolean isDeleted ) {
                 batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
                      .putColumn( column, isDeleted );
-
-
-                if ( !isDeleted ) {
-                    writeEdgeShardStrategy.increment( scope, shard, 1, directedEdgeMeta );
-                }
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -265,7 +242,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                             final Shard shard, final boolean isDeleted ) {
 
                 batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).deleteColumn( edge );
-                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -288,7 +264,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
                 batch.withRow( columnFamilies.getSourceNodeTargetTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
                      .deleteColumn( edge );
-                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -308,7 +283,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                             final Shard shard, final boolean isDeleted ) {
 
                 batch.withRow( columnFamily, ScopedRowKey.fromKey( scope.getApplication(), rowKey ) ).deleteColumn( edge );
-                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -331,7 +305,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
                 batch.withRow( columnFamilies.getTargetNodeSourceTypeCfName(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
                      .deleteColumn( edge );
-                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
             }
         }.createBatch( scope, shards, timestamp );
     }
@@ -351,7 +324,6 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
                             final boolean isDeleted ) {
                 batch.withRow( columnFamilies.getGraphEdgeVersions(), ScopedRowKey.fromKey( scope.getApplication(), rowKey ) )
                      .deleteColumn( column );
-                writeEdgeShardStrategy.increment( scope, shard, -1, directedEdgeMeta );
             }
         }.createBatch( scope, shards, timestamp );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
index 8787d97..cf164ba 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
@@ -25,9 +25,7 @@ import java.util.Iterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 
 import com.google.inject.Inject;
@@ -42,14 +40,11 @@ public class SizebasedEdgeShardStrategy implements EdgeShardStrategy {
 
 
     private final NodeShardCache shardCache;
-    private final NodeShardApproximation shardApproximation;
 
 
     @Inject
-    public SizebasedEdgeShardStrategy( final NodeShardCache shardCache,
-                                       final NodeShardApproximation shardApproximation ) {
+    public SizebasedEdgeShardStrategy( final NodeShardCache shardCache) {
         this.shardCache = shardCache;
-        this.shardApproximation = shardApproximation;
     }
 
 
@@ -65,10 +60,4 @@ public class SizebasedEdgeShardStrategy implements EdgeShardStrategy {
         return shardCache.getReadShardGroup( scope, maxTimestamp, directedEdgeMeta );
     }
 
-
-    @Override
-    public void increment( final ApplicationScope scope, final Shard shard,
-                           final long count, final DirectedEdgeMeta directedEdgeMeta) {
-        shardApproximation.increment( scope, shard,  count, directedEdgeMeta );
-    }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
deleted file mode 100644
index 2951efe..0000000
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied.  See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
- *
- */
-package org.apache.usergrid.persistence.graph;
-
-
-import java.util.concurrent.TimeoutException;
-
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.persistence.core.guice.MigrationManagerRule;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
-import org.apache.usergrid.persistence.core.test.ITRunner;
-import org.apache.usergrid.persistence.core.test.UseModules;
-import org.apache.usergrid.persistence.core.util.IdGenerator;
-import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.inject.Inject;
-
-import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
-import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-
-@RunWith( ITRunner.class )
-@UseModules( TestGraphModule.class )
-@Ignore("Kills cassandra")
-public class GraphManagerShardingIT {
-
-
-    @Inject
-    @Rule
-    public MigrationManagerRule migrationManagerRule;
-
-
-    @Inject
-    protected GraphManagerFactory emf;
-
-
-    @Inject
-    protected GraphFig graphFig;
-
-    @Inject
-    protected NodeShardApproximation nodeShardApproximation;
-
-    protected ApplicationScope scope;
-
-
-
-
-    @Before
-    public void mockApp() {
-        this.scope = new ApplicationScopeImpl( IdGenerator.createId( "application" )  );
-    }
-
-
-    @Test
-    public void testWriteSourceType() throws TimeoutException, InterruptedException {
-
-        GraphManager gm = emf.createEdgeManager( scope ) ;
-
-        final Id sourceId = IdGenerator.createId( "source" );
-        final String edgeType = "test";
-
-
-
-
-        final long flushCount = graphFig.getCounterFlushCount();
-        final long maxShardSize = graphFig.getShardSize();
-
-
-
-
-        final long startTime = System.currentTimeMillis();
-
-        //each edge causes 4 counts
-        final long writeCount = flushCount/4;
-
-        assertTrue( "Shard size must be >= beginFlush Count", maxShardSize >= flushCount );
-
-        Id targetId = null;
-
-        for(long i = 0; i < writeCount; i ++){
-            targetId = IdGenerator.createId( "target" ) ;
-
-            final Edge edge = createEdge( sourceId, edgeType, targetId);
-
-            gm.writeEdge( edge ).toBlocking().last();
-
-        }
-
-
-
-        final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( sourceId, edgeType,
-                targetId.getType() );
-        final Shard shard = new Shard(0, 0, true);
-
-
-        long shardCount = nodeShardApproximation.getCount( scope, shard, sourceEdgeMeta );
-
-        assertEquals("Shard count for source node should be the same as write count", writeCount, shardCount);
-
-
-        //now verify it's correct for the target
-        final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType(targetId,  edgeType, sourceId.getType() );
-
-
-        shardCount = nodeShardApproximation.getCount( scope, shard, targetEdgeMeta );
-
-        assertEquals(1, shardCount);
-
-    }
-
-
-    @Test
-    public void testWriteTargetType() throws TimeoutException, InterruptedException {
-
-        GraphManager gm = emf.createEdgeManager( scope ) ;
-
-        final Id targetId = IdGenerator.createId( "target" );
-        final String edgeType = "test";
-
-
-
-
-        final long flushCount = graphFig.getCounterFlushCount();
-        final long maxShardSize = graphFig.getShardSize();
-
-
-         //each edge causes 4 counts
-        final long writeCount = flushCount/4;
-
-        assertTrue( "Shard size must be >= beginFlush Count", maxShardSize >= flushCount );
-
-        Id sourceId = null;
-
-        for(long i = 0; i < writeCount; i ++){
-            sourceId = IdGenerator.createId( "source" ) ;
-
-            final Edge edge = createEdge( sourceId, edgeType, targetId);
-
-            gm.writeEdge( edge ).toBlocking().last();
-
-        }
-
-
-        //this is from target->source, since the target id doesn't change
-        final DirectedEdgeMeta targetMeta = DirectedEdgeMeta.fromTargetNode( targetId, edgeType );
-        final Shard shard = new Shard(0l, 0l, true);
-
-        long targetWithType = nodeShardApproximation.getCount( scope, shard, targetMeta );
-
-        assertEquals("Shard count for target node should be the same as write count", writeCount, targetWithType);
-
-
-        final DirectedEdgeMeta targetNodeSource = DirectedEdgeMeta.fromTargetNodeSourceType( targetId, edgeType, "source" );
-
-        long shardCount = nodeShardApproximation.getCount( scope, shard, targetNodeSource );
-
-        assertEquals("Shard count for target node should be the same as write count", writeCount, shardCount);
-
-
-        //now verify it's correct for the target
-
-        final DirectedEdgeMeta sourceMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType );
-
-        shardCount = nodeShardApproximation.getCount( scope, shard, sourceMeta );
-
-        assertEquals(1, shardCount);
-
-    }
-
-
-
-
-}
-
-
-
-
-

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index ac965cd..bc364cc 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -98,15 +98,13 @@ public class NodeShardAllocationTest {
 
         final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
-
 
         final TimeService timeService = mock( TimeService.class );
 
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardCounterSerialization, timeService, graphFig, shardGroupCompaction );
+                         timeService, graphFig, shardGroupCompaction );
 
 
         final long timeservicetime = System.currentTimeMillis();
@@ -131,15 +129,12 @@ public class NodeShardAllocationTest {
 
         final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardCounterSerialization = mock( NodeShardApproximation.class );
-
-
         final TimeService timeService = mock( TimeService.class );
 
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardCounterSerialization, timeService, graphFig, shardGroupCompaction );
+                      timeService, graphFig, shardGroupCompaction );
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -175,14 +170,11 @@ public class NodeShardAllocationTest {
 
         final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-
-
         final TimeService timeService = mock( TimeService.class );
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+                        timeService, graphFig, shardGroupCompaction );
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -205,8 +197,6 @@ public class NodeShardAllocationTest {
 
         final long count = graphFig.getShardSize() - 1;
 
-        when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( count );
-
         final boolean result = approximation.auditShard( scope, shardEntryGroup, targetEdgeMeta );
 
         assertFalse( "Shard allocated", result );
@@ -223,14 +213,12 @@ public class NodeShardAllocationTest {
 
         final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-
 
         final TimeService timeService = mock( TimeService.class );
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+                        timeService, graphFig, shardGroupCompaction );
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -255,9 +243,6 @@ public class NodeShardAllocationTest {
         final long shardCount = ( long ) ( graphFig.getShardSize() * 2.5 );
 
 
-        //return a shard size equal to our max
-        when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( shardCount );
-
 
         //this is how many we should be iterating and should set the value of the last shard we keep
         final int numToIterate = ( int ) ( graphFig.getShardSize() * 2 );
@@ -338,14 +323,12 @@ public class NodeShardAllocationTest {
 
         final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-
 
         final TimeService timeService = mock( TimeService.class );
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+                         timeService, graphFig, shardGroupCompaction );
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -378,9 +361,6 @@ public class NodeShardAllocationTest {
 
         iteratedEdges.add( returnedEdge );
 
-        //return a shard size equal to our max
-        when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( shardCount );
-
         ArgumentCaptor<Shard> shardValue = ArgumentCaptor.forClass( Shard.class );
 
 
@@ -429,14 +409,12 @@ public class NodeShardAllocationTest {
 
         final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-
 
         final TimeService timeService = mock( TimeService.class );
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+                       timeService, graphFig, shardGroupCompaction );
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -456,8 +434,6 @@ public class NodeShardAllocationTest {
 
         final long shardCount = graphFig.getShardSize();
 
-        //return a shard size equal to our max
-        when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( shardCount );
 
         ArgumentCaptor<Shard> shardValue = ArgumentCaptor.forClass( Shard.class );
 
@@ -495,7 +471,6 @@ public class NodeShardAllocationTest {
 
         final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -503,7 +478,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+                         timeService, graphFig, shardGroupCompaction );
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -630,9 +605,6 @@ public class NodeShardAllocationTest {
 
         final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-
-
         final TimeService timeService = mock( TimeService.class );
 
         final long returnTime = System.currentTimeMillis() + graphFig.getShardCacheTimeout() * 2;
@@ -643,7 +615,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+                       timeService, graphFig, shardGroupCompaction );
 
         final Id nodeId = IdGenerator.createId( "test" );
         final String type = "type";
@@ -712,8 +684,6 @@ public class NodeShardAllocationTest {
 
         final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
 
-        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
-
 
         /**
          * Return 100000 milliseconds
@@ -736,7 +706,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+                      timeService, graphFig, shardGroupCompaction );
 
 
         /**

http://git-wip-us.apache.org/repos/asf/usergrid/blob/591a2f1f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
deleted file mode 100644
index 32a0cda..0000000
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ /dev/null
@@ -1,627 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
-
-
-import java.beans.PropertyChangeListener;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.safehaus.guicyfig.Bypass;
-import org.safehaus.guicyfig.OptionState;
-import org.safehaus.guicyfig.Overrides;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.consistency.TimeService;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.IdGenerator;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.netflix.astyanax.ColumnListMutation;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.WriteAheadLog;
-import com.netflix.astyanax.connectionpool.Host;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.model.ColumnFamily;
-import com.netflix.astyanax.model.ConsistencyLevel;
-import com.netflix.astyanax.retry.RetryPolicy;
-
-import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class NodeShardApproximationTest {
-
-    private static final Logger LOG = LoggerFactory.getLogger( NodeShardApproximation.class );
-
-    private GraphFig graphFig;
-
-    private NodeShardCounterSerialization nodeShardCounterSerialization;
-    private TimeService timeService;
-
-    protected ApplicationScope scope;
-
-
-    @Before
-    public void setup() {
-        scope = mock( ApplicationScope.class );
-
-        Id orgId = mock( Id.class );
-
-        when( orgId.getType() ).thenReturn( "organization" );
-        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
-
-        when( scope.getApplication() ).thenReturn( orgId );
-
-        graphFig = mock( GraphFig.class );
-
-        when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
-        when( graphFig.getShardSize() ).thenReturn( 250000l );
-        when( graphFig.getCounterFlushQueueSize() ).thenReturn( 10000 );
-
-        nodeShardCounterSerialization = mock( NodeShardCounterSerialization.class );
-
-        when( nodeShardCounterSerialization.flush( any( Counter.class ) ) ).thenReturn( mock( MutationBatch.class ) );
-
-
-        timeService = mock( TimeService.class );
-
-        when( timeService.getCurrentTime() ).thenReturn( System.currentTimeMillis() );
-    }
-
-
-    @Test
-    public void testSingleShard() throws InterruptedException {
-
-
-        when( graphFig.getCounterFlushCount() ).thenReturn( 100000l );
-        NodeShardApproximation approximation =
-                new NodeShardApproximationImpl( graphFig, nodeShardCounterSerialization, timeService );
-
-
-        final Id id = IdGenerator.createId( "test" );
-        final Shard shard = new Shard( 0, 0, true );
-        final String type = "type";
-        final String type2 = "subType";
-
-        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
-
-        long count = approximation.getCount( scope, shard, directedEdgeMeta );
-
-        waitForFlush( approximation );
-
-        assertEquals( 0, count );
-    }
-
-
-    @Ignore("outdated and no longer relevant test")
-    @Test
-    public void testSingleShardMultipleThreads() throws ExecutionException, InterruptedException {
-
-
-        NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
-
-        final NodeShardApproximation approximation =
-                new NodeShardApproximationImpl( new TestGraphFig(), serialization, new TestTimeService() );
-
-
-        final int increments = 1000000;
-        final int workers = Runtime.getRuntime().availableProcessors() * 2;
-
-        final Id id = IdGenerator.createId( "test" );
-        final String type = "type";
-        final String type2 = "subType";
-
-        final Shard shard = new Shard( 10000, 0, true );
-
-        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
-
-        ExecutorService executor = Executors.newFixedThreadPool( workers );
-
-        List<Future<Long>> futures = new ArrayList<>( workers );
-
-        for ( int i = 0; i < workers; i++ ) {
-
-            final Future<Long> future = executor.submit( new Callable<Long>() {
-                @Override
-                public Long call() throws Exception {
-
-                    for ( int i = 0; i < increments; i++ ) {
-                        approximation.increment( scope, shard, 1, directedEdgeMeta );
-                    }
-
-                    return 0l;
-                }
-            } );
-
-            futures.add( future );
-        }
-
-
-        for ( Future<Long> future : futures ) {
-            future.get();
-        }
-
-        waitForFlush( approximation );
-        //get our count.  It should be accurate b/c we only have 1 instance
-
-        final long returnedCount = approximation.getCount( scope, shard, directedEdgeMeta );
-        final long expected = workers * increments;
-
-
-        assertEquals( expected, returnedCount );
-
-        //test we get nothing with the other type
-
-        final long emptyCount =
-                approximation.getCount( scope, shard, DirectedEdgeMeta.fromSourceNodeTargetType( id, type, type2 ) );
-
-
-        assertEquals( 0, emptyCount );
-    }
-
-
-    @Ignore("outdated and no longer relevant test")
-    @Test
-    public void testMultipleShardMultipleThreads() throws ExecutionException, InterruptedException {
-
-
-        NodeShardCounterSerialization serialization = new TestNodeShardCounterSerialization();
-
-        final NodeShardApproximation approximation =
-                new NodeShardApproximationImpl( new TestGraphFig(), serialization, new TestTimeService() );
-
-
-        final int increments = 1000000;
-        final int workers = Runtime.getRuntime().availableProcessors() * 2;
-
-        final Id id = IdGenerator.createId( "test" );
-        final String type = "type";
-        final String type2 = "subType";
-
-        final AtomicLong shardIdCounter = new AtomicLong();
-
-
-        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromTargetNodeSourceType( id, type, type2 );
-
-
-        ExecutorService executor = Executors.newFixedThreadPool( workers );
-
-        List<Future<Shard>> futures = new ArrayList<>( workers );
-
-        for ( int i = 0; i < workers; i++ ) {
-
-            final Future<Shard> future = executor.submit( new Callable<Shard>() {
-                @Override
-                public Shard call() throws Exception {
-
-                    final long threadShardId = shardIdCounter.incrementAndGet();
-
-                    final Shard shard = new Shard( threadShardId, 0, true );
-
-                    for ( int i = 0; i < increments; i++ ) {
-                        approximation.increment( scope, shard, 1, directedEdgeMeta );
-                    }
-
-                    return shard;
-                }
-            } );
-
-            futures.add( future );
-        }
-
-
-        for ( Future<Shard> future : futures ) {
-            final Shard shardId = future.get();
-
-            waitForFlush( approximation );
-
-            final long returnedCount = approximation.getCount( scope, shardId, directedEdgeMeta );
-
-            assertEquals( increments, returnedCount );
-        }
-    }
-
-
-    private void waitForFlush( NodeShardApproximation approximation ) throws InterruptedException {
-
-        approximation.beginFlush();
-
-        while ( approximation.flushPending() ) {
-
-            LOG.info( "Waiting on beginFlush to complete" );
-
-            Thread.sleep( 100 );
-        }
-    }
-
-
-    /**
-     * These are created b/c we can't use Mockito.  It OOM's with keeping track of all the mock invocations
-     */
-
-    private static class TestNodeShardCounterSerialization implements NodeShardCounterSerialization {
-
-        private Counter copy = new Counter();
-
-
-        @Override
-        public MutationBatch flush( final Counter counter ) {
-            copy.merge( counter );
-            return new TestMutationBatch();
-        }
-
-
-        @Override
-        public long getCount( final ShardKey key ) {
-            return copy.get( key );
-        }
-
-
-        @Override
-        public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-    }
-
-
-    /**
-     * Simple test mutation to no-op during tests
-     */
-    private
-    static class TestMutationBatch implements MutationBatch {
-
-        @Override
-        public <K, C> ColumnListMutation<C> withRow( final ColumnFamily<K, C> columnFamily, final K rowKey ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public <K> void deleteRow( final Iterable<? extends ColumnFamily<K, ?>> columnFamilies, final K rowKey ) {
-            //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public void discardMutations() {
-            //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public void mergeShallow( final MutationBatch other ) {
-            //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public boolean isEmpty() {
-            return false;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public int getRowCount() {
-            return 0;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public Map<ByteBuffer, Set<String>> getRowKeys() {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public MutationBatch pinToHost( final Host host ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public MutationBatch setConsistencyLevel( final ConsistencyLevel consistencyLevel ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public MutationBatch withConsistencyLevel( final ConsistencyLevel consistencyLevel ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public MutationBatch withRetryPolicy( final RetryPolicy retry ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public MutationBatch usingWriteAheadLog( final WriteAheadLog manager ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public MutationBatch lockCurrentTimestamp() {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public MutationBatch setTimeout( final long timeout ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public MutationBatch setTimestamp( final long timestamp ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public MutationBatch withTimestamp( final long timestamp ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public MutationBatch withAtomicBatch( final boolean condition ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public ByteBuffer serialize() throws Exception {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public void deserialize( final ByteBuffer data ) throws Exception {
-            //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public OperationResult<Void> execute() throws ConnectionException {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public ListenableFuture<OperationResult<Void>> executeAsync() throws ConnectionException {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-    }
-
-
-    private static class TestGraphFig implements GraphFig {
-
-        @Override
-        public int getScanPageSize() {
-            return 0;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public int getRepairConcurrentSize() {
-            return 0;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public double getShardRepairChance() {
-            return 0;
-        }
-
-
-        @Override
-        public long getShardSize() {
-            return 0;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public long getShardCacheTimeout() {
-            return 0;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public long getShardMinDelta() {
-            return 0;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public long getShardCacheSize() {
-            return 0;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public int getShardCacheRefreshWorkerCount() {
-            return 0;
-        }
-
-
-        @Override
-        public int getShardAuditWorkerCount() {
-            return 0;
-        }
-
-
-        @Override
-        public int getShardAuditWorkerQueueSize() {
-            return 0;
-        }
-
-
-        @Override
-        public long getCounterFlushCount() {
-            return 100000l;
-        }
-
-
-        @Override
-        public long getCounterFlushInterval() {
-            return 30000l;
-        }
-
-
-        @Override
-        public int getCounterFlushQueueSize() {
-            return 10000;
-        }
-
-
-        @Override
-        public void addPropertyChangeListener( final PropertyChangeListener propertyChangeListener ) {
-            //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public void removePropertyChangeListener( final PropertyChangeListener propertyChangeListener ) {
-            //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public OptionState[] getOptions() {
-            return new OptionState[0];  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public OptionState getOption( final String s ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public String getKeyByMethod( final String s ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public Object getValueByMethod( final String s ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public Properties filterOptions( final Properties properties ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public Map<String, Object> filterOptions( final Map<String, Object> stringObjectMap ) {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public void override( final String s, final String s2 ) {
-            //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public boolean setOverrides( final Overrides overrides ) {
-            return false;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public Overrides getOverrides() {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public void bypass( final String s, final String s2 ) {
-            //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public boolean setBypass( final Bypass bypass ) {
-            return false;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public Bypass getBypass() {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public Class getFigInterface() {
-            return null;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-
-
-        @Override
-        public boolean isSingleton() {
-            return false;  //To change body of implemented methods use File | Settings | File Templates.
-        }
-    }
-
-
-    private static class TestTimeService implements TimeService {
-
-        @Override
-        public long getCurrentTime() {
-            return System.currentTimeMillis();
-        }
-    }
-}


Mime
View raw message