usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [49/50] [abbrv] git commit: Fixed local consistency issue during long flushes.
Date Mon, 07 Jul 2014 18:17:06 GMT
Fixed local consistency issue during long flushes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2a98cae9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2a98cae9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2a98cae9

Branch: refs/heads/two-dot-o
Commit: 2a98cae997bb418557d1e962c32eabd6026a0725
Parents: ba04cb6
Author: Todd Nine <tnine@apigee.com>
Authored: Tue Jul 1 18:00:29 2014 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Tue Jul 1 18:00:29 2014 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/graph/GraphFig.java    |  23 +--
 .../persistence/graph/guice/GraphModule.java    |   5 +-
 .../impl/shard/NodeShardAllocation.java         |  11 --
 .../impl/shard/NodeShardCache.java              |  10 -
 .../shard/count/NodeShardApproximationImpl.java |  61 +++---
 .../shard/impl/NodeShardAllocationImpl.java     | 118 ++++++------
 .../impl/shard/impl/NodeShardCacheImpl.java     |  19 +-
 .../shard/impl/SizebasedEdgeShardStrategy.java  |  16 +-
 .../persistence/graph/GraphManagerIT.java       |  10 +-
 .../graph/GraphManagerShardingIT.java           | 189 +++++++++++++++++++
 .../impl/shard/NodeShardAllocationTest.java     |  12 +-
 .../shard/count/NodeShardApproximationTest.java |  13 --
 12 files changed, 312 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2a98cae9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index ca36ffc..e0ce45c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -35,12 +35,6 @@ public interface GraphFig extends GuicyFig {
 
     public static final String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
 
-
-
-    public static final String WRITE_TIMEOUT = "usergrid.graph.write.timeout";
-
-    public static final String READ_TIMEOUT = "usergrid.graph.read.timeout";
-
     public static final String SHARD_SIZE = "usergrid.graph.shard.size";
 
     public static final String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
@@ -59,17 +53,6 @@ public interface GraphFig extends GuicyFig {
     int getScanPageSize();
 
 
-    @Default("10000")
-    @Key(WRITE_TIMEOUT)
-    int getWriteTimeout();
-
-    /**
-     * Get the read timeout (in milliseconds) that we should allow when reading from the
data source
-     */
-    @Default("10000")
-    @Key(READ_TIMEOUT)
-    int getReadTimeout();
-
 
     @Default("5")
     @Key(REPAIR_CONCURRENT_SIZE)
@@ -77,13 +60,11 @@ public interface GraphFig extends GuicyFig {
 
 
 
-    @Default("10000")
+    @Default("500000")
     @Key(SHARD_SIZE)
     long getShardSize();
 
 
-
-
     @Default("30000")
     @Key(SHARD_CACHE_TIMEOUT)
     long getShardCacheTimeout();
@@ -93,7 +74,7 @@ public interface GraphFig extends GuicyFig {
     long getShardCacheSize();
 
 
-    @Default( "100000" )
+    @Default( "10000" )
     @Key( COUNTER_WRITE_FLUSH_COUNT )
     long getCounterFlushCount();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2a98cae9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index f096740..aca8d00 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -145,9 +145,10 @@ public class GraphModule extends AbstractModule {
     @StorageEdgeSerialization
     public EdgeSerialization permanentStorageSerialization( final NodeShardCache cache, final
Keyspace keyspace,
                                                             final CassandraConfig cassandraConfig,
-                                                            final GraphFig graphFig ) {
+                                                            final GraphFig graphFig,
+                                                            final NodeShardApproximation
shardApproximation) {
 
-        final EdgeShardStrategy sizeBasedStrategy = new SizebasedEdgeShardStrategy( cache
);
+        final EdgeShardStrategy sizeBasedStrategy = new SizebasedEdgeShardStrategy( cache,
shardApproximation );
 
         final EdgeSerializationImpl edgeSerialization =
                 new EdgeSerializationImpl( keyspace, cassandraConfig, graphFig, sizeBasedStrategy
);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2a98cae9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
index 07ee490..1097ced 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
@@ -59,15 +59,4 @@ public interface NodeShardAllocation {
     public boolean auditMaxShard(final ApplicationScope scope, final Id nodeId, final String...
edgeType);
 
 
-    /**
-     * Increment the underlying counts on the shards
-     * @param scope The app scope
-     * @param nodeId The id of the node
-     * @param shardId The id of the shard
-     * @param amount The amount to increment
-     * @param edgeTypes The edge types to include in the count operation
-     */
-    public void increment(final ApplicationScope scope, final Id nodeId, final long shardId,
final long amount, final String... edgeTypes);
-
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2a98cae9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
index 6c83da2..667fdbf 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
@@ -52,14 +52,4 @@ public interface NodeShardCache {
      */
     public Iterator<Long> getVersions(final ApplicationScope scope, final Id nodeId,
final long  maxTimestamp, final String... edgeType);
 
-
-    /**
-     * Increment the cached counter amount for this shard
-     * @param scope
-     * @param nodeId
-     * @param shard
-     * @param count
-     * @return The new local cached count
-     */
-    public void increment(final ApplicationScope scope, final Id nodeId, final long shard,
final long count, final String... edgeTypes);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2a98cae9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
index a67b9fb..7dc763f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
@@ -24,8 +24,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import javax.inject.Inject;
 
-import org.apache.cassandra.thrift.Mutation;
-
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -57,8 +55,16 @@ public class NodeShardApproximationImpl implements NodeShardApproximation
{
     private final NodeShardCounterSerialization nodeShardCounterSerialization;
     private final TimeService timeService;
 
+    /**
+     * Counter currently implemented
+     */
     private volatile Counter currentCounter;
 
+    /**
+     * The counter that is currently in process of flushing to Cassandra.  Can be null
+     */
+    private volatile Counter flushPending;
+
 
     /**
      * Create a time shard approximation with the correct configuration.
@@ -108,6 +114,10 @@ public class NodeShardApproximationImpl implements NodeShardApproximation
{
 
         try {
             count = currentCounter.get( key );
+
+            if ( flushPending != null ) {
+                count += flushPending.get( key );
+            }
         }
         finally {
             readLock.unlock();
@@ -121,12 +131,11 @@ public class NodeShardApproximationImpl implements NodeShardApproximation
{
 
     @Override
     public void flush() {
-        final Counter toFlush;
 
         writeLockLock.lock();
 
         try {
-            toFlush = currentCounter;
+            flushPending = currentCounter;
             currentCounter = new Counter();
         }
         finally {
@@ -135,7 +144,7 @@ public class NodeShardApproximationImpl implements NodeShardApproximation
{
 
 
         //copy to the batch outside of the command for performance
-        final MutationBatch batch =  nodeShardCounterSerialization.flush( toFlush );
+        final MutationBatch batch = nodeShardCounterSerialization.flush( flushPending );
 
         /**
          * Execute the command in hystrix to avoid slamming cassandra
@@ -156,11 +165,20 @@ public class NodeShardApproximationImpl implements NodeShardApproximation
{
             @Override
             protected Object getFallback() {
                 //we've failed to mutate.  Merge this count back into the current one
-                currentCounter.merge( toFlush );
+                currentCounter.merge( flushPending );
 
                 return null;
             }
         }.execute();
+
+        writeLockLock.lock();
+
+        try {
+            flushPending = null;
+        }
+        finally {
+            writeLockLock.unlock();
+        }
     }
 
 
@@ -169,22 +187,21 @@ public class NodeShardApproximationImpl implements NodeShardApproximation
{
      */
     private void checkFlush() {
 
-        //we shouldn't flush we've not pass our timeout
-        if ( currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval() >
timeService.getCurrentTime()
-                //or we're not past the invocation count
-                && currentCounter.getInvokeCount() < graphFig.getCounterFlushCount()
) {
-            return;
+        //there's no flush pending and we're past the timeout or count
+        if ( flushPending == null && (
+                currentCounter.getCreateTimestamp() + graphFig.getCounterFlushInterval()
> timeService.getCurrentTime()
+                        || currentCounter.getInvokeCount() >= graphFig.getCounterFlushCount()
) ) {
+
+
+            /**
+             * Fire the flush action asynchronously
+             */
+            Schedulers.immediate().createWorker().schedule( new Action0() {
+                @Override
+                public void call() {
+                    flush();
+                }
+            } );
         }
-
-
-        /**
-         * Fire the flush action asynchronously
-         */
-        Schedulers.immediate().createWorker().schedule( new Action0() {
-            @Override
-            public void call() {
-                flush();
-            }
-        } );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2a98cae9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index c28b693..cf70669 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -55,18 +55,16 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
     private final NodeShardApproximation nodeShardApproximation;
     private final TimeService timeService;
     private final GraphFig graphFig;
-    private final Keyspace keyspace;
 
 
     @Inject
     public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization,
-                                    final  NodeShardApproximation nodeShardApproximation,
-                                    final TimeService timeService, final GraphFig graphFig,
final Keyspace keyspace ) {
+                                    final NodeShardApproximation nodeShardApproximation,
+                                    final TimeService timeService, final GraphFig graphFig
) {
         this.edgeShardSerialization = edgeShardSerialization;
         this.nodeShardApproximation = nodeShardApproximation;
         this.timeService = timeService;
         this.graphFig = graphFig;
-        this.keyspace = keyspace;
     }
 
 
@@ -77,59 +75,59 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
                 edgeShardSerialization.getEdgeMetaData( scope, nodeId, maxShardId, edgeTypes
);
 
         final PushbackIterator<Long> pushbackIterator = new PushbackIterator( existingShards
);
-
-
-        final long now = timeService.getCurrentTime();
-
-
-        final List<Long> futures = new ArrayList<Long>();
-
-
-        //loop through all shards, any shard > now+1 should be deleted
-        while ( pushbackIterator.hasNext() ) {
-
-            final Long value = pushbackIterator.next();
-
-            //we're done, our current time uuid is greater than the value stored
-            if ( now >= value ) {
-                //push it back into the iterator
-                pushbackIterator.pushback( value );
-                break;
-            }
-
-            futures.add( value );
-        }
-
-
-        //we have more than 1 future value, we need to remove it
-
-        MutationBatch cleanup = keyspace.prepareMutationBatch();
-
-        //remove all futures except the last one, it is the only value we shouldn't lazy
remove
-        for ( int i = 0; i < futures.size() -1; i++ ) {
-            final long toRemove = futures.get( i );
-
-            final MutationBatch batch = edgeShardSerialization.removeEdgeMeta( scope, nodeId,
toRemove, edgeTypes );
-
-            cleanup.mergeShallow( batch );
-        }
-
-
-        try {
-            cleanup.execute();
-        }
-        catch ( ConnectionException e ) {
-            throw new GraphRuntimeException( "Unable to remove future shards, mutation error",
e );
-        }
-
-
-        final int futuresSize =  futures.size();
-
-        if ( futuresSize > 0 ) {
-            pushbackIterator.pushback( futures.get( futuresSize - 1 ) );
-        }
-
-
+//
+//
+//        final long now = timeService.getCurrentTime();
+//
+//
+//        final List<Long> futures = new ArrayList<Long>();
+//
+//
+//        //loop through all shards, any shard > now+1 should be deleted
+//        while ( pushbackIterator.hasNext() ) {
+//
+//            final Long value = pushbackIterator.next();
+//
+//            //we're done, our current time uuid is greater than the value stored
+//            if ( now >= value ) {
+//                //push it back into the iterator
+//                pushbackIterator.pushback( value );
+//                break;
+//            }
+//
+//            futures.add( value );
+//        }
+//
+//
+//        //we have more than 1 future value, we need to remove it
+//
+//        MutationBatch cleanup = keyspace.prepareMutationBatch();
+//
+//        //remove all futures except the last one, it is the only value we shouldn't lazy
remove
+//        for ( int i = 0; i < futures.size() -1; i++ ) {
+//            final long toRemove = futures.get( i );
+//
+//            final MutationBatch batch = edgeShardSerialization.removeEdgeMeta( scope, nodeId,
toRemove, edgeTypes );
+//
+//            cleanup.mergeShallow( batch );
+//        }
+//
+//
+//        try {
+//            cleanup.execute();
+//        }
+//        catch ( ConnectionException e ) {
+//            throw new GraphRuntimeException( "Unable to remove future shards, mutation
error", e );
+//        }
+//
+//
+//        final int futuresSize =  futures.size();
+//
+//        if ( futuresSize > 0 ) {
+//            pushbackIterator.pushback( futures.get( futuresSize - 1 ) );
+//        }
+//
+//
         /**
          * Nothing to iterate, return an iterator with 0.
          */
@@ -183,10 +181,4 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         return true;
     }
 
-
-    @Override
-    public void increment( final ApplicationScope scope, final Id nodeId, final long shardId,
final long amount,
-                           final String... edgeTypes ) {
-       nodeShardApproximation.increment( scope, nodeId, shardId, amount, edgeTypes );
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2a98cae9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index d34c625..3b78898 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -139,13 +139,6 @@ public class NodeShardCacheImpl implements NodeShardCache {
         return iterator;
     }
 
-
-    @Override
-    public void increment( final ApplicationScope scope, final Id nodeId, final long shard,
final long count, final String... edgeTypes ) {
-        nodeShardAllocation.increment( scope, nodeId, shard, count, edgeTypes );
-    }
-
-
     /**
      * This is a race condition.  We could re-init the shard while another thread is reading
it.  This is fine, the read
      * doesn't have to be precise.  The algorithm accounts for stale data.
@@ -160,12 +153,12 @@ public class NodeShardCacheImpl implements NodeShardCache {
                       @Override
                       public CacheEntry load( final CacheKey key ) throws Exception {
 
-
-                          /**
-                           * Perform an audit in case we need to allocate a new shard
-                           */
-                          nodeShardAllocation.auditMaxShard( key.scope, key.id, key.types
);
-                          //TODO, we need to put some sort of upper bounds on this, it could
possibly get too large
+//
+//                          /**
+//                           * Perform an audit in case we need to allocate a new shard
+//                           */
+//                          nodeShardAllocation.auditMaxShard( key.scope, key.id, key.types
);
+//                          //TODO, we need to put some sort of upper bounds on this, it
could possibly get too large
 
 
                           final Iterator<Long> edges = nodeShardAllocation

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2a98cae9/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
index c1dd2dc..f246f23 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/SizebasedEdgeShardStrategy.java
@@ -21,10 +21,10 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
 import java.util.Iterator;
-import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -40,11 +40,15 @@ public class SizebasedEdgeShardStrategy implements EdgeShardStrategy {
 
 
     private final NodeShardCache shardCache;
+    private final NodeShardApproximation shardApproximation;
 
 
     @Inject
-    public SizebasedEdgeShardStrategy( final NodeShardCache shardCache ) {this.shardCache
= shardCache;}
-
+    public SizebasedEdgeShardStrategy( final NodeShardCache shardCache,
+                                       final NodeShardApproximation shardApproximation )
{
+        this.shardCache = shardCache;
+        this.shardApproximation = shardApproximation;
+    }
 
 
     @Override
@@ -62,9 +66,9 @@ public class SizebasedEdgeShardStrategy implements EdgeShardStrategy {
 
 
     @Override
-    public void increment( final ApplicationScope scope, final Id rowKeyId,
-                           final long shardId, final long count, final String... types )
{
-        shardCache.increment(  scope, rowKeyId, shardId, count, types );
+    public void increment( final ApplicationScope scope, final Id rowKeyId, final long shardId,
final long count,
+                           final String... types ) {
+        shardApproximation.increment( scope, rowKeyId, shardId, count, types );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2a98cae9/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
index 364bd23..9fa4cbf 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerIT.java
@@ -29,6 +29,7 @@ import org.junit.Test;
 
 import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -76,14 +77,7 @@ public abstract class GraphManagerIT {
 
     @Before
     public void mockApp() {
-        this.scope = mock( ApplicationScope.class );
-
-        Id orgId = mock( Id.class );
-
-        when( orgId.getType() ).thenReturn( "organization" );
-        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
-
-        when( this.scope.getApplication() ).thenReturn( orgId );
+        this.scope = new ApplicationScopeImpl(createId("application")  );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2a98cae9/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
new file mode 100644
index 0000000..01e07b1
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardingIT.java
@@ -0,0 +1,189 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+package org.apache.usergrid.persistence.graph;
+
+
+import java.util.concurrent.TimeoutException;
+
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.cassandra.ITRunner;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createEdge;
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+@RunWith( ITRunner.class )
+@UseModules( TestGraphModule.class )
+public class GraphManagerShardingIT {
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    protected GraphManagerFactory emf;
+
+
+    @Inject
+    protected GraphFig graphFig;
+
+    @Inject
+    protected NodeShardApproximation nodeShardApproximation;
+
+    protected ApplicationScope scope;
+
+
+
+
+    @Before
+    public void mockApp() {
+        this.scope = new ApplicationScopeImpl(createId("application")  );
+    }
+
+
+    @Test
+    public void testWriteSourceType() throws TimeoutException, InterruptedException {
+
+        GraphManager gm = emf.createEdgeManager( scope ) ;
+
+        final Id sourceId = createId( "source" );
+        final String edgeType = "test";
+
+
+
+
+        final long flushCount = graphFig.getCounterFlushCount();
+        final long maxShardSize = graphFig.getShardSize();
+
+
+
+
+        final long startTime = System.currentTimeMillis();
+
+        //each edge causes 4 counts
+        final long writeCount = flushCount/4;
+
+        assertTrue( "Shard size must be >= flush Count", maxShardSize >= flushCount
);
+
+        Id targetId = null;
+
+        for(long i = 0; i < writeCount; i ++){
+            targetId = createId("target") ;
+
+            final Edge edge = createEdge( sourceId, edgeType, targetId);
+
+            gm.writeEdge( edge ).toBlocking().last();
+
+        }
+
+
+        long shardCount = nodeShardApproximation.getCount( scope, sourceId, 0l, edgeType
);
+
+        assertEquals("Shard count for source node should be the same as write count", writeCount,
shardCount);
+
+
+        //now verify it's correct for the target
+
+        shardCount = nodeShardApproximation.getCount( scope, targetId, 0l, edgeType );
+
+        assertEquals(1, shardCount);
+
+    }
+
+
+    @Test
+    public void testWriteTargetType() throws TimeoutException, InterruptedException {
+
+        GraphManager gm = emf.createEdgeManager( scope ) ;
+
+        final Id targetId = createId( "target" );
+        final String edgeType = "test";
+
+
+
+
+        final long flushCount = graphFig.getCounterFlushCount();
+        final long maxShardSize = graphFig.getShardSize();
+
+
+
+
+        final long startTime = System.currentTimeMillis();
+
+        //each edge causes 4 counts
+        final long writeCount = flushCount/4;
+
+        assertTrue( "Shard size must be >= flush Count", maxShardSize >= flushCount
);
+
+        Id sourceId = null;
+
+        for(long i = 0; i < writeCount; i ++){
+            sourceId = createId("source") ;
+
+            final Edge edge = createEdge( sourceId, edgeType, targetId);
+
+            gm.writeEdge( edge ).toBlocking().last();
+
+        }
+
+
+        long shardCount = nodeShardApproximation.getCount( scope, targetId, 0l, edgeType
);
+
+        assertEquals("Shard count for source node should be the same as write count", writeCount,
shardCount);
+
+
+        //now verify it's correct for the target
+
+        shardCount = nodeShardApproximation.getCount( scope, sourceId, 0l, edgeType );
+
+        assertEquals(1, shardCount);
+
+    }
+
+
+
+
+}
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2a98cae9/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 3b9e074..5c846f1 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -98,7 +98,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, nodeShardCounterSerialization,
timeService,
-                        graphFig, keyspace );
+                        graphFig );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -137,7 +137,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, nodeShardCounterSerialization,
timeService,
-                        graphFig, keyspace );
+                        graphFig );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -182,7 +182,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation,
timeService,
-                        graphFig, keyspace );
+                        graphFig );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -234,7 +234,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation,
timeService,
-                        graphFig, keyspace );
+                        graphFig );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -307,7 +307,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation,
timeService,
-                        graphFig, keyspace );
+                        graphFig );
 
         final Id nodeId = createId( "test" );
         final String type = "type";
@@ -411,7 +411,7 @@ public class NodeShardAllocationTest {
 
         NodeShardAllocation approximation =
                 new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation,
timeService,
-                        graphFig, keyspace );
+                        graphFig );
 
         final Id nodeId = createId( "test" );
         final String type = "type";

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2a98cae9/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
index e849725..da19ce5 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
@@ -423,19 +423,6 @@ public class NodeShardApproximationTest {
             return 0;  //To change body of implemented methods use File | Settings | File
Templates.
         }
 
-
-        @Override
-        public int getWriteTimeout() {
-            return 0;  //To change body of implemented methods use File | Settings | File
Templates.
-        }
-
-
-        @Override
-        public int getReadTimeout() {
-            return 0;  //To change body of implemented methods use File | Settings | File
Templates.
-        }
-
-
         @Override
         public int getRepairConcurrentSize() {
             return 0;  //To change body of implemented methods use File | Settings | File
Templates.


Mime
View raw message