usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [01/50] [abbrv] git commit: Implemented shard count persistence.
Date Mon, 07 Jul 2014 18:16:18 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-164 [deleted] 280f51707
  refs/heads/USERGRID-174 2781766e1 -> 5163de914
  refs/heads/akka-job-scheduler-replacement [created] c7b861275
  refs/heads/cloud-formation-scripts [created] 9046e8899
  refs/heads/master 7dd337212 -> c7b861275
  refs/heads/performance-fixes e3f61ecb5 -> b64af62ec
  refs/heads/two-dot-o b6364bd9f -> 71cb9c266
  refs/pull/182/merge [deleted] bd2f138ed
  refs/pull/188/head 280f51707 -> c030dbaea
  refs/pull/188/merge [deleted] 05d1fa64d
  refs/pull/194/merge [deleted] 0baccb5c3
  refs/pull/195/head 3faf865cf -> 3472adaee
  refs/pull/195/merge [deleted] 046b0cb78
  refs/pull/198/merge 5a8dbe895 -> 7e318fa60 (forced update)
  refs/pull/200/head 470128269 -> b7a88d9fc
  refs/pull/200/merge [deleted] 405a02645
  refs/pull/203/head [created] c1127f92c
  refs/pull/204/head [created] 04d2839ff
  refs/pull/205/head [created] f4cd72c47
  refs/pull/206/head [created] 6f4423e89
  refs/pull/206/merge [created] 3d34a4390
  refs/pull/207/head [created] 19116416f
  refs/pull/208/head [created] 914c6fa1f
  refs/pull/209/head [created] 98e89bb62
  refs/pull/210/head [created] 9046e8899
  refs/pull/211/head [created] 5163de914
  refs/pull/211/merge [created] 58918e901
  refs/pull/212/head [created] 23f5a1137
  refs/pull/213/head [created] fe88acbc2
  refs/pull/214/head [created] 3f3bb46a3
  refs/pull/215/head [created] c54849ed9
  refs/pull/216/head [created] 0ae961a65
  refs/pull/217/head [created] 6853f7ceb
  refs/pull/218/head [created] efd3292be
  refs/pull/218/merge [created] 0f479fa74
  refs/pull/219/head [created] d3753f32e
  refs/pull/220/head [created] 5127c83bc
  refs/pull/221/head [created] ceeb66f82
  refs/pull/222/head [created] 7e4fc083b
  refs/pull/223/head [created] 1b11c501e
  refs/pull/224/head [created] fd29736d1
  refs/pull/225/head [created] db037b180
  refs/pull/225/merge [created] 488ecbe26
  refs/pull/226/head [created] 630830d5a
  refs/pull/227/head [created] f2fd25a4e
  refs/pull/227/merge [created] 756327144
  refs/pull/228/head [created] 2b2bb8556
  refs/pull/228/merge [created] 540a449d5
  refs/pull/229/head [created] 97eaead30
  refs/pull/229/merge [created] 49b490ed8
  refs/pull/230/head [created] 97e5dfbf8
  refs/pull/231/head [created] 1143d43aa
  refs/pull/231/merge [created] 0a48ce648
  refs/pull/232/head [created] f2fd25a4e
  refs/pull/232/merge [created] e3e2dadd8
  refs/pull/233/head [created] 598644d6c
  refs/pull/234/head [created] b6e445d6b
  refs/pull/235/head [created] 4fe70f4c3
  refs/pull/236/head [created] 9b6d81978
  refs/pull/237/head [created] 639157137
  refs/pull/238/head [created] 155c8095b
  refs/pull/239/head [created] 2a98cae99


Implemented shard count persistence.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a8fc5a56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a8fc5a56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a8fc5a56

Branch: refs/heads/two-dot-o
Commit: a8fc5a56e59941f9591bfea7211e3d4df6106483
Parents: b56e641
Author: Todd Nine <tnine@apigee.com>
Authored: Thu Jun 26 18:41:34 2014 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Thu Jun 26 18:41:34 2014 -0600

