usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [45/50] [abbrv] git commit: Added shard counting when serializing edges
Date Mon, 07 Jul 2014 18:17:02 GMT
Added shard counting when serializing edges


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ba04cb6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ba04cb6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ba04cb6a

Branch: refs/heads/two-dot-o
Commit: ba04cb6a60758020d0ec9f9a4a575ebf321373ef
Parents: 69c2580
Author: Todd Nine <tnine@apigee.com>
Authored: Mon Jun 30 16:24:50 2014 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Mon Jun 30 16:24:50 2014 -0600

----------------------------------------------------------------------
 .../impl/EdgeSerializationImpl.java             | 34 +++++++++++++-----
 .../impl/shard/NodeShardAllocation.java         | 15 ++++++--
 .../impl/shard/NodeShardCache.java              |  2 +-
 .../shard/impl/NodeShardAllocationImpl.java     | 19 +++++++---
 .../impl/shard/impl/NodeShardCacheImpl.java     | 10 ++----
 .../impl/shard/NodeShardAllocationTest.java     | 38 ++++++++++----------
 6 files changed, 75 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ba04cb6a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
index b61616e..10d1048 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeSerializationImpl.java
@@ -213,7 +213,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration
{
 
             @Override
             public void countEdge( final Id rowId, final long shardId, final String... types
) {
-                edgeShardStrategy.increment( scope, rowId, shardId, 1l, types );
+                if(!isDeleted){
+                    edgeShardStrategy.increment( scope, rowId, shardId, 1l, types );
+                }
             }
 
 
@@ -278,7 +280,9 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration
{
         EdgeUtils.validateEdge( edge );
 
         final Id sourceNodeId = edge.getSourceNode();
+        final String souceNodeType = sourceNodeId.getType();
         final Id targetNodeId = edge.getTargetNode();
+        final String targetNodeType = targetNodeId.getType();
         final long timestamp = edge.getTimestamp();
         final String type = edge.getType();
 
@@ -287,20 +291,24 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration
{
          * Key in the serializers based on the edge
          */
 
-        final RowKey sourceRowKey = new RowKey( sourceNodeId, type,
-                edgeShardStrategy.getWriteShard( scope, sourceNodeId, timestamp, type ) );
+        final long sourceRowKeyShard = edgeShardStrategy.getWriteShard( scope, sourceNodeId,
timestamp, type );
+        final RowKey sourceRowKey = new RowKey( sourceNodeId, type, sourceRowKeyShard);
 
-        final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId,
-                edgeShardStrategy.getWriteShard( scope, sourceNodeId, timestamp, type, targetNodeId.getType()
) );
+
+
+        final long sourceWithTypeRowKeyShard =  edgeShardStrategy.getWriteShard( scope, sourceNodeId,
timestamp, type, targetNodeType );
+
+        final RowKeyType sourceRowKeyType = new RowKeyType( sourceNodeId, type, targetNodeId,
sourceWithTypeRowKeyShard );
 
         final DirectedEdge sourceEdge = new DirectedEdge( targetNodeId, timestamp );
 
 
-        final RowKey targetRowKey = new RowKey( targetNodeId, type,
-                edgeShardStrategy.getWriteShard( scope, targetNodeId, timestamp, type ) );
 
-        final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId,
-                edgeShardStrategy.getWriteShard( scope, targetNodeId, timestamp, type, sourceNodeId.getType()
) );
+        final long targetRowKeyShard = edgeShardStrategy.getWriteShard( scope, targetNodeId,
timestamp, type );
+        final RowKey targetRowKey = new RowKey( targetNodeId, type, targetRowKeyShard);
+
+        final long targetWithTypeRowKeyShard = edgeShardStrategy.getWriteShard( scope, targetNodeId,
timestamp, type, souceNodeType );
+        final RowKeyType targetRowKeyType = new RowKeyType( targetNodeId, type, sourceNodeId,
targetWithTypeRowKeyShard );
 
         final DirectedEdge targetEdge = new DirectedEdge( sourceNodeId, timestamp );
 
@@ -315,22 +323,30 @@ public class EdgeSerializationImpl implements EdgeSerialization, Migration
{
          */
 
         op.writeEdge( sourceNodeEdgesCf, sourceRowKey, sourceEdge );
+        op.countEdge( sourceNodeId, sourceRowKeyShard, type );
 
         op.writeEdge( sourceNodeTargetTypeCf, sourceRowKeyType, sourceEdge );
+        op.countEdge( sourceNodeId, sourceWithTypeRowKeyShard, type, targetNodeType );
+
+
 
 
         /**
          * write edges from target<-source
          */
         op.writeEdge( targetNodeEdgesCf, targetRowKey, targetEdge );
+        op.countEdge( targetNodeId, targetRowKeyShard, type );
 
         op.writeEdge( targetNodeSourceTypeCf, targetRowKeyType, targetEdge );
+        op.countEdge( targetNodeId, targetWithTypeRowKeyShard, type, souceNodeType );
 
 
         /**
          * Write this in the timestamp log for this edge of source->target
          */
         op.writeVersion( graphEdgeVersionsCf, edgeRowKey, timestamp );
+
+
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ba04cb6a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
index 675d0dc..07ee490 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocation.java
@@ -38,7 +38,7 @@ public interface NodeShardAllocation {
     /**
      * Get all shards for the given info.  If none exist, a default shard should be allocated
      *
-     * @param scope
+     * @param scope The application scope
      * @param nodeId
      * @param maxShardId The max value to start seeking from.  Values <= this will be
returned if specified
      * @param edgeTypes
@@ -51,7 +51,7 @@ public interface NodeShardAllocation {
     /**
      * Audit our highest shard for it's maximum capacity.  If it has reached the max capacity
<=, it will allocate a new shard
      *
-     * @param scope The organization scope
+     * @param scope The app scope
      * @param nodeId The node id
      * @param edgeType The edge types
      * @return True if a new shard was allocated
@@ -59,4 +59,15 @@ public interface NodeShardAllocation {
     public boolean auditMaxShard(final ApplicationScope scope, final Id nodeId, final String...
edgeType);
 
 
+    /**
+     * Increment the underlying counts on the shards
+     * @param scope The app scope
+     * @param nodeId The id of the node
+     * @param shardId The id of the shard
+     * @param amount The amount to increment
+     * @param edgeTypes The edge types to include in the count operation
+     */
+    public void increment(final ApplicationScope scope, final Id nodeId, final long shardId,
final long amount, final String... edgeTypes);
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ba04cb6a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
index 7845ab0..6c83da2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardCache.java
@@ -61,5 +61,5 @@ public interface NodeShardCache {
      * @param count
      * @return The new local cached count
      */
-    public long increment(final ApplicationScope scope, final Id nodeId, final long shard,
final long count, final String... edgeTypes);
+    public void increment(final ApplicationScope scope, final Id nodeId, final long shard,
final long count, final String... edgeTypes);
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ba04cb6a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index dc34d73..c28b693 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -32,6 +32,7 @@ import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.ShardKey;
 import org.apache.usergrid.persistence.model.entity.Id;
@@ -50,7 +51,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
     private final EdgeShardSerialization edgeShardSerialization;
-    private final NodeShardCounterSerialization edgeShardCounterSerialization;
+//    private final NodeShardCounterSerialization edgeShardCounterSerialization;
+    private final NodeShardApproximation nodeShardApproximation;
     private final TimeService timeService;
     private final GraphFig graphFig;
     private final Keyspace keyspace;
@@ -58,10 +60,10 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
     @Inject
     public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization,
-                                    final NodeShardCounterSerialization edgeShardCounterSerialization,
+                                    final  NodeShardApproximation nodeShardApproximation,
                                     final TimeService timeService, final GraphFig graphFig,
final Keyspace keyspace ) {
         this.edgeShardSerialization = edgeShardSerialization;
-        this.edgeShardCounterSerialization = edgeShardCounterSerialization;
+        this.nodeShardApproximation = nodeShardApproximation;
         this.timeService = timeService;
         this.graphFig = graphFig;
         this.keyspace = keyspace;
@@ -157,7 +159,9 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         /**
          * Check out if we have a count for our shard allocation
          */
-        final long count = edgeShardCounterSerialization.getCount( new ShardKey( scope, nodeId,
maxShard, edgeType ));
+
+
+        final long count = nodeShardApproximation.getCount( scope, nodeId, maxShard, edgeType
);
 
         if ( count < graphFig.getShardSize() ) {
             return false;
@@ -178,4 +182,11 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
         return true;
     }
+
+
+    @Override
+    public void increment( final ApplicationScope scope, final Id nodeId, final long shardId,
final long amount,
+                           final String... edgeTypes ) {
+       nodeShardApproximation.increment( scope, nodeId, shardId, amount, edgeTypes );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ba04cb6a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index bda3b04..d34c625 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -52,10 +52,6 @@ import com.google.inject.Inject;
  */
 public class NodeShardCacheImpl implements NodeShardCache {
 
-
-    private static final int SHARD_PAGE_SIZE = 1000;
-
-
     private final NodeShardAllocation nodeShardAllocation;
     private final GraphFig graphFig;
 
@@ -145,10 +141,8 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
 
     @Override
-    public long increment( final ApplicationScope scope, final Id nodeId, final long shard,
final long count,
-                                     final String... edgeTypes ) {
-        //TODO, implement this
-        return 0;
+    public void increment( final ApplicationScope scope, final Id nodeId, final long shard,
final long count, final String... edgeTypes ) {
+        nodeShardAllocation.increment( scope, nodeId, shard, count, edgeTypes );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ba04cb6a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 5b4c4e7..3b9e074 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -84,8 +84,8 @@ public class NodeShardAllocationTest {
     public void noShards() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class
);
 
-        final NodeShardCounterSerialization nodeShardCounterSerialization =
-                mock( NodeShardCounterSerialization.class );
+        final NodeShardApproximation nodeShardCounterSerialization =
+                mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -121,8 +121,8 @@ public class NodeShardAllocationTest {
     public void existingFutureShard() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class
);
 
-        final NodeShardCounterSerialization nodeShardCounterSerialization =
-                mock( NodeShardCounterSerialization.class );
+        final NodeShardApproximation nodeShardCounterSerialization =
+                mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -167,8 +167,8 @@ public class NodeShardAllocationTest {
     public void lowCountFutureShard() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class
);
 
-        final NodeShardCounterSerialization NodeShardCounterSerialization =
-                mock( NodeShardCounterSerialization.class );
+        final NodeShardApproximation nodeShardApproximation =
+                mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -181,7 +181,7 @@ public class NodeShardAllocationTest {
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, NodeShardCounterSerialization,
timeService,
+                new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation,
timeService,
                         graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
@@ -206,7 +206,7 @@ public class NodeShardAllocationTest {
 
         final long count = graphFig.getShardSize() - 1;
 
-        when( NodeShardCounterSerialization.getCount( eq( new ShardKey( scope, nodeId, 0l,
type, subType ) ) ))
+        when( nodeShardApproximation.getCount(scope, nodeId, 0l, type, subType ))
                                            .thenReturn( count );
 
         final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType
);
@@ -219,8 +219,8 @@ public class NodeShardAllocationTest {
     public void equalCountFutureShard() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class
);
 
-        final NodeShardCounterSerialization NodeShardCounterSerialization =
-                mock( NodeShardCounterSerialization.class );
+        final NodeShardApproximation nodeShardApproximation =
+                mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -233,7 +233,7 @@ public class NodeShardAllocationTest {
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, NodeShardCounterSerialization,
timeService,
+                new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation,
timeService,
                         graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
@@ -257,8 +257,8 @@ public class NodeShardAllocationTest {
         final long shardCount = graphFig.getShardSize();
 
         //return a shard size equal to our max
-        when( NodeShardCounterSerialization
-                .getCount(  eq(new ShardKey(  scope , nodeId, 0l,type , subType ) ) ))
+        when( nodeShardApproximation
+                .getCount(   scope , nodeId, 0l,type , subType  ))
                 .thenReturn( shardCount );
 
         ArgumentCaptor<Long> newUUIDValue = ArgumentCaptor.forClass( Long.class );
@@ -292,8 +292,8 @@ public class NodeShardAllocationTest {
     public void futureCountShardCleanup() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class
);
 
-        final NodeShardCounterSerialization NodeShardCounterSerialization =
-                mock( NodeShardCounterSerialization.class );
+        final NodeShardApproximation nodeShardApproximation =
+                mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -306,7 +306,7 @@ public class NodeShardAllocationTest {
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, NodeShardCounterSerialization,
timeService,
+                new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation,
timeService,
                         graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
@@ -397,8 +397,8 @@ public class NodeShardAllocationTest {
     public void noShardsReturns() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class
);
 
-        final NodeShardCounterSerialization NodeShardCounterSerialization =
-                mock( NodeShardCounterSerialization.class );
+        final NodeShardApproximation nodeShardApproximation =
+                mock( NodeShardApproximation.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -410,7 +410,7 @@ public class NodeShardAllocationTest {
         when( keyspace.prepareMutationBatch() ).thenReturn( batch );
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, NodeShardCounterSerialization,
timeService,
+                new NodeShardAllocationImpl( edgeShardSerialization, nodeShardApproximation,
timeService,
                         graphFig, keyspace );
 
         final Id nodeId = createId( "test" );


Mime
View raw message