usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mdun...@apache.org
Subject [07/15] usergrid git commit: Fixes bug with deleting minimum shard. This is no longer allowed.
Date Thu, 22 Oct 2015 18:29:36 GMT
Fixes bug with deleting minimum shard.  This is no longer allowed.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/beaed6ff
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/beaed6ff
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/beaed6ff

Branch: refs/heads/2.1-release
Commit: beaed6ffe1239ad404823cc4864b299e4113e135
Parents: 0286dcc
Author: Todd Nine <tnine@apigee.com>
Authored: Wed Oct 21 18:19:40 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Wed Oct 21 18:19:40 2015 -0600

----------------------------------------------------------------------
 .../graph/serialization/impl/shard/Shard.java   |  15 +
 .../impl/shard/ShardEntryGroup.java             |   4 +-
 .../impl/shard/ShardGroupDeletion.java          |   7 +-
 .../shard/impl/NodeShardAllocationImpl.java     |  81 ++--
 .../impl/shard/impl/ShardGroupDeletionImpl.java |  30 +-
 .../graph/GraphManagerShardConsistencyIT.java   | 370 ++++++++++++++-----
 .../impl/shard/ShardEntryGroupTest.java         |  14 +
 .../shard/impl/ShardGroupDeletionImplTest.java  |  50 ++-
 8 files changed, 419 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
index 9ca6cbe..472e0a2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -21,6 +21,12 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 public class Shard implements Comparable<Shard> {
 
+
+    /**
+     * The minimum shard a shardIndex can possibly be set to
+     */
+    public static final Shard MIN_SHARD = new Shard(0, 0, true);
+
     private final long shardIndex;
     private final long createdTime;
     private final boolean compacted;
@@ -58,6 +64,15 @@ public class Shard implements Comparable<Shard> {
 
 
     /**
+     * Returns true if this is the minimum shard
+     * @return
+     */
+    public boolean isMinShard(){
+        return shardIndex == MIN_SHARD.shardIndex;
+    }
+
+
+    /**
      * Compare the shards based on the timestamp first, then the created time second
      */
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 555d467..543f1fb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -301,7 +301,7 @@ public class ShardEntryGroup {
      * @return
      */
     public boolean isNew(final long currentTime){
-        return currentTime - delta < maxCreatedTime;
+        return currentTime - delta <= maxCreatedTime;
     }
 
     /**
@@ -316,7 +316,7 @@ public class ShardEntryGroup {
         final Shard compactionTarget = getCompactionTarget();
 
 
-        return !shard.isCompacted() && ( compactionTarget != null && compactionTarget.getShardIndex()
!= shard
+        return !shard.isCompacted() && !shard.isMinShard() &&  ( compactionTarget
!= null && compactionTarget.getShardIndex() != shard
                 .getShardIndex() );
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
index 640e8bd..37aa9e3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupDeletion.java
@@ -68,6 +68,11 @@ public interface ShardGroupDeletion {
         /**
          * Our capacity was saturated, we didnt' check the shard
          */
-        NOT_CHECKED;
+        NOT_CHECKED,
+
+        /**
+         * We checked everything, but we didn't perform any operations.  Happens when only
the min shard remains
+         */
+        NO_OP;
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 7a7fb3f..b0875af 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -60,8 +60,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
     private static final Logger LOG = LoggerFactory.getLogger( NodeShardAllocationImpl.class
);
 
-    private static final Shard MIN_SHARD = new Shard( 0, 0, true );
-
     private final EdgeShardSerialization edgeShardSerialization;
     private final EdgeColumnFamilies edgeColumnFamilies;
     private final ShardedEdgeSerialization shardedEdgeSerialization;
@@ -99,29 +97,33 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
         //its a new node, it doesn't need to check cassandra, it won't exist
         if ( isNewNode( directedEdgeMeta ) ) {
-            existingShards = Collections.singleton( MIN_SHARD ).iterator();
+            existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator();
         }
 
         else {
             existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId,
directedEdgeMeta );
-        }
 
-        if ( existingShards == null || !existingShards.hasNext() ) {
+            /**
+             * We didn't get anything out of cassandra, so we need to create the minumum
shard
+             */
+            if ( existingShards == null || !existingShards.hasNext() ) {
 
 
-            final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, MIN_SHARD,
directedEdgeMeta );
-            try {
-                batch.execute();
-            }
-            catch ( ConnectionException e ) {
-                throw new RuntimeException( "Unable to connect to casandra", e );
-            }
+                final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope,
Shard.MIN_SHARD, directedEdgeMeta );
+                try {
+                    batch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to connect to casandra", e );
+                }
 
-            existingShards = Collections.singleton( MIN_SHARD ).iterator();
+                existingShards = Collections.singleton( Shard.MIN_SHARD ).iterator();
+            }
         }
 
