usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [07/52] [abbrv] Updated OrderedMerge to use a faster implementation at runtime. After initialization, it's an O(1) emit operation as long as our produces are fast enough.
Date Wed, 03 Sep 2014 22:11:02 GMT
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
index d434db7..0a6ecfa 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/GraphFig.java
@@ -35,8 +35,15 @@ public interface GraphFig extends GuicyFig {
 
     public static final String REPAIR_CONCURRENT_SIZE = "usergrid.graph.repair.concurrent.size";
 
+    /**
+     * The size of the shards.  This is approximate, and should be set lower than what you would like your max to be
+     */
     public static final String SHARD_SIZE = "usergrid.graph.shard.size";
 
+
+    /**
+     * Number of shards we can cache.
+     */
     public static final String SHARD_CACHE_SIZE = "usergrid.graph.shard.cache.size";
 
 
@@ -45,17 +52,33 @@ public interface GraphFig extends GuicyFig {
      */
     public static final String SHARD_CACHE_TIMEOUT = "usergrid.graph.shard.cache.timeout";
 
+    /**
+     * Number of worker threads to refresh the cache
+     */
     public static final String SHARD_CACHE_REFRESH_WORKERS = "usergrid.graph.shard.refresh.worker.count";
 
-    public static final String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
 
+    /**
+     * The size of the worker count for shard auditing
+     */
+    public static final String SHARD_AUDIT_QUEUE_SIZE = "usergrid.graph.shard.audit.worker.queue.size";
 
 
     /**
-     * The minimum amount of time than can occur (in millis) between shard allocation.  Must be at least 2x the cache timeout.
+     * The size of the worker count for shard auditing
+     */
+    public static final String SHARD_AUDIT_WORKERS = "usergrid.graph.shard.audit.worker.count";
+
+
+    public static final String SHARD_REPAIR_CHANCE = "usergrid.graph.shard.repair.chance";
+
+
+    /**
+     * The minimum amount of time than can occur (in millis) between shard allocation and compaction.  Must be at least 2x the cache
+     * timeout. Set to 2.5x the cache timeout to be safe
      *
-     * Note that you should also pad this for node clock drift.  A good value for this would be 2x the shard cache timeout + 30 seconds,
-     * assuming you have NTP and allow a max drift of 30 seconds
+     * Note that you should also pad this for node clock drift.  A good value for this would be 2x the shard cache
+     * timeout + 30 seconds, assuming you have NTP and allow a max drift of 30 seconds
      */
     public static final String SHARD_MIN_DELTA = "usergrid.graph.shard.min.delta";
 
@@ -67,26 +90,23 @@ public interface GraphFig extends GuicyFig {
     public static final String COUNTER_WRITE_FLUSH_QUEUE_SIZE = "usergrid.graph.shard.counter.queue.size";
 
 
-
-
     @Default("1000")
     @Key(SCAN_PAGE_SIZE)
     int getScanPageSize();
 
 
-
     @Default("5")
     @Key(REPAIR_CONCURRENT_SIZE)
     int getRepairConcurrentSize();
 
 
     @Default( ".10" )
-    @Key( SHARD_REPAIR_CHANCE  )
+    @Key( SHARD_REPAIR_CHANCE )
     double getShardRepairChance();
 
 
-    @Default("500000")
-    @Key(SHARD_SIZE)
+    @Default( "500000" )
+    @Key( SHARD_SIZE )
     long getShardSize();
 
 
@@ -95,31 +115,40 @@ public interface GraphFig extends GuicyFig {
     long getShardCacheTimeout();
 
     @Default("60000")
-    @Key( SHARD_MIN_DELTA )
+    @Key(SHARD_MIN_DELTA)
     long getShardMinDelta();
 
 
-    @Default( "250000" )
-    @Key( SHARD_CACHE_SIZE )
+    @Default("250000")
+    @Key(SHARD_CACHE_SIZE)
     long getShardCacheSize();
 
 
-    @Default( "2" )
-    @Key( SHARD_CACHE_REFRESH_WORKERS )
+    @Default("2")
+    @Key(SHARD_CACHE_REFRESH_WORKERS)
     int getShardCacheRefreshWorkerCount();
 
 
-    @Default( "10000" )
-    @Key( COUNTER_WRITE_FLUSH_COUNT )
+    @Default( "10" )
+    @Key( SHARD_AUDIT_WORKERS )
+    int getShardAuditWorkerCount();
+
+    @Default( "1000" )
+    @Key( SHARD_AUDIT_QUEUE_SIZE )
+    int getShardAuditWorkerQueueSize();
+
+
+    @Default("10000")
+    @Key(COUNTER_WRITE_FLUSH_COUNT)
     long getCounterFlushCount();
 
 
-    @Default( "30000" )
-    @Key( COUNTER_WRITE_FLUSH_INTERVAL )
+    @Default("30000")
+    @Key(COUNTER_WRITE_FLUSH_INTERVAL)
     long getCounterFlushInterval();
 
-    @Default( "1000" )
-    @Key(COUNTER_WRITE_FLUSH_QUEUE_SIZE  )
+    @Default("1000")
+    @Key(COUNTER_WRITE_FLUSH_QUEUE_SIZE)
     int getCounterFlushQueueSize();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
index 7e589f2..114440f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdge.java
@@ -65,4 +65,10 @@ public interface SearchByEdge {
      */
     Optional<Edge> last();
 
+    /**
+     * Get the sort order
+     * @return
+     */
+    SearchByEdgeType.Order getOrder();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
index 29cc3f5..749130b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/SearchByEdgeType.java
@@ -59,4 +59,20 @@ public interface SearchByEdgeType {
      */
     Optional<Edge> last();
 
+    /**
+     * Get the direction we're seeking
+     * @return
+     */
+    Order getOrder();
+
+
+    /**
+     * Options for ordering.  By default, we want to perform descending for common use cases and read speed.  This is our our data
+     * is optimized in cassandra
+     */
+    public enum Order {
+        DESCENDING,
+        ASCENDING
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
index 64f0fbb..f0e954b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/GraphModule.java
@@ -48,6 +48,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardS
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardCache;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardApproximationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.NodeShardCounterSerialization;
@@ -55,6 +56,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.count.Node
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardCacheImpl;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupCompactionImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardedEdgeSerializationImpl;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeColumnFamilies;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.SizebasedEdgeShardStrategy;
@@ -119,6 +121,8 @@ public class GraphModule extends AbstractModule {
 
         bind( EdgeColumnFamilies.class ).to( SizebasedEdgeColumnFamilies.class );
 
+        bind( ShardGroupCompaction.class).to( ShardGroupCompactionImpl.class);
+
 
         /**
          * Bind our implementation

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/MergedProxy.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/MergedProxy.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/MergedProxy.java
deleted file mode 100644
index 20bc637..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/guice/MergedProxy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.persistence.graph.guice;
-
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import com.google.inject.BindingAnnotation;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
-@BindingAnnotation
-public @interface MergedProxy {}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
index 12192fc..9fcb816 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleMarkedEdge.java
@@ -70,7 +70,6 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
             return false;
         }
 
-
         return true;
     }
 
@@ -90,3 +89,4 @@ public class SimpleMarkedEdge extends  SimpleEdge implements MarkedEdge {
                 "} " + super.toString();
     }
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
index e8971f6..d40efc0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdge.java
@@ -23,10 +23,12 @@ package org.apache.usergrid.persistence.graph.impl;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.SearchByEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 
 
 /**
@@ -40,6 +42,7 @@ public class SimpleSearchByEdge implements SearchByEdge {
     private final String type;
     private final long maxTimestamp;
     private final Optional<Edge> last;
+    private final SearchByEdgeType.Order order;
 
 
     /**
@@ -50,17 +53,20 @@ public class SimpleSearchByEdge implements SearchByEdge {
      * @param maxTimestamp The maximum timestamp to seek from
      * @param last The value to start seeking from.  Must be >= this value
      */
-    public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, final Edge last ) {
+    public SimpleSearchByEdge( final Id sourceNode, final String type, final Id targetNode, final long maxTimestamp, final SearchByEdgeType.Order order, final Edge last ) {
+
         ValidationUtils.verifyIdentity(sourceNode);
         ValidationUtils.verifyIdentity(targetNode);
         ValidationUtils.verifyString( type, "type" );
         GraphValidation.validateTimestamp( maxTimestamp, "maxTimestamp" );
+        Preconditions.checkNotNull(order, "order must not be null");
 
 
         this.sourceNode = sourceNode;
         this.targetNode = targetNode;
         this.type = type;
         this.maxTimestamp = maxTimestamp;
+        this.order = order;
         this.last = Optional.fromNullable(last);
     }
 
@@ -93,4 +99,10 @@ public class SimpleSearchByEdge implements SearchByEdge {
     public Optional<Edge> last() {
         return last;
     }
+
+
+    @Override
+    public SearchByEdgeType.Order getOrder() {
+        return order;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
index 9e7dcde..6bc8b1b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByEdgeType.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 
 
 /**
@@ -39,6 +40,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
     private final String type;
     private final long maxTimestamp;
     private final Optional<Edge> last;
+    private final Order order;
 
 
     /**
@@ -46,9 +48,14 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
      * @param node The node to search from
      * @param type The edge type
      * @param maxTimestamp The maximum timestamp to return
+     * @param order The order order.  Descending is most efficient
      * @param last The value to start seeking from.  Must be >= this value
+     * @param order
      */
-    public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Edge last ) {
+    public SimpleSearchByEdgeType( final Id node, final String type, final long maxTimestamp, final Order order, final Edge last
+                                   ) {
+
+        Preconditions.checkNotNull( order, "order is required");
         ValidationUtils.verifyIdentity(node);
         ValidationUtils.verifyString( type, "type" );
         GraphValidation.validateTimestamp( maxTimestamp, "maxTimestamp" );
@@ -57,6 +64,7 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
         this.node = node;
         this.type = type;
         this.maxTimestamp = maxTimestamp;
+        this.order = order;
         this.last = Optional.fromNullable(last);
     }
 
@@ -86,6 +94,12 @@ public class SimpleSearchByEdgeType implements SearchByEdgeType{
 
 
     @Override
+    public Order getOrder() {
+        return order;
+    }
+
+
+    @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {
             return true;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
index 4249ae7..4b73347 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/SimpleSearchByIdType.java
@@ -44,8 +44,8 @@ public class SimpleSearchByIdType extends SimpleSearchByEdgeType implements Sear
      * @param last The value to start seeking from.  Must be >= this value
 
      */
-    public SimpleSearchByIdType( final Id node, final String type, final long maxTimestamp, final String idType, final Edge last  ) {
-        super( node, type, maxTimestamp, last );
+    public SimpleSearchByIdType( final Id node, final String type, final long maxTimestamp, final Order order, final String idType, final Edge last  ) {
+        super( node, type, maxTimestamp, order, last );
 
         ValidationUtils.verifyString( idType, "idType" );
         this.idType = idType;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
index 5be3541..0137ba4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeDeleteRepairImpl.java
@@ -32,14 +32,13 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdge;
 import org.apache.usergrid.persistence.graph.serialization.EdgeSerialization;
 
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.netflix.astyanax.Keyspace;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.functions.Action1;
@@ -116,7 +115,7 @@ public class EdgeDeleteRepairImpl implements EdgeDeleteRepair {
 
                 final SimpleSearchByEdge search =
                         new SimpleSearchByEdge( edge.getSourceNode(), edge.getType(), edge.getTargetNode(),
-                                edge.getTimestamp(), null );
+                                edge.getTimestamp(), SearchByEdgeType.Order.DESCENDING, null );
 
                 return serialization.getEdgeVersions( scope, search );
             }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
index 055b867..7e09eca 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/EdgeMetaRepairImpl.java
@@ -33,7 +33,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
-import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchIdType;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -46,7 +46,6 @@ import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.functions.Action1;
@@ -286,7 +285,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesToTargetBySourceType( scope,
-                            new SimpleSearchByIdType( nodeId, edgeType, maxTimestamp, subType, null ) );
+                            new SimpleSearchByIdType( nodeId, edgeType, maxTimestamp, SearchByEdgeType.Order.DESCENDING, subType,   null ) );
                 }
             } );
         }
@@ -332,7 +331,7 @@ public class EdgeMetaRepairImpl implements EdgeMetaRepair {
                 @Override
                 protected Iterator<MarkedEdge> getIterator() {
                     return storageEdgeSerialization.getEdgesFromSourceByTargetType( scope,
-                            new SimpleSearchByIdType( nodeId, edgeType, maxTimestamp, subType, null ) );
+                            new SimpleSearchByIdType( nodeId, edgeType, maxTimestamp, SearchByEdgeType.Order.DESCENDING, subType, null ) );
                 }
             } );
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
index 962da21..2be6c55 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/stage/NodeDeleteListenerImpl.java
@@ -33,8 +33,8 @@ import org.apache.usergrid.persistence.core.rx.ObservableIterator;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.SearchEdgeType;
-import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.EdgeMetadataSerialization;
@@ -46,7 +46,6 @@ import com.google.common.base.Optional;
 import com.google.inject.Inject;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 
 import rx.Observable;
 import rx.functions.Action0;
@@ -160,7 +159,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
                                 return storageSerialization.getEdgesToTarget( scope,
-                                        new SimpleSearchByEdgeType( node, edgeType, maxVersion, null ) );
+                                        new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, null ) );
                             }
                         } );
                     }
