usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [07/20] usergrid git commit: Throttle the compactions and auditing such that the 'type' can only be compacted one at a time (source or target node shard(s))
Date Wed, 23 Mar 2016 17:34:35 GMT
Throttle the compactions and auditing such that the 'type' can only be compacted one at a time
(source or target node shard(s))


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/58ae197e
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/58ae197e
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/58ae197e

Branch: refs/heads/release-2.1.1
Commit: 58ae197ea581f271de644760531a9cd45287c7c9
Parents: 4e407ff
Author: Michael Russo <mrusso@apigee.com>
Authored: Fri Mar 18 14:31:30 2016 -0700
Committer: Michael Russo <mrusso@apigee.com>
Committed: Fri Mar 18 14:31:30 2016 -0700

----------------------------------------------------------------------
 .../core/astyanax/MultiRowColumnIterator.java   |  2 +-
 .../shard/impl/ShardGroupCompactionImpl.java    | 54 ++++++++++++++++----
 .../graph/GraphManagerShardConsistencyIT.java   | 25 ++++-----
 3 files changed, 54 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/58ae197e/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index d8b9097..6049c1f 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -375,7 +375,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
 
         // if a whole page is skipped OR the result size equals the the difference of what's
skipped,
         // it is likely during a shard transition and we should assume there is more to read
-        if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize -
skipSize ){
+        if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize -
skipSize || size == (selectSize -1) - skipSize ){
             moreToReturn = true;
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/58ae197e/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 1890d53..8728c6c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
 
 import com.google.common.base.Optional;
 import com.netflix.astyanax.connectionpool.OperationResult;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
 import org.slf4j.Logger;
@@ -200,37 +201,61 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
 
 
                 edgeCount++;
-                shardEnd = edge;
+
 
                 // if we're at our count, execute the mutation of writing the edges to the
new row, then remove them
                 // from the old rows
                 if ( edgeCount % maxWorkSize == 0 ) {
 
+
+
                     try {
 
                         // write the edges into the new shard atomically so we know they
all succeed
                         newRowBatch.withAtomicBatch(true).execute();
 
+                        // set the shardEnd after the write is known to be successful
+                        shardEnd = edge;
+
+                        // Update the shard end after each batch so any reads during transition
stay as close to current
+                        sourceShard.setShardEnd(
+                            Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
+                        );
+
+                        logger.info("Updating shard {} for nodes {} with shardEnd {}", sourceShard,
edgeMeta.getNodes(), shardEnd );
+                        updateShardMetaBatch.mergeShallow(
+                            edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
+
+
+
                         // on purpose block this thread before deleting the old edges to
be sure there are no gaps
                         // duplicates are filtered on graph seeking so this is OK
                         Thread.sleep(1000);
+                        logger.info("Deleting batch of {} from old shard", maxWorkSize);
                         deleteRowBatch.execute();
 
+
                     }
                     catch ( Throwable t ) {
                         logger.error( "Unable to move edges from shard {} to shard {}", sourceShard,
targetShard );
                     }
+                }else {
+
+                    shardEnd = edge;
+
                 }
+
+
+
             }
 
-            if (shardEnd != null){
+            if (shardEnd != null && edgeCount > 0){
 
                 sourceShard.setShardEnd(
                     Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
                 );
 
-
-                logger.info("Updating shard {} with shardEnd: {}", sourceShard, shardEnd
);
+                logger.info("Updating shard {} for nodes {} with shardEnd {}", sourceShard,
shardEnd );
                 updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope,
sourceShard, edgeMeta));
             }
 
@@ -247,9 +272,13 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
             // on purpose block this thread before deleting the old edges to be sure there
are no gaps
             // duplicates are filtered on graph seeking so this is OK
             Thread.sleep(1000);
+
+            logger.info("Deleting remaining edges from old shard");
             deleteRowBatch.execute();
 
+            // now update with our shard end
             updateShardMetaBatch.execute();
+
         }
         catch ( Throwable t ) {
             logger.error( "Unable to move edges to target shard {}", targetShard );
@@ -438,6 +467,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
                  * It's already compacting, don't do anything
                  */
                 if ( !shardCompactionTaskTracker.canStartTask( scope, edgeMeta, group ) )
{
+                    logger.info("the group is already compacting");
                     return AuditResult.COMPACTING;
                 }
 
@@ -477,8 +507,9 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
         public boolean canStartTask( final ApplicationScope scope, final DirectedEdgeMeta
edgeMeta,
                                      ShardEntryGroup group ) {
             final Long hash = doHash( scope, edgeMeta, group ).hash().asLong();
-
             final Boolean returned = runningTasks.putIfAbsent( hash, TRUE );
+            //logger.info("hash components are app: {}, edgeMeta: {}, group: {}", scope.getApplication(),
edgeMeta, group);
+            //logger.info("checking hash value of: {}, already started: {}", hash, returned
);
 
             /**
              * Someone already put the value
@@ -509,12 +540,13 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
         /**
          * Hash our data into a consistent long
          */
