usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [05/52] [abbrv] Updated OrderedMerge to use a faster implementation at runtime. After initialization, it's an O(1) emit operation as long as our produces are fast enough.
Date Wed, 03 Sep 2014 22:11:00 GMT
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
index d0a0525..d39e433 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerLoadTest.java
@@ -112,7 +112,7 @@ public class GraphManagerLoadTest {
 
             @Override
             public Observable<Edge> doSearch( final GraphManager manager ) {
-                 return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), null ) );
+                 return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, null) );
             }
         };
 
@@ -139,7 +139,7 @@ public class GraphManagerLoadTest {
 
             @Override
             public Observable<Edge> doSearch( final GraphManager manager ) {
-                return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), null ) );
+                return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, null ) );
             }
         };
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 59bf014..c01d157 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -21,27 +21,31 @@
 package org.apache.usergrid.persistence.graph;
 
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintWriter;
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nullable;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.Marker;
-
-import org.apache.commons.lang.time.StopWatch;
 
 import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
 import org.apache.usergrid.persistence.core.migration.MigrationException;
@@ -51,29 +55,33 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Slf4jReporter;
-import com.google.common.base.Optional;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.netflix.config.ConfigurationManager;
 
 import rx.Observable;
+import rx.functions.Action1;
 
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 
-@Ignore("A stress test, not part of functional testing")
+//@Ignore( "A stress test, not part of functional testing" )
 public class GraphManagerShardConsistencyIT {
     private static final Logger log = LoggerFactory.getLogger( GraphManagerShardConsistencyIT.class );
 
@@ -85,12 +93,9 @@ public class GraphManagerShardConsistencyIT {
 
     private static final Meter writeMeter = registry.meter( "writeThroughput" );
 
-    private static final Slf4jReporter reporter = Slf4jReporter.forRegistry( registry )
-                                                .outputTo( log )
-                                                .convertRatesTo( TimeUnit.SECONDS )
-                                                .convertDurationsTo( TimeUnit.MILLISECONDS )
-                                                .build();
-
+    private static final Slf4jReporter reporter =
+            Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS )
+                         .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
 
 
     protected ApplicationScope scope;
@@ -102,6 +107,7 @@ public class GraphManagerShardConsistencyIT {
 
     protected Object originalShardDelta;
 
+
     @Before
     public void setupOrg() {
 
@@ -110,82 +116,52 @@ public class GraphManagerShardConsistencyIT {
 
         originalShardTimeout = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_CACHE_TIMEOUT );
 
-        originalShardDelta =  ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_MIN_DELTA );
+        originalShardDelta = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_MIN_DELTA );
 
 
-        ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 10000 );
+        ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 1000 );
 
 
-        final long cacheTimeout = 10000;
-        //set our cache timeout to 10 seconds
+        final long cacheTimeout = 5000;
+        //set our cache timeout to the above value
         ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_CACHE_TIMEOUT, cacheTimeout );
 
 
-        final long minDelta = ( long ) (cacheTimeout * 2.5);
+        final long minDelta = ( long ) ( cacheTimeout * 2.5 );
 
         ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_MIN_DELTA, minDelta );
 
 
-
-
         //get the system property of the UUID to use.  If one is not set, use the defualt
         String uuidString = System.getProperty( "org.id", "80a42760-b699-11e3-a5e2-0800200c9a66" );
 
         scope = new ApplicationScopeImpl( createId( UUID.fromString( uuidString ), "test" ) );
 
 
-
         reporter.start( 10, TimeUnit.SECONDS );
     }
 
 
     @After
-    public void tearDown(){
+    public void tearDown() {
         reporter.stop();
         reporter.report();
     }
 
 
-//    @Test
-//    public void writeThousandsSingleSource() throws InterruptedException, ExecutionException {
-//        EdgeGenerator generator = new EdgeGenerator() {
-//
-//            private Id sourceId = createId( "source" );
-//
-//
-//            @Override
-//            public Edge newEdge() {
-//                Edge edge = createEdge( sourceId, "test", createId( "target" ) );
-//
-//
-//                return edge;
-//            }
-//
-//
-//            @Override
-//            public Observable<Edge> doSearch( final GraphManager manager ) {
-//                return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), null ) );
-//            }
-//        };
-//
-//
-//
-//        doTest( generator );
-//    }
-
-
     @Test
-    public void writeThousandsSingleTarget() throws InterruptedException, ExecutionException, MigrationException {
+    public void writeThousandsSingleSource()
+            throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException {
 
-        final Id sourceId = createId("source");
+        final Id sourceId = createId( "source" );
         final String edgeType = "test";
 
-        EdgeGenerator generator = new EdgeGenerator() {
+        final EdgeGenerator generator = new EdgeGenerator() {
 
 
             @Override
             public Edge newEdge() {
-                Edge edge = createEdge( sourceId, edgeType,  createId( "target" ) );
+                Edge edge = createEdge( sourceId, edgeType, createId( "target" ) );
 
 
                 return edge;
@@ -194,59 +170,84 @@ public class GraphManagerShardConsistencyIT {
 
             @Override
             public Observable<Edge> doSearch( final GraphManager manager ) {
-                return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), null ) );
+                return manager.loadEdgesFromSource(
+                        new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE,
+                                SearchByEdgeType.Order.DESCENDING, null ) );
             }
         };
 
 
-        final int numInjectors = 2;
+//        final int numInjectors = 2;
+        final int numInjectors = 1;
 
         /**
          * create 3 injectors.  This way all the caches are independent of one another.  This is the same as
          * multiple nodes
          */
-        final List<Injector> injectors = createInjectors(numInjectors);
+        final List<Injector> injectors = createInjectors( numInjectors );
 
 
         final GraphFig graphFig = getInstance( injectors, GraphFig.class );
 
-        final long shardSize =  graphFig.getShardSize();
+        final long shardSize = graphFig.getShardSize();
 
 
-        //we don't want to starve the cass runtime since it will be on the same box. Only take 50% of processing power for writes
-        final int numProcessors = Runtime.getRuntime().availableProcessors() /2 ;
+        //we don't want to starve the cass runtime since it will be on the same box. Only take 50% of processing
+        // power for writes
+        final int numProcessors = Runtime.getRuntime().availableProcessors() / 2;
 
-        final int numWorkers = numProcessors/numInjectors;
+        final int numWorkersPerInjector = numProcessors / numInjectors;
 
 
         /**
          * Do 4x shard size so we should have approximately 4 shards
          */
-        final long numberOfEdges =  shardSize * 4;
+        final long numberOfEdges = shardSize * 4;
 
 
-        final long countPerWorker = numberOfEdges/numWorkers;
+        final long workerWriteLimit = numberOfEdges / numWorkersPerInjector;
 
-        final long writeLimit = countPerWorker;
 
 
+        final long expectedShardCount = numberOfEdges/shardSize;
+
+
+        final ListeningExecutorService
+                executor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( numWorkersPerInjector ) );
+
+
+        final AtomicLong writeCounter = new AtomicLong();
+
 
         //min stop time the min delta + 1 cache cycle timeout
         final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout();
 
 
-
+        log.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit, numWorkersPerInjector,
+                numInjectors );
 
 
         final List<Future<Boolean>> futures = new ArrayList<>();
 
-        for(Injector injector: injectors) {
+
+
+        for ( Injector injector : injectors ) {
             final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class );
 
-            futures.addAll( doTest( gmf, generator, numWorkers,  writeLimit, minExecutionTime ) );
+
+            for ( int i = 0; i < numWorkersPerInjector; i++ ) {
+                Future<Boolean> future = executor
+                        .submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime, writeCounter ) );
+
+                futures.add( future );
+            }
+
         }
 