@@ -177,7 +176,7 @@ public class NodeDeleteListenerImpl implements NodeDeleteListener {
                             @Override
                             protected Iterator<MarkedEdge> getIterator() {
                                 return storageSerialization.getEdgesFromSource( scope,
-                                        new SimpleSearchByEdgeType( node, edgeType, maxVersion, null ) );
+                                        new SimpleSearchByEdgeType( node, edgeType, maxVersion, SearchByEdgeType.Order.DESCENDING, null ) );
                             }
                         } );
                     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
index 92f2548..6bb467f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/DirectedEdgeMeta.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
@@ -38,6 +39,8 @@ import org.apache.usergrid.persistence.graph.impl.SimpleSearchByEdgeType;
 import org.apache.usergrid.persistence.graph.impl.SimpleSearchByIdType;
 import org.apache.usergrid.persistence.model.entity.Id;
 
+import com.netflix.astyanax.MutationBatch;
+
 
 /**
  * A bean to define directed edge meta data.  This is used to encapsulate the meta data around a source or target node,
@@ -151,14 +154,52 @@ public abstract class DirectedEdgeMeta {
     }
 
 
+    @Override
+    public String toString() {
+        return "DirectedEdgeMeta{" +
+                "nodes=" + Arrays.toString( nodes ) +
+                ", types=" + Arrays.toString( types ) +
+                '}';
+    }
+
+
     /**
      * Given the edge serialization, load all shard in the shard group
      */
     public abstract Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                     final EdgeColumnFamilies edgeColumnFamilies,
                                                     final ApplicationScope scope, final Collection<Shard> shards,