+        @Override
         protected Hasher doHash( final ApplicationScope scope, final DirectedEdgeMeta directedEdgeMeta,
                                  final ShardEntryGroup shardEntryGroup ) {
 
             final Hasher hasher = super.doHash( scope, directedEdgeMeta, shardEntryGroup
);
 
-            //add our compaction target to the hash
+            // add the compaction target to the hash
             final Shard compactionTarget = shardEntryGroup.getCompactionTarget();
 
             hasher.putLong( compactionTarget.getShardIndex() );
@@ -541,14 +573,16 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
 
             addToHash( hasher, scope.getApplication() );
 
-            /**
-             * add our edge meta data
-             */
+
+            /** Commenting the full meta from the hash so we allocate/compact shards in a
more controlled fashion
+
             for ( DirectedEdgeMeta.NodeMeta nodeMeta : directedEdgeMeta.getNodes() ) {
                 addToHash( hasher, nodeMeta.getId() );
                 hasher.putInt( nodeMeta.getNodeType().getStorageValue() );
             }
 
+            **/
+
 
             /**
              * Add our edge type
@@ -557,8 +591,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
                 hasher.putString( type, CHARSET );
             }
 
-            //add our compaction target to the hash
-
 
             return hasher;
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/58ae197e/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 8fd7cea..9e6996d 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -102,6 +102,8 @@ public class GraphManagerShardConsistencyIT {
 
     protected ListeningExecutorService deleteExecutor;
 
+    protected int TARGET_NUM_SHARDS = 6;
+
 
 
     @Before
@@ -172,15 +174,15 @@ public class GraphManagerShardConsistencyIT {
     public void writeThousandsSingleSource()
         throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException
{
 
-        final Id sourceId = IdGenerator.createId( "sourceWrite" );
-        final String edgeType = "testWrite_"+ UUIDGenerator.newTimeUUID().toString();
+        final Id sourceId = IdGenerator.createId( "sourceWrite_"+ UUIDGenerator.newTimeUUID().toString()
);
+        final String edgeType = "testWrite";
 
         final EdgeGenerator generator = new EdgeGenerator() {
 
 
             @Override
             public Edge newEdge() {
-                Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite"
) );
+                Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite_"+
UUIDGenerator.newTimeUUID().toString() ) );
 
 
                 return edge;
@@ -196,7 +198,7 @@ public class GraphManagerShardConsistencyIT {
         };
 
 
-        final int numInjectors = 2;
+        final int numInjectors = 1;
 
         /**
          * create injectors.  This way all the caches are independent of one another.  This
is the same as
@@ -218,10 +220,7 @@ public class GraphManagerShardConsistencyIT {
 
 
 
-        /**
-         * Do 4x shard size so we should have approximately 4 shards
-         */
-        final long numberOfEdges = shardSize * 4;
+        final long numberOfEdges = shardSize * TARGET_NUM_SHARDS;
 
 
         final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
@@ -233,7 +232,7 @@ public class GraphManagerShardConsistencyIT {
 
 
         //min stop time the min delta + 1 cache cycle timeout
-        final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout()
+ 60000;
+        final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout()
+ 120000;
 
 
         logger.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit,
numWorkersPerInjector,
@@ -279,7 +278,7 @@ public class GraphManagerShardConsistencyIT {
         final List<Throwable> failures = new ArrayList<>();
         Thread.sleep(3000); // let's make sure everything is written
 
-        for(int i = 0; i < 2; i ++) {
+        for(int i = 0; i < 1; i ++) {
 
 
             /**
@@ -452,11 +451,7 @@ public class GraphManagerShardConsistencyIT {
 
         final int numWorkersPerInjector = numProcessors / numInjectors;
 
-
-        /**
-         * Do 4x shard size so we should have approximately 4 shards
-         */
-        final long numberOfEdges = shardSize * 4;
+        final long numberOfEdges = shardSize * TARGET_NUM_SHARDS;
 
 
         final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;


Mime
View raw message