----------------------------------------------------------------------
 .../persistence/graph/guice/GraphModule.java    |  14 +-
 .../shard/EdgeShardCounterSerialization.java    |  63 --------
 .../serialization/impl/shard/count/Counter.java |  10 ++
 .../count/NodeShardCounterSerialization.java    |   4 +-
 .../NodeShardCounterSerializationImpl.java      | 139 +++++++++++++++++
 .../impl/shard/count/ShardKey.java              |  24 ++-
 .../impl/EdgeShardCounterSerializationImpl.java | 142 ------------------
 .../shard/impl/NodeShardAllocationImpl.java     |   9 +-
 .../EdgeShardCounterSerializationTest.java      | 148 -------------------
 .../impl/shard/NodeShardAllocationTest.java     |  47 +++---
 .../shard/count/NodeShardApproximationTest.java |  13 +-
 .../NodeShardCounterSerializationTest.java      | 126 ++++++++++++++++
 12 files changed, 347 insertions(+), 392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a8fc5a56/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index dcf0718..f096740 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -43,16 +43,16 @@ import org.apache.usergrid.persistence.graph.serialization.NodeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.EdgeMetadataSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.EdgeSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.NodeSerializationImpl;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardCounterSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardStrategy;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardCounterSerializationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardApproximationImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardApproximationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardCacheImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
 