+
         return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta(),
shardGroupCompaction, scope,
-                directedEdgeMeta );
+            directedEdgeMeta );
     }
 
 
@@ -173,17 +175,22 @@ public class NodeShardAllocationImpl implements NodeShardAllocation
{
             return false;
         }
 
-        if(LOG.isDebugEnabled()){
-            LOG.debug("Count of {} has exceeded shard config of {} will begin compacting",
count, shardSize);
+        if ( LOG.isDebugEnabled() ) {
+            LOG.debug( "Count of {} has exceeded shard config of {} will begin compacting",
count, shardSize );
         }
 
         /**
-         * We want to allocate a new shard as close to the max value as possible.  This way
if we're filling up a shard rapidly, we split it near the head of the values.
-         * Further checks to this group will result in more splits, similar to creating a
tree type structure and splitting each node.
+         * We want to allocate a new shard as close to the max value as possible.  This way
if we're filling up a
+         * shard rapidly, we split it near the head of the values.
+         * Further checks to this group will result in more splits, similar to creating a
tree type structure and
+         * splitting each node.
          *
-         * This means that the lower shard can be re-split later if it is still too large.
 We do the division to truncate
-         * to a split point < what our current max is that would be approximately be our
pivot ultimately if we split from the
-         * lower bound and moved forward.  Doing this will stop the current shard from expanding
and avoid a point where we cannot
+         * This means that the lower shard can be re-split later if it is still too large.
 We do the division to
+         * truncate
+         * to a split point < what our current max is that would be approximately be our
pivot ultimately if we split
+         * from the
+         * lower bound and moved forward.  Doing this will stop the current shard from expanding
and avoid a point
+         * where we cannot
          * ultimately compact to the correct shard size.
          */
 
@@ -193,13 +200,14 @@ public class NodeShardAllocationImpl implements NodeShardAllocation
{
          */
 
         final Iterator<MarkedEdge> edges = directedEdgeMeta
-                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(),
0,
-                        SearchByEdgeType.Order.ASCENDING );
+            .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(),
0,
+                SearchByEdgeType.Order.ASCENDING );
 
 
         if ( !edges.hasNext() ) {
-            LOG.warn( "Tried to allocate a new shard for edge meta data {}, "
-                    + "but no max value could be found in that row", directedEdgeMeta );
+            LOG.warn(
+                "Tried to allocate a new shard for edge meta data {}, " + "but no max value
could be found in that row",
+                directedEdgeMeta );
             return false;
         }
 