-        for(Future<Boolean> future: futures){
+        /**
+         * Wait for all writes to complete
+         */
+        for ( Future<Boolean> future : futures ) {
             future.get();
         }
 
@@ -255,53 +256,134 @@ public class GraphManagerShardConsistencyIT {
 
         final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, edgeType );
 
-        int count = 0;
+        //now submit the readers.
+        final GraphManagerFactory gmf = getInstance( injectors, GraphManagerFactory.class );
 
-        while(true) {
 
-            //reset our count.  Ultimately we'll have 4 groups once our compaction completes
-            count = 0;
+        final long writeCount = writeCounter.get();
+        final Meter readMeter = registry.meter( "readThroughput" );
+
 
+        /**
+         * Start reading continuously while we migrate data to ensure our view is always correct
+         */
+        final ListenableFuture<Long> future = executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
+
+        final List<Throwable> failures = new ArrayList<>(  );
+
+
+
+
+        //add the future
+        Futures.addCallback( future, new FutureCallback<Long>() {
+
+            @Override
+            public void onSuccess( @Nullable final Long result ) {
+                log.info( "Successfully ran the read, re-running" );
+                executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
+            }
+
+
+            @Override
+            public void onFailure( final Throwable t ) {
+                failures.add( t );
+                log.error( "Failed test!", t );
+            }
+        } );
+
+
+
+        int compactedCount;
+
+
+
+
+
+        //now start our readers
+
+        while ( true ) {
+
+            if(!failures.isEmpty()){
+
+                StringBuilder builder = new StringBuilder(  );
+
+                builder.append("Read runner failed!\n");
+
+                for(Throwable t: failures){
+                    builder.append( "Exception is: " );
+                    ByteArrayOutputStream output = new ByteArrayOutputStream(  );
+
+                    t.printStackTrace( new PrintWriter( output ) );
+
+                    builder.append( output.toString( "UTF-8" ));
+                    builder.append( "\n\n" );
+
+                }
+
+
+
+                fail(builder.toString());
+            }
+
+            //reset our count.  Ultimately we'll have 4 groups once our compaction completes
+            compactedCount = 0;
 
             //we have to get it from the cache, because this will trigger the compaction process
             final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+            final Set<ShardEntryGroup> shardEntryGroups = new HashSet<>();
+
+            while ( groups.hasNext() ) {
 
-            while(groups.hasNext()){
                 final ShardEntryGroup group = groups.next();
+                shardEntryGroups.add( group );
 
                 log.info( "Compaction pending status for group {} is {}", group, group.isCompactionPending() );
 
-                count++;
-
+                if ( !group.isCompactionPending() ) {
+                    compactedCount++;
+                }
             }
 
+
             //we're done
-            if(count == 4){
+            if ( compactedCount >= expectedShardCount ) {
+                log.info( "All compactions complete, sleeping" );
+
+//                final Object mutex = new Object();
+//
+//                synchronized ( mutex ){
+//
+//                    mutex.wait();
+//                }
+
                 break;
+
             }
 
-            Thread.sleep(5000);
-        }
 
+            Thread.sleep( 2000 );
 
+        }
+
+        executor.shutdownNow();
 
 
     }
 
-    private <T> T getInstance(final List<Injector> injectors, Class<T> clazz ){
+
+    private <T> T getInstance( final List<Injector> injectors, Class<T> clazz ) {
         return injectors.get( 0 ).getInstance( clazz );
     }
 
 
     /**
      * Create new Guice injector environments and return them
-     * @param count
      */
     private List<Injector> createInjectors( int count ) throws MigrationException {
 
-        final List<Injector> injectors = new ArrayList<>(count);
+        final List<Injector> injectors = new ArrayList<>( count );
 
-        for(int i = 0; i < count; i++){
+        for ( int i = 0; i < count; i++ ) {
             final Injector injector = Guice.createInjector( new TestGraphModule() );
             injectors.add( injector );
         }
@@ -312,42 +394,27 @@ public class GraphManagerShardConsistencyIT {
         migrationManager.migrate();
 
         return injectors;
-
-
     }
 
-    /**
-     * Execute the test with the generator
-     */
-    private List<Future<Boolean>> doTest(final GraphManagerFactory factory, final EdgeGenerator generator, final int numWorkers, final long writeCount, final long minExecutionTime ) throws InterruptedException, ExecutionException {
-
-        ExecutorService executor = Executors.newFixedThreadPool( numWorkers );
-
-        List<Future<Boolean>> futures = new ArrayList<>( numWorkers );
 
-        for ( int i = 0; i < numWorkers; i++ ) {
-            Future<Boolean> future = executor.submit( new Worker(factory, generator, writeCount, minExecutionTime ) );
-
-            futures.add( future );
-        }
 
 
-        return futures;
-    }
-
 
     private class Worker implements Callable<Boolean> {
         private final GraphManagerFactory factory;
         private final EdgeGenerator generator;
         private final long writeLimit;
         private final long minExecutionTime;
+        private final AtomicLong writeCounter;
 
 
-        private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit, final long minExecutionTime ) {
+        private Worker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeLimit,
+                        final long minExecutionTime, final AtomicLong writeCounter ) {
             this.factory = factory;
             this.generator = generator;
             this.writeLimit = writeLimit;
             this.minExecutionTime = minExecutionTime;
+            this.writeCounter = writeCounter;
         }
 
 
@@ -356,11 +423,10 @@ public class GraphManagerShardConsistencyIT {
             GraphManager manager = factory.createEdgeManager( scope );
 
 
-
             final long startTime = System.currentTimeMillis();
 
 
-            for ( long i = 0; i < writeLimit || System.currentTimeMillis() - startTime < minExecutionTime ; i++ ) {
+            for ( long i = 0; i < writeLimit || System.currentTimeMillis() - startTime < minExecutionTime; i++ ) {
 
                 Edge edge = generator.newEdge();
 
@@ -372,6 +438,10 @@ public class GraphManagerShardConsistencyIT {
 
                 writeMeter.mark();
 
+                writeCounter.incrementAndGet();
+
+
+
                 if ( i % 1000 == 0 ) {
                     log.info( "   Wrote: " + i );
                 }
@@ -383,6 +453,94 @@ public class GraphManagerShardConsistencyIT {
     }
 
 
+    private class ReadWorker implements Callable<Long> {
+        private final GraphManagerFactory factory;
+        private final EdgeGenerator generator;
+        private final long writeCount;
+        private final Meter readMeter;
+
+        private ReadWorker( final GraphManagerFactory factory, final EdgeGenerator generator, final long writeCount,
+                            final Meter readMeter) {
+            this.factory = factory;
+            this.generator = generator;
+            this.writeCount = writeCount;
+            this.readMeter = readMeter;
+        }
+
+
+        @Override
+        public Long call() throws Exception {
+
+
+
+
+            GraphManager gm = factory.createEdgeManager( scope );
+
+
+
+            while(true) {
+
+//                final long[] count = {0};
+//                final long[] duplicate = {0};
+//                final HashSet<Edge >  seen = new HashSet<>((int)writeCount);
+
+
+                //do a read to eventually trigger our group compaction. Take 2 pages of columns
+                final long returnedEdgeCount = generator.doSearch( gm )
+
+                                                        .doOnNext( new Action1<Edge>() {
+
+
+
+//                    private Edge last;
+
+
+                    @Override
+                    public void call( final Edge edge ) {
+                        readMeter.mark();
+
+//                        count[0]++;
+//
+//                        /**
+//                         * Added this check as part of the read
+//                         */
+//                        if ( last != null && last.equals(edge) ) {
+//                            fail( String.format( "Expected edges to be in order, however last was %s and current is %s",
+//                                    last, edge ) );
+//                        }
+//
+//                        last = edge;
+//
+//                        if( seen.contains( edge ) ){
+//                            fail( String.format("Returned an edge that was already seen! Edge was %s, last edge was %s", edge, last) );
+//                            duplicate[0]++;
+//                        }
+//
+//                        seen.add( edge );
+
+                    }
+                } )
+
+                                                        .longCount().toBlocking().last();
+
+
+//                if(returnedEdgeCount != count[0]-duplicate[0]){
+//                    log.warn( "Missing entries from the initial put" );
+//                }
+
+                log.info( "Completed reading {} edges", returnedEdgeCount );
+
+                if(writeCount != returnedEdgeCount){
+                    log.warn( "Unexpected edge count returned!!!  Expected {} but was {}", writeCount, returnedEdgeCount );
+                }
+
+                assertEquals( "Expected to read same edge count", writeCount, returnedEdgeCount );
+            }
+
+        }
+    }
+
+
     private interface EdgeGenerator {
 
         /**
@@ -392,13 +550,9 @@ public class GraphManagerShardConsistencyIT {
 
         /**
          * Perform the search returning an observable edge
-         * @param manager
-         * @return
          */
         public Observable<Edge> doSearch( final GraphManager manager );
     }
-
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
index cf6dda8..e4b67c2 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerStressTest.java
@@ -118,7 +118,7 @@ public class GraphManagerStressTest {
                             for ( Id sourceId : sourceIds ) {
 
                                 final Iterable<Edge> edges = manager.loadEdgesFromSource(
-                                        new SimpleSearchByEdgeType( sourceId, "test", timestamp, null ) )
+                                        new SimpleSearchByEdgeType( sourceId, "test", timestamp, SearchByEdgeType.Order.DESCENDING, null ) )
                                                                     .toBlocking().toIterable();
 
                                 for ( Edge edge : edges ) {
@@ -192,7 +192,7 @@ public class GraphManagerStressTest {
 
             @Override
             public Observable<Edge> doSearch( final GraphManager manager ) {
-                return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), null ) );
+                return manager.loadEdgesFromSource( new SimpleSearchByEdgeType( sourceId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, null ) );
             }
         };
 
@@ -220,7 +220,7 @@ public class GraphManagerStressTest {
             @Override
             public Observable<Edge> doSearch( final GraphManager manager ) {
 
-                return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), null ) );
+                return manager.loadEdgesToTarget( new SimpleSearchByEdgeType( targetId, "test", System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, null ) );
             }
         };
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
index 4b62ad1..e8af91c 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairTest.java
@@ -34,6 +34,7 @@ import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.core.cassandra.ITRunner;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
@@ -131,7 +132,7 @@ public class EdgeDeleteRepairTest {
 
 
         Iterator<MarkedEdge> itr = storageEdgeSerialization.getEdgeVersions( scope,
-                new SimpleSearchByEdge( sourceId, edgeType, targetId, System.currentTimeMillis(), null ) );
+                new SimpleSearchByEdge( sourceId, edgeType, targetId, System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING, null ) );
 
         assertEquals( edge2, itr.next() );
         assertEquals( edge1, itr.next() );
@@ -142,7 +143,7 @@ public class EdgeDeleteRepairTest {
         assertEquals( edge1, deleted );
 
         itr = storageEdgeSerialization.getEdgeVersions( scope,
-                new SimpleSearchByEdge( sourceId, edgeType, targetId, System.currentTimeMillis(), null ) );
+                new SimpleSearchByEdge( sourceId, edgeType, targetId, System.currentTimeMillis(), SearchByEdgeType.Order.DESCENDING,  null ) );
 
         assertEquals( edge2, itr.next() );
         assertFalse( itr.hasNext() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 878674a..b8be5d2 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -19,10 +19,12 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -87,6 +89,8 @@ public class NodeShardAllocationTest {
 
     @Test
     public void minTime() {
+        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
+
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
         final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
@@ -101,7 +105,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardCounterSerialization, timeService, graphFig );
+                        nodeShardCounterSerialization, timeService, graphFig, shardGroupCompaction );
 
 
         final long timeservicetime = System.currentTimeMillis();
@@ -116,9 +120,10 @@ public class NodeShardAllocationTest {
     }
 
 
-
     @Test
     public void existingFutureShardSameTime() {
+        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
+
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
         final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
@@ -131,10 +136,9 @@ public class NodeShardAllocationTest {
         final TimeService timeService = mock( TimeService.class );
 
 
-
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardCounterSerialization, timeService, graphFig );
+                        nodeShardCounterSerialization, timeService, graphFig, shardGroupCompaction );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -145,7 +149,7 @@ public class NodeShardAllocationTest {
 
         when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
 
-        final Shard firstShard = new Shard(0l, 0l, true);
+        final Shard firstShard = new Shard( 0l, 0l, true );
         final Shard futureShard = new Shard( 10000l, timeservicetime, false );
 
         final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
@@ -162,6 +166,8 @@ public class NodeShardAllocationTest {
 
     @Test
     public void lowCountFutureShard() {
+        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
+
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
         final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
@@ -175,7 +181,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardApproximation, timeService, graphFig );
+                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -207,7 +213,9 @@ public class NodeShardAllocationTest {
 
 
     @Test
-    public void equalCountFutureShard() {
+    public void overAllocatedShard() {
+        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
+
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
         final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
@@ -221,7 +229,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardApproximation, timeService, graphFig );
+                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -239,11 +247,58 @@ public class NodeShardAllocationTest {
 
         final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
 
-        final long shardCount = graphFig.getShardSize();
+
+        /**
+         * Allocate 2.5x what this shard should have.  We should ultimately have a split at 2x
+         */
+        final long shardCount = ( long ) (graphFig.getShardSize() * 2.5);
+
 
         //return a shard size equal to our max
         when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( shardCount );
 
+
+        //this is how many we should be iterating and should set the value of the last shard we keep
+        final int numToIterate = ( int ) (graphFig.getShardSize() *2);
+
+
+        /**
+         * Just use 2 edges.  It means that we won't generate a boatload of data and kill our test. We just want
+         * to check that the one we want to return is correct
+         */
+        SimpleMarkedEdge skipped = new SimpleMarkedEdge( nodeId, type, createId( subType ), 10000, false );
+        SimpleMarkedEdge keep = new SimpleMarkedEdge( nodeId, type, createId( subType ), 20000, false );
+
+        //allocate some extra to ensure we seek the right value
+        List<MarkedEdge> edges = new ArrayList( numToIterate + 100 );
+
+        int i = 0;
+
+        for (; i < numToIterate - 1; i++ ) {
+            edges.add( skipped );
+        }
+
+        //add our keep edge
+        edges.add( keep );
+        i++;
+
+        for ( ; i < shardCount; i++ ) {
+
+            edges.add( skipped );
+        }
+
+
+        final Iterator<MarkedEdge> edgeIterator = edges.iterator();
+
+        //mock up returning the value
+        when( shardedEdgeSerialization
+                .getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ), any( SearchByIdType.class ),
+                        any( Collection.class ) ) ).thenReturn( edgeIterator );
+
+
+        /**
+         * Mock up the write shard meta data
+         */
         ArgumentCaptor<Shard> shardValue = ArgumentCaptor.forClass( Shard.class );
 
 
@@ -252,10 +307,89 @@ public class NodeShardAllocationTest {
                 .thenReturn( mock( MutationBatch.class ) );
 
 
+        final boolean result = approximation.auditShard( scope, shardEntryGroup, targetEdgeMeta );
+
+        assertTrue( "Shard was split correctly", result );
+
+        final long savedTimestamp = shardValue.getValue().getCreatedTime();
+
+
+        assertEquals( "Expected time service time", timeservicetime, savedTimestamp );
+
+
+        //now check our max value was set.  Since our shard is significantly over allocated, we should be iterating
+        //through elements to move the pivot down to a more manageable size
+
+        final long savedShardPivot = shardValue.getValue().getShardIndex();
+
+        assertEquals( "Expected max value to be the same", keep.getTimestamp(), savedShardPivot );
+    }
+
+
+    @Test
+    public void equalCountFutureShard() {
+
+        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
+
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+
+        final Id nodeId = createId( "test" );
+        final String type = "type";
+        final String subType = "subType";
+
+
+        final long timeservicetime = System.currentTimeMillis();
+
+        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+        final Shard futureShard = new Shard( 0l, 0l, true );
+
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
+        shardEntryGroup.addShard( futureShard );
+
+        final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
+
+        final long shardCount = graphFig.getShardSize();
+
+
+        final SimpleMarkedEdge skippedEdge =   new SimpleMarkedEdge( nodeId, type, createId( "subType" ), 10000l, false );
         final SimpleMarkedEdge returnedEdge =
                 new SimpleMarkedEdge( nodeId, type, createId( "subType" ), 10005l, false );
 
-        final Iterator<MarkedEdge> edgeIterator = Collections.singleton( ( MarkedEdge ) returnedEdge ).iterator();
+        List<MarkedEdge> iteratedEdges = new ArrayList<>( ( int ) shardCount );
+
+        for(long i = 0; i < shardCount-1; i ++){
+            iteratedEdges.add( skippedEdge);
+        }
+
+        iteratedEdges.add( returnedEdge );
+
+        //return a shard size equal to our max
+        when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( shardCount );
+
+        ArgumentCaptor<Shard> shardValue = ArgumentCaptor.forClass( Shard.class );
+
+
+        //mock up our mutation
+        when( edgeShardSerialization.writeShardMeta( same( scope ), shardValue.capture(), same( targetEdgeMeta ) ) )
+                .thenReturn( mock( MutationBatch.class ) );
+
+
+
+        final Iterator<MarkedEdge> edgeIterator = iteratedEdges.iterator();
 
         //mock up returning the value
         when( shardedEdgeSerialization
@@ -285,7 +419,76 @@ public class NodeShardAllocationTest {
 
 
     @Test
+    public void invalidCountNoShards() {
+
+        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
+
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final NodeShardApproximation nodeShardApproximation = mock( NodeShardApproximation.class );
+
+
+        final TimeService timeService = mock( TimeService.class );
+
+        NodeShardAllocation approximation =
+                new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
+                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
+
+        final Id nodeId = createId( "test" );
+        final String type = "type";
+        final String subType = "subType";
+
+
+        final long timeservicetime = System.currentTimeMillis();
+
+        when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
+
+        final Shard futureShard = new Shard( 0l, 0l, true );
+
+        final ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1000l );
+        shardEntryGroup.addShard( futureShard );
+
+        final DirectedEdgeMeta targetEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( nodeId, type, subType );
+
+        final long shardCount = graphFig.getShardSize();
+
+        //return a shard size equal to our max
+        when( nodeShardApproximation.getCount( scope, futureShard, targetEdgeMeta ) ).thenReturn( shardCount );
+
+        ArgumentCaptor<Shard> shardValue = ArgumentCaptor.forClass( Shard.class );
+
+
+        //mock up our mutation
+        when( edgeShardSerialization.writeShardMeta( same( scope ), shardValue.capture(), same( targetEdgeMeta ) ) )
+                .thenReturn( mock( MutationBatch.class ) );
+
+
+        final SimpleMarkedEdge returnedEdge =
+                new SimpleMarkedEdge( nodeId, type, createId( "subType" ), 10005l, false );
+
+        final Iterator<MarkedEdge> edgeIterator = Collections.singleton( ( MarkedEdge ) returnedEdge ).iterator();
+
+        //mock up returning the value
+        when( shardedEdgeSerialization
+                .getEdgesFromSourceByTargetType( same( edgeColumnFamilies ), same( scope ), any( SearchByIdType.class ),
+                        any( Collection.class ) ) ).thenReturn( edgeIterator );
+
+
+        final boolean result = approximation.auditShard( scope, shardEntryGroup, targetEdgeMeta );
+
+        assertFalse( "Shard should not be allocated", result );
+    }
+
+
+    @Test
     public void futureCountShardCleanup() {
+
+        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
+
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
         final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
@@ -300,7 +503,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardApproximation, timeService, graphFig );
+                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -310,7 +513,7 @@ public class NodeShardAllocationTest {
         /**
          * Use the time service to generate timestamps
          */
-        final long timeservicetime = 10000;
+        final long timeservicetime = System.currentTimeMillis() + 60000;
 
 
         when( timeService.getCurrentTime() ).thenReturn( timeservicetime );
@@ -382,21 +585,16 @@ public class NodeShardAllocationTest {
 
         Collection<Shard> writeShards = shardEntryGroup.getWriteShards( minTime + minDelta );
 
-        assertEquals( "Shard size as expected", 4, writeShards.size() );
+        assertEquals( "Shard size as expected", 1, writeShards.size() );
 
-        assertTrue( writeShards.contains( futureShard1 ) );
-        assertTrue( writeShards.contains( futureShard2 ) );
-        assertTrue( writeShards.contains( futureShard3 ) );
         assertTrue( writeShards.contains( compactedShard ) );
 
 
         Collection<Shard> readShards = shardEntryGroup.getReadShards();
 
-        assertEquals( "Shard size as expected", 4, readShards.size() );
+        assertEquals( "Shard size as expected", 2, readShards.size() );
 
         assertTrue( readShards.contains( futureShard1 ) );
-        assertTrue( readShards.contains( futureShard2 ) );
-        assertTrue( readShards.contains( futureShard3 ) );
         assertTrue( readShards.contains( compactedShard ) );
 
 
@@ -423,6 +621,9 @@ public class NodeShardAllocationTest {
 
     @Test
     public void noShardsReturns() throws ConnectionException {
+
+        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
+
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
         final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
@@ -434,13 +635,15 @@ public class NodeShardAllocationTest {
 
         final TimeService timeService = mock( TimeService.class );
 
-        when( timeService.getCurrentTime() ).thenReturn( 10000l );
+        final long returnTime = System.currentTimeMillis()+graphFig.getShardCacheTimeout()*2 ;
+
+        when( timeService.getCurrentTime() ).thenReturn( returnTime );
 
         final MutationBatch batch = mock( MutationBatch.class );
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardApproximation, timeService, graphFig );
+                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -499,6 +702,8 @@ public class NodeShardAllocationTest {
     @Test
     public void invalidConfiguration() {
 
+        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
+
         final GraphFig graphFig = mock( GraphFig.class );
 
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
@@ -531,7 +736,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, edgeColumnFamilies, shardedEdgeSerialization,
-                        nodeShardApproximation, timeService, graphFig );
+                        nodeShardApproximation, timeService, graphFig, shardGroupCompaction );
 
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
index 6096e58..ee4b94a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
@@ -264,13 +264,11 @@ public class ShardEntryGroupTest {
 
         Collection<Shard> readShards = shardEntryGroup.getReadShards();
 
-        assertEquals("Shard size correct", 3, readShards.size());
+        assertEquals("Shard size correct", 2, readShards.size());
 
-        assertTrue("First shard present",  readShards.contains( firstShard ) );
+        assertTrue("First shard present",  readShards.contains( secondShard ) );
 
-        assertTrue("Second shard present",  readShards.contains( firstShard ) );
-
-        assertTrue("Third shard present",  readShards.contains( firstShard ) );
+        assertTrue("Second shard present",  readShards.contains( compactedShard1 ) );
 
     }
 
@@ -308,23 +306,15 @@ public class ShardEntryGroupTest {
 
         Collection<Shard> writeShards = shardEntryGroup.getWriteShards( firstShard.getCreatedTime() + delta );
 
-        assertEquals("Shard size correct", 3, writeShards.size());
-
-        assertTrue("First shard present",  writeShards.contains( firstShard ) );
-
-        assertTrue("Second shard present",  writeShards.contains( secondShard ) );
+        assertEquals("Shard size correct", 1, writeShards.size());
 
-        assertTrue("Third shard present",  writeShards.contains( compactedShard ) );
+        assertTrue("Root shard present",  writeShards.contains( compactedShard ) );
 
 
 
         writeShards = shardEntryGroup.getWriteShards(secondShard.getCreatedTime()+delta);
 
-        assertEquals("Shard size correct", 3, writeShards.size());
-
-        assertTrue("First shard present",  writeShards.contains( firstShard ) );
-
-        assertTrue("Second shard present",  writeShards.contains( secondShard ) );
+        assertEquals("Shard size correct", 1, writeShards.size());
 
         assertTrue("Third shard present",  writeShards.contains( compactedShard ) );
 
@@ -334,13 +324,11 @@ public class ShardEntryGroupTest {
          */
         writeShards = shardEntryGroup.getWriteShards(secondShard.getCreatedTime() +1 + delta);
 
-        assertEquals("Shard size correct", 3, writeShards.size());
+        assertEquals("Shard size correct", 1, writeShards.size());
 
-        assertTrue("First shard present",  writeShards.contains( firstShard ) );
 
-        assertTrue("Second shard present",  writeShards.contains( secondShard ) );
+        assertTrue("Second shard present",  writeShards.contains( compactedShard ) );
 
-        assertTrue("Third shard present",  writeShards.contains( compactedShard ) );
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
new file mode 100644
index 0000000..1513e85
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompactionTest.java
@@ -0,0 +1,226 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard;
+
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import org.apache.usergrid.persistence.core.consistency.TimeService;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
+
+import com.netflix.astyanax.Keyspace;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class ShardGroupCompactionTest {
+
+    protected GraphFig graphFig;
+    protected ApplicationScope scope;
+
+
+    @Before
+    public void setup() {
+        graphFig = mock( GraphFig.class );
+
+        when( graphFig.getShardAuditWorkerCount() ).thenReturn( 10 );
+
+        when( graphFig.getShardAuditWorkerQueueSize() ).thenReturn( 1000 );
+
+        this.scope = new ApplicationScopeImpl( createId( "application" ) );
+    }
+
+
+    @Test
+    public void shouldNotCompact() {
+
+        final TimeService timeService = mock( TimeService.class );
+
+        final NodeShardAllocation nodeShardAllocation = mock( NodeShardAllocation.class );
+
+        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+
+        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+
+        final Keyspace keyspace = mock( Keyspace.class );
+
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+
+        final long delta = 10000;
+
+        final long createTime = 20000;
+
+        //we shouldn't be able to compact, should throw an exception
+        final long timeNow = createTime + delta - 1;
+
+        ShardEntryGroup group = new ShardEntryGroup( delta );
+        group.addShard( new Shard( 2000, createTime, false ) );
+        group.addShard( new Shard( 1000, 5000, true ) );
+
+
+        when( timeService.getCurrentTime() ).thenReturn( timeNow );
+
+        ShardGroupCompactionImpl compaction =
+                new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
+                        edgeColumnFamilies, keyspace, edgeShardSerialization );
+
+
+        DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( createId("source"), "test" );
+
+        try {
+            compaction.compact( this.scope, directedEdgeMeta, group );
+            fail( "I should not reach this point" );
+        }catch(Throwable t){
+            assertEquals("Correct error message returned", "Compaction cannot be run yet.  Ignoring compaction.", t.getMessage());
+        }
+
+    }
+
+
+//    /**
+//     * Tests that when we copy edges, we do not actually run the compaction, we can only run it after we get nothing
+//     * and the timeout has elapsed
+//     */
+//    @Test
+//    public void shouldOnlyCopy() {
+//
+//        final TimeService timeService = mock( TimeService.class );
+//
+//        final NodeShardAllocation nodeShardAllocation = mock( NodeShardAllocation.class );
+//
+//        final ShardedEdgeSerialization shardedEdgeSerialization = mock( ShardedEdgeSerialization.class );
+//
+//        final EdgeColumnFamilies edgeColumnFamilies = mock( EdgeColumnFamilies.class );
+//
+//        final Keyspace keyspace = mock( Keyspace.class );
+//
+//        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
+//
+//        final long delta = 10000;
+//
+//        final long createTime = 20000;
+//
+//        //we shouldn't be able to compact, should throw an exception
+//        final long timeNow = createTime + delta ;
+//
+//
+//        final Shard targetShard = new Shard( 2000, createTime, false ) ;
+//        final Shard sourceShard =  new Shard( 1000, 5000, true );
+//        ShardEntryGroup group = new ShardEntryGroup( delta );
+//        group.addShard( targetShard );
+//        group.addShard( sourceShard );
+//
+//
+//        when( timeService.getCurrentTime() ).thenReturn( timeNow );
+//
+//        ShardGroupCompaction compaction =
+//                new ShardGroupCompactionImpl( timeService, graphFig, nodeShardAllocation, shardedEdgeSerialization,
+//                        edgeColumnFamilies, keyspace, edgeShardSerialization );
+//
+//
+//        DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( createId("source"), "test" );
+//
+//
+//        /**
+//         * Mock up returning edges from the source
+//         */
+//
+//        int count = 100;
+//
+//        for(int i = 0; i < count; i ++){
+//
+//
+//
+//            when(shardedEdgeSerialization.getEdgesFromSource( same(edgeColumnFamilies), same(scope), any(
+//                    SearchByEdgeType.class), Matchers.argThat(new ShardSetMatcher( Collections.singleton( sourceShard ) ))/*any(Set.class)*/ ));
+//            edgeMeta.loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope,
+//
+//                                Collections.singleton( sourceShard ),  SearchByEdgeType.Order.DESCENDING, Long.MAX_VALUE );
+//        }
+//
+//        try {
+//            compaction.compact( this.scope, directedEdgeMeta, group );
+//            fail( "I should not reach this point" );
+//        }catch(Throwable t){
+//            assertEquals("Correct error message returned", "Compaction cannot be run yet.  Ignoring compaction.", t.getMessage());
+//        }
+//
+//    }
+
+
+    private final class ShardSetMatcher extends BaseMatcher<Collection<Shard>>{
+
+        private final Collection<Shard> expected;
+
+
+        private ShardSetMatcher( final Collection<Shard> expected ) {this.expected = expected;}
+
+
+        @Override
+        public boolean matches( final Object o ) {
+            if(! (o instanceof Collection)){
+                return false;
+            }
+
+
+            Collection<Shard> passedShards = ( Collection<Shard> ) o;
+
+            return passedShards.containsAll( expected );
+        }
+
+
+        @Override
+        public void describeTo( final Description description ) {
+
+           StringBuilder builder = new StringBuilder(  );
+
+            builder.append("Collection of shards with shards {");
+
+            for(Shard shard: expected){
+              builder.append(shard).append( "," );
+            }
+
+            builder.setLength( builder.length()-1 );
+
+           description.appendText( builder.toString() );
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
index 8e9ed5c..da96e33 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
@@ -489,6 +489,18 @@ public class NodeShardApproximationTest {
 
 
         @Override
+        public int getShardAuditWorkerCount() {
+            return 0;
+        }
+
+
+        @Override
+        public int getShardAuditWorkerQueueSize() {
+            return 0;
+        }
+
+
+        @Override
         public long getCounterFlushCount() {
             return 100000l;
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
index 5b20647..34bc079 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIteratorTest.java
@@ -29,62 +29,86 @@ import java.util.Iterator;
 
 import org.junit.Test;
 
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
 
 import static junit.framework.TestCase.assertTrue;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 
 public class ShardEntryGroupIteratorTest {
 
+
     @Test(expected = IllegalArgumentException.class)
-    public void noShards(){
+    public void noShards() {
+
+        final ApplicationScope scope = new ApplicationScopeImpl( createId( "application" ) );
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( createId( "source" ), "test" );
+        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
         final long delta = 10000;
         final Iterator<Shard> noShards = Collections.<Shard>emptyList().iterator();
 
         //should blow up, our iterator is empty
-        new ShardEntryGroupIterator(noShards, delta);
-
+        new ShardEntryGroupIterator( noShards, delta, shardGroupCompaction, scope, directedEdgeMeta );
     }
 
+
     @Test
-    public void existingSingleShard(){
+    public void existingSingleShard() {
 
-        final Shard minShard = new Shard(0, 0, true);
+        final ApplicationScope scope = new ApplicationScopeImpl( createId( "application" ) );
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( createId( "source" ), "test" );
+
+
+        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
+
+        final Shard minShard = new Shard( 0, 0, true );
         final long delta = 10000;
         final Iterator<Shard> noShards = Collections.singleton( minShard ).iterator();
 
-        ShardEntryGroupIterator entryGroupIterator = new ShardEntryGroupIterator(noShards, delta);
+        ShardEntryGroupIterator entryGroupIterator =
+                new ShardEntryGroupIterator( noShards, delta, shardGroupCompaction, scope, directedEdgeMeta );
+
 
-        assertTrue("Root shard always present", entryGroupIterator.hasNext());
+        assertTrue( "Root shard always present", entryGroupIterator.hasNext() );
 
         ShardEntryGroup group = entryGroupIterator.next();
 
-        assertNotNull("Group returned", group);
+        assertNotNull( "Group returned", group );
 
-        Collection<Shard> readShards = group.getReadShards();
+        //verify we ran our compaction check
+        verify( shardGroupCompaction ).evaluateShardGroup( same( scope ), same( directedEdgeMeta ), eq( group ) );
 
-        assertEquals("Min shard present", 1, readShards.size());
 
-        assertTrue("Min shard present", readShards.contains( minShard ));
+        Collection<Shard> readShards = group.getReadShards( );
 
+        assertEquals( "Min shard present", 1, readShards.size() );
 
-        Collection<Shard> writeShards = group.getWriteShards( 0 );
+        assertTrue( "Min shard present", readShards.contains( minShard ) );
 
-        assertEquals("Min shard present", 1, writeShards.size());
 
-        assertTrue("Min shard present", writeShards.contains( minShard ));
+        Collection<Shard> writeShards = group.getWriteShards( 0 );
 
+        assertEquals( "Min shard present", 1, writeShards.size() );
 
-        writeShards = group.getWriteShards( Long.MAX_VALUE );
+        assertTrue( "Min shard present", writeShards.contains( minShard ) );
 
-        assertEquals("Min shard present", 1, writeShards.size());
 
-        assertTrue("Min shard present", writeShards.contains( minShard ));
+        writeShards = group.getWriteShards( Long.MAX_VALUE );
 
+        assertEquals( "Min shard present", 1, writeShards.size() );
 
+        assertTrue( "Min shard present", writeShards.contains( minShard ) );
     }
 
 
@@ -93,140 +117,160 @@ public class ShardEntryGroupIteratorTest {
      * that only the last 1 or 2 groups will actually have more than 1 entry.
      */
     @Test
-    public void boundedShardSets(){
+    public void boundedShardSets() {
+
+        final ApplicationScope scope = new ApplicationScopeImpl( createId( "application" ) );
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( createId( "source" ), "test" );
+
+        final ShardGroupCompaction shardGroupCompaction = mock( ShardGroupCompaction.class );
+
 
         /**
          * Next shard group
          */
-        final Shard shardGroup1Shard1 = new Shard(0, 0, true);
+        final Shard shardGroup1Shard1 = new Shard( 0, 0, true );
 
-        final Shard shardGroup1Shard2 = new Shard(10000, 100, false);
+        final Shard shardGroup1Shard2 = new Shard( 10000, 100, false );
 
-        final Shard shardGroup1Shard3 = new Shard(20000, 200, false);
+        final Shard shardGroup1Shard3 = new Shard( 20000, 200, false );
 
 
         /**
          * Middle shard group
          */
-        final Shard shardGroup2Shard1 = new Shard(30000, 300, true);
+        final Shard shardGroup2Shard1 = new Shard( 30000, 300, true );
 
-        final Shard shardGroup2Shard2 = new Shard(40000, 400, false);
+        final Shard shardGroup2Shard2 = new Shard( 40000, 400, false );
 
 
         /**
          * Highest shard group
          */
 
-        final Shard shardGroup3Shard1 = new Shard(50000, 500, true);
-
-        final Shard shardGroup3Shard2 = new Shard(60000, 600, false);
-
-        final Shard shardGroup3Shard3 = new Shard(70000, 700, false);
+        final Shard shardGroup3Shard1 = new Shard( 50000, 500, true );
 
+        final Shard shardGroup3Shard2 = new Shard( 60000, 600, false );
 
+        final Shard shardGroup3Shard3 = new Shard( 70000, 700, false );
 
 
         final long delta = 10000;
-        final Iterator<Shard> noShards = Arrays.asList(shardGroup3Shard3, shardGroup3Shard2, shardGroup3Shard1, shardGroup2Shard2, shardGroup2Shard1, shardGroup1Shard3, shardGroup1Shard2, shardGroup1Shard1 ).iterator();
-
 
+        final Iterator<Shard> noShards =
+                Arrays.asList( shardGroup3Shard3, shardGroup3Shard2, shardGroup3Shard1, shardGroup2Shard2,
+                        shardGroup2Shard1, shardGroup1Shard3, shardGroup1Shard2, shardGroup1Shard1 ).iterator();
 
 
-        ShardEntryGroupIterator entryGroupIterator = new ShardEntryGroupIterator(noShards, delta);
+        ShardEntryGroupIterator entryGroupIterator =
+                new ShardEntryGroupIterator( noShards, delta, shardGroupCompaction, scope, directedEdgeMeta );
 
-        assertTrue("max group present", entryGroupIterator.hasNext());
+        assertTrue( "max group present", entryGroupIterator.hasNext() );
 
         ShardEntryGroup group = entryGroupIterator.next();
 
-        assertNotNull("Group returned", group);
+        assertNotNull( "Group returned", group );
 
-        Collection<Shard> readShards = group.getReadShards();
+        //verify we ran our compaction check
+        verify( shardGroupCompaction ).evaluateShardGroup( same( scope ), same( directedEdgeMeta ), eq( group ) );
 
-        assertEquals("Min shard present", 3, readShards.size());
+        Collection<Shard> readShards = group.getReadShards( );
 
-        assertTrue("shardGroup3Shard3 shard present", readShards.contains( shardGroup3Shard3 ));
+        assertEquals( "Both shards present", 2, readShards.size() );
 
-        assertTrue("shardGroup3Shard2 shard present", readShards.contains( shardGroup3Shard2 ));
+        assertTrue( "shardGroup3Shard2 shard present", readShards.contains( shardGroup3Shard2 ) );
 
-        assertTrue("shardGroup3Shard1 shard present", readShards.contains( shardGroup3Shard1 ));
+        assertTrue( "shardGroup3Shard1 shard present", readShards.contains( shardGroup3Shard1 ) );
 
 
         Collection<Shard> writeShards = group.getWriteShards( 0 );
 
-        assertEquals("Min shard present", 3, writeShards.size());
+        assertEquals( "Min shard present", 1, writeShards.size() );
 
-        assertTrue("shardGroup3Shard3 shard present", writeShards.contains( shardGroup3Shard3 ));
 
-        assertTrue("shardGroup3Shard2 shard present", writeShards.contains( shardGroup3Shard2 ));
+        assertTrue( "shardGroup3Shard1 shard present", writeShards.contains( shardGroup3Shard1 ) );
 
-        assertTrue("shardGroup3Shard1 shard present", writeShards.contains( shardGroup3Shard1 ));
+        writeShards = group.getWriteShards( shardGroup3Shard3.getCreatedTime() + delta );
 
+        assertEquals( "Min shard present", 1, writeShards.size() );
 
 
+        assertTrue( "shardGroup3Shard2 shard present", readShards.contains( shardGroup3Shard2 ) );
 
+        assertTrue( "shardGroup3Shard1 shard present", writeShards.contains( shardGroup3Shard1 ) );
 
-        assertTrue("middle group present", entryGroupIterator.hasNext());
 
-        group = entryGroupIterator.next();
+        /****
+         * Middle group
+         */
 
-        assertNotNull("Group returned", group);
+        assertTrue( "middle group present", entryGroupIterator.hasNext() );
 
-       readShards = group.getReadShards();
+        group = entryGroupIterator.next();
 
-        assertEquals("Min shard present", 2, readShards.size());
+        assertNotNull( "Group returned", group );
 
-        assertTrue("shardGroup2Shard1 shard present", readShards.contains( shardGroup2Shard1 ));
+        //verify we ran our compaction check
+        verify( shardGroupCompaction ).evaluateShardGroup( same( scope ), same( directedEdgeMeta ), eq( group ) );
 
-        assertTrue("shardGroup2Shard2 shard present", readShards.contains( shardGroup2Shard2 ));
 
+        readShards = group.getReadShards( );
 
 
-        writeShards = group.getWriteShards( 0 );
+        assertEquals( "Both shards present", 2, readShards.size() );
 
-        assertEquals("Min shard present", 2, writeShards.size());
+        assertTrue( "shardGroup2Shard1 shard present", readShards.contains( shardGroup2Shard1 ) );
 
-        assertTrue("shardGroup2Shard1 shard present", writeShards.contains( shardGroup2Shard1 ));
+        assertTrue( "shardGroup2Shard2 shard present", readShards.contains( shardGroup2Shard2 ) );
 
-        assertTrue("shardGroup2Shard2 shard present", writeShards.contains( shardGroup2Shard2 ));
 
+        writeShards = group.getWriteShards( 0 );
 
+        assertEquals( "Min shard present", 1, writeShards.size() );
 
+        assertTrue( "shardGroup2Shard1 shard present", writeShards.contains( shardGroup2Shard1 ) );
 
 
+        writeShards = group.getWriteShards( shardGroup2Shard2.getCreatedTime() + delta +1 );
 
-        assertTrue("min group present", entryGroupIterator.hasNext());
+        assertEquals( "Both shards present", 1, writeShards.size() );
 
-        group = entryGroupIterator.next();
+        assertTrue( "shardGroup2Shard2 shard present", writeShards.contains( shardGroup2Shard2 ) );
 
-        assertNotNull("Group returned", group);
 
-        readShards = group.getReadShards();
+        /*****
+         * Minimum group
+         */
 
-        assertEquals("Min shard present", 3, readShards.size());
+        assertTrue( "min group present", entryGroupIterator.hasNext() );
 
-        assertTrue("shardGroup1Shard3 shard present", readShards.contains( shardGroup1Shard3 ));
+        group = entryGroupIterator.next();
 
-        assertTrue("shardGroup1Shard2 shard present", readShards.contains( shardGroup1Shard2 ));
+        assertNotNull( "Group returned", group );
 
-        assertTrue("shardGroup1Shard1 shard present", readShards.contains( shardGroup1Shard1 ));
+        //verify we ran our compaction check
+        verify( shardGroupCompaction ).evaluateShardGroup( same( scope ), same( directedEdgeMeta ), eq( group ) );
 
 
 
-        writeShards = group.getWriteShards( 0 );
+        readShards = group.getReadShards();
 
-        assertEquals("Min shard present", 3, writeShards.size());
+        assertEquals( "Both shards present", 2, readShards.size() );
 
-        assertTrue("shardGroup1Shard3 shard present", writeShards.contains( shardGroup1Shard3 ));
+        assertTrue( "shardGroup1Shard1 shard present", readShards.contains( shardGroup1Shard1 ) );
+        assertTrue( "shardGroup1Shard2 shard present", readShards.contains( shardGroup1Shard2 ) );
 
-        assertTrue("shardGroup1Shard2 shard present", writeShards.contains( shardGroup1Shard2 ));
 
-        assertTrue("shardGroup1Shard1 shard present", writeShards.contains( shardGroup1Shard1 ));
+        writeShards = group.getWriteShards( 0 );
 
+        assertEquals( "Min shard present", 1, writeShards.size() );
 
+        assertTrue( "shardGroup1Shard1 shard present", writeShards.contains( shardGroup1Shard1 ) );
 
 
+        writeShards = group.getWriteShards( shardGroup1Shard3.getCreatedTime() + delta + 1 );
 
+        assertEquals( "Both shards present", 1, writeShards.size() );
 
+        assertTrue( "shardGroup1Shard2 shard present", writeShards.contains( shardGroup1Shard2 ) );
     }
-
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparatorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparatorTest.java
new file mode 100644
index 0000000..d0adc1e
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/SourceDirectedEdgeDescendingComparatorTest.java
@@ -0,0 +1,136 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators;
+
+
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class SourceDirectedEdgeDescendingComparatorTest {
+
+    final SourceDirectedEdgeDescendingComparator comp = SourceDirectedEdgeDescendingComparator.INSTANCE;
+
+
+    @Test
+    public void sameEdges() {
+
+        final Id sourceId = createId( "source" );
+        final Id targetId = createId( "target" );
+        final String type = "type";
+        final long timestamp = 10000;
+
+        final SimpleMarkedEdge markedEdge1 = new SimpleMarkedEdge( sourceId, type, targetId, timestamp, true );
+        final SimpleMarkedEdge markedEdge2 = new SimpleMarkedEdge( sourceId, type, targetId, timestamp, true );
+
+
+        int compare = comp.compare( markedEdge1, markedEdge2 );
+
+        assertEquals( 0, compare );
+
+        compare = comp.compare( markedEdge2, markedEdge1 );
+
+        assertEquals( 0, compare );
+    }
+
+
+    @Test
+    public void timestampDifferent() {
+
+        final Id sourceId = createId( "source" );
+        final Id targetId = createId( "target" );
+        final String type = "type";
+        final long timestamp = 10000;
+
+        final SimpleMarkedEdge markedEdge1 = new SimpleMarkedEdge( sourceId, type, targetId, timestamp, true );
+        final SimpleMarkedEdge markedEdge2 = new SimpleMarkedEdge( sourceId, type, targetId, timestamp + 1, true );
+
+
+        //marked edge 1 is less than timestamp, it should be considered "greater"
+        int compare = comp.compare( markedEdge1, markedEdge2 );
+
+        assertEquals( 1, compare );
+
+        compare = comp.compare( markedEdge2, markedEdge1 );
+
+        assertEquals( -1, compare );
+    }
+
+
+    @Test
+    public void uuidDifferent() {
+
+        final Id sourceId1 = createId( "source" );
+        final Id sourceId2 = createId( "source" );
+        final Id targetId = createId( "target" );
+        final String type = "type";
+        final long timestamp = 10000;
+
+        final SimpleMarkedEdge markedEdge1 = new SimpleMarkedEdge( sourceId1, type, targetId, timestamp, true );
+        final SimpleMarkedEdge markedEdge2 = new SimpleMarkedEdge( sourceId2, type, targetId, timestamp, true );
+
+
+        //marked edge 1 uuid is a is less than target uuid timestamp, it should be considered "greater"
+        int compare = comp.compare( markedEdge1, markedEdge2 );
+
+        assertTrue( compare > 0 );
+
+        compare = comp.compare( markedEdge2, markedEdge1 );
+
+        assertTrue( compare < 0 );
+    }
+
+
+    @Test
+    public void idTypeDifferent() {
+
+        final UUID sourceId = UUIDGenerator.newTimeUUID();
+
+        final Id sourceId1 = createId( sourceId,  "source1" );
+        final Id sourceId2 = createId( sourceId,  "source2" );
+        final Id targetId = createId( "target" );
+        final String type = "type";
+        final long timestamp = 10000;
+
+        final SimpleMarkedEdge markedEdge1 = new SimpleMarkedEdge( sourceId2, type, targetId, timestamp, true );
+        final SimpleMarkedEdge markedEdge2 = new SimpleMarkedEdge( sourceId1, type, targetId, timestamp, true );
+
+
+        //marked edge 1 is less than timestamp, it should be considered "greater"
+        int compare = comp.compare( markedEdge1, markedEdge2 );
+
+        assertEquals( 1, compare );
+
+        compare = comp.compare( markedEdge2, markedEdge1 );
+
+        assertEquals( -1, compare );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparatorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparatorTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparatorTest.java
new file mode 100644
index 0000000..15df661
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/comparators/TargetDirectedEdgeDescendingComparatorTest.java
@@ -0,0 +1,136 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.comparators;
+
+
+import java.util.UUID;
+
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.graph.impl.SimpleMarkedEdge;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class TargetDirectedEdgeDescendingComparatorTest {
+
+    final TargetDirectedEdgeDescendingComparator comp = TargetDirectedEdgeDescendingComparator.INSTANCE;
+
+
+    @Test
+    public void sameEdges() {
+
+        final Id sourceId = createId( "source" );
+        final Id targetId = createId( "target" );
+        final String type = "type";
+        final long timestamp = 10000;
+
+        final SimpleMarkedEdge markedEdge1 = new SimpleMarkedEdge( sourceId, type, targetId, timestamp, true );
+        final SimpleMarkedEdge markedEdge2 = new SimpleMarkedEdge( sourceId, type, targetId, timestamp, true );
+
+
+        int compare = comp.compare( markedEdge1, markedEdge2 );
+
+        assertEquals( 0, compare );
+
+        compare = comp.compare( markedEdge2, markedEdge1 );
+
+        assertEquals( 0, compare );
+    }
+
+
+    @Test
+    public void timestampDifferent() {
+
+        final Id sourceId = createId( "source" );
+        final Id targetId = createId( "target" );
+        final String type = "type";
+        final long timestamp = 10000;
+
+        final SimpleMarkedEdge markedEdge1 = new SimpleMarkedEdge( sourceId, type, targetId, timestamp, true );
+        final SimpleMarkedEdge markedEdge2 = new SimpleMarkedEdge( sourceId, type, targetId, timestamp + 1, true );
+
+
+        //marked edge 1 is less than timestamp, it should be considered "greater"
+        int compare = comp.compare( markedEdge1, markedEdge2 );
+
+        assertEquals( 1, compare );
+
+        compare = comp.compare( markedEdge2, markedEdge1 );
+
+        assertEquals( -1, compare );
+    }
+
+
+    @Test
+    public void uuidDifferent() {
+
+        final Id sourceId = createId( "source" );
+        final Id targetId1 = createId( "target" );
+        final Id targetId2 = createId( "target" );
+        final String type = "type";
+        final long timestamp = 10000;
+
+        final SimpleMarkedEdge markedEdge1 = new SimpleMarkedEdge( sourceId, type, targetId1, timestamp, true );
+        final SimpleMarkedEdge markedEdge2 = new SimpleMarkedEdge( sourceId, type, targetId2, timestamp, true );
+
+
+        //marked edge 1 uuid is a is less than target uuid timestamp, it should be considered "greater"
+        int compare = comp.compare( markedEdge1, markedEdge2 );
+
+        assertTrue( compare > 0 );
+
+        compare = comp.compare( markedEdge2, markedEdge1 );
+
+        assertTrue( compare < 0 );
+    }
+
+
+    @Test
+    public void idTypeDifferent() {
+
+        final UUID targetId = UUIDGenerator.newTimeUUID();
+
+        final Id sourceId = createId( "source" );
+        final Id targetId1 = createId( targetId, "target1" );
+        final Id targetId2 = createId( targetId, "target2" );
+        final String type = "type";
+        final long timestamp = 10000;
+
+        final SimpleMarkedEdge markedEdge1 = new SimpleMarkedEdge( sourceId, type, targetId2, timestamp, true );
+        final SimpleMarkedEdge markedEdge2 = new SimpleMarkedEdge( sourceId, type, targetId1, timestamp, true );
+
+
+        //marked edge 1 is less than timestamp, it should be considered "greater"
+        int compare = comp.compare( markedEdge1, markedEdge2 );
+
+        assertEquals( 1, compare );
+
+        compare = comp.compare( markedEdge2, markedEdge1 );
+
+        assertEquals( -1, compare );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
index c9507d4..ca82b8d 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/test/util/EdgeTestUtils.java
@@ -168,7 +168,7 @@ public class EdgeTestUtils {
      */
     public static SearchByEdgeType createSearchByEdge( final Id sourceId, final String type, final long maxVersion,
                                                        final Edge last ) {
-        return new SimpleSearchByEdgeType( sourceId, type, maxVersion, last );
+        return new SimpleSearchByEdgeType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, last );
     }
 
 
@@ -183,7 +183,7 @@ public class EdgeTestUtils {
      */
     public static SearchByIdType createSearchByEdgeAndId( final Id sourceId, final String type, final long maxVersion,
                                                           final String idType, final Edge last ) {
-        return new SimpleSearchByIdType( sourceId, type, maxVersion, idType, last );
+        return new SimpleSearchByIdType( sourceId, type, maxVersion, SearchByEdgeType.Order.DESCENDING, idType, last );
     }
 
 
@@ -211,7 +211,7 @@ public class EdgeTestUtils {
      */
     public static SearchByEdge createGetByEdge( final Id sourceId, final String type, final Id targetId,
                                                 final long maxVersion, final Edge last ) {
-        return new SimpleSearchByEdge( sourceId, type, targetId, maxVersion, last );
+        return new SimpleSearchByEdge( sourceId, type, targetId, maxVersion, SearchByEdgeType.Order.DESCENDING, last );
     }
 
 //

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties
index 08d897c..f9ea207 100644
--- a/stack/corepersistence/graph/src/test/resources/log4j.properties
+++ b/stack/corepersistence/graph/src/test/resources/log4j.properties
@@ -35,5 +35,6 @@ log4j.logger.cassandra.db=ERROR
 
 log4j.logger.org.apache.usergrid.persistence.graph=TRACE
 log4j.logger.org.apache.usergrid.persistence.core.rx=TRACE
+log4j.logger.org.apache.usergrid.persistence.core.astyanax=TRACE
 #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
index 61612f6..1cb970f 100644
--- a/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/graph/src/test/resources/usergrid-UNIT.properties
@@ -13,5 +13,7 @@ collections.keyspace.strategy.options=replication_factor:1
 collections.keyspace.strategy.class=SimpleStrategy
 collection.stage.transient.timeout=60
 
+usergrid.graph.shard.repair.chance=.20
+
 hystrix.threadpool.graph_user.coreSize=8
 hystrix.threadpool.graph_async.coreSize=8


Mime
View raw message