@@ -92,13 +92,14 @@ public class GraphModule extends AbstractModule {
         bind( NodeShardAllocation.class ).to( NodeShardAllocationImpl.class );
         bind( NodeShardApproximation.class ).to( NodeShardApproximationImpl.class );
         bind( NodeShardCache.class ).to( NodeShardCacheImpl.class );
+        bind( NodeShardCounterSerialization.class ).to( NodeShardCounterSerializationImpl.class );
 
         /**
          * Bind our strategies based on their internal annotations.
          */
 
         bind( EdgeShardSerialization.class ).to( EdgeShardSerializationImpl.class );
-        bind( EdgeShardCounterSerialization.class ).to( EdgeShardCounterSerializationImpl.class );
+
 
 
         //Repair/cleanup classes.
@@ -113,6 +114,9 @@ public class GraphModule extends AbstractModule {
         bind( EdgeDeleteListener.class).to( EdgeDeleteListenerImpl.class );
 
 
+        /**
+         * Bind our implementation
+         */
 
         /********
          * Migration bindings
@@ -128,7 +132,7 @@ public class GraphModule extends AbstractModule {
         migrationBinding.addBinding().to( Key.get( EdgeSerialization.class, StorageEdgeSerialization.class ) );
 
         migrationBinding.addBinding().to( Key.get( EdgeShardSerialization.class ) );
-        migrationBinding.addBinding().to( Key.get( EdgeShardCounterSerialization.class ) );
+        migrationBinding.addBinding().to( Key.get( NodeShardCounterSerialization.class ) );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a8fc5a56/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardCounterSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardCounterSerialization.java
deleted file mode 100644
index a9aa1ba..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardCounterSerialization.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard;
-
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.migration.Migration;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.MutationBatch;
-
-
-/**
- * The interface to define counter operations.  Note that the implementation may not be immediately consistent.
- *
- * TODO: Ivestigate this further with CountMinSketch.  Since our cardinality needs to be "good enough", we may be able
- * to offer much better performance than the Cassandra counters by using CountMinSketch in a time series manner on each node, and persisting it's map
- * in memory with period flush into a standard CF.  On query, we can read a unioned column.
- * On flush, we can flush, then read+union and set the timestamp on the column so that only 1 union will be the max.
- *
- */
-public interface EdgeShardCounterSerialization extends Migration{
-
-    /**
-     * Write a new time shard for the meta data
-     * @param scope The scope to write
-     * @param nodeId The id in the edge
-     * @param shardId The shard Id to use
-     * @param types The types to write to.  Can be edge type, or edgeType+id type
-     */
-    public MutationBatch writeMetaDataLog( ApplicationScope scope, Id nodeId, long shardId, long count, String... types );
-
-
-    /**
-     * Get the most recent rollup of all of the given summations.  If one is not present the optional will be empty
-     * @param scope
-     * @param nodeId
-     * @param shardId
-     * @param types
-     * @return
-     */
-    public long getCount(ApplicationScope scope, Id nodeId, long shardId, String... types);
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a8fc5a56/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
index 47d89e9..4318200 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/Counter.java
@@ -20,6 +20,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
 
 
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -100,6 +101,15 @@ public class Counter {
 
 
     /**
+     * Get all entries
+     * @return
+     */
+    public Set<Map.Entry<ShardKey, AtomicLong>> getEntries(){
+        return counts.entrySet();
+    }
+
+
+    /**
      * Get the count of the number of times we've been incremented
      * @return
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a8fc5a56/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
index aa95e69..41eb525 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerialization.java
@@ -19,13 +19,15 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
 
 
+import org.apache.usergrid.persistence.core.migration.Migration;
+
 import com.netflix.astyanax.MutationBatch;
 
 
 /**
  * Serialization for flushing and reading counters
  */
-public interface NodeShardCounterSerialization {
+public interface NodeShardCounterSerialization  extends Migration {
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a8fc5a56/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
new file mode 100644
index 0000000..da318bf
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
+
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CounterColumnType;
+
+import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
+import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeRowKey;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeRowKeySerializer;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.OperationResult;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.serializers.LongSerializer;
+
+
+
+@Singleton
+public class NodeShardCounterSerializationImpl implements NodeShardCounterSerialization {
+
+
+    /**
+     * Edge shards
+     */
+    private static final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> EDGE_SHARD_COUNTS =
+            new MultiTennantColumnFamily<>( "Edge_Shard_Counts",
+                    new OrganizationScopedRowKeySerializer<>( new EdgeRowKeySerializer() ), LongSerializer.get() );
+
+
+    protected final Keyspace keyspace;
+    protected final CassandraConfig cassandraConfig;
+    protected final GraphFig graphFig;
+
+
+    @Inject
+    public NodeShardCounterSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
+                                            final GraphFig graphFig ) {
+        this.keyspace = keyspace;
+        this.cassandraConfig = cassandraConfig;
+        this.graphFig = graphFig;
+    }
+
+
+    @Override
+    public MutationBatch flush( final Counter counter ) {
+
+
+        Preconditions.checkNotNull( counter, "counter must be specified" );
+
+
+        final MutationBatch batch = keyspace.prepareMutationBatch();
+
+        for ( Map.Entry<ShardKey, AtomicLong> entry : counter.getEntries() ) {
+
+            final ShardKey key = entry.getKey();
+            final long value = entry.getValue().get();
+
+            final EdgeRowKey edgeRowKey = new EdgeRowKey( key.getNodeId(), key.getEdgeTypes() );
+
+            final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.getScope(), edgeRowKey );
+
+
+            batch.withRow( EDGE_SHARD_COUNTS, rowKey ).incrementCounterColumn( key.getShardId(), value );
+        }
+
+
+        return batch;
+    }
+
+
+    @Override
+    public long getCount( final ShardKey key ) {
+
+        final EdgeRowKey edgeRowKey = new EdgeRowKey( key.getNodeId(), key.getEdgeTypes() );
+
+        final ScopedRowKey rowKey = ScopedRowKey.fromKey( key.getScope(), edgeRowKey );
+
+
+        try {
+            OperationResult<Column<Long>> column =
+                    keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( key.getShardId() ).execute();
+
+            return column.getResult().getLongValue();
+        }
+        //column not found, return 0
+        catch ( NotFoundException nfe ) {
+            return 0;
+        }
+        catch ( ConnectionException e ) {
+            throw new GraphRuntimeException( "An error occurred connecting to cassandra", e );
+        }
+    }
+
+
+    @Override
+    public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
+        return Collections.singleton(
+                new MultiTennantColumnFamilyDefinition( EDGE_SHARD_COUNTS, BytesType.class.getSimpleName(),
+                        ColumnTypes.LONG_TYPE_REVERSED, CounterColumnType.class.getSimpleName(),
+                        MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a8fc5a56/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
index 2e6cc1d..63c87d3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/ShardKey.java
@@ -35,9 +35,7 @@ public class ShardKey {
     private final String[] edgeTypes;
 
 
-    public ShardKey( final ApplicationScope scope, final Id nodeId, final long shardId, final String[] edgeTypes ) {
-
-
+    public ShardKey( final ApplicationScope scope, final Id nodeId, final long shardId, final String... edgeTypes ) {
         this.scope = scope;
         this.nodeId = nodeId;
         this.shardId = shardId;
@@ -73,6 +71,26 @@ public class ShardKey {
     }
 
 
+    public ApplicationScope getScope() {
+        return scope;
+    }
+
+
+    public Id getNodeId() {
+        return nodeId;
+    }
+
+
+    public long getShardId() {
+        return shardId;
+    }
+
+
+    public String[] getEdgeTypes() {
+        return edgeTypes;
+    }
+
+
     @Override
     public int hashCode() {
         int result = scope.hashCode();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a8fc5a56/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardCounterSerializationImpl.java
deleted file mode 100644
index 5e46c3a..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardCounterSerializationImpl.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import java.util.Collection;
-import java.util.Collections;
-
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.db.marshal.CounterColumnType;
-
-import org.apache.usergrid.persistence.core.astyanax.ColumnTypes;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamily;
-import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
-import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.ValidationUtils;
-import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.core.astyanax.CassandraConfig;
-import org.apache.usergrid.persistence.core.astyanax.OrganizationScopedRowKeySerializer;
-import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardCounterSerialization;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
-import com.netflix.astyanax.model.Column;
-import com.netflix.astyanax.serializers.LongSerializer;
-
-
-@Singleton
-public class EdgeShardCounterSerializationImpl implements EdgeShardCounterSerialization {
-
-
-    /**
-     * Edge shards
-     */
-    private static final MultiTennantColumnFamily<ApplicationScope, EdgeRowKey, Long> EDGE_SHARD_COUNTS =
-            new MultiTennantColumnFamily<>( "Edge_Shard_Counts",
-                    new OrganizationScopedRowKeySerializer<>( new EdgeRowKeySerializer() ), LongSerializer.get() );
-
-
-    protected final Keyspace keyspace;
-    protected final CassandraConfig cassandraConfig;
-    protected final GraphFig graphFig;
-
-
-    @Inject
-    public EdgeShardCounterSerializationImpl( final Keyspace keyspace, final CassandraConfig cassandraConfig,
-                                              final GraphFig graphFig ) {
-        this.keyspace = keyspace;
-        this.cassandraConfig = cassandraConfig;
-        this.graphFig = graphFig;
-    }
-
-
-    @Override
-    public MutationBatch writeMetaDataLog( final ApplicationScope scope, final Id nodeId, final long shardId,
-                                           final long count, final String... types ) {
-
-        ValidationUtils.validateApplicationScope( scope );
-        ValidationUtils.verifyIdentity(nodeId);
-        Preconditions.checkArgument( shardId > -1, "shardId must be greater than -1" );
-        Preconditions.checkNotNull( types );
-
-        final EdgeRowKey key = new EdgeRowKey( nodeId, types );
-
-        final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, key );
-
-        final MutationBatch batch = keyspace.prepareMutationBatch();
-
-        batch.withRow( EDGE_SHARD_COUNTS, rowKey ).incrementCounterColumn( shardId, count );
-
-        return batch;
-    }
-
-
-    @Override
-    public long getCount( final ApplicationScope scope, final Id nodeId, final long shardId, final String... types ) {
-
-
-        ValidationUtils.validateApplicationScope( scope );
-        ValidationUtils.verifyIdentity(nodeId);
-        Preconditions.checkArgument( shardId > -1, "shardId must be greater than -1" );
-        Preconditions.checkNotNull( types );
-
-
-        final EdgeRowKey key = new EdgeRowKey( nodeId, types );
-
-        final ScopedRowKey rowKey = ScopedRowKey.fromKey( scope, key );
-
-
-        try {
-            OperationResult<Column<Long>> column =
-                    keyspace.prepareQuery( EDGE_SHARD_COUNTS ).getKey( rowKey ).getColumn( shardId ).execute();
-
-            return column.getResult().getLongValue();
-        }
-        //column not found, return 0
-        catch ( NotFoundException nfe ) {
-            return 0;
-        }
-        catch ( ConnectionException e ) {
-            throw new GraphRuntimeException( "An error occurred connecting to cassandra", e );
-        }
-
-    }
-
-
-    @Override
-    public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
-        return Collections.singleton(
-                new MultiTennantColumnFamilyDefinition( EDGE_SHARD_COUNTS, BytesType.class.getSimpleName(),
-                        ColumnTypes.LONG_TYPE_REVERSED,  CounterColumnType.class.getSimpleName(),
-                        MultiTennantColumnFamilyDefinition.CacheOption.KEYS ) );
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a8fc5a56/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index cec2737..dc34d73 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -30,9 +30,10 @@ import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardCounterSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.ShardKey;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
@@ -49,7 +50,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
     private final EdgeShardSerialization edgeShardSerialization;
-    private final EdgeShardCounterSerialization edgeShardCounterSerialization;
+    private final NodeShardCounterSerialization edgeShardCounterSerialization;
     private final TimeService timeService;
     private final GraphFig graphFig;
     private final Keyspace keyspace;
@@ -57,7 +58,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
     @Inject
     public NodeShardAllocationImpl( final EdgeShardSerialization edgeShardSerialization,
-                                    final EdgeShardCounterSerialization edgeShardCounterSerialization,
+                                    final NodeShardCounterSerialization edgeShardCounterSerialization,
                                     final TimeService timeService, final GraphFig graphFig, final Keyspace keyspace ) {
         this.edgeShardSerialization = edgeShardSerialization;
         this.edgeShardCounterSerialization = edgeShardCounterSerialization;
@@ -156,7 +157,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
         /**
          * Check out if we have a count for our shard allocation
          */
-        final long count = edgeShardCounterSerialization.getCount( scope, nodeId, maxShard, edgeType );
+        final long count = edgeShardCounterSerialization.getCount( new ShardKey( scope, nodeId, maxShard, edgeType ));
 
         if ( count < graphFig.getShardSize() ) {
             return false;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a8fc5a56/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardCounterSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardCounterSerializationTest.java
deleted file mode 100644
index ccc3de1..0000000
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardCounterSerializationTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard;
-
-
-import java.util.Stack;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.jukito.UseModules;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.apache.usergrid.persistence.core.cassandra.ITRunner;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
-import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
-import org.apache.usergrid.persistence.model.entity.Id;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
-
-import com.google.inject.Inject;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-
-import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-@RunWith( ITRunner.class )
-@UseModules( { TestGraphModule.class } )
-public class EdgeShardCounterSerializationTest {
-
-
-    @Inject
-    @Rule
-    public MigrationManagerRule migrationManagerRule;
-
-
-    @Inject
-    private EdgeShardCounterSerialization edgeShardCounterSerialization;
-
-    protected ApplicationScope scope;
-
-
-    @Before
-    public void setup() {
-        scope = mock( ApplicationScope.class );
-
-        Id orgId = mock( Id.class );
-
-        when( orgId.getType() ).thenReturn( "organization" );
-        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
-
-        when( scope.getApplication() ).thenReturn( orgId );
-    }
-
-
-    @Test
-    public void testSingleCount() throws ConnectionException {
-
-        final Id id = createId( "test" );
-        final long shard = 1000l;
-        final String[] types = { "type", "subtype" };
-
-        final long toWrite = 1000l;
-
-        edgeShardCounterSerialization.writeMetaDataLog( scope, id, shard, toWrite, types ).execute();
-
-
-        final long count = edgeShardCounterSerialization.getCount( scope, id, shard, types );
-
-        assertEquals( "Correct amount returned", toWrite, count );
-    }
-
-
-    @Test
-    public void testConcurrentWrites() throws ConnectionException, ExecutionException, InterruptedException {
-
-        final Id id = createId( "test" );
-        final long shard = 1000l;
-        final String[] types = { "type", "subtype" };
-
-        final long toWrite = 1000l;
-
-
-        final int workerCount = 2;
-        final int iterations = 1000;
-
-        ExecutorService executors = Executors.newFixedThreadPool( workerCount );
-
-        Stack<Future<Void>> futures = new Stack<Future<Void>>();
-
-        for ( int i = 0; i < workerCount; i++ ) {
-
-           final Future<Void> future =  executors.submit( new Callable<Void>() {
-
-                @Override
-                public Void call() throws Exception {
-
-                    for ( int i = 0; i < iterations; i++ ) {
-                        edgeShardCounterSerialization.writeMetaDataLog( scope, id, shard, toWrite, types ).execute();
-                    }
-
-                    return null;
-                }
-            } );
-
-            futures.push( future );
-        }
-
-        /**
-         * Wait until they're all done
-         */
-        for(Future<Void> future: futures){
-            future.get();
-        }
-
-
-        final long count = edgeShardCounterSerialization.getCount( scope, id, shard, types );
-
-        final long expected = toWrite * iterations * workerCount;
-
-        assertEquals( "Correct amount returned", expected, count );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a8fc5a56/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
index 1578be2..5b4c4e7 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/NodeShardAllocationTest.java
@@ -31,6 +31,8 @@ import org.mockito.ArgumentCaptor;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.ShardKey;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -82,8 +84,8 @@ public class NodeShardAllocationTest {
     public void noShards() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final EdgeShardCounterSerialization edgeShardCounterSerialization =
-                mock( EdgeShardCounterSerialization.class );
+        final NodeShardCounterSerialization nodeShardCounterSerialization =
+                mock( NodeShardCounterSerialization.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -95,7 +97,7 @@ public class NodeShardAllocationTest {
         when( keyspace.prepareMutationBatch() ).thenReturn( batch );
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeShardCounterSerialization, timeService,
+                new NodeShardAllocationImpl( edgeShardSerialization, nodeShardCounterSerialization, timeService,
                         graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
@@ -119,8 +121,8 @@ public class NodeShardAllocationTest {
     public void existingFutureShard() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final EdgeShardCounterSerialization edgeShardCounterSerialization =
-                mock( EdgeShardCounterSerialization.class );
+        final NodeShardCounterSerialization nodeShardCounterSerialization =
+                mock( NodeShardCounterSerialization.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -134,7 +136,7 @@ public class NodeShardAllocationTest {
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeShardCounterSerialization, timeService,
+                new NodeShardAllocationImpl( edgeShardSerialization, nodeShardCounterSerialization, timeService,
                         graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
@@ -165,8 +167,8 @@ public class NodeShardAllocationTest {
     public void lowCountFutureShard() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final EdgeShardCounterSerialization edgeShardCounterSerialization =
-                mock( EdgeShardCounterSerialization.class );
+        final NodeShardCounterSerialization NodeShardCounterSerialization =
+                mock( NodeShardCounterSerialization.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -179,7 +181,7 @@ public class NodeShardAllocationTest {
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeShardCounterSerialization, timeService,
+                new NodeShardAllocationImpl( edgeShardSerialization, NodeShardCounterSerialization, timeService,
                         graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
@@ -204,9 +206,8 @@ public class NodeShardAllocationTest {
 
         final long count = graphFig.getShardSize() - 1;
 
-        when( edgeShardCounterSerialization
-                .getCount( same( scope ), same( nodeId ), eq( 0l ), same( type ), same( subType ) ) )
-                .thenReturn( count );
+        when( NodeShardCounterSerialization.getCount( eq( new ShardKey( scope, nodeId, 0l, type, subType ) ) ))
+                                           .thenReturn( count );
 
         final boolean result = approximation.auditMaxShard( scope, nodeId, type, subType );
 
@@ -218,8 +219,8 @@ public class NodeShardAllocationTest {
     public void equalCountFutureShard() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final EdgeShardCounterSerialization edgeShardCounterSerialization =
-                mock( EdgeShardCounterSerialization.class );
+        final NodeShardCounterSerialization NodeShardCounterSerialization =
+                mock( NodeShardCounterSerialization.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -232,7 +233,7 @@ public class NodeShardAllocationTest {
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeShardCounterSerialization, timeService,
+                new NodeShardAllocationImpl( edgeShardSerialization, NodeShardCounterSerialization, timeService,
                         graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
@@ -256,8 +257,8 @@ public class NodeShardAllocationTest {
         final long shardCount = graphFig.getShardSize();
 
         //return a shard size equal to our max
-        when( edgeShardCounterSerialization
-                .getCount( same( scope ), same( nodeId ), eq( 0l ), same( type ), same( subType ) ) )
+        when( NodeShardCounterSerialization
+                .getCount(  eq(new ShardKey(  scope , nodeId, 0l,type , subType ) ) ))
                 .thenReturn( shardCount );
 
         ArgumentCaptor<Long> newUUIDValue = ArgumentCaptor.forClass( Long.class );
@@ -291,8 +292,8 @@ public class NodeShardAllocationTest {
     public void futureCountShardCleanup() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final EdgeShardCounterSerialization edgeShardCounterSerialization =
-                mock( EdgeShardCounterSerialization.class );
+        final NodeShardCounterSerialization NodeShardCounterSerialization =
+                mock( NodeShardCounterSerialization.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -305,7 +306,7 @@ public class NodeShardAllocationTest {
 
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeShardCounterSerialization, timeService,
+                new NodeShardAllocationImpl( edgeShardSerialization, NodeShardCounterSerialization, timeService,
                         graphFig, keyspace );
 
         final Id nodeId = createId( "test" );
@@ -396,8 +397,8 @@ public class NodeShardAllocationTest {
     public void noShardsReturns() {
         final EdgeShardSerialization edgeShardSerialization = mock( EdgeShardSerialization.class );
 
-        final EdgeShardCounterSerialization edgeShardCounterSerialization =
-                mock( EdgeShardCounterSerialization.class );
+        final NodeShardCounterSerialization NodeShardCounterSerialization =
+                mock( NodeShardCounterSerialization.class );
 
 
         final TimeService timeService = mock( TimeService.class );
@@ -409,7 +410,7 @@ public class NodeShardAllocationTest {
         when( keyspace.prepareMutationBatch() ).thenReturn( batch );
 
         NodeShardAllocation approximation =
-                new NodeShardAllocationImpl( edgeShardSerialization, edgeShardCounterSerialization, timeService,
+                new NodeShardAllocationImpl( edgeShardSerialization, NodeShardCounterSerialization, timeService,
                         graphFig, keyspace );
 
         final Id nodeId = createId( "test" );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a8fc5a56/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
index 5eb760c..6475ca6 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationTest.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
 import java.beans.PropertyChangeListener;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -39,10 +40,10 @@ import org.safehaus.guicyfig.Bypass;
 import org.safehaus.guicyfig.OptionState;
 import org.safehaus.guicyfig.Overrides;
 
+import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
 import org.apache.usergrid.persistence.core.consistency.TimeService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardCounterSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
@@ -70,7 +71,7 @@ public class NodeShardApproximationTest {
 
     private GraphFig graphFig;
 
-    private EdgeShardCounterSerialization ser;
+    private NodeShardCounterSerialization ser;
     private NodeShardCounterSerialization nodeShardCounterSerialization;
     private TimeService timeService;
 
@@ -93,7 +94,7 @@ public class NodeShardApproximationTest {
         when( graphFig.getShardCacheSize() ).thenReturn( 10000l );
         when( graphFig.getShardSize() ).thenReturn( 250000l );
 
-        ser = mock( EdgeShardCounterSerialization.class );
+        ser = mock( NodeShardCounterSerialization.class );
         nodeShardCounterSerialization = mock( NodeShardCounterSerialization.class );
 
         when(nodeShardCounterSerialization.flush( any(Counter.class) )).thenReturn( mock( MutationBatch.class) );
@@ -273,6 +274,12 @@ public class NodeShardApproximationTest {
         public long getCount( final ShardKey key ) {
             return copy.get( key );
         }
+
+
+        @Override
+        public Collection<MultiTennantColumnFamilyDefinition> getColumnFamilies() {
+            return null;  //To change body of implemented methods use File | Settings | File Templates.
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a8fc5a56/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
new file mode 100644
index 0000000..9968f67
--- /dev/null
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.count;
+
+
+import org.jukito.UseModules;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
+import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.core.cassandra.ITRunner;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.GraphFig;
+import org.apache.usergrid.persistence.graph.guice.TestGraphModule;
+import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
+import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.google.inject.Inject;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+
+import static org.apache.usergrid.persistence.graph.test.util.EdgeTestUtils.createId;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+@RunWith(ITRunner.class)
+@UseModules({ TestGraphModule.class })
+public class NodeShardCounterSerializationTest {
+
+    private static final Logger log = LoggerFactory.getLogger( NodeShardCounterSerializationTest.class );
+
+    @ClassRule
+    public static CassandraRule rule = new CassandraRule();
+
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    protected EdgeSerialization serialization;
+
+    @Inject
+    protected GraphFig graphFig;
+
+    @Inject
+    protected Keyspace keyspace;
+
+    @Inject
+    protected NodeShardCounterSerialization nodeShardCounterSerialization;
+
+    protected ApplicationScope scope;
+
+
+    @Before
+    public void setup() {
+        scope = mock( ApplicationScope.class );
+
+        Id orgId = mock( Id.class );
+
+        when( orgId.getType() ).thenReturn( "organization" );
+        when( orgId.getUuid() ).thenReturn( UUIDGenerator.newTimeUUID() );
+
+        when( scope.getApplication() ).thenReturn( orgId );
+    }
+
+
+    @Test
+    public void testWritesRead() throws ConnectionException {
+
+
+        final Id id = createId( "test" );
+
+        ShardKey key1 = new ShardKey( scope, id, 0, "type1" );
+
+        ShardKey key2 = new ShardKey( scope, id, 0, "type2" );
+
+        ShardKey key3 = new ShardKey( scope, id, 1, "type1" );
+
+
+        Counter counter = new Counter();
+        counter.add( key1, 1000 );
+        counter.add( key2, 2000 );
+        counter.add( key3, 3000 );
+
+        nodeShardCounterSerialization.flush( counter ).execute();
+
+
+        final long time1 = nodeShardCounterSerialization.getCount( key1 );
+
+        assertEquals( 1000, time1 );
+
+        final long time2 = nodeShardCounterSerialization.getCount( key2 );
+
+        assertEquals( 2000, time2 );
+
+        final long time3 = nodeShardCounterSerialization.getCount( key3 );
+
+        assertEquals( 3000, time3 );
+    }
+}


Mime
View raw message