usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [06/20] usergrid git commit: Add 'smart' shard seeking into the multi row column iterator. This leverages a stored 'end' edge marked on a shard at the end of compacting and balancing a shard's (row's) edges (columns).
Date Wed, 23 Mar 2016 17:34:34 GMT
Add 'smart' shard seeking into the multi row column iterator.  This leverages a stored 'end' edge marked on a shard at the end of compacting and balancing a shard's (row's) edges (columns).


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4e407ff6
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4e407ff6
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4e407ff6

Branch: refs/heads/release-2.1.1
Commit: 4e407ff690f220ee04c535a5ce91ca5a3a07ad1d
Parents: b112488
Author: Michael Russo <mrusso@apigee.com>
Authored: Thu Mar 17 22:12:51 2016 -0700
Committer: Michael Russo <mrusso@apigee.com>
Committed: Thu Mar 17 22:12:51 2016 -0700

----------------------------------------------------------------------
 .../persistence/core/astyanax/ColumnSearch.java |   7 +-
 .../core/astyanax/MultiRowColumnIterator.java   | 146 ++++++++++---------
 .../persistence/core/shard/SmartShard.java      |   8 +-
 .../astyanax/MultiRowColumnIteratorTest.java    |  14 +-
 .../impl/EdgeMetadataSerializationV2Impl.java   |   4 +-
 .../graph/serialization/impl/shard/Shard.java   |   8 +-
 .../impl/shard/impl/EdgeSearcher.java           |  38 ++---
 .../shard/impl/EdgeShardSerializationImpl.java  |  44 +++++-
 .../shard/impl/NodeShardAllocationImpl.java     |  13 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  88 +++--------
 .../impl/ShardedEdgeSerializationImpl.java      |   2 +-
 .../shard/impl/serialize/ShardSerializer.java   |  99 +++++++++++++
 .../graph/GraphManagerShardConsistencyIT.java   |  42 +++---
 .../graph/src/test/resources/log4j.properties   |   1 +
 14 files changed, 315 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java