@@ -214,12 +222,12 @@ public class NodeShardAllocationImpl implements NodeShardAllocation
{
          */
 
 
-        for(long i = 1;  edges.hasNext(); i++){
+        for ( long i = 1; edges.hasNext(); i++ ) {
             //we hit a pivot shard, set it since it could be the last one we encounter
-            if(i% shardSize == 0){
+            if ( i % shardSize == 0 ) {
                 marked = edges.next();
             }
-            else{
+            else {
                 edges.next();
             }
         }
@@ -228,7 +236,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         /**
          * Sanity check in case our counters become severely out of sync with our edge state
in cassandra.
          */
-        if(marked == null){
+        if ( marked == null ) {
             LOG.warn( "Incorrect shard count for shard group {}", shardEntryGroup );
             return false;
         }
@@ -262,8 +270,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
         if ( minDelta < minimumAllowed ) {
-            throw new GraphRuntimeException( String.format(
-                    "You must configure the property %s to be >= 2 x %s.  Otherwise you
risk losing data",
+            throw new GraphRuntimeException( String
+                .format( "You must configure the property %s to be >= 2 x %s.  Otherwise
you risk losing data",
                     GraphFig.SHARD_MIN_DELTA, GraphFig.SHARD_CACHE_TIMEOUT ) );
         }
 
@@ -279,8 +287,9 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
         //TODO: TN this is broken....
-        //The timeout is in milliseconds.  Time for a time uuid is 1/10000 of a milli, so
we need to get the units correct
-        final long timeoutDelta = graphFig.getShardCacheTimeout() ;
+        //The timeout is in milliseconds.  Time for a time uuid is 1/10000 of a milli, so
we need to get the units
+        // correct
+        final long timeoutDelta = graphFig.getShardCacheTimeout();
 
         final long timeNow = timeService.getCurrentTime();
 
@@ -289,16 +298,16 @@ public class NodeShardAllocationImpl implements NodeShardAllocation
{
         for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
 
             //short circuit
-            if(!isNew || node.getId().getUuid().version() > 2){
+            if ( !isNew || node.getId().getUuid().version() > 2 ) {
                 return false;
             }
 
-            final long uuidTime =   TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid());
+            final long uuidTime = TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid() );
 
             final long newExpirationTimeout = uuidTime + timeoutDelta;
 
             //our expiration is after our current time, treat it as new
-            isNew = isNew && newExpirationTimeout >  timeNow;
+            isNew = isNew && newExpirationTimeout > timeNow;
         }
 
         return isNew;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
index 9c97d77..38a7834 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImpl.java
@@ -149,9 +149,18 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
 
         //now we can proceed based on the shard meta state and we don't have any edge
 
+        DeleteResult result = DeleteResult.NO_OP;
+
         MutationBatch rollup = null;
 
         for ( final Shard shard : shardEntryGroup.getReadShards() ) {
+
+            //skip the min shard
+            if(shard.isMinShard()){
+                continue;
+            }
+
+
             final MutationBatch shardRemovalMutation =
                 edgeShardSerialization.removeShardMeta( applicationScope, shard, directedEdgeMeta
);
 
@@ -162,20 +171,23 @@ public class ShardGroupDeletionImpl implements ShardGroupDeletion {
             else {
                 rollup.mergeShallow( shardRemovalMutation );
             }
+
+            result = DeleteResult.DELETED;
         }
 
 
-        Preconditions.checkNotNull( rollup, "rollup should be assigned" );
+       if( rollup != null) {
 
-        try {
-            rollup.execute();
-        }
-        catch ( ConnectionException e ) {
-            logger.error( "Unable to execute shard deletion", e );
-            throw new RuntimeException( "Unable to execute shard deletion", e );
-        }
+           try {
+               rollup.execute();
+           }
+           catch ( ConnectionException e ) {
+               logger.error( "Unable to execute shard deletion", e );
+               throw new RuntimeException( "Unable to execute shard deletion", e );
+           }
+       }
 
-        return DeleteResult.DELETED;
+        return result;
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 2fb08a4..b82c28a 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -71,10 +71,8 @@ import com.google.inject.Injector;
 import com.netflix.config.ConfigurationManager;
 
 import rx.Observable;
-import rx.functions.Action1;
 
 import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
-import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -89,8 +87,8 @@ public class GraphManagerShardConsistencyIT {
     private static final Meter writeMeter = registry.meter( "writeThroughput" );
 
     private static final Slf4jReporter reporter =
-            Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS
)
-                         .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
+        Slf4jReporter.forRegistry( registry ).outputTo( log ).convertRatesTo( TimeUnit.SECONDS
)
+                     .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
 
 
     protected ApplicationScope scope;
@@ -102,6 +100,8 @@ public class GraphManagerShardConsistencyIT {
 
     protected Object originalShardDelta;
 
+    protected ListeningExecutorService executor;
+
 
     @Before
     public void setupOrg() {
@@ -141,12 +141,19 @@ public class GraphManagerShardConsistencyIT {
     public void tearDown() {
         reporter.stop();
         reporter.report();
+
+        executor.shutdownNow();
+    }
+
+
+    private void createExecutor( final int size ) {
+        executor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( size )
);
     }
 
 
     @Test
     public void writeThousandsSingleSource()
-            throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException
{
+        throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException
{
 
         final Id sourceId = IdGenerator.createId( "source" );
         final String edgeType = "test";
@@ -166,13 +173,13 @@ public class GraphManagerShardConsistencyIT {
             @Override
             public Observable<Edge> doSearch( final GraphManager manager ) {
                 return manager.loadEdgesFromSource(
-                        new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE,
-                                SearchByEdgeType.Order.DESCENDING,  Optional.<Edge>absent()
) );
+                    new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                        Optional.<Edge>absent() ) );
             }
         };
 
 
-//        final int numInjectors = 2;
+        //        final int numInjectors = 2;
         final int numInjectors = 1;
 
         /**
@@ -200,16 +207,13 @@ public class GraphManagerShardConsistencyIT {
         final long numberOfEdges = shardSize * 4;
 
 
-        final long workerWriteLimit = numberOfEdges / numWorkersPerInjector;
-
+        final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
 
 
-        final long expectedShardCount = numberOfEdges/shardSize;
+        final long expectedShardCount = numberOfEdges / shardSize;
 
 
-        final ListeningExecutorService
-                executor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool(
numWorkersPerInjector ) );
-
+        createExecutor( numWorkersPerInjector );
 
         final AtomicLong writeCounter = new AtomicLong();
 
@@ -219,24 +223,22 @@ public class GraphManagerShardConsistencyIT {
 
 
         log.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit,
numWorkersPerInjector,
-                numInjectors );
+            numInjectors );
 
 
         final List<Future<Boolean>> futures = new ArrayList<>();
 
 
-
         for ( Injector injector : injectors ) {
             final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class
);
 
 
             for ( int i = 0; i < numWorkersPerInjector; i++ ) {
-                Future<Boolean> future = executor
-                        .submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime,
writeCounter ) );
+                Future<Boolean> future =
+                    executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime,
writeCounter ) );
 
                 futures.add( future );
             }
-
         }
 
         /**
@@ -262,11 +264,10 @@ public class GraphManagerShardConsistencyIT {
         /**
          * Start reading continuously while we migrate data to ensure our view is always
correct
          */
-        final ListenableFuture<Long> future = executor.submit( new ReadWorker( gmf,
generator, writeCount, readMeter ) );
-
-        final List<Throwable> failures = new ArrayList<>(  );
-
+        final ListenableFuture<Long> future =
+            executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
 
+        final List<Throwable> failures = new ArrayList<>();
 
 
         //add the future
@@ -287,37 +288,31 @@ public class GraphManagerShardConsistencyIT {
         } );
 
 
-
         int compactedCount;
 
 
-
-
-
         //now start our readers
 
         while ( true ) {
 
-            if(!failures.isEmpty()){
+            if ( !failures.isEmpty() ) {
 
-                StringBuilder builder = new StringBuilder(  );
+                StringBuilder builder = new StringBuilder();
 
-                builder.append("Read runner failed!\n");
+                builder.append( "Read runner failed!\n" );
 
-                for(Throwable t: failures){
+                for ( Throwable t : failures ) {
                     builder.append( "Exception is: " );
-                    ByteArrayOutputStream output = new ByteArrayOutputStream(  );
+                    ByteArrayOutputStream output = new ByteArrayOutputStream();
 
                     t.printStackTrace( new PrintWriter( output ) );
 
-                    builder.append( output.toString( "UTF-8" ));
+                    builder.append( output.toString( "UTF-8" ) );
                     builder.append( "\n\n" );
-
                 }
 
 
-
-                fail(builder.toString());
+                fail( builder.toString() );
             }
 
             //reset our count.  Ultimately we'll have 4 groups once our compaction completes
@@ -344,25 +339,21 @@ public class GraphManagerShardConsistencyIT {
             if ( compactedCount >= expectedShardCount ) {
                 log.info( "All compactions complete, sleeping" );
 
-//                final Object mutex = new Object();
-//
-//                synchronized ( mutex ){
-//
-//                    mutex.wait();
-//                }
+                //                final Object mutex = new Object();
+                //
+                //                synchronized ( mutex ){
+                //
+                //                    mutex.wait();
+                //                }
 
                 break;
-
             }
 
 
             Thread.sleep( 2000 );
-
         }
 
         executor.shutdownNow();
-
-
     }
 
 
@@ -392,7 +383,236 @@ public class GraphManagerShardConsistencyIT {
     }
 
 
+    @Test
+    public void writeThousandsDelete()
+        throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException
{
+
+        final Id sourceId = IdGenerator.createId( "source" );
+        final String edgeType = "test";
+
+        final EdgeGenerator generator = new EdgeGenerator() {
+
+
+            @Override
+            public Edge newEdge() {
+                Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "target"
) );
+
+
+                return edge;
+            }
+
+
+            @Override
+            public Observable<Edge> doSearch( final GraphManager manager ) {
+                return manager.loadEdgesFromSource(
+                    new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                        Optional.<Edge>absent(), false ) );
+            }
+        };
+
+
+        //        final int numInjectors = 2;
+        final int numInjectors = 1;
+
+        /**
+         * create 3 injectors.  This way all the caches are independent of one another. 
This is the same as
+         * multiple nodes
+         */
+        final List<Injector> injectors = createInjectors( numInjectors );
+
+
+        final GraphFig graphFig = getInstance( injectors, GraphFig.class );
+
+        final long shardSize = graphFig.getShardSize();
+
+
+        //we don't want to starve the cass runtime since it will be on the same box. Only
take 50% of processing
+        // power for writes
+        final int numProcessors = Runtime.getRuntime().availableProcessors() / 2;
+
+        final int numWorkersPerInjector = numProcessors / numInjectors;
+
+
+        /**
+         * Do 4x shard size so we should have approximately 4 shards
+         */
+        final long numberOfEdges = shardSize * 4;
+
+
+        final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
+
+        createExecutor( numWorkersPerInjector );
+
+
+        final AtomicLong writeCounter = new AtomicLong();
+
 
+        //min stop time the min delta + 1 cache cycle timeout
+        final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout();
+
+
+        log.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit,
numWorkersPerInjector,
+            numInjectors );
+
+
+        final List<Future<Boolean>> futures = new ArrayList<>();
+
+
+        for ( Injector injector : injectors ) {
+            final GraphManagerFactory gmf = injector.getInstance( GraphManagerFactory.class
);
+
+
+            for ( int i = 0; i < numWorkersPerInjector; i++ ) {
+                Future<Boolean> future =
+                    executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime,
writeCounter ) );
+
+                futures.add( future );
+            }
+        }
+
+        /**
+         * Wait for all writes to complete
+         */
+        for ( Future<Boolean> future : futures ) {
+            future.get();
+        }
+
+        //now get all our shards
+        final NodeShardCache cache = getInstance( injectors, NodeShardCache.class );
+
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId,
edgeType );
+
+        //now submit the readers.
+        final GraphManagerFactory gmf = getInstance( injectors, GraphManagerFactory.class
);
+
+
+        final long writeCount = writeCounter.get();
+        final Meter readMeter = registry.meter( "readThroughput" );
+
+
+        //check our shard state
+
+
+        final Iterator<ShardEntryGroup> existingShardGroups =
+            cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
+        int shardCount = 0;
+
+        while ( existingShardGroups.hasNext() ) {
+            final ShardEntryGroup group = existingShardGroups.next();
+
+            shardCount++;
+
+            log.info( "Compaction pending status for group {} is {}", group, group.isCompactionPending()
);
+        }
+
+
+        log.info( "found {} shard groups", shardCount );
+
+
+        //now mark and delete all the edges
+
+
+        final GraphManager manager = gmf.createEdgeManager( scope );
+
+        //sleep occasionally to stop pushing cassandra over
+
+        long count = Long.MAX_VALUE;
+
+        while(count != 0) {
+            //take 10000 then sleep
+            count = generator.doSearch( manager ).onBackpressureBlock().take( 1000 ).flatMap(
edge -> manager.markEdge( edge ) )
+                     .flatMap( edge -> manager.deleteEdge( edge ) ).countLong().toBlocking().last();
+
+            Thread.sleep( 500 );
+        }
+
+
+        //now loop until with a reader until our shards are gone
+
+
+        /**
+         * Start reading continuously while we migrate data to ensure our view is always
correct
+         */
+        final ListenableFuture<Long> future = executor.submit( new ReadWorker( gmf,
generator, 0, readMeter ) );
+
+        final List<Throwable> failures = new ArrayList<>();
+
+
+        //add the future
+        Futures.addCallback( future, new FutureCallback<Long>() {
+
+            @Override
+            public void onSuccess( @Nullable final Long result ) {
+                log.info( "Successfully ran the read, re-running" );
+                executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter )
);
+            }
+
+
+            @Override
+            public void onFailure( final Throwable t ) {
+                failures.add( t );
+                log.error( "Failed test!", t );
+            }
+        } );
+
+
+        //now start our readers
+
+        while ( true ) {
+
+            if ( !failures.isEmpty() ) {
+
+                StringBuilder builder = new StringBuilder();
+
+                builder.append( "Read runner failed!\n" );
+
+                for ( Throwable t : failures ) {
+                    builder.append( "Exception is: " );
+                    ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+                    t.printStackTrace( new PrintWriter( output ) );
+
+                    builder.append( output.toString( "UTF-8" ) );
+                    builder.append( "\n\n" );
+                }
+
+
+                fail( builder.toString() );
+            }
+
+            //reset our count.  Ultimately we'll have 4 groups once our compaction completes
+            shardCount = 0;
+
+            //we have to get it from the cache, because this will trigger the compaction
process
+            final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope,
Long.MAX_VALUE, directedEdgeMeta );
+
+            ShardEntryGroup group = null;
+
+            while ( groups.hasNext() ) {
+
+                group = groups.next();;
+
+                log.info( "Shard size for group is {}", group.getReadShards() );
+
+                shardCount += group.getReadShards().size();
+            }
+
+
+            //we're done, 1 shard remains, we have a group, and it's our default shard
+            if ( shardCount == 1 && group != null &&  group.getMinShard().getShardIndex()
== 0  ) {
+                log.info( "All compactions complete," );
+
+                break;
+            }
+
+
+            Thread.sleep( 2000 );
+        }
+
+        //now that we have finished expanding s
+
+        executor.shutdownNow();
+    }
 
 
     private class Worker implements Callable<Boolean> {
@@ -436,7 +656,6 @@ public class GraphManagerShardConsistencyIT {
                 writeCounter.incrementAndGet();
 
 
-
                 if ( i % 1000 == 0 ) {
                     log.info( "   Wrote: " + i );
                 }
@@ -454,8 +673,9 @@ public class GraphManagerShardConsistencyIT {
         private final long writeCount;
         private final Meter readMeter;
 
+
         private ReadWorker( final GraphManagerFactory factory, final EdgeGenerator generator,
final long writeCount,
-                            final Meter readMeter) {
+                            final Meter readMeter ) {
             this.factory = factory;
             this.generator = generator;
             this.writeCount = writeCount;
@@ -467,72 +687,28 @@ public class GraphManagerShardConsistencyIT {
         public Long call() throws Exception {
 
 
-
-
             GraphManager gm = factory.createEdgeManager( scope );
 
 
-
-            while(true) {
-
-//                final long[] count = {0};
-//                final long[] duplicate = {0};
-//                final HashSet<Edge >  seen = new HashSet<>((int)writeCount);
+            while ( true ) {
 
 
                 //do a read to eventually trigger our group compaction. Take 2 pages of columns
                 final long returnedEdgeCount = generator.doSearch( gm )
 
-                                                        .doOnNext( new Action1<Edge>()
{
-
-
-                                                            //                    private
Edge last;
-
-
-                                                            @Override
-                                                            public void call( final Edge
edge ) {
-                                                                readMeter.mark();
-
-                                                                //                      
 count[0]++;
-                                                                //
-                                                                //                      
 /**
-                                                                //                      
  * Added this check as part
-                                                                // of the read
-                                                                //                      
  */
-                                                                //                      
 if ( last != null && last
-                                                                // .equals(edge) ) {
-                                                                //                      
     fail( String.format( "Expected edges to be in order, however last was %s and current
is %s",
-                                                                //                      
             last, edge ) );
-                                                                //                      
 }
-                                                                //
-                                                                //                      
 last = edge;
-                                                                //
-                                                                //                      
 if( seen.contains( edge ) ){
-                                                                //                      
     fail( String.format("Returned an edge that was already seen! Edge was %s, last edge was
%s", edge, last) );
-                                                                //                      
     duplicate[0]++;
-                                                                //                      
 }
-                                                                //
-                                                                //                      
 seen.add( edge );
-
-                                                            }
-                                                        } )
+                                                        .doOnNext( edge -> readMeter.mark()
)
 
                                                         .countLong().toBlocking().last();
 
-
-//                if(returnedEdgeCount != count[0]-duplicate[0]){
-//                    log.warn( "Missing entries from the initial put" );
-//                }
-
                 log.info( "Completed reading {} edges", returnedEdgeCount );
 
-                if(writeCount != returnedEdgeCount){
-                    log.warn( "Unexpected edge count returned!!!  Expected {} but was {}",
writeCount, returnedEdgeCount );
+                if ( writeCount != returnedEdgeCount ) {
+                    log.warn( "Unexpected edge count returned!!!  Expected {} but was {}",
writeCount,
+                        returnedEdgeCount );
                 }
 
                 assertEquals( "Expected to read same edge count", writeCount, returnedEdgeCount
);
             }
-
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
index 9289340..30634e2 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroupTest.java
@@ -392,6 +392,20 @@ public class ShardEntryGroupTest {
 
         assertFalse( "Shard added", result );
     }
+
+
+    @Test
+    public void minShardNotDeleted() {
+        Shard minShard = Shard.MIN_SHARD;
+
+        ShardEntryGroup shardEntryGroup = new ShardEntryGroup( 1 );
+
+        shardEntryGroup.addShard( minShard );
+
+        assertTrue( minShard.isMinShard() );
+
+        assertFalse( shardEntryGroup.canBeDeleted( minShard ) );
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/beaed6ff/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
index 70c9b01..9a3e407 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupDeletionImplTest.java
@@ -120,6 +120,46 @@ public class ShardGroupDeletionImplTest {
 
 
     @Test
+    public void shardIsMinShard() throws ExecutionException, InterruptedException {
+
+
+        final long currentTime = 1000;
+
+        final Shard shard0 = Shard.MIN_SHARD;
+
+
+        //set a 1 delta for testing
+        final ShardEntryGroup group = new ShardEntryGroup( 1 );
+
+        group.addShard( shard0 );
+
+        assertTrue( "this should return false for our test to succeed", shard0.isMinShard()
);
+
+
+        final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class
);
+
+        final TimeService timeService = mock( TimeService.class );
+
+        when( timeService.getCurrentTime() ).thenReturn( currentTime );
+
+        initExecutor( 1, 1 );
+
+        final ShardGroupDeletionImpl shardGroupDeletion =
+            new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService
);
+
+        final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
+
+
+        final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
+            shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator()
);
+
+        final ShardGroupDeletion.DeleteResult result = future.get();
+
+        assertEquals( "should not delete min shard", ShardGroupDeletion.DeleteResult.NO_OP,
result );
+    }
+
+
+    @Test
     public void shardTooNew() throws ExecutionException, InterruptedException {
 
         final long createTime = 10000;
@@ -232,7 +272,7 @@ public class ShardGroupDeletionImplTest {
 
         final long currentTime = createTime * 2;
 
-        final Shard shard0 = new Shard( 0, createTime, true );
+        final Shard shard0 = new Shard( 1000, createTime, true );
 
 
         ////set a delta for way in the future
@@ -245,15 +285,14 @@ public class ShardGroupDeletionImplTest {
         assertFalse( "this should return false for our test to succeed", group.isNew( currentTime
) );
 
 
-
         final DirectedEdgeMeta directedEdgeMeta = getDirectedEdgeMeta();
 
         //mock up returning a mutation
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class
);
 
 
-        when(edgeShardSerialization.removeShardMeta( same(scope), same(shard0), same(directedEdgeMeta)
)).thenReturn( mock(
-            MutationBatch.class) );
+        when( edgeShardSerialization.removeShardMeta( same( scope ), same( shard0 ), same(
directedEdgeMeta ) ) )
+            .thenReturn( mock( MutationBatch.class ) );
 
         final TimeService timeService = mock( TimeService.class );
 
@@ -265,8 +304,6 @@ public class ShardGroupDeletionImplTest {
             new ShardGroupDeletionImpl( asyncTaskExecutor, edgeShardSerialization, timeService
);
 
 
-
-
         final ListenableFuture<ShardGroupDeletion.DeleteResult> future =
             shardGroupDeletion.maybeDeleteShard( this.scope, directedEdgeMeta, group, Collections.emptyIterator()
);
 
@@ -276,7 +313,6 @@ public class ShardGroupDeletionImplTest {
     }
 
 
-
     private DirectedEdgeMeta getDirectedEdgeMeta() {
 
         final Id sourceId = createId( "source" );


Mime
View raw message