-                                                    final long maxValue );
+                                                    final long maxValue, final SearchByEdgeType.Order order );
+
+
+    /**
+     * Write the edge for this meta data to the target edge
+     * @param shardedEdgeSerialization
+     * @param edgeColumnFamilies
+     * @param scope
+     * @param targetShard
+     * @param edge
+     * @param timestamp The timestamp on the operation
+     * @return
+     */
+    public abstract MutationBatch writeEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+                                             final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+                                             final Shard targetShard, final MarkedEdge edge, final UUID timestamp );
+
 
+    /**
+     * Delete the edge for this meta data from the shard
+     * @param shardedEdgeSerialization
+     * @param edgeColumnFamilies
+     * @param scope
+     * @param sourceShard
+     * @param edge
+     * @param timestamp The timestamp on the operation
+     * @return
+     */
+    public abstract MutationBatch deleteEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+                                              final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+                                              final Shard sourceShard, final MarkedEdge edge, final UUID timestamp );
 
 
     /**
@@ -225,19 +266,39 @@ public abstract class DirectedEdgeMeta {
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final Collection<Shard>  shards,
-                                                   final long maxValue ) {
+                                                   final ApplicationScope scope, final Collection<Shard> shards,
+                                                   final long maxValue, final SearchByEdgeType.Order order ) {
 
                 final Id sourceId = nodes[0].id;
                 final String edgeType = types[0];
 
-                final SearchByEdgeType search = new SimpleSearchByEdgeType( sourceId, edgeType, maxValue, null );
+                final SearchByEdgeType search = new SimpleSearchByEdgeType( sourceId, edgeType, maxValue, order, null);
 
                 return serialization.getEdgesFromSource( edgeColumnFamilies, scope, search, shards );
             }
 
 
             @Override
+            public MutationBatch writeEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+                                            final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+                                            final Shard targetShard, final MarkedEdge edge, final UUID timestamp ) {
+                return shardedEdgeSerialization
+                        .writeEdgeFromSource( edgeColumnFamilies, scope, edge, Collections.singleton( targetShard ),
+                                this, timestamp );
+            }
+
+
+            @Override
+            public MutationBatch deleteEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+                                             final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+                                             final Shard sourceShard, final MarkedEdge edge, final UUID timestamp ) {
+                return shardedEdgeSerialization
+                        .deleteEdgeFromSource( edgeColumnFamilies, scope, edge, Collections.singleton( sourceShard ),
+                                this, timestamp );
+            }
+
+
+            @Override
             public MetaType getType() {
                 return MetaType.SOURCE;
             }
@@ -264,19 +325,39 @@ public abstract class DirectedEdgeMeta {
 
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
-                                                   final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final Collection<Shard>shards,
-                                                   final long maxValue ) {
-//
+                                                             final EdgeColumnFamilies edgeColumnFamilies,
+                                                             final ApplicationScope scope, final Collection<Shard> shards,
+                                                             final long maxValue, final SearchByEdgeType.Order order ) {
+                //
                 final Id sourceId = nodes[0].id;
                 final String edgeType = types[0];
                 final String targetType = types[1];
 
                 final SearchByIdType search =
-                        new SimpleSearchByIdType( sourceId, edgeType, maxValue, targetType, null );
+                        new SimpleSearchByIdType( sourceId, edgeType, maxValue, order, targetType,  null );
+
+                return serialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, search, shards );
+            }
 
-                return serialization.getEdgesFromSourceByTargetType( edgeColumnFamilies, scope, search, shards);
 
+
+
+
+            @Override
+            public MutationBatch writeEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+                                            final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+                                            final Shard targetShard, final MarkedEdge edge, final UUID timestamp ) {
+                return shardedEdgeSerialization.writeEdgeFromSourceWithTargetType( edgeColumnFamilies, scope, edge,
+                        Collections.singleton( targetShard ), this, timestamp );
+            }
+
+
+            @Override
+            public MutationBatch deleteEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+                                             final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+                                             final Shard sourceShard, final MarkedEdge edge, final UUID timestamp ) {
+                return shardedEdgeSerialization.deleteEdgeFromSourceWithTargetType( edgeColumnFamilies, scope, edge,
+                        Collections.singleton( sourceShard ), this, timestamp );
             }
 
 
@@ -304,20 +385,40 @@ public abstract class DirectedEdgeMeta {
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final Collection<Shard>  shards,
-                                                   final long maxValue ) {
+                                                   final ApplicationScope scope, final Collection<Shard> shards,
+                                                   final long maxValue, final SearchByEdgeType.Order order ) {
 
 
                 final Id targetId = nodes[0].id;
                 final String edgeType = types[0];
 
-                final SearchByEdgeType search = new SimpleSearchByEdgeType( targetId, edgeType, maxValue, null );
+                final SearchByEdgeType search = new SimpleSearchByEdgeType( targetId, edgeType, maxValue, order, null);
 
                 return serialization.getEdgesToTarget( edgeColumnFamilies, scope, search, shards );
             }
 
 
             @Override
+            public MutationBatch writeEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+                                            final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+                                            final Shard targetShard, final MarkedEdge edge, final UUID timestamp ) {
+                return shardedEdgeSerialization
+                        .writeEdgeToTarget( edgeColumnFamilies, scope, edge, Collections.singleton( targetShard ),
+                                this, timestamp );
+            }
+
+
+            @Override
+            public MutationBatch deleteEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+                                             final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+                                             final Shard sourceShard, final MarkedEdge edge, final UUID timestamp ) {
+                return shardedEdgeSerialization
+                        .deleteEdgeToTarget( edgeColumnFamilies, scope, edge, Collections.singleton( sourceShard ),
+                                this, timestamp );
+            }
+
+
+            @Override
             public MetaType getType() {
                 return MetaType.TARGET;
             }
@@ -339,11 +440,13 @@ public abstract class DirectedEdgeMeta {
     private static DirectedEdgeMeta fromTargetNodeSourceType( final NodeMeta[] nodes, final String[] types ) {
         return new DirectedEdgeMeta( nodes, types ) {
 
+
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
                                                    final ApplicationScope scope, final Collection<Shard> shards,
-                                                   final long maxValue ) {
+                                                   final long maxValue, final SearchByEdgeType.Order order ) {
+
 
                 final Id targetId = nodes[0].id;
                 final String edgeType = types[0];
@@ -351,9 +454,27 @@ public abstract class DirectedEdgeMeta {
 
 
                 final SearchByIdType search =
-                        new SimpleSearchByIdType( targetId, edgeType, maxValue, sourceType, null );
+                        new SimpleSearchByIdType( targetId, edgeType, maxValue, order, sourceType,  null );
+
+                return serialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, search, shards );
+            }
+
 
-                return serialization.getEdgesToTargetBySourceType( edgeColumnFamilies, scope, search, shards);
+            @Override
+            public MutationBatch writeEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+                                            final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+                                            final Shard targetShard, final MarkedEdge edge, final UUID timestamp ) {
+                return shardedEdgeSerialization.writeEdgeToTargetWithSourceType( edgeColumnFamilies, scope, edge,
+                        Collections.singleton( targetShard ), this, timestamp );
+            }
+
+
+            @Override
+            public MutationBatch deleteEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+                                             final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+                                             final Shard sourceShard, final MarkedEdge edge, final UUID timestamp ) {
+                return shardedEdgeSerialization.deleteEdgeToTargetWithSourceType( edgeColumnFamilies, scope, edge,
+                        Collections.singleton( sourceShard ), this, timestamp );
             }
 
 
@@ -385,18 +506,37 @@ public abstract class DirectedEdgeMeta {
             @Override
             public Iterator<MarkedEdge> loadEdges( final ShardedEdgeSerialization serialization,
                                                    final EdgeColumnFamilies edgeColumnFamilies,
-                                                   final ApplicationScope scope, final Collection<Shard>  shards,
-                                                   final long maxValue ) {
+                                                   final ApplicationScope scope, final Collection<Shard> shards,
+                                                   final long maxValue, final SearchByEdgeType.Order order ) {
 
                 final Id sourceId = nodes[0].id;
                 final Id targetId = nodes[1].id;
                 final String edgeType = types[0];
 
                 final SimpleSearchByEdge search =
-                        new SimpleSearchByEdge( sourceId, edgeType, targetId, maxValue, null );
+                        new SimpleSearchByEdge( sourceId, edgeType, targetId, maxValue, order, null );
+
+                return serialization.getEdgeVersions( edgeColumnFamilies, scope, search, shards );
+            }
 
-                return serialization.getEdgeVersions( edgeColumnFamilies, scope, search, shards);
 
+            @Override
+            public MutationBatch writeEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+                                            final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+                                            final Shard targetShard, final MarkedEdge edge, final UUID timestamp ) {
+                return shardedEdgeSerialization
+                        .writeEdgeVersions( edgeColumnFamilies, scope, edge, Collections.singleton( targetShard ),
+                                this, timestamp );
+            }
+
+
+            @Override
+            public MutationBatch deleteEdge( final ShardedEdgeSerialization shardedEdgeSerialization,
+                                             final EdgeColumnFamilies edgeColumnFamilies, final ApplicationScope scope,
+                                             final Shard sourceShard, final MarkedEdge edge, final UUID timestamp ) {
+                return shardedEdgeSerialization
+                        .deleteEdgeVersions( edgeColumnFamilies, scope, edge, Collections.singleton( sourceShard ),
+                                this, timestamp );
             }
 
 
@@ -408,16 +548,15 @@ public abstract class DirectedEdgeMeta {
     }
 
 
-
     /**
      * Create a directed edge from the stored meta data
+     *
      * @param metaType The meta type stored
      * @param nodes The metadata of the nodes
      * @param types The types in the meta data
-     *
-     *
      */
-    public static DirectedEdgeMeta fromStorage( final  MetaType metaType, final NodeMeta[] nodes, final String[] types ) {
+    public static DirectedEdgeMeta fromStorage( final MetaType metaType, final NodeMeta[] nodes,
+                                                final String[] types ) {
         switch ( metaType ) {
             case SOURCE:
                 return fromSourceNode( nodes, types );
@@ -428,7 +567,7 @@ public abstract class DirectedEdgeMeta {
             case TARGETSOURCE:
                 return fromTargetNodeSourceType( nodes, types );
             case VERSIONS:
-                return fromEdge(nodes, types);
+                return fromEdge( nodes, types );
             default:
                 throw new UnsupportedOperationException( "No supported meta type found" );
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
index 9bd9937..74f7ffc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKey.java
@@ -42,4 +42,14 @@ public class RowKey {
         this.edgeType = edgeType;
         this.shardId = shardId;
     }
+
+
+    @Override
+    public String toString() {
+        return "RowKey{" +
+                "nodeId=" + nodeId +
+                ", edgeType='" + edgeType + '\'' +
+                ", shardId=" + shardId +
+                '}';
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
index 6e69bbf..3368c40 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/RowKeyType.java
@@ -57,5 +57,10 @@ public class RowKeyType extends RowKey {
     }
 
 
-
+    @Override
+    public String toString() {
+        return "RowKeyType{" +
+                "idType='" + idType + '\'' +
+                "} " + super.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
index 38fe51c..9ca6cbe 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -137,6 +137,7 @@ public class Shard implements Comparable<Shard> {
         return "Shard{" +
                 "shardIndex=" + shardIndex +
                 ", createdTime=" + createdTime +
-                "} ";
+                ", compacted=" + compacted +
+                '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
index 70569fd..11bf7a4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardEntryGroup.java
@@ -20,10 +20,15 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Preconditions;
 
 
@@ -36,6 +41,7 @@ import com.google.common.base.Preconditions;
  */
 public class ShardEntryGroup {
 
+    private static final Logger LOG = LoggerFactory.getLogger( ShardEntryGroup.class );
 
     private List<Shard> shards;
 
@@ -45,6 +51,8 @@ public class ShardEntryGroup {
 
     private Shard compactionTarget;
 
+    private Shard rootShard;
+
 
     /**
      * The max delta we accept in milliseconds for create time to be considered a member of this group
@@ -85,10 +93,13 @@ public class ShardEntryGroup {
         //shard is not compacted, or it's predecessor isn't, we should include it in this group
         if ( !minShard.isCompacted() ) {
             addShardInternal( shard );
+
             return true;
         }
 
 
+
+
         return false;
     }
 
@@ -124,10 +135,26 @@ public class ShardEntryGroup {
      * Get the entries that we should read from.
      */
     public Collection<Shard> getReadShards() {
-        return shards;
+
+
+        final Shard staticShard = getRootShard();
+        final Shard compactionTarget = getCompactionTarget();
+
+
+
+        if(compactionTarget != null){
+            LOG.debug( "Returning shards {} and {} as read shards", compactionTarget, staticShard );
+            return Arrays.asList( compactionTarget, staticShard );
+        }
+
+
+        LOG.debug( "Returning shards {} read shard", staticShard );
+        return  Collections.singleton( staticShard );
     }
 
 
+
+
     /**
      * Get the entries, with the max shard time being first. We write to all shards until they're migrated
      */
@@ -138,11 +165,22 @@ public class ShardEntryGroup {
          * adding data to other shards
          */
         if ( !isTooSmallToCompact() && shouldCompact( currentTime ) ) {
-            return Collections.singleton( getCompactionTarget() );
+
+            final Shard compactionTarget = getCompactionTarget();
+
+            LOG.debug( "Returning shard {} as write shard", compactionTarget);
+
+            return Collections.singleton( compactionTarget  );
+
         }
 
+        final Shard staticShard = getRootShard();
+
+
+        LOG.debug( "Returning shard {} as write shard", staticShard);
+
+        return Collections.singleton( staticShard );
 
-        return shards;
     }
 
 
@@ -155,6 +193,24 @@ public class ShardEntryGroup {
 
 
     /**
+     * Get the root shard that was created in this group
+     * @return
+     */
+    private Shard getRootShard(){
+        if(rootShard != null){
+            return rootShard;
+        }
+
+        final Shard rootCandidate = shards.get( shards.size() -1 );
+
+        if(rootCandidate.isCompacted()){
+            rootShard = rootCandidate;
+        }
+
+        return rootShard;
+    }
+
+    /**
      * Get the shard all compactions should write to.  Null indicates we cannot find a shard that could be used as a
      * compaction target.  Note that this shard may not have surpassed the delta yet You should invoke "shouldCompact"
      * first to ensure all criteria are met before initiating compaction

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
index bf4d3c9..4fe1a63 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/ShardGroupCompaction.java
@@ -22,38 +22,32 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
-import rx.Observable;
+import com.google.common.util.concurrent.ListenableFuture;
 
 
 /**
  * Defines tasks for running compaction
- *
- *
  */
 public interface ShardGroupCompaction {
 
-
-    /**
-     * Execute the compaction task.  Will return the number of edges that have
-     * @param group The shard entry group to compact
-     * @return The shards that were compacted
-     */
-    public Set<Shard> compact(final ApplicationScope scope, final DirectedEdgeMeta edgeMeta, final ShardEntryGroup group);
-
     /**
-     * Possibly audit the shard entry group.  This is asynchronous and returns immediately
-     * @param group
-     * @return
+     * Possibly audit the shard entry group.  This is asynchronous and returns the future that will
+     * report the operations performed (if any) upon completion.
+     *
+     * @return A ListenableFuture with the result.  Note that some
      */
-    public AuditResult evaluateShardGroup( final ApplicationScope scope, final DirectedEdgeMeta edgeMeta,
-                                           final ShardEntryGroup group );
+    public ListenableFuture<AuditResult> evaluateShardGroup( final ApplicationScope scope,
+                                                             final DirectedEdgeMeta edgeMeta,
+                                                             final ShardEntryGroup group );
 
 
-    public enum AuditResult{
+    public enum AuditResult {
         /**
          * We didn't check this shard
          */
@@ -68,11 +62,10 @@ public interface ShardGroupCompaction {
          */
         CHECKED_CREATED,
 
-        /**
+        COMPACTED, /**
          * The shard group is already compacting
          */
         COMPACTING
     }
 
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
index ed3daaf..749416c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardCounterSerializationImpl.java
@@ -39,10 +39,9 @@ import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
 import org.apache.usergrid.persistence.graph.GraphFig;
-import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.EdgeShardRowKeySerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import com.google.common.base.Preconditions;
@@ -51,7 +50,6 @@ import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.OperationResult;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
 import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.model.CompositeBuilder;
@@ -127,7 +125,10 @@ public class NodeShardCounterSerializationImpl implements NodeShardCounterSerial
         }
         //column not found, return 0
         catch ( RuntimeException re ) {
-            if(re.getCause().getCause() instanceof NotFoundException) {
+
+            final Throwable cause = re.getCause();
+
+            if(cause != null && cause.getCause() instanceof NotFoundException) {
                 return 0;
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
deleted file mode 100644
index 90b264c..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeRowKeySerializer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeRowKey;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-
-
-/**
- * Class to perform serialization for row keys from edges
- */
-
-public class EdgeRowKeySerializer implements CompositeFieldSerializer<EdgeRowKey> {
-
-    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
-
-
-    @Override
-    public void toComposite( final CompositeBuilder builder, final EdgeRowKey key ) {
-
-        //add the row id to the composite
-        ID_SER.toComposite( builder, key.sourceId );
-        builder.addString( key.edgeType );
-        ID_SER.toComposite( builder, key.targetId );
-        builder.addLong( key.shardId );
-    }
-
-
-    @Override
-    public EdgeRowKey fromComposite( final CompositeParser composite ) {
-
-        final Id sourceId = ID_SER.fromComposite( composite );
-        final String edgeType = composite.readString();
-        final Id targetId = ID_SER.fromComposite( composite );
-        final long shard = composite.readLong();
-
-        return new EdgeRowKey( sourceId, edgeType, targetId, shard );
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index 27862d0..413c2a3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -4,16 +4,16 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
+import org.apache.usergrid.persistence.core.astyanax.ColumnSearch;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -30,23 +30,26 @@ import com.netflix.astyanax.util.RangeBuilder;
  * @param <C> The column type
  * @param <T> The parsed return type
  */
-public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Comparator<T> {
+public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, ColumnSearch<T>{
 
-    protected final Optional<Edge> last;
+    protected final Optional<T> last;
     protected final long maxTimestamp;
     protected final ApplicationScope scope;
     protected final Collection<Shard> shards;
+    protected final SearchByEdgeType.Order order;
+    protected final Comparator<T> comparator;
 
 
-    protected EdgeSearcher( final ApplicationScope scope, final long maxTimestamp, final Optional<Edge> last,
-                            final Collection<Shard> shards ) {
+    protected EdgeSearcher( final ApplicationScope scope, final Collection<Shard> shards,  final SearchByEdgeType.Order order, final Comparator<T> comparator,  final long maxTimestamp, final Optional<T> last) {
 
         Preconditions.checkArgument(shards.size() > 0 , "Cannot search with no possible shards");
 
         this.scope = scope;
         this.maxTimestamp = maxTimestamp;
-        this.last = last;
+        this.order = order;
         this.shards = shards;
+        this.last = last;
+        this.comparator = comparator;
     }
 
 
@@ -69,20 +72,6 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
 
 
 
-    /**
-     * Set the range on a search
-     */
-    public void setRange( final RangeBuilder builder ) {
-
-        //set our start range since it was supplied to us
-        if ( last.isPresent() ) {
-            C sourceEdge = getStartColumn( last.get() );
-
-
-            builder.setStart( sourceEdge, getSerializer() );
-        }
-
-    }
 
 
     public boolean hasPage() {
@@ -98,6 +87,50 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
     }
 
 
+    @Override
+    public void buildRange( final RangeBuilder rangeBuilder, final T value ) {
+
+        C edge = createColumn( value );
+
+        rangeBuilder.setStart( edge, getSerializer() );
+
+        setRangeOptions( rangeBuilder );
+    }
+
+
+    @Override
+    public void buildRange( final RangeBuilder rangeBuilder ) {
+
+        //set our start range since it was supplied to us
+        if ( last.isPresent() ) {
+            C sourceEdge = createColumn( last.get() );
+
+
+            rangeBuilder.setStart( sourceEdge, getSerializer() );
+        }
+
+
+        setRangeOptions(rangeBuilder);
+
+
+    }
+
+    private void setRangeOptions(final RangeBuilder rangeBuilder){
+            //if we're ascending, this is opposite what cassandra sorts, so set the reversed flag
+        final boolean reversed = order == SearchByEdgeType.Order.ASCENDING;
+
+        rangeBuilder.setReversed( reversed );
+    }
+
+
+    /**
+     * Get the comparator
+     * @return
+     */
+    public Comparator<T> getComparator() {
+        return comparator;
+    }
+
 
     /**
      * Get the column's serializer
@@ -116,7 +149,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Compa
     /**
      * Set the start column to begin searching from.  The last is provided
      */
-    protected abstract C getStartColumn( final Edge last );
+    protected abstract C createColumn( final T last );
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
deleted file mode 100644
index d93f679..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSerializer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied.  See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
- *
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import java.nio.ByteBuffer;
-
-import org.apache.usergrid.persistence.core.astyanax.IdColDynamicCompositeSerializer;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.google.common.base.Preconditions;
-import com.netflix.astyanax.model.DynamicComposite;
-import com.netflix.astyanax.serializers.AbstractSerializer;
-import com.netflix.astyanax.serializers.LongSerializer;
-
-
-/**
- * Serializes to a source->target edge Note that we cannot set the edge type on de-serialization.  Only the target
- * Id and version.
- */
-public class EdgeSerializer extends AbstractSerializer<DirectedEdge> {
-
-    private static final IdColDynamicCompositeSerializer ID_COL_SERIALIZER = IdColDynamicCompositeSerializer.get();
-    private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
-
-
-    @Override
-    public ByteBuffer toByteBuffer( final DirectedEdge edge ) {
-
-        DynamicComposite composite = new DynamicComposite();
-
-        composite.addComponent( edge.timestamp, LONG_SERIALIZER );
-
-        ID_COL_SERIALIZER.toComposite( composite, edge.id );
-
-        return composite.serialize();
-    }
-
-
-    @Override
-    public DirectedEdge fromByteBuffer( final ByteBuffer byteBuffer ) {
-        DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer );
-
-        Preconditions.checkArgument( composite.size() == 3, "Composite should have 3 elements" );
-
-
-        //return the version
-        final long timestamp = composite.get( 0, LONG_SERIALIZER );
-
-
-        //parse our id
-        final Id id = ID_COL_SERIALIZER.fromComposite( composite, 1 );
-
-
-        return new DirectedEdge( id, timestamp );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java
deleted file mode 100644
index 0451d68..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardRowKeySerializer.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied.  See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
- *
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeType;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-
-
-public class EdgeShardRowKeySerializer implements CompositeFieldSerializer<DirectedEdgeMeta> {
-
-    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
-
-    public static final EdgeShardRowKeySerializer INSTANCE = new EdgeShardRowKeySerializer();
-
-
-    @Override
-    public void toComposite( final CompositeBuilder builder, final DirectedEdgeMeta meta ) {
-
-
-        final DirectedEdgeMeta.NodeMeta[] nodeMeta = meta.getNodes();
-
-        //add the stored value
-        builder.addInteger( meta.getType().getStorageValue() );
-
-        final int length = nodeMeta.length;
-
-        builder.addInteger( length );
-
-
-        for ( DirectedEdgeMeta.NodeMeta node : nodeMeta ) {
-            ID_SER.toComposite( builder, node.getId() );
-            builder.addInteger( node.getNodeType().getStorageValue() );
-        }
-
-        final String[] edgeTypes = meta.getTypes();
-
-        builder.addInteger( edgeTypes.length );
-
-        for ( String type : edgeTypes ) {
-            builder.addString( type );
-        }
-    }
-
-
-    @Override
-    public DirectedEdgeMeta fromComposite( final CompositeParser composite ) {
-
-
-        final int storageType = composite.readInteger();
-
-        final DirectedEdgeMeta.MetaType metaType = DirectedEdgeMeta.MetaType.fromStorage( storageType );
-
-        final int idLength = composite.readInteger();
-
-        final DirectedEdgeMeta.NodeMeta[] nodePairs = new DirectedEdgeMeta.NodeMeta[idLength];
-
-
-        for ( int i = 0; i < idLength; i++ ) {
-            final Id sourceId = ID_SER.fromComposite( composite );
-
-            final NodeType type = NodeType.get( composite.readInteger() );
-
-            nodePairs[i] = new DirectedEdgeMeta.NodeMeta( sourceId, type );
-        }
-
-
-        final int length = composite.readInteger();
-
-        String[] types = new String[length];
-
-        for ( int i = 0; i < length; i++ ) {
-            types[i] = composite.readString();
-        }
-
-        return  DirectedEdgeMeta.fromStorage( metaType, nodePairs, types );
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index 8233d0d..3b1f37a 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -40,6 +40,7 @@ import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 
 import com.google.common.base.Optional;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index b919ad7..832d9b0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -32,6 +32,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.ValidationUtils;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.apache.usergrid.persistence.graph.exception.GraphRuntimeException;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
@@ -40,16 +41,15 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardA
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardApproximation;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
-import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
-import com.fasterxml.uuid.impl.UUIDUtil;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.netflix.astyanax.MutationBatch;
-import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.util.TimeUUIDUtils;
 
 
 /**
@@ -68,6 +68,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
     private final NodeShardApproximation nodeShardApproximation;
     private final TimeService timeService;
     private final GraphFig graphFig;
+    private final ShardGroupCompaction shardGroupCompaction;
 
 
     @Inject
@@ -75,13 +76,14 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
                                     final EdgeColumnFamilies edgeColumnFamilies,
                                     final ShardedEdgeSerialization shardedEdgeSerialization,
                                     final NodeShardApproximation nodeShardApproximation, final TimeService timeService,
-                                    final GraphFig graphFig ) {
+                                    final GraphFig graphFig, final ShardGroupCompaction shardGroupCompaction ) {
         this.edgeShardSerialization = edgeShardSerialization;
         this.edgeColumnFamilies = edgeColumnFamilies;
         this.shardedEdgeSerialization = shardedEdgeSerialization;
         this.nodeShardApproximation = nodeShardApproximation;
         this.timeService = timeService;
         this.graphFig = graphFig;
+        this.shardGroupCompaction = shardGroupCompaction;
     }
 
 
@@ -113,7 +115,8 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             existingShards = Collections.singleton( MIN_SHARD ).iterator();
         }
 
-        return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta() );
+        return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta(), shardGroupCompaction, scope,
+                directedEdgeMeta );
     }
 
 
@@ -158,18 +161,31 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
         final long count = nodeShardApproximation.getCount( scope, shard, directedEdgeMeta );
 
+        final long shardSize = graphFig.getShardSize();
 
-        if ( count < graphFig.getShardSize() ) {
+
+        if ( count < shardSize ) {
             return false;
         }
 
+        /**
+         * We want to allocate a new shard as close to the max value as possible.  This way if we're filling up a shard rapidly, we split it near the head of the values.
+         * Further checks to this group will result in more splits, similar to creating a tree type structure and splitting each node.
+         *
+         * This means that the lower shard can be re-split later if it is still too large.  We do the division to truncate
+         * to a split point < what our current max is that would be approximately be our pivot ultimately if we split from the
+         * lower bound and moved forward.  Doing this will stop the current shard from expanding and avoid a point where we cannot
+         * ultimately compact to the correct shard size.
+         */
+
 
         /**
          * Allocate the shard
          */
 
         final Iterator<MarkedEdge> edges = directedEdgeMeta
-                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), Long.MAX_VALUE );
+                .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), 0,
+                        SearchByEdgeType.Order.ASCENDING );
 
 
         if ( !edges.hasNext() ) {
@@ -178,14 +194,41 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             return false;
         }
 
-        //we have a next, allocate it based on the max
 
-        MarkedEdge marked = edges.next();
+        MarkedEdge marked = null;
+
+        /**
+         * Advance to the pivot point we should use.  Once it's compacted, we can split again.
+         * We either want to take the first one (unlikely) or we take our total count - the shard size.
+         * If this is a negative number, we're approaching our max count for this shard, so the first
+         * element will suffice.
+         */
+
+
+        for(long i = 1;  edges.hasNext(); i++){
+            //we hit a pivot shard, set it since it could be the last one we encounter
+            if(i% shardSize == 0){
+                marked = edges.next();
+            }
+            else{
+                edges.next();
+            }
+        }
+
+
+        /**
+         * Sanity check in case our counters become severely out of sync with our edge state in cassandra.
+         */
+        if(marked == null){
+            LOG.warn( "Incorrect shard count for shard group {}", shardEntryGroup );
+            return false;
+        }
 
         final long createTimestamp = timeService.getCurrentTime();
 
         final Shard newShard = new Shard( marked.getTimestamp(), createTimestamp, false );
 
+        LOG.info( "Allocating new shard {} for edge meta {}", newShard, directedEdgeMeta );
 
         final MutationBatch batch = this.edgeShardSerialization.writeShardMeta( scope, newShard, directedEdgeMeta );
 
@@ -220,22 +263,30 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
      */
     private boolean isNewNode( DirectedEdgeMeta directedEdgeMeta ) {
 
+
+        //TODO: TN this is broken....
         //The timeout is in milliseconds.  Time for a time uuid is 1/10000 of a milli, so we need to get the units correct
-        final long uuidDelta =  graphFig.getShardCacheTimeout()  * 10000;
+        final long timeoutDelta = graphFig.getShardCacheTimeout() ;
+
+        final long timeNow = timeService.getCurrentTime();
 
-        final long timeNow = UUIDGenerator.newTimeUUID().timestamp();
+        boolean isNew = true;
 
         for ( DirectedEdgeMeta.NodeMeta node : directedEdgeMeta.getNodes() ) {
 
-            final long uuidTime = node.getId().getUuid().timestamp();
+            //short circuit
+            if(!isNew){
+                return false;
+            }
 
-            final long uuidTimeDelta = uuidTime + uuidDelta;
+            final long uuidTime =   TimeUUIDUtils.getTimeFromUUID( node.getId().getUuid());
 
-            if ( uuidTimeDelta < timeNow ) {
-                return true;
-            }
+            final long newExpirationTimeout = uuidTime + timeoutDelta;
+
+            //our expiration is after our current time, treat it as new
+            isNew = isNew && newExpirationTimeout >  timeNow;
         }
 
-        return false;
+        return isNew;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index d444eec..36a7bc1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -299,13 +299,13 @@ public class NodeShardCacheImpl implements NodeShardCache {
 
 
         /**
-         * Get all shards <= this one in decending order
+         * Get all shards <= this one in descending order
          */
         public Iterator<ShardEntryGroup> getShards( final Long maxShard ) {
 
             final Long firstKey = shards.floorKey( maxShard );
 
-            return shards.headMap( firstKey, true ).descendingMap().values().iterator();
+            return Collections.unmodifiableCollection( shards.headMap( firstKey, true ).descendingMap().values()).iterator();
         }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
deleted file mode 100644
index 1edaf21..0000000
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/RowSerializer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied.  See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
- *
- */
-
-package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
-
-
-import org.apache.usergrid.persistence.core.astyanax.CompositeFieldSerializer;
-import org.apache.usergrid.persistence.core.astyanax.IdRowCompositeSerializer;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.RowKey;
-import org.apache.usergrid.persistence.model.entity.Id;
-
-import com.netflix.astyanax.model.CompositeBuilder;
-import com.netflix.astyanax.model.CompositeParser;
-
-
-/**
- * Class to perform serialization for row keys from edges
- */
-public class RowSerializer implements CompositeFieldSerializer<RowKey> {
-
-    private static final IdRowCompositeSerializer ID_SER = IdRowCompositeSerializer.get();
-
-
-    @Override
-    public void toComposite( final CompositeBuilder builder, final RowKey key ) {
-
-        //add the row id to the composite
-        ID_SER.toComposite( builder, key.nodeId );
-
-        builder.addString( key.edgeType );
-        builder.addLong( key.shardId );
-    }
-
-
-    @Override
-    public RowKey fromComposite( final CompositeParser composite ) {
-
-        final Id id = ID_SER.fromComposite( composite );
-        final String edgeType = composite.readString();
-        final long shard = composite.readLong();
-
-
-        return new RowKey( id, edgeType, shard );
-    }
-}


Mime
View raw message