index 112f4aa..43654ad 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java
@@ -33,17 +33,18 @@ public interface ColumnSearch<T> {
      * Set the start value supplied and the user supplied end value (if present)
      *
      * @param value The value to set in the start
+     * @param end
      */
-    public void buildRange( final RangeBuilder rangeBuilder, final T value );
+    void buildRange(final RangeBuilder rangeBuilder, final T start, T end);
 
     /**
      * Set the range builder with the user supplied start and finish
      */
-    public void buildRange( final RangeBuilder rangeBuilder );
+    void buildRange( final RangeBuilder rangeBuilder );
 
     /**
      * Return true if we should skip the first result
      * @return
      */
-    public boolean skipFirst(final T first);
+    boolean skipFirst(final T first);
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index 6c91aca..d8b9097 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -227,14 +227,14 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
         if(currentShard == null){
 
             if(logger.isTraceEnabled()){
-                logger.trace(Thread.currentThread().getName()+" - currentShard: {}", currentShard);
+                logger.trace("currentShard: {}", currentShard);
             }
 
             currentShard = currentShardIterator.next();
 
             if(logger.isTraceEnabled()){
-                logger.trace(Thread.currentThread().getName()+" - all shards when starting: {}", rowKeysWithShardEnd);
-                logger.trace(Thread.currentThread().getName()+" - initializing iterator with shard: {}", currentShard);
+                logger.trace("all shards when starting: {}", rowKeysWithShardEnd);
+                logger.trace("initializing iterator with shard: {}", currentShard);
             }
 
 
@@ -242,21 +242,85 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
 
 
+        // initial request, build the range with no start and no end
+        if ( startColumn == null && currentShard.getShardEnd() == null ){
 
+            columnSearch.buildRange( rangeBuilder );
+
+            if(logger.isTraceEnabled()){
+                logger.trace("initial search (no start or shard end)");
+            }
 
-        //set the range into the search
-        if(logger.isTraceEnabled()){
-            logger.trace(Thread.currentThread().getName()+" - startColumn={}", startColumn);
         }
+        // if there's only a startColumn set the range start startColumn always
+        else if ( startColumn != null && currentShard.getShardEnd() == null ){
+
+            columnSearch.buildRange( rangeBuilder, startColumn, null );
+
+            if(logger.isTraceEnabled()){
+                logger.trace("search (no shard end) with start: {}", startColumn);
+            }
 
-        if ( startColumn == null ) {
-            columnSearch.buildRange( rangeBuilder );
         }
-        else {
-            columnSearch.buildRange( rangeBuilder, startColumn );
+        // if there's only a shardEnd, set the start/end according based on the search order
+        else if ( startColumn == null && currentShard.getShardEnd() != null ){
+
+            T shardEnd = (T) currentShard.getShardEnd();
+
+            // if we have a shardEnd and it's not an ascending search, use the shardEnd as a start
+            if(!ascending) {
+
+                columnSearch.buildRange(rangeBuilder, shardEnd, null);
+
+                if(logger.isTraceEnabled()){
+                    logger.trace("search descending with start: {}", shardEnd);
+                }
+
+            }
+            // if we have a shardEnd and it is an ascending search, use the shardEnd as the end
+            else{
+
+                columnSearch.buildRange( rangeBuilder, null, shardEnd );
+
+                if(logger.isTraceEnabled()){
+                    logger.trace("search ascending with end: {}", shardEnd);
+                }
+
+            }
+
         }
+        // if there's both a startColumn and a shardEnd, decide which should be used as start/end based on search order
+        else if ( startColumn != null && currentShard.getShardEnd() != null) {
+
+            T shardEnd = (T) currentShard.getShardEnd();
+
+
+            // if the search is not ascending, set the start to be the older edge
+            if(!ascending){
+
+                T searchStart = comparator.compare(shardEnd, startColumn) > 0 ? shardEnd : startColumn;
+                columnSearch.buildRange( rangeBuilder, searchStart, null);
+
+                if(logger.isTraceEnabled()){
+                    logger.trace("search descending with start: {} in shard", searchStart, currentShard);
+                }
+
+            }
+            // if the search is ascending, then always use the startColumn for the start and shardEnd for the range end
+            else{
+
+                columnSearch.buildRange( rangeBuilder, startColumn , shardEnd);
+
+                if(logger.isTraceEnabled()){
+                    logger.trace("search with start: {}, end: {}", startColumn, shardEnd);
+                }
 
 
+
+            }
+
+        }
+
         rangeBuilder.setLimit( selectSize );
 
         if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query with shard {}", currentShard );
@@ -296,8 +360,8 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
 
         if(logger.isTraceEnabled()){
-            logger.trace(Thread.currentThread().getName()+" - current shard: {}, retrieved size: {}", currentShard, size);
-            logger.trace(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size);
+            logger.trace("current shard: {}, retrieved size: {}", currentShard, size);
+            logger.trace("selectSize={}, size={}, ", selectSize, size);
 
 
         }
@@ -348,8 +412,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
         }
 
        if(logger.isTraceEnabled()){
-           logger.trace(
-               Thread.currentThread().getName()+" - currentColumnIterator.hasNext()={}, " +
+           logger.trace("currentColumnIterator.hasNext()={}, " +
                    "moreToReturn={}, currentShardIterator.hasNext()={}",
                currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext());
        }
@@ -359,61 +422,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
 
     /**
-     * Return true if we have < 2 rows with columns, false otherwise
-     */
-    private boolean containsSingleRowOnly( final Rows<R, C> result ) {
-
-        int count = 0;
-
-        for ( R key : result.getKeys() ) {
-            if ( result.getRow( key ).getColumns().size() > 0 ) {
-                count++;
-
-                //we have more than 1 row with values, return them
-                if ( count > 1 ) {
-                    return false;
-                }
-            }
-        }
-
-        return true;
-    }
-
-
-    /**
-     * A single row is present, only parse the single row
-     * @param result
-     * @return
-     */
-    private List<T> singleRowResult( final Rows<R, C> result ) {
-
-        if (logger.isTraceEnabled()) logger.trace( "Only a single row has columns.  Parsing directly" );
-
-        for ( R key : result.getKeys() ) {
-            final ColumnList<C> columnList = result.getRow( key ).getColumns();
-
-            final int size = columnList.size();
-
-            if ( size > 0 ) {
-
-                final List<T> results = new ArrayList<>(size);
-
-                for(Column<C> column: columnList){
-                    results.add(columnParser.parseColumn( column ));
-                }
-
-                return results;
-
-
-            }
-        }
-
-        //we didn't have any results, just return nothing
-        return Collections.<T>emptyList();
-    }
-
-
-    /**
      * Process the result set and filter any duplicates that may have already been seen in previous shards.  During
      * a shard transition, there could be the same columns in multiple shards (rows).  This will also allow for
      * filtering the startColumn (the seek starting point) when paging a row in Cassandra.

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
index 8a1bee8..b60cb59 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
@@ -20,13 +20,13 @@ package org.apache.usergrid.persistence.core.shard;
 
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 
-public class SmartShard<R, C> {
+public class SmartShard<R, T> {
 
     final ScopedRowKey<R> rowKey;
-    final C shardEnd;
+    final T shardEnd;
 
 
-    public SmartShard(final ScopedRowKey<R> rowKey, final C shardEnd){
+    public SmartShard(final ScopedRowKey<R> rowKey, final T shardEnd){
 
         this.rowKey = rowKey;
         this.shardEnd = shardEnd;
@@ -37,7 +37,7 @@ public class SmartShard<R, C> {
         return rowKey;
     }
 
-    public C getShardEnd(){
+    public T getShardEnd(){
         return shardEnd;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
index 9f5741b..b6ee7fe 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
 import java.util.concurrent.CountDownLatch;
 
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -53,7 +52,6 @@ import com.netflix.astyanax.util.RangeBuilder;
 import rx.Observable;
 import rx.Observer;
 import rx.functions.Action1;
-import rx.functions.Func1;
 import rx.schedulers.Schedulers;
 
 import static org.junit.Assert.assertEquals;
@@ -156,7 +154,7 @@ public class MultiRowColumnIteratorTest {
 
         final ColumnSearch<Long> ascendingSearch = new ColumnSearch<Long>() {
             @Override
-            public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+            public void buildRange(final RangeBuilder rangeBuilder, final Long value, Long end) {
                 rangeBuilder.setStart( value );
             }
 
@@ -201,7 +199,7 @@ public class MultiRowColumnIteratorTest {
 
         final ColumnSearch<Long> descendingSearch = new ColumnSearch<Long>() {
             @Override
-            public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+            public void buildRange(final RangeBuilder rangeBuilder, final Long value, Long end) {
                 rangeBuilder.setStart( value );
                 buildRange( rangeBuilder );
             }
@@ -276,7 +274,7 @@ public class MultiRowColumnIteratorTest {
 
         final ColumnSearch<Long> ascendingSearch = new ColumnSearch<Long>() {
             @Override
-            public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+            public void buildRange(final RangeBuilder rangeBuilder, final Long value, Long end) {
                 rangeBuilder.setStart( value );
             }
 
@@ -325,7 +323,7 @@ public class MultiRowColumnIteratorTest {
 
         final ColumnSearch<Long> descendingSearch = new ColumnSearch<Long>() {
             @Override
-            public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+            public void buildRange(final RangeBuilder rangeBuilder, final Long value, Long end) {
                 rangeBuilder.setStart( value );
                 buildRange( rangeBuilder );
             }
@@ -414,7 +412,7 @@ public class MultiRowColumnIteratorTest {
 
         final ColumnSearch<Long> ascendingSearch = new ColumnSearch<Long>() {
             @Override
-            public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+            public void buildRange(final RangeBuilder rangeBuilder, final Long value, Long end) {
                 rangeBuilder.setStart( value );
             }
 
@@ -459,7 +457,7 @@ public class MultiRowColumnIteratorTest {
 
         final ColumnSearch<Long> descendingSearch = new ColumnSearch<Long>() {
             @Override
-            public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+            public void buildRange(final RangeBuilder rangeBuilder, final Long value, Long end) {
                 rangeBuilder.setStart( value );
                 buildRange( rangeBuilder );
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
index 2af62a8..9b0257f 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgeMetadataSerializationV2Impl.java
@@ -489,7 +489,7 @@ public class EdgeMetadataSerializationV2Impl implements EdgeMetadataSerializatio
         //resume from the last if specified.  Also set the range
         return new ColumnSearch<String>() {
             @Override
-            public void buildRange( final RangeBuilder rangeBuilder, final String value ) {
+            public void buildRange(final RangeBuilder rangeBuilder, final String value, String end) {
                 rangeBuilder.setLimit( graphFig.getScanPageSize() );
 
 
@@ -517,7 +517,7 @@ public class EdgeMetadataSerializationV2Impl implements EdgeMetadataSerializatio
 
             @Override
             public void buildRange( final RangeBuilder rangeBuilder ) {
-                buildRange( rangeBuilder, null );
+                buildRange( rangeBuilder, null, null);
             }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
index 92793cb..6394703 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -33,7 +33,7 @@ public class Shard implements Comparable<Shard> {
     private final long shardIndex;
     private final long createdTime;
     private final boolean compacted;
-    private Optional<Edge> shardEnd;
+    private Optional<DirectedEdge> shardEnd;
 
 
     public Shard( final long shardIndex, final long createdTime, final boolean compacted ) {
@@ -76,11 +76,11 @@ public class Shard implements Comparable<Shard> {
         return shardIndex == MIN_SHARD.shardIndex;
     }
 
-    public void setShardEnd(final Optional<Edge> shardEnd) {
+    public void setShardEnd(final Optional<DirectedEdge> shardEnd) {
         this.shardEnd = shardEnd;
     }
 
-    public Optional<Edge> getShardEnd() {
+    public Optional<DirectedEdge> getShardEnd() {
         return shardEnd;
     }
 
@@ -170,7 +170,7 @@ public class Shard implements Comparable<Shard> {
         string.append(", compacted=").append(compacted);
         string.append(", shardEndTimestamp=");
         if(shardEnd.isPresent()){
-            string.append(shardEnd.get().getTimestamp());
+            string.append(shardEnd.get().timestamp);
         }else{
             string.append("null");
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index 2f5817d..6d8ddd4 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -19,20 +19,14 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
-import java.nio.ByteBuffer;
 import java.util.*;
 
-import org.apache.http.cookie.SM;
 import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
 import org.apache.usergrid.persistence.core.astyanax.ColumnSearch;
-import org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.shard.SmartShard;
-import org.apache.usergrid.persistence.graph.Edge;
-import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 
 import com.google.common.base.Optional;
@@ -43,8 +37,6 @@ import com.netflix.astyanax.util.RangeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator.*;
-
 
 /**
  * Searcher to be used when performing the search.  Performs I/O transformation as well as parsing for the iterator. If
@@ -56,6 +48,9 @@ import static org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterat
  */
 public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, ColumnSearch<T>{
 
+    private static final Logger logger = LoggerFactory.getLogger( EdgeSearcher.class );
+
+
     protected final Optional<T> last;
     protected final long maxTimestamp;
     protected final ApplicationScope scope;
@@ -84,7 +79,6 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
         List<ScopedRowKey<R>> rowKeys = new ArrayList<>(shards.size());
 
         for(Shard shard : shards){
-
             final ScopedRowKey< R> rowKey = ScopedRowKey
                     .fromKey( scope.getApplication(), generateRowKey(shard.getShardIndex() ) );
 
@@ -105,16 +99,14 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
             final ScopedRowKey< R> rowKey = ScopedRowKey
                 .fromKey( scope.getApplication(), generateRowKey(shard.getShardIndex() ) );
 
-            final C shardEnd;
-            if(shard.getShardEnd().isPresent()){
-                shardEnd = createColumn((T) shard.getShardEnd().get());
 
+            final T shardEnd;
+            if(shard.getShardEnd().isPresent()){
+                shardEnd = createEdge((C) shard.getShardEnd().get(), false); // convert DirectedEdge to Edge
             }else{
                 shardEnd = null;
             }
 
-
-
             rowKeysWithShardEnd.add(new SmartShard(rowKey, shardEnd));
         }
 
@@ -142,11 +134,23 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
 
 
     @Override
-    public void buildRange( final RangeBuilder rangeBuilder, final T value ) {
+    public void buildRange(final RangeBuilder rangeBuilder, final T start, T end) {
 
-        C edge = createColumn( value );
+        if ( start != null){
 
-        rangeBuilder.setStart( edge, getSerializer() );
+            C startEdge = createColumn( start );
+            rangeBuilder.setStart( startEdge, getSerializer() );
+        }else{
+
+            setTimeScan( rangeBuilder );
+        }
+
+        if( end != null){
+
+            C endEdge = createColumn( end );
+            rangeBuilder.setEnd( endEdge, getSerializer() );
+
+        }
 
         setRangeOptions( rangeBuilder );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index 120a15c..d22f472 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEd
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
 import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.EdgeShardRowKeySerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize.ShardSerializer;
 import org.apache.usergrid.persistence.graph.serialization.util.GraphValidation;
 
 import com.google.common.base.Optional;
@@ -53,11 +54,16 @@ import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.query.RowQuery;
 import com.netflix.astyanax.serializers.LongSerializer;
 import com.netflix.astyanax.util.RangeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 @Singleton
 public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
+    private static final Logger logger = LoggerFactory.getLogger( EdgeShardSerializationImpl.class );
+
+
     /**
      * Edge shards
      */
@@ -67,6 +73,7 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
 
     private static final ShardColumnParser COLUMN_PARSER = new ShardColumnParser();
+    private static final ShardSerializer SHARD_SERIALIZER = ShardSerializer.INSTANCE;
 
 
     protected final Keyspace keyspace;
@@ -101,7 +108,7 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
         final MutationBatch batch = keyspace.prepareMutationBatch();
 
         batch.withTimestamp( shard.getCreatedTime() ).withRow( EDGE_SHARDS, rowKey )
-             .putColumn( shard.getShardIndex(), shard.isCompacted() );
+             .putColumn( shard.getShardIndex(), SHARD_SERIALIZER.toByteBuffer(shard));
 
         return batch;
     }
@@ -180,9 +187,42 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
     private static class ShardColumnParser implements ColumnParser<Long, Shard> {
 
+        /** Example CQL schema for this table
+         *
+         * CREATE TABLE "Usergrid_Applications"."Edge_Shards" (
+         *    key blob,
+         *    column1 bigint,
+         *    value blob,
+         *    PRIMARY KEY (key, column1)
+         *    ) WITH COMPACT STORAGE
+         *    AND CLUSTERING ORDER BY (column1 DESC)
+         *
+         *
+         *
+         */
+
+
         @Override
         public Shard parseColumn( final Column<Long> column ) {
-            return new Shard( column.getName(), column.getTimestamp(), column.getBooleanValue() );
+
+            // A custom serializer was introduced to handle parsing multiple column formats without re-writing the data.
+            // The column can be stored as a legacy, single boolean, value OR a new, composite, value which contains
+            // every item in the shard. If the legacy value is seen, we return a shard with Long.MIN for index and
+            // createdTime so it can be identified later and handled.
+
+
+            Shard shard =  column.getValue(SHARD_SERIALIZER);
+
+            if (shard.getShardIndex() == Long.MIN_VALUE && shard.getCreatedTime() == Long.MIN_VALUE){
+
+                // this was deserialized as a legacy column format, use the column name and timestamp for the shard
+                return new Shard(column.getName(), column.getTimestamp(), shard.isCompacted());
+
+            } else {
+
+                return shard;
+            }
+
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index 6f95cf5..6b190a1 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -105,7 +105,6 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
              */
             if ( existingShards == null || !existingShards.hasNext() ) {
 
-                //logger.info("writing min shard");
                 final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta );
                 try {
                     batch.execute();
@@ -160,7 +159,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
 
         if ( shard.getCreatedTime() >= minTime ) {
-            if (logger.isTraceEnabled()) logger.trace( "Shard entry group {}  and shard {} is before the minimum created time of {}.  Not allocating.does not have 1 entry, not allocating", shardEntryGroup, shard, minTime );
+            if (logger.isTraceEnabled()) logger.trace( "Shard entry group {}  and shard {} is before the minimum created time of {}.  Not allocating", shardEntryGroup, shard, minTime );
             return false;
         }
 
@@ -196,7 +195,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
          */
 
         final Iterator<MarkedEdge> edges = directedEdgeMeta
-            .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, shardEntryGroup.getReadShards(), 0,
+            .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singletonList(shard),0,
                 SearchByEdgeType.Order.ASCENDING );
 
 
@@ -217,7 +216,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
          * element will suffice.
          */
 
-
+        long edgeCount = 0;
         for ( long i = 1; edges.hasNext(); i++ ) {
             //we hit a pivot shard, set it since it could be the last one we encounter
             if ( i % shardSize == 0 ) {
@@ -226,6 +225,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             else {
                 edges.next();
             }
+            edgeCount++;
         }
 
 
@@ -233,7 +233,10 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
          * Sanity check in case we audit before we have a full shard
          */
         if ( marked == null ) {
-            if (logger.isTraceEnabled()) logger.trace( "Shard {} in shard group {} not full, not splitting",  shard, shardEntryGroup );
+            if (logger.isTraceEnabled()){
+                logger.trace( "Shard {} in shard group {} not full, " +
+                    "not splitting. Edge count: {}",  shard, shardEntryGroup, edgeCount );
+            }
             return false;
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index f644380..1890d53 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -35,6 +35,7 @@ import javax.annotation.Nullable;
 import com.google.common.base.Optional;
 import com.netflix.astyanax.connectionpool.OperationResult;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,15 +44,6 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.GraphFig;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.AsyncTaskExecutor;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdgeMeta;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeColumnFamilies;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.EdgeShardSerialization;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.NodeShardAllocation;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroupCompaction;
-import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardedEdgeSerialization;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
@@ -189,7 +181,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                 final MarkedEdge edge = edges.next();
 
                 final long edgeTimestamp = edge.getTimestamp();
-                shardEnd = edge;
+
 
                 /**
                  * The edge is within a different shard, break
@@ -208,9 +200,10 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
 
                 edgeCount++;
+                shardEnd = edge;
 
-                //if we're at our count, execute the mutation of writing the edges to the new row, then remove them
-                //from the old rows
+                // if we're at our count, execute the mutation of writing the edges to the new row, then remove them
+                // from the old rows
                 if ( edgeCount % maxWorkSize == 0 ) {
 
                     try {
@@ -218,30 +211,10 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                         // write the edges into the new shard atomically so we know they all succeed
                         newRowBatch.withAtomicBatch(true).execute();
 
-                        List<MutationBatch> deleteMutations = new ArrayList<MutationBatch>(1)
-                        {{
-                            add(deleteRowBatch);
-                        }};
-
-                        // fire the mutation in the background after 1 second delay
-                        if(logger.isTraceEnabled()){
-                            logger.trace("scheduling shard compaction delete");
-
-                        }
-
-                        // perform the deletes after some delay, but we need to block before marking this shard as 'compacted'
-                        Observable.from(deleteMutations)
-                            .delay(1000, TimeUnit.MILLISECONDS)
-                            .map(deleteRowBatchSingle -> {
-                                try {
-                                    return deleteRowBatchSingle.execute();
-                                } catch (ConnectionException e) {
-                                    logger.error("Unable to remove edges from old shards");
-                                    throw new RuntimeException("Unable to remove edges from old shards");
-                                }
-                            })
-                            .subscribeOn(Schedulers.io())
-                            .toBlocking().last();
+                        // on purpose block this thread before deleting the old edges to be sure there are no gaps
+                        // duplicates are filtered on graph seeking so this is OK
+                        Thread.sleep(1000);
+                        deleteRowBatch.execute();
 
                     }
                     catch ( Throwable t ) {
@@ -250,10 +223,16 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                 }
             }
 
-            Shard updatedShard = new Shard( sourceShard.getShardIndex(), sourceShard.getCreatedTime(), sourceShard.isCompacted() );
-            updatedShard.setShardEnd(Optional.fromNullable(shardEnd));
-            logger.info("updating with shard end: {}", shardEnd );
-            updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, updatedShard, edgeMeta));
+            if (shardEnd != null){
+
+                sourceShard.setShardEnd(
+                    Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
+                );
+
+
+                logger.info("Updating shard {} with shardEnd: {}", sourceShard, shardEnd );
+                updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
+            }
 
         }
 
@@ -265,31 +244,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
             // write the edges into the new shard atomically so we know they all succeed
             newRowBatch.withAtomicBatch(true).execute();
 
-            List<MutationBatch> deleteMutations = new ArrayList<MutationBatch>(1)
-            {{
-                add(deleteRowBatch);
-            }};
-
-
-            if(logger.isTraceEnabled()) {
-                logger.trace("scheduling shard compaction delete");
-            }
-
-            // perform the deletes after some delay, but we need to block before marking this shard as 'compacted'
-            Observable.from(deleteMutations)
-                .delay(1000, TimeUnit.MILLISECONDS)
-                .map(deleteRowBatchSingle -> {
-                    try {
-                        return deleteRowBatchSingle.execute();
-                    } catch (ConnectionException e) {
-                        logger.error("Unable to remove edges from old shards");
-                        throw new RuntimeException("Unable to remove edges from old shards");
-                    }
-                })
-                .subscribeOn(Schedulers.io())
-                .toBlocking().last();
+            // on purpose block this thread before deleting the old edges to be sure there are no gaps
+            // duplicates are filtered on graph seeking so this is OK
+            Thread.sleep(1000);
+            deleteRowBatch.execute();
 
-            //updateShardMetaBatch.execute();
+            updateShardMetaBatch.execute();
         }
         catch ( Throwable t ) {
             logger.error( "Unable to move edges to target shard {}", targetShard );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index c3e0cc0..c7028aa 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -407,7 +407,7 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         GraphValidation.validateSearchByEdgeType( search );
 
         if(logger.isTraceEnabled()){
-            logger.info("getEdgesFromSource shards: {}", shards);
+            logger.trace("getEdgesFromSource shards: {}", shards);
         }
 
         final Id sourceId = search.getNode();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
new file mode 100644
index 0000000..58276fe
--- /dev/null
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/serialize/ShardSerializer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.serialize;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.netflix.astyanax.model.DynamicComposite;
+import com.netflix.astyanax.serializers.AbstractSerializer;
+import com.netflix.astyanax.serializers.BooleanSerializer;
+import com.netflix.astyanax.serializers.ByteSerializer;
+import com.netflix.astyanax.serializers.LongSerializer;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.DirectedEdge;
+import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
+
+import java.nio.ByteBuffer;
+
+
+public class ShardSerializer extends AbstractSerializer<Shard> {
+
+    private static final BooleanSerializer BOOLEAN_SERIALIZER = BooleanSerializer.get();
+    private static final LongSerializer LONG_SERIALIZER = LongSerializer.get();
+    private static final EdgeSerializer EDGE_SERIALIZER = EdgeSerializer.INSTANCE;
+    private static final ByteSerializer BYTE_SERIALIZER = ByteSerializer.get();
+
+
+    public static final ShardSerializer INSTANCE = new ShardSerializer();
+
+
+    @Override
+    public ByteBuffer toByteBuffer(final Shard shard ) {
+
+        DynamicComposite composite = new DynamicComposite();
+
+        composite.addComponent( (byte) 2 , BYTE_SERIALIZER);
+        composite.addComponent( shard.getShardIndex(), LONG_SERIALIZER);
+        composite.addComponent( shard.getCreatedTime(), LONG_SERIALIZER);
+
+        if(shard.getShardEnd().isPresent()) {
+            composite.addComponent(shard.getShardEnd().get(), EDGE_SERIALIZER);
+        }else{
+            composite.addComponent(null, EDGE_SERIALIZER);
+        }
+
+        composite.addComponent( shard.isCompacted(), BOOLEAN_SERIALIZER);
+
+        return composite.serialize();
+    }
+
+
+    @Override
+    public Shard fromByteBuffer( final ByteBuffer byteBuffer ) {
+        DynamicComposite composite = DynamicComposite.fromByteBuffer( byteBuffer );
+
+        Preconditions.checkArgument( composite.size() == 1 || composite.size() == 5,
+            "Composite should have 1 or 5 elements" );
+
+        // this is the legacy column format, return a shard with identifiable values so the column name and timestamp
+        // can be used
+        if( composite.size() == 1){
+
+            final boolean isCompacted = composite.get( 0, BOOLEAN_SERIALIZER);
+            return new Shard(Long.MIN_VALUE, Long.MIN_VALUE, isCompacted);
+
+        }
+        // This is the new format which contains all the information about a Shard.  Include a byte version of 2 if it's
+        // needed later for any reason.
+        else{
+
+            final byte version = composite.get(0, BYTE_SERIALIZER);
+            final long shardIndex = composite.get( 1, LONG_SERIALIZER );
+            final long shardCreated = composite.get( 2, LONG_SERIALIZER );
+            final DirectedEdge shardEnd = composite.get( 3, EDGE_SERIALIZER);
+            final boolean isCompacted = composite.get( 4, BOOLEAN_SERIALIZER);
+
+
+            final Shard shard = new Shard(shardIndex, shardCreated, isCompacted);
+            shard.setShardEnd(Optional.fromNullable(shardEnd));
+            return shard;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 439553c..8fd7cea 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -115,10 +115,10 @@ public class GraphManagerShardConsistencyIT {
         originalShardDelta = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_MIN_DELTA );
 
 
-        ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 5000 );
+        ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 10000 );
 
 
-        final long cacheTimeout = 2000;
+        final long cacheTimeout = 1000;
         //set our cache timeout to the above value
         ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_CACHE_TIMEOUT, cacheTimeout );
 
@@ -128,7 +128,7 @@ public class GraphManagerShardConsistencyIT {
         ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_MIN_DELTA, minDelta );
 
 
-        //get the system property of the UUID to use.  If one is not set, use the defualt
+        // get the system property of the UUID to use.  If one is not set, use the defualt
         String uuidString = System.getProperty( "org.id", "80a42760-b699-11e3-a5e2-0800200c9a66" );
 
         scope = new ApplicationScopeImpl( IdGenerator.createId( UUID.fromString( uuidString ), "test" ) );
@@ -196,8 +196,7 @@ public class GraphManagerShardConsistencyIT {
         };
 
 
-        //final int numInjectors = 2;
-        final int numInjectors = 1;
+        final int numInjectors = 2;
 
         /**
          * create injectors.  This way all the caches are independent of one another.  This is the same as
@@ -280,7 +279,7 @@ public class GraphManagerShardConsistencyIT {
         final List<Throwable> failures = new ArrayList<>();
         Thread.sleep(3000); // let's make sure everything is written
 
-        for(int i = 0; i < 1; i ++) {
+        for(int i = 0; i < 2; i ++) {
 
 
             /**
@@ -312,7 +311,7 @@ public class GraphManagerShardConsistencyIT {
         int compactedCount;
 
 
-        //now start our readers
+        // now start the compaction watcher
 
         while ( true ) {
 
@@ -336,10 +335,10 @@ public class GraphManagerShardConsistencyIT {
                 fail( builder.toString() );
             }
 
-            //reset our count.  Ultimately we'll have 4 groups once our compaction completes
+            // reset our count.  Ultimately we'll have 4 groups once our compaction completes
             compactedCount = 0;
 
-            //we have to get it from the cache, because this will trigger the compaction process
+            // we have to get it from the cache, because this will trigger the compaction process
             final Iterator<ShardEntryGroup> groups = cache.getReadShardGroup( scope, Long.MAX_VALUE, directedEdgeMeta );
             final Set<ShardEntryGroup> shardEntryGroups = new HashSet<>();
 
@@ -433,7 +432,7 @@ public class GraphManagerShardConsistencyIT {
         };
 
 
-        final int numInjectors = 1;
+        final int numInjectors = 2;
 
         /**
          * create injectors.  This way all the caches are independent of one another.  This is the same as
@@ -498,12 +497,11 @@ public class GraphManagerShardConsistencyIT {
             future.get();
         }
 
-        //now get all our shards
+        // now get all our shards
         final NodeShardCache cache = getInstance( injectors, NodeShardCache.class );
 
         final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId, deleteEdgeType );
 
-        //now submit the readers.
         final GraphManagerFactory gmf = getInstance( injectors, GraphManagerFactory.class );
 
 
@@ -527,7 +525,7 @@ public class GraphManagerShardConsistencyIT {
         }
 
 
-        logger.info( "found {} shard groups", shardCount );
+        logger.info( "Found {} shard groups", shardCount );
 
 
         //now mark and delete all the edges
@@ -543,6 +541,7 @@ public class GraphManagerShardConsistencyIT {
 
         long totalDeleted = 0;
 
+        // now do the deletes
         while(count != 0) {
 
             logger.info("total deleted: {}", totalDeleted);
@@ -565,7 +564,7 @@ public class GraphManagerShardConsistencyIT {
         }
 
 
-        //now loop until with a reader until our shards are gone
+        // loop with a reader until our shards are gone
 
 
         /**
@@ -582,7 +581,7 @@ public class GraphManagerShardConsistencyIT {
             @Override
             public void onSuccess( @Nullable final Long result ) {
                 logger.info( "Successfully ran the read, re-running" );
-                deleteExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter ) );
+                deleteExecutor.submit( new ReadWorker( gmf, generator, 0, readMeter ) );
             }
 
 
@@ -593,9 +592,9 @@ public class GraphManagerShardConsistencyIT {
             }
         } );
 
+        Thread.sleep(3000); //  let the edge readers start
 
-        //now start our readers
-
+        // now loop check the shard count
         while ( true ) {
 
             if ( !failures.isEmpty() ) {
@@ -647,9 +646,12 @@ public class GraphManagerShardConsistencyIT {
             Thread.sleep( 2000 );
         }
 
-        //now that we have finished expanding s
+        future.cancel(true); // stop the read future
 
+        //now that we have finished deleting and shards are removed, shutdown
         deleteExecutor.shutdownNow();
+
+        Thread.sleep( 3000 ); // sleep before the next test
     }
 
 
@@ -695,7 +697,7 @@ public class GraphManagerShardConsistencyIT {
 
 
                 if ( i % 100 == 0 ) {
-                    logger.info( Thread.currentThread().getName()+" wrote: " + i );
+                    logger.info( "wrote: " + i );
                 }
             }
 
@@ -741,7 +743,7 @@ public class GraphManagerShardConsistencyIT {
                 logger.info( "Completed reading {} edges", returnedEdgeCount );
 
                 if ( writeCount != returnedEdgeCount ) {
-                    logger.warn( Thread.currentThread().getName()+" - Unexpected edge count returned!!!  Expected {} but was {}", writeCount,
+                    logger.warn( "Unexpected edge count returned!!!  Expected {} but was {}", writeCount,
                         returnedEdgeCount );
                 }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4e407ff6/stack/corepersistence/graph/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties
index 5afc288..e7f7524 100644
--- a/stack/corepersistence/graph/src/test/resources/log4j.properties
+++ b/stack/corepersistence/graph/src/test/resources/log4j.properties
@@ -42,4 +42,5 @@ log4j.logger.cassandra.db=ERROR
 #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=TRACE
 #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE
 #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.NodeShardAllocationImpl=TRACE
 


Mime
View raw message