usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [08/20] usergrid git commit: Reset original multi-row column iterator and use a new class for the smart iterating over edge shards. Change to system time for 'last write wins' in cassandra instead of a shard's 'createdTime'.
Date Wed, 23 Mar 2016 17:34:36 GMT
Reset original multi-row column iterator and use a new class for the smart iterating over edge shards.  Change to system time for 'last write wins' in cassandra instead of a shard's 'createdTime'.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/bec50939
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/bec50939
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/bec50939

Branch: refs/heads/release-2.1.1
Commit: bec5093978175c87b7d76f66c8a503f062275ead
Parents: 58ae197
Author: Michael Russo <mrusso@apigee.com>
Authored: Sun Mar 20 17:49:10 2016 -0700
Committer: Michael Russo <mrusso@apigee.com>
Committed: Sun Mar 20 17:49:10 2016 -0700

----------------------------------------------------------------------
 .../core/astyanax/MultiRowColumnIterator.java   | 346 ++++++--------
 .../astyanax/MultiRowShardColumnIterator.java   | 455 +++++++++++++++++++
 .../shard/impl/EdgeShardSerializationImpl.java  |  14 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  99 ++--
 .../impl/shard/impl/ShardsColumnIterator.java   |   3 +-
 .../graph/GraphManagerShardConsistencyIT.java   |   3 +-
 .../impl/shard/EdgeShardSerializationTest.java  |  12 +-
 7 files changed, 665 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index 6049c1f..c071d53 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -20,9 +20,14 @@
 package org.apache.usergrid.persistence.core.astyanax;
 
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
 
-import org.apache.usergrid.persistence.core.shard.SmartShard;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,18 +77,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
     private Iterator<T> currentColumnIterator;
 
-    private Iterator<SmartShard> currentShardIterator;
-
-    private List<SmartShard> rowKeysWithShardEnd;
-
-    private SmartShard currentShard;
-
-    private List<T> resultsTracking;
-
-    private int skipSize = 0; // used for determining if we've skipped a whole page during shard transition
-
-    private boolean ascending = false;
-
 
     /**
      * Remove after finding bug
@@ -115,63 +108,18 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
         this.keyspace = keyspace;
         this.consistencyLevel = consistencyLevel;
         this.moreToReturn = true;
-        this.resultsTracking = new ArrayList<>();
-
-    }
-
-    // temporarily use a new constructor for specific searches until we update each caller of this class
-    public MultiRowColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf,
-                                   final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser,
-                                   final ColumnSearch<T> columnSearch, final Comparator<T> comparator,
-                                   final Collection<R> rowKeys, final int pageSize,
-                                   final List<SmartShard> rowKeysWithShardEnd,
-                                   final boolean ascending) {
-        this.cf = cf;
-        this.pageSize = pageSize;
-        this.columnParser = columnParser;
-        this.columnSearch = columnSearch;
-        this.comparator = comparator;
-        this.rowKeys = rowKeys;
-        this.keyspace = keyspace;
-        this.consistencyLevel = consistencyLevel;
-        this.moreToReturn = true;
-        this.rowKeysWithShardEnd = rowKeysWithShardEnd;
-        this.resultsTracking = new ArrayList<>();
-        this.ascending = ascending;
 
+        //        seenResults = new HashMap<>( pageSize * 10 );
     }
 
 
     @Override
     public boolean hasNext() {
 
-        // if column iterator is null, initialize with first call to advance()
-        // advance if we know there more columns exist in the current shard but we've exhausted this page fetch from c*
         if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) {
             advance();
         }
 
-        // when there are no more columns, nothing reported to return, but more shards available, go to the next shard
-        if( currentColumnIterator != null && !currentColumnIterator.hasNext() &&
-            !moreToReturn && currentShardIterator.hasNext()){
-
-                if(logger.isTraceEnabled()){
-                    logger.trace("Advancing shard iterator");
-                    logger.trace("Shard before advance: {}", currentShard);
-                }
-
-
-                // advance to the next shard
-                currentShard = currentShardIterator.next();
-
-                if(logger.isTraceEnabled()){
-                    logger.trace("Shard after advance: {}", currentShard);
-
-                }
-
-                advance();
-
-        }
 
         return currentColumnIterator.hasNext();
     }
@@ -198,6 +146,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
     public void advance() {
 
+
         if (logger.isTraceEnabled()) logger.trace( "Advancing multi row column iterator" );
 
         /**
@@ -206,130 +155,32 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
         final boolean skipFirstColumn = startColumn != null;
 
-        final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
-
-        final RangeBuilder rangeBuilder = new RangeBuilder();
-
-
 
 
-        if(currentShardIterator == null){
-
-            // flip the order of our shards if ascending
-            if(ascending){
-                Collections.reverse(rowKeysWithShardEnd);
-            }
-
-            currentShardIterator = rowKeysWithShardEnd.iterator();
-
-        }
-
-        if(currentShard == null){
-
-            if(logger.isTraceEnabled()){
-                logger.trace("currentShard: {}", currentShard);
-            }
-
-            currentShard = currentShardIterator.next();
-
-            if(logger.isTraceEnabled()){
-                logger.trace("all shards when starting: {}", rowKeysWithShardEnd);
-                logger.trace("initializing iterator with shard: {}", currentShard);
-            }
-
-
-        }
+        final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
 
+        final RangeBuilder rangeBuilder = new RangeBuilder();
 
 
-        // initial request, build the range with no start and no end
-        if ( startColumn == null && currentShard.getShardEnd() == null ){
+        //set the range into the search
 
+        if ( startColumn == null ) {
             columnSearch.buildRange( rangeBuilder );
-
-            if(logger.isTraceEnabled()){
-                logger.trace("initial search (no start or shard end)");
-            }
-
         }
-        // if there's only a startColumn set the range start startColumn always
-        else if ( startColumn != null && currentShard.getShardEnd() == null ){
-
+        else {
             columnSearch.buildRange( rangeBuilder, startColumn, null );
-
-            if(logger.isTraceEnabled()){
-                logger.trace("search (no shard end) with start: {}", startColumn);
-            }
-
-        }
-        // if there's only a shardEnd, set the start/end according based on the search order
-        else if ( startColumn == null && currentShard.getShardEnd() != null ){
-
-            T shardEnd = (T) currentShard.getShardEnd();
-
-            // if we have a shardEnd and it's not an ascending search, use the shardEnd as a start
-            if(!ascending) {
-
-                columnSearch.buildRange(rangeBuilder, shardEnd, null);
-
-                if(logger.isTraceEnabled()){
-                    logger.trace("search descending with start: {}", shardEnd);
-                }
-
-            }
-            // if we have a shardEnd and it is an ascending search, use the shardEnd as the end
-            else{
-
-                columnSearch.buildRange( rangeBuilder, null, shardEnd );
-
-                if(logger.isTraceEnabled()){
-                    logger.trace("search ascending with end: {}", shardEnd);
-                }
-
-            }
-
         }
-        // if there's both a startColumn and a shardEnd, decide which should be used as start/end based on search order
-        else if ( startColumn != null && currentShard.getShardEnd() != null) {
 
-            T shardEnd = (T) currentShard.getShardEnd();
-
-
-            // if the search is not ascending, set the start to be the older edge
-            if(!ascending){
-
-                T searchStart = comparator.compare(shardEnd, startColumn) > 0 ? shardEnd : startColumn;
-                columnSearch.buildRange( rangeBuilder, searchStart, null);
-
-                if(logger.isTraceEnabled()){
-                    logger.trace("search descending with start: {} in shard", searchStart, currentShard);
-                }
-
-            }
-            // if the search is ascending, then always use the startColumn for the start and shardEnd for the range end
-            else{
-
-                columnSearch.buildRange( rangeBuilder, startColumn , shardEnd);
-
-                if(logger.isTraceEnabled()){
-                    logger.trace("search with start: {}, end: {}", startColumn, shardEnd);
-                }
-
-
-
-            }
-
-        }
 
         rangeBuilder.setLimit( selectSize );
 
-        if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query with shard {}", currentShard );
+        if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query" );
 
         /**
          * Get our list of slices
          */
         final RowSliceQuery<R, C> query =
-            keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( (R) currentShard.getRowKey() )
+            keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
                 .withColumnRange( rangeBuilder.build() );
 
         final Rows<R, C> result;
@@ -341,43 +192,36 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
         }
 
 
+        //now aggregate them together
 
+        //this is an optimization.  It's faster to see if we only have values for one row,
+        // then return the iterator of those columns than
+        //do a merge if only one row has data.
 
-        final List<T> mergedResults;
-
-        skipSize = 0;
 
-        mergedResults = processResults( result, selectSize );
+        final List<T> mergedResults;
 
-        if(logger.isTraceEnabled()){
-            logger.trace("skipped amount: {}", skipSize);
+        if ( containsSingleRowOnly( result ) ) {
+            mergedResults = singleRowResult( result );
+        }
+        else {
+            mergedResults = mergeResults( result, selectSize );
         }
 
 
 
-        final int size = mergedResults.size();
 
 
+        //we've parsed everything truncate to the first pageSize, it's all we can ensure is correct without another
+        //trip back to cassandra
 
-        if(logger.isTraceEnabled()){
-            logger.trace("current shard: {}, retrieved size: {}", currentShard, size);
-            logger.trace("selectSize={}, size={}, ", selectSize, size);
+        //discard our first element (maybe)
 
 
-        }
-
-        moreToReturn = size == selectSize;
-
-        if(selectSize == 1001 && mergedResults.size() == 1000){
-            moreToReturn = true;
-        }
 
+        final int size = mergedResults.size();
 
-        // if a whole page is skipped OR the result size equals the the difference of what's skipped,
-        // it is likely during a shard transition and we should assume there is more to read
-        if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize || size == (selectSize -1) - skipSize ){
-            moreToReturn = true;
-        }
+        moreToReturn = size == selectSize;
 
         //we have a first column to to check
         if( size > 0) {
@@ -386,53 +230,93 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
             //The search has either told us to skip the first element, or it matches our last, therefore we disregard it
             if(columnSearch.skipFirst( firstResult ) || (skipFirstColumn && comparator.compare( startColumn, firstResult ) == 0)){
-                if(logger.isTraceEnabled()){
-                    logger.trace("removing an entry");
-
-                }
                 mergedResults.remove( 0 );
             }
 
         }
 
 
-        // set the start column for the enxt query
         if(moreToReturn && mergedResults.size() > 0){
             startColumn = mergedResults.get( mergedResults.size()  - 1 );
-
         }
 
 
         currentColumnIterator = mergedResults.iterator();
 
+        if (logger.isTraceEnabled()) logger.trace( "Finished parsing {} rows for results", rowKeys.size() );
+    }
+
+
+    /**
+     * Return true if we have < 2 rows with columns, false otherwise
+     */
+    private boolean containsSingleRowOnly( final Rows<R, C> result ) {
 
-        //force an advance of this iterator when there are still shards to read but result set on current shard is 0
-        if(size == 0 && currentShardIterator.hasNext()){
-            hasNext();
+        int count = 0;
+
+        for ( R key : result.getKeys() ) {
+            if ( result.getRow( key ).getColumns().size() > 0 ) {
+                count++;
+
+                //we have more than 1 row with values, return them
+                if ( count > 1 ) {
+                    return false;
+                }
+            }
         }
 
-       if(logger.isTraceEnabled()){
-           logger.trace("currentColumnIterator.hasNext()={}, " +
-                   "moreToReturn={}, currentShardIterator.hasNext()={}",
-               currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext());
-       }
+        return true;
+    }
+
+
+    /**
+     * A single row is present, only parse the single row
+     * @param result
+     * @return
+     */
+    private List<T> singleRowResult( final Rows<R, C> result ) {
+
+        if (logger.isTraceEnabled()) logger.trace( "Only a single row has columns.  Parsing directly" );
+
+        for ( R key : result.getKeys() ) {
+            final ColumnList<C> columnList = result.getRow( key ).getColumns();
+
+            final int size = columnList.size();
+
+            if ( size > 0 ) {
+
+                final List<T> results = new ArrayList<>(size);
+
+                for(Column<C> column: columnList){
+                    results.add(columnParser.parseColumn( column ));
+                }
+
+                return results;
 
 
+            }
+        }
+
+        //we didn't have any results, just return nothing
+        return Collections.<T>emptyList();
     }
 
 
     /**
-     * Process the result set and filter any duplicates that may have already been seen in previous shards.  During
-     * a shard transition, there could be the same columns in multiple shards (rows).  This will also allow for
-     * filtering the startColumn (the seek starting point) when paging a row in Cassandra.
-     *
+     * Multiple rows are present, merge them into a single result set
      * @param result
      * @return
      */
-    private List<T> processResults(final Rows<R, C> result, final int maxSize ) {
+    private List<T> mergeResults( final Rows<R, C> result, final int maxSize ) {
+
+        if (logger.isTraceEnabled()) logger.trace( "Multiple rows have columns.  Merging" );
+
 
         final List<T> mergedResults = new ArrayList<>(maxSize);
 
+
+
+
         for ( final R key : result.getKeys() ) {
             final ColumnList<C> columns = result.getRow( key ).getColumns();
 
@@ -441,24 +325,52 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
                 final T returnedValue = columnParser.parseColumn( column );
 
-                // use an O(log n) search, same as a tree, but with fast access to indexes for later operations
-                int searchIndex = Collections.binarySearch( resultsTracking, returnedValue, comparator );
+                //Use an O(log n) search, same as a tree, but with fast access to indexes for later operations
+                int searchIndex = Collections.binarySearch( mergedResults, returnedValue, comparator );
+
+                /**
+                 * DO NOT remove this section of code. If you're seeing inconsistent results during shard transition,
+                 * you'll
+                 * need to enable this
+                 */
+                //
+                //                if ( previous != null && comparator.compare( previous, returnedValue ) == 0 ) {
+                //                    throw new RuntimeException( String.format(
+                //                            "Cassandra returned 2 unique columns,
+                // but your comparator marked them as equal.  This " +
+                //                                    "indicates a bug in your comparator.  Previous value was %s and
+                // current value is " +
+                //                                    "%s",
+                //                            previous, returnedValue ) );
+                //                }
+                //
+                //                previous = returnedValue;
+
+                //we've already seen it, no-op
+                if(searchIndex > -1){
+                    continue;
+                }
 
+                final int insertIndex = (searchIndex+1)*-1;
 
-                //we've already seen the column, filter it out as we might be in a shard transition or our start column
-                if(searchIndex > -1){
-                    if(logger.isTraceEnabled()){
-                        logger.trace("skipping column as it was already retrieved before");
-                    }
-                    skipSize++;
+                //it's at the end of the list, don't bother inserting just to remove it
+                if(insertIndex >= maxSize){
                     continue;
                 }
 
+                if (logger.isTraceEnabled()) logger.trace( "Adding value {} to merged set at index {}", returnedValue, insertIndex );
+
+                mergedResults.add( insertIndex, returnedValue );
 
-                resultsTracking.add(returnedValue);
-                mergedResults.add(returnedValue );
 
+                //prune the mergedResults
+                while ( mergedResults.size() > maxSize ) {
 
+                    if (logger.isTraceEnabled()) logger.trace( "Trimming results to size {}", maxSize );
+
+                    //just remove from our tail until the size falls to the correct value
+                    mergedResults.remove(mergedResults.size()-1);
+                }
             }
 
             if (logger.isTraceEnabled()) logger.trace( "Candidate result set size is {}", mergedResults.size() );
@@ -467,6 +379,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
         return mergedResults;
     }
 
-}
 
+}
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
new file mode 100644
index 0000000..bfc04c4
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.astyanax;
+
+
+import java.util.*;
+
+import org.apache.usergrid.persistence.core.shard.SmartShard;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ColumnFamily;
+import com.netflix.astyanax.model.ColumnList;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.model.Rows;
+import com.netflix.astyanax.query.RowSliceQuery;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ *
+ *
+ */
+public class MultiRowShardColumnIterator<R, C, T> implements Iterator<T> {
+
+    private static final Logger logger = LoggerFactory.getLogger( MultiRowColumnIterator.class );
+
+    private final int pageSize;
+
+    private final ColumnFamily<R, C> cf;
+
+
+    private final ColumnParser<C, T> columnParser;
+
+    private final ColumnSearch<T> columnSearch;
+
+    private final Comparator<T> comparator;
+
+
+    private final Collection<R> rowKeys;
+
+    private final Keyspace keyspace;
+
+    private final ConsistencyLevel consistencyLevel;
+
+
+    private T startColumn;
+
+
+    private boolean moreToReturn;
+
+
+    private Iterator<T> currentColumnIterator;
+
+    private Iterator<SmartShard> currentShardIterator;
+
+    private List<SmartShard> rowKeysWithShardEnd;
+
+    private SmartShard currentShard;
+
+    private List<T> resultsTracking;
+
+    private int skipSize = 0; // used for determining if we've skipped a whole page during shard transition
+
+    private boolean ascending = false;
+
+
+    /**
+     * Remove after finding bug
+     */
+
+
+    //    private int advanceCount;
+    //
+    //    private final HashMap<T, SeekPosition> seenResults;
+
+    /**
+     * Complete Remove
+     */
+
+
+    /**
+     * Create the iterator
+     */
+    // temporarily use a new constructor for specific searches until we update each caller of this class
+    public MultiRowShardColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf,
+                                   final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser,
+                                   final ColumnSearch<T> columnSearch, final Comparator<T> comparator,
+                                   final Collection<R> rowKeys, final int pageSize,
+                                   final List<SmartShard> rowKeysWithShardEnd,
+                                   final boolean ascending) {
+        this.cf = cf;
+        this.pageSize = pageSize;
+        this.columnParser = columnParser;
+        this.columnSearch = columnSearch;
+        this.comparator = comparator;
+        this.rowKeys = rowKeys;
+        this.keyspace = keyspace;
+        this.consistencyLevel = consistencyLevel;
+        this.moreToReturn = true;
+        this.rowKeysWithShardEnd = rowKeysWithShardEnd;
+        this.resultsTracking = new ArrayList<>();
+        this.ascending = ascending;
+
+    }
+
+
+    @Override
+    public boolean hasNext() {
+
+        // if column iterator is null, initialize with first call to advance()
+        // advance if we know there more columns exist in the current shard but we've exhausted this page fetch from c*
+        if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) {
+            advance();
+        }
+
+        // when there are no more columns, nothing reported to return, but more shards available, go to the next shard
+        if( currentColumnIterator != null && !currentColumnIterator.hasNext() &&
+            !moreToReturn && currentShardIterator.hasNext()){
+
+            if(logger.isTraceEnabled()){
+                logger.trace("Advancing shard iterator");
+                logger.trace("Shard before advance: {}", currentShard);
+            }
+
+
+            // advance to the next shard
+            currentShard = currentShardIterator.next();
+
+            if(logger.isTraceEnabled()){
+                logger.trace("Shard after advance: {}", currentShard);
+
+            }
+
+            advance();
+
+        }
+
+        return currentColumnIterator.hasNext();
+    }
+
+
+    @Override
+    public T next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No new element exists" );
+        }
+
+        final T next = currentColumnIterator.next();
+
+
+        return next;
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported this is a read only iterator" );
+    }
+
+
+    public void advance() {
+
+        if (logger.isTraceEnabled()) logger.trace( "Advancing multi row column iterator" );
+
+        /**
+         * If the edge is present, we need to being seeking from this
+         */
+
+        final boolean skipFirstColumn = startColumn != null;
+
+        final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
+
+        final RangeBuilder rangeBuilder = new RangeBuilder();
+
+
+
+
+        if(currentShardIterator == null){
+
+            // flip the order of our shards if ascending
+            if(ascending){
+                Collections.reverse(rowKeysWithShardEnd);
+            }
+
+            currentShardIterator = rowKeysWithShardEnd.iterator();
+
+        }
+
+        if(currentShard == null){
+
+            if(logger.isTraceEnabled()){
+                logger.trace("currentShard: {}", currentShard);
+            }
+
+            currentShard = currentShardIterator.next();
+
+            if(logger.isTraceEnabled()){
+                logger.trace("all shards when starting: {}", rowKeysWithShardEnd);
+                logger.trace("initializing iterator with shard: {}", currentShard);
+            }
+
+
+        }
+
+
+
+        // initial request, build the range with no start and no end
+        if ( startColumn == null && currentShard.getShardEnd() == null ){
+
+            columnSearch.buildRange( rangeBuilder );
+
+            if(logger.isTraceEnabled()){
+                logger.trace("initial search (no start or shard end)");
+            }
+
+        }
+        // if there's only a startColumn set the range start startColumn always
+        else if ( startColumn != null && currentShard.getShardEnd() == null ){
+
+            columnSearch.buildRange( rangeBuilder, startColumn, null );
+
+            if(logger.isTraceEnabled()){
+                logger.trace("search (no shard end) with start: {}", startColumn);
+            }
+
+        }
+        // if there's only a shardEnd, set the start/end according based on the search order
+        else if ( startColumn == null && currentShard.getShardEnd() != null ){
+
+            T shardEnd = (T) currentShard.getShardEnd();
+
+            // if we have a shardEnd and it's not an ascending search, use the shardEnd as a start
+            if(!ascending) {
+
+                columnSearch.buildRange(rangeBuilder, shardEnd, null);
+
+                if(logger.isTraceEnabled()){
+                    logger.trace("search descending with start: {}", shardEnd);
+                }
+
+            }
+            // if we have a shardEnd and it is an ascending search, use the shardEnd as the end
+            else{
+
+                columnSearch.buildRange( rangeBuilder, null, shardEnd );
+
+                if(logger.isTraceEnabled()){
+                    logger.trace("search ascending with end: {}", shardEnd);
+                }
+
+            }
+
+        }
+        // if there's both a startColumn and a shardEnd, decide which should be used as start/end based on search order
+        else if ( startColumn != null && currentShard.getShardEnd() != null) {
+
+            T shardEnd = (T) currentShard.getShardEnd();
+
+
+            // if the search is not ascending, set the start to be the older edge
+            if(!ascending){
+
+                T searchStart = comparator.compare(shardEnd, startColumn) > 0 ? shardEnd : startColumn;
+                columnSearch.buildRange( rangeBuilder, searchStart, null);
+
+                if(logger.isTraceEnabled()){
+                    logger.trace("search descending with start: {} in shard", searchStart, currentShard);
+                }
+
+            }
+            // if the search is ascending, then always use the startColumn for the start and shardEnd for the range end
+            else{
+
+                columnSearch.buildRange( rangeBuilder, startColumn , shardEnd);
+
+                if(logger.isTraceEnabled()){
+                    logger.trace("search with start: {}, end: {}", startColumn, shardEnd);
+                }
+
+
+
+            }
+
+        }
+
+        rangeBuilder.setLimit( selectSize );
+
+        if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query with shard {}", currentShard );
+
+        /**
+         * Get our list of slices
+         */
+        final RowSliceQuery<R, C> query =
+            keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( (R) currentShard.getRowKey() )
+                .withColumnRange( rangeBuilder.build() );
+
+        final Rows<R, C> result;
+        try {
+            result = query.execute().getResult();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( "Unable to connect to casandra", e );
+        }
+
+
+
+
+        final List<T> mergedResults;
+
+        skipSize = 0;
+
+        mergedResults = processResults( result, selectSize );
+
+        if(logger.isTraceEnabled()){
+            logger.trace("skipped amount: {}", skipSize);
+        }
+
+
+
+        final int size = mergedResults.size();
+
+
+
+        if(logger.isTraceEnabled()){
+            logger.trace("current shard: {}, retrieved size: {}", currentShard, size);
+            logger.trace("selectSize={}, size={}, ", selectSize, size);
+
+
+        }
+
+        moreToReturn = size == selectSize;
+
+        if(selectSize == 1001 && mergedResults.size() == 1000){
+            moreToReturn = true;
+        }
+
+
+        // if a whole page is skipped OR the result size equals the the difference of what's skipped,
+        // it is likely during a shard transition and we should assume there is more to read
+        if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize || size == (selectSize -1) - skipSize ){
+            moreToReturn = true;
+        }
+
+        //we have a first column to to check
+        if( size > 0) {
+
+            final T firstResult = mergedResults.get( 0 );
+
+            //The search has either told us to skip the first element, or it matches our last, therefore we disregard it
+            if(columnSearch.skipFirst( firstResult ) || (skipFirstColumn && comparator.compare( startColumn, firstResult ) == 0)){
+                if(logger.isTraceEnabled()){
+                    logger.trace("removing an entry");
+
+                }
+                mergedResults.remove( 0 );
+            }
+
+        }
+
+
+        // set the start column for the enxt query
+        if(moreToReturn && mergedResults.size() > 0){
+            startColumn = mergedResults.get( mergedResults.size()  - 1 );
+
+        }
+
+
+        currentColumnIterator = mergedResults.iterator();
+
+
+        //force an advance of this iterator when there are still shards to read but result set on current shard is 0
+        if(size == 0 && currentShardIterator.hasNext()){
+            hasNext();
+        }
+
+        if(logger.isTraceEnabled()){
+            logger.trace("currentColumnIterator.hasNext()={}, " +
+                    "moreToReturn={}, currentShardIterator.hasNext()={}",
+                currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext());
+        }
+
+
+    }
+
+
+    /**
+     * Process the result set and filter any duplicates that may have already been seen in previous shards.  During
+     * a shard transition, there could be the same columns in multiple shards (rows).  This will also allow for
+     * filtering the startColumn (the seek starting point) when paging a row in Cassandra.
+     *
+     * @param result
+     * @return
+     */
+    private List<T> processResults(final Rows<R, C> result, final int maxSize ) {
+
+        final List<T> mergedResults = new ArrayList<>(maxSize);
+
+        for ( final R key : result.getKeys() ) {
+            final ColumnList<C> columns = result.getRow( key ).getColumns();
+
+
+            for (final Column<C> column :columns  ) {
+
+                final T returnedValue = columnParser.parseColumn( column );
+
+                // use an O(log n) search, same as a tree, but with fast access to indexes for later operations
+                int searchIndex = Collections.binarySearch( resultsTracking, returnedValue, comparator );
+
+
+                //we've already seen the column, filter it out as we might be in a shard transition or our start column
+                if(searchIndex > -1){
+                    if(logger.isTraceEnabled()){
+                        logger.trace("skipping column as it was already retrieved before");
+                    }
+                    skipSize++;
+                    continue;
+                }
+
+
+                resultsTracking.add(returnedValue);
+                mergedResults.add(returnedValue );
+
+
+            }
+
+            if (logger.isTraceEnabled()) logger.trace( "Candidate result set size is {}", mergedResults.size() );
+
+        }
+        return mergedResults;
+    }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
index d22f472..5eeeae0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeShardSerializationImpl.java
@@ -107,8 +107,11 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
         final MutationBatch batch = keyspace.prepareMutationBatch();
 
-        batch.withTimestamp( shard.getCreatedTime() ).withRow( EDGE_SHARDS, rowKey )
-             .putColumn( shard.getShardIndex(), SHARD_SERIALIZER.toByteBuffer(shard));
+        // write the row with a current timestamp so we can ensure that it's persisted with updated shard meta
+        long batchTimestamp = System.currentTimeMillis();
+
+        batch.withTimestamp( batchTimestamp ).withRow( EDGE_SHARDS, rowKey )
+             .putColumn( shard.getShardIndex(), SHARD_SERIALIZER.toByteBuffer(shard)).setTimestamp(batchTimestamp);
 
         return batch;
     }
@@ -163,8 +166,11 @@ public class EdgeShardSerializationImpl implements EdgeShardSerialization {
 
         final MutationBatch batch = keyspace.prepareMutationBatch();
 
-        batch.withTimestamp(shard.getCreatedTime()).withRow( EDGE_SHARDS, rowKey )
-            .deleteColumn( shard.getShardIndex() );
+        // write the row with a current timestamp so we can ensure that it's persisted with updated shard meta
+        long batchTimestamp = System.currentTimeMillis();
+
+        batch.withTimestamp(batchTimestamp).withRow( EDGE_SHARDS, rowKey )
+            .deleteColumn( shard.getShardIndex() ).setTimestamp(batchTimestamp);
 
         return batch;
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 8728c6c..e63db46 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -161,17 +161,22 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         final int maxWorkSize = graphFig.getScanPageSize();
 
 
-        final MutationBatch newRowBatch = keyspace.prepareMutationBatch();
-        final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch();
-        final MutationBatch updateShardMetaBatch = keyspace.prepareMutationBatch();
+
 
         /**
          * As we move edges, we want to keep track of it
          */
-        long edgeCount = 0;
+        long totalEdgeCount = 0;
 
 
         for ( Shard sourceShard : sourceShards ) {
+
+            final MutationBatch newRowBatch = keyspace.prepareMutationBatch();
+            final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch();
+            final MutationBatch updateShardMetaBatch = keyspace.prepareMutationBatch();
+
+            long edgeCount = 0;
+
             Iterator<MarkedEdge> edges = edgeMeta
                 .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singleton( sourceShard ),
                     Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING );
@@ -183,6 +188,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
                 final long edgeTimestamp = edge.getTimestamp();
 
+                shardEnd = edge;
 
                 /**
                  * The edge is within a different shard, break
@@ -203,6 +209,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                 edgeCount++;
 
 
+
                 // if we're at our count, execute the mutation of writing the edges to the new row, then remove them
                 // from the old rows
                 if ( edgeCount % maxWorkSize == 0 ) {
@@ -214,15 +221,15 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                         // write the edges into the new shard atomically so we know they all succeed
                         newRowBatch.withAtomicBatch(true).execute();
 
-                        // set the shardEnd after the write is known to be successful
-                        shardEnd = edge;
 
                         // Update the shard end after each batch so any reads during transition stay as close to current
                         sourceShard.setShardEnd(
                             Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
                         );
 
-                        logger.info("Updating shard {} for nodes {} with shardEnd {}", sourceShard, edgeMeta.getNodes(), shardEnd );
+                        if(logger.isTraceEnabled()) {
+                            logger.trace("Updating shard {} during batch removal with shardEnd {}", sourceShard, shardEnd);
+                        }
                         updateShardMetaBatch.mergeShallow(
                             edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
 
@@ -231,74 +238,85 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                         // on purpose block this thread before deleting the old edges to be sure there are no gaps
                         // duplicates are filtered on graph seeking so this is OK
                         Thread.sleep(1000);
-                        logger.info("Deleting batch of {} from old shard", maxWorkSize);
-                        deleteRowBatch.execute();
+
+                        if(logger.isTraceEnabled()) {
+                            logger.trace("Deleting batch of {} from old shard", maxWorkSize);
+                        }
+                        deleteRowBatch.withAtomicBatch(true).execute();
+
+                        updateShardMetaBatch.execute();
 
 
                     }
                     catch ( Throwable t ) {
                         logger.error( "Unable to move edges from shard {} to shard {}", sourceShard, targetShard );
                     }
-                }else {
-
-                    shardEnd = edge;
 
+                    totalEdgeCount += edgeCount;
+                    edgeCount = 0;
                 }
 
 
 
             }
 
-            if (shardEnd != null && edgeCount > 0){
+            totalEdgeCount += edgeCount;
 
-                sourceShard.setShardEnd(
-                    Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
-                );
+            try {
 
-                logger.info("Updating shard {} for nodes {} with shardEnd {}", sourceShard, shardEnd );
-                updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
-            }
+                // write the edges into the new shard atomically so we know they all succeed
+                newRowBatch.withAtomicBatch(true).execute();
 
-        }
+                // on purpose block this thread before deleting the old edges to be sure there are no gaps
+                // duplicates are filtered on graph seeking so this is OK
+                Thread.sleep(1000);
 
+                if(logger.isTraceEnabled()) {
+                    logger.trace("Deleting remaining {} edges from old shard", edgeCount);
+                }
+                deleteRowBatch.withAtomicBatch(true).execute();
 
+                if (shardEnd != null){
 
+                    sourceShard.setShardEnd(
+                        Optional.of(new DirectedEdge(shardEnd.getTargetNode(), shardEnd.getTimestamp()))
+                    );
 
-        try {
+                    if(logger.isTraceEnabled()) {
+                        logger.trace("Updating for last time shard {} with shardEnd {}", sourceShard, shardEnd);
+                    }
+                    updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, sourceShard, edgeMeta));
+                    updateShardMetaBatch.execute();
+                }
 
-            // write the edges into the new shard atomically so we know they all succeed
-            newRowBatch.withAtomicBatch(true).execute();
 
-            // on purpose block this thread before deleting the old edges to be sure there are no gaps
-            // duplicates are filtered on graph seeking so this is OK
-            Thread.sleep(1000);
+            }
+            catch ( Throwable t ) {
+                logger.error( "Unable to move edges to target shard {}", targetShard );
+            }
 
-            logger.info("Deleting remaining edges from old shard");
-            deleteRowBatch.execute();
 
-            // now update with our shard end
-            updateShardMetaBatch.execute();
 
         }
-        catch ( Throwable t ) {
-            logger.error( "Unable to move edges to target shard {}", targetShard );
-        }
+
+
 
 
         if (logger.isTraceEnabled()) {
-            logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, edgeCount);
+            logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount);
         }
-        logger.info("Finished compacting {} shards and moved {} edges", sourceShards, edgeCount);
+
+        logger.info("Finished compacting {} shards and moved {} edges", sourceShards, totalEdgeCount);
 
 
-        resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
+        resultBuilder.withCopiedEdges( totalEdgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
 
         /**
          * We didn't move anything this pass, mark the shard as compacted.  If we move something,
          * it means that we missed it on the first pass
          * or someone is still not writing to the target shard only.
          */
-        if ( edgeCount == 0 ) {
+        if ( totalEdgeCount == 0 ) {
 
 
             //now that we've marked our target as compacted, we can successfully remove any shards that are not
@@ -329,11 +347,12 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                 throw new RuntimeException( "Unable to connect to cassandra", e );
             }
 
-
-            logger.info( "Shard has been fully compacted.  Marking shard {} as compacted in Cassandra", targetShard );
-
             //Overwrite our shard index with a newly created one that has been marked as compacted
             Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
+            compactedShard.setShardEnd(targetShard.getShardEnd());
+
+            logger.info( "Shard has been fully compacted.  Marking shard {} as compacted in Cassandra", compactedShard );
+
 
             final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
             try {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
index e609d33..e2dd549 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 import java.util.*;
 
+import org.apache.usergrid.persistence.core.astyanax.MultiRowShardColumnIterator;
 import org.apache.usergrid.persistence.core.shard.SmartShard;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.slf4j.Logger;
@@ -136,7 +137,7 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
             logger.trace("Searching with row keys {}", rowKeys);
         }
 
-        currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf,  consistencyLevel, searcher, searcher,
+        currentColumnIterator = new MultiRowShardColumnIterator<>( keyspace, cf,  consistencyLevel, searcher, searcher,
             searcher.getComparator(), rowKeys, pageSize, rowKeysWithShardEnd, ascending);
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 9e6996d..e7027f4 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -82,6 +82,7 @@ import static org.junit.Assert.fail;
 public class GraphManagerShardConsistencyIT {
     private static final Logger logger = LoggerFactory.getLogger( GraphManagerShardConsistencyIT.class );
 
+
     private static final MetricRegistry registry = new MetricRegistry();
 
     private static final Meter writeMeter = registry.meter( "writeThroughput" );
@@ -102,7 +103,7 @@ public class GraphManagerShardConsistencyIT {
 
     protected ListeningExecutorService deleteExecutor;
 
-    protected int TARGET_NUM_SHARDS = 6;
+    protected int TARGET_NUM_SHARDS = 5;
 
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/bec50939/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
index 1f8bfa9..145aa03 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/EdgeShardSerializationTest.java
@@ -209,7 +209,7 @@ public class EdgeShardSerializationTest {
     }
 
     @Test
-    public void sameShardIndexRemoval() throws ConnectionException {
+    public void testShardDelete() throws ConnectionException {
 
         final Id now = IdGenerator.createId( "test" );
 
@@ -217,11 +217,15 @@ public class EdgeShardSerializationTest {
 
         final Shard shard1 = new Shard( 1000L, timestamp, false );
         final Shard shard2 = new Shard( shard1.getShardIndex(), timestamp * 2, true );
+        final Shard shard3 = new Shard( shard2.getShardIndex() * 2, timestamp * 3, true );
+
 
 
         final DirectedEdgeMeta sourceEdgeMeta = DirectedEdgeMeta.fromSourceNodeTargetType( now, "edgeType", "subType" );
         MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, shard1, sourceEdgeMeta );
         batch.mergeShallow( edgeShardSerialization.writeShardMeta( scope, shard2, sourceEdgeMeta ) );
+        batch.mergeShallow( edgeShardSerialization.writeShardMeta( scope, shard3, sourceEdgeMeta ) );
+
         batch.execute();
 
 
@@ -229,16 +233,16 @@ public class EdgeShardSerializationTest {
             edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta );
 
         // Latest timestamp  comes first
-        assertEquals( shard2, results.next() );
+        assertEquals( shard3, results.next() );
 
         // This should now not remove anything
-        edgeShardSerialization.removeShardMeta( scope, shard1, sourceEdgeMeta ).execute();
+        edgeShardSerialization.removeShardMeta( scope, shard3, sourceEdgeMeta ).execute();
 
 
         // Get iterator again
         results = edgeShardSerialization.getShardMetaData( scope, Optional.<Shard>absent(), sourceEdgeMeta );
 
-        // We should still have shard3 stored
+        // We should still have shard2 stored
         assertEquals( shard2, results.next() );
 
 


Mime
View raw message