usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [04/20] usergrid git commit: Clean up the logging, ensure the order of shard iterator within MultiRowColumnIterator is correct. Restore NodeShardCache logic.
Date Wed, 23 Mar 2016 17:34:32 GMT
Clean up the logging, ensure the order of shard iterator within MultiRowColumnIterator is correct.
 Restore NodeShardCache logic.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/4bbebc5f
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/4bbebc5f
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/4bbebc5f

Branch: refs/heads/release-2.1.1
Commit: 4bbebc5fd759efe59bae612c9f47e36589750982
Parents: 92fae0d
Author: Michael Russo <mrusso@apigee.com>
Authored: Tue Mar 15 21:59:21 2016 -0700
Committer: Michael Russo <mrusso@apigee.com>
Committed: Tue Mar 15 21:59:21 2016 -0700

----------------------------------------------------------------------
 .../core/astyanax/MultiRowColumnIterator.java   | 201 ++++++-------------
 .../impl/shard/impl/EdgeSearcher.java           |  36 +---
 .../impl/shard/impl/NodeShardCacheImpl.java     |  19 +-
 .../shard/impl/ShardEntryGroupIterator.java     |  12 ++
 .../impl/shard/impl/ShardsColumnIterator.java   |  10 +-
 .../graph/GraphManagerShardConsistencyIT.java   |  54 ++---
 .../graph/src/test/resources/log4j.properties   |   4 +
 7 files changed, 111 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index c384899..10786f7 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -22,7 +22,6 @@ package org.apache.usergrid.persistence.core.astyanax;
 
 import java.util.*;
 
-import org.apache.avro.generic.GenericData;
 import org.apache.usergrid.persistence.core.shard.SmartShard;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -81,6 +80,10 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
 
     private List<T> resultsTracking;
 
+    private int skipSize = 0; // used for determining if we've skipped a whole page during
shard transition
+
+    private boolean ascending = false;
+
 
     /**
      * Remove after finding bug
@@ -114,14 +117,15 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
         this.moreToReturn = true;
         this.resultsTracking = new ArrayList<>();
 
-        //        seenResults = new HashMap<>( pageSize * 10 );
     }
 
+    // temporarily use a new constructor for specific searches until we update each caller
of this class
     public MultiRowColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C>
cf,
                                    final ConsistencyLevel consistencyLevel, final ColumnParser<C,
T> columnParser,
                                    final ColumnSearch<T> columnSearch, final Comparator<T>
comparator,
                                    final Collection<R> rowKeys, final int pageSize,
-                                   final List<SmartShard> rowKeysWithShardEnd) {
+                                   final List<SmartShard> rowKeysWithShardEnd,
+                                   final boolean ascending) {
         this.cf = cf;
         this.pageSize = pageSize;
         this.columnParser = columnParser;
@@ -133,54 +137,45 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
         this.moreToReturn = true;
         this.rowKeysWithShardEnd = rowKeysWithShardEnd;
         this.resultsTracking = new ArrayList<>();
+        this.ascending = ascending;
 
-
-        //        seenResults = new HashMap<>( pageSize * 10 );
     }
 
 
     @Override
     public boolean hasNext() {
 
-        if( currentColumnIterator != null && !currentColumnIterator.hasNext() &&
!moreToReturn){
-            if(currentShardIterator.hasNext()) {
+        // if column iterator is null, initialize with first call to advance()
+        // advance if we know there more columns exist in the current shard but we've exhausted
this page fetch from c*
+        if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() &&
moreToReturn ) ) {
+            advance();
+        }
+
+        // when there are no more columns, nothing reported to return, but more shards available,
go to the next shard
+        if( currentColumnIterator != null && !currentColumnIterator.hasNext() &&
+            !moreToReturn && currentShardIterator.hasNext()){
 
                 if(logger.isTraceEnabled()){
-                    logger.trace(Thread.currentThread().getName()+" - advancing shard iterator");
-                    logger.trace(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
-                    logger.trace(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
-                    logger.trace(Thread.currentThread().getName()+" - current shard: {}",
currentShard);
+                    logger.trace("Advancing shard iterator");
+                    logger.trace("Shard before advance: {}", currentShard);
                 }
 
 
+                // advance to the next shard
                 currentShard = currentShardIterator.next();
 
                 if(logger.isTraceEnabled()){
-                    logger.trace(Thread.currentThread().getName()+" - current shard: {}",
currentShard);
+                    logger.trace("Shard after advance: {}", currentShard);
 
                 }
 
+                // reset the start column as we'll be seeking a new row, any duplicates will
be filtered out
                 startColumn = null;
 
                 advance();
-            }
-        }
-
-        if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() &&
moreToReturn ) ) {
-            if(currentColumnIterator != null) {
-                if(logger.isTraceEnabled()){
-                    logger.trace(Thread.currentThread().getName() + " - currentColumnIterator.hasNext()={}",
currentColumnIterator.hasNext());
-
-                }
-            }
 
-            if(logger.isTraceEnabled()){
-                logger.trace(Thread.currentThread().getName()+" - going into advance()");
-
-            }
-
-            advance();
         }
+
         return currentColumnIterator.hasNext();
     }
 
@@ -214,24 +209,25 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
 
         final boolean skipFirstColumn = startColumn != null;
 
-
-
         final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
 
-        //final int selectSize = pageSize;
-
         final RangeBuilder rangeBuilder = new RangeBuilder();
 
 
 
 
         if(currentShardIterator == null){
+
+            // flip the order of our shards if ascending
+            if(ascending){
+                Collections.reverse(rowKeysWithShardEnd);
+            }
+
             currentShardIterator = rowKeysWithShardEnd.iterator();
 
         }
 
         if(currentShard == null){
-            Collections.reverse(rowKeysWithShardEnd); // ranges are ascending
 
             if(logger.isTraceEnabled()){
                 logger.trace(Thread.currentThread().getName()+" - currentShard: {}", currentShard);
@@ -266,7 +262,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
 
         rangeBuilder.setLimit( selectSize );
 
-        if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query" );
+        if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query with shard
{}", currentShard );
 
         /**
          * Get our list of slices
@@ -285,65 +281,17 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
 
 
 
-//        List<RowSliceQuery<R, C>> queries = new ArrayList<>();
-//
-//        rowKeys.forEach( rowkey -> {
-//
-//            queries.add(keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel
).getKeySlice( rowKeys )
-//                .withColumnRange( rangeBuilder.build() ));
-//
-//        });
-//
-//
-//        final List<Rows<R,C>> combinedResults = new ArrayList<>();
-//
-//        queries.forEach(query ->{
-//
-//            try {
-//                combinedResults.add(query.execute().getResult());
-//            }
-//            catch ( ConnectionException e ) {
-//                throw new RuntimeException( "Unable to connect to casandra", e );
-//            }
-//
-//        });
-
-
-
-
-        //now aggregate them together
-
-        //this is an optimization.  It's faster to see if we only have values for one row,
-        // then return the iterator of those columns than
-        //do a merge if only one row has data.
-
-
 
         final List<T> mergedResults;
 
-        mergedResults = mergeResults( result, selectSize );
-
-//        if ( containsSingleRowOnly( result ) ) {
-//            mergedResults = singleRowResult( result );
-//        }
-//        else {
-//            mergedResults = mergeResults( result, selectSize );
-//        }
+        skipSize = 0;
 
+        mergedResults = processResults( result, selectSize );
 
+        if(logger.isTraceEnabled()){
+            logger.trace("skipped amount: {}", skipSize);
+        }
 
-//        final List<T> mergedResults = new ArrayList<>();
-//
-//        combinedResults.forEach(rows -> {
-//
-//            if ( containsSingleRowOnly( rows ) ) {
-//                mergedResults.addAll(singleRowResult( rows ));
-//            }
-//            else {
-//                mergedResults.addAll(mergeResults( rows, selectSize ));
-//            }
-//
-//        });
 
 
         final int size = mergedResults.size();
@@ -363,6 +311,12 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
             moreToReturn = true;
         }
 
+
+        // if a whole page is skipped, this is likely during a shard transition and we should
assume there is more to read
+        if( skipSize == selectSize || skipSize == selectSize - 1){
+            moreToReturn = true;
+        }
+
         //we have a first column to to check
         if( size > 0) {
 
@@ -380,21 +334,20 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
         }
 
 
+        // set the start column for the enxt query
         if(moreToReturn && mergedResults.size() > 0){
             startColumn = mergedResults.get( mergedResults.size()  - 1 );
 
         }
 
-        if(logger.isTraceEnabled()){
-            logger.trace(Thread.currentThread().getName()+" - current shard: {}", currentShard);
-            logger.trace(Thread.currentThread().getName()+" - selectSize={}, size={}, ",
selectSize, size);
-        }
-
-
 
+        currentColumnIterator = mergedResults.iterator();
 
 
-        currentColumnIterator = mergedResults.iterator();
+        //force an advance of this iterator when there are still shards to read but result
set on current shard is 0
+        if(size == 0 && currentShardIterator.hasNext()){
+            hasNext();
+        }
 
        if(logger.isTraceEnabled()){
            logger.trace(
@@ -404,7 +357,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
        }
 
 
-        if (logger.isTraceEnabled()) logger.trace( "Finished parsing {} rows for results",
rowKeys.size() );
     }
 
 
@@ -464,20 +416,17 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
 
 
     /**
-     * Multiple rows are present, merge them into a single result set
+     * Process the result set and filter any duplicates that may have already been seen in
previous shards.  During
+     * a shard transition, there could be the same columns in multiple shards (rows).  This
will also allow for
+     * filtering the startColumn (the seek starting point) when paging a row in Cassandra.
+     *
      * @param result
      * @return
      */
-    private List<T> mergeResults( final Rows<R, C> result, final int maxSize
) {
-
-        if (logger.isTraceEnabled()) logger.trace( "Multiple rows have columns.  Merging"
);
-
+    private List<T> processResults(final Rows<R, C> result, final int maxSize
) {
 
         final List<T> mergedResults = new ArrayList<>(maxSize);
 
-
-
-
         for ( final R key : result.getKeys() ) {
             final ColumnList<C> columns = result.getRow( key ).getColumns();
 
@@ -486,62 +435,24 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
 
                 final T returnedValue = columnParser.parseColumn( column );
 
-                //Use an O(log n) search, same as a tree, but with fast access to indexes
for later operations
+                // use an O(log n) search, same as a tree, but with fast access to indexes
for later operations
                 int searchIndex = Collections.binarySearch( resultsTracking, returnedValue,
comparator );
 
-                /**
-                 * DO NOT remove this section of code. If you're seeing inconsistent results
during shard transition,
-                 * you'll
-                 * need to enable this
-                 */
-                //
-                //                if ( previous != null && comparator.compare( previous,
returnedValue ) == 0 ) {
-                //                    throw new RuntimeException( String.format(
-                //                            "Cassandra returned 2 unique columns,
-                // but your comparator marked them as equal.  This " +
-                //                                    "indicates a bug in your comparator.
 Previous value was %s and
-                // current value is " +
-                //                                    "%s",
-                //                            previous, returnedValue ) );
-                //                }
-                //
-                //                previous = returnedValue;
-
-                //we've already seen it, no-op
+
+                //we've already seen the column, filter it out as we might be in a shard
transition or our start column
                 if(searchIndex > -1){
                     if(logger.isTraceEnabled()){
                         logger.trace("skipping column as it was already retrieved before");
                     }
+                    skipSize++;
                     continue;
                 }
 
-//                final int insertIndex = (searchIndex+1)*-1;
-//
-//                //it's at the end of the list, don't bother inserting just to remove it
-//                if(insertIndex >= maxSize){
-//                    logger.info("skipping column as it was at the end of the list");
-//                    continue;
-//                }
 
                 resultsTracking.add(returnedValue);
-
-                //if (logger.isTraceEnabled()) logger.trace( "Adding value {} to merged set
at index {}", returnedValue, insertIndex );
-
-                //mergedResults.add( insertIndex, returnedValue );
                 mergedResults.add(returnedValue );
 
 
-
-                //prune the mergedResults
-//                while ( mergedResults.size() > maxSize ) {
-//
-//                    if (logger.isTraceEnabled()) logger.trace( "Trimming results to size
{}", maxSize );
-//
-//                    //just remove from our tail until the size falls to the correct value
-//                    mergedResults.remove(mergedResults.size()-1);
-//                    resultsTracking.remove(resultsTracking.size()-1);
-//
-//                }
             }
 
             if (logger.isTraceEnabled()) logger.trace( "Candidate result set size is {}",
mergedResults.size() );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index e0ba3ec..2f5817d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -56,9 +56,6 @@ import static org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterat
  */
 public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, ColumnSearch<T>{
 
-    private static final Logger logger = LoggerFactory.getLogger( EdgeSearcher.class );
-
-
     protected final Optional<T> last;
     protected final long maxTimestamp;
     protected final ApplicationScope scope;
@@ -78,7 +75,6 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C,
T>, Colum
         this.last = last;
         this.comparator = comparator;
 
-        //logger.info("initializing with shards: {}", shards);
     }
 
 
@@ -86,7 +82,6 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C,
T>, Colum
     public List<ScopedRowKey<R>> getRowKeys() {
 
         List<ScopedRowKey<R>> rowKeys = new ArrayList<>(shards.size());
-        //logger.info("shards: {}", shards);
 
         for(Shard shard : shards){
 
@@ -175,37 +170,14 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C,
T>, Colum
     }
 
     private void setRangeOptions(final RangeBuilder rangeBuilder){
-            //if we're ascending, this is opposite what cassandra sorts, so set the reversed
flag
+
+        //if we're ascending, this is opposite what cassandra sorts, so set the reversed
flag
         final boolean reversed = order == SearchByEdgeType.Order.ASCENDING;
 
         rangeBuilder.setReversed( reversed );
 
     }
 
-//    public class SmartShard {
-//
-//        final ScopedRowKey<R> rowKey;
-//        final C shardEnd;
-//
-//
-//        public SmartShard(final ScopedRowKey<R> rowKey, final C shardEnd){
-//
-//            this.rowKey = rowKey;
-//            this.shardEnd = shardEnd;
-//        }
-//
-//
-//        public ScopedRowKey<R> getRowKey(){
-//            return rowKey;
-//        }
-//
-//        public C getShardEnd(){
-//            return shardEnd;
-//        }
-//
-//    }
-
-
     /**
      * Get the comparator
      * @return
@@ -214,6 +186,10 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C,
T>, Colum
         return comparator;
     }
 
+    public SearchByEdgeType.Order getOrder(){
+        return order;
+    }
+
 
     /**
      * Get the column's serializer

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 3ff9d47..1a88ebb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -164,19 +164,14 @@ public class NodeShardCacheImpl implements NodeShardCache {
         final CacheKey key = new CacheKey( scope, directedEdgeMeta );
         CacheEntry entry;
 
-//        try {
-//            entry = this.graphs.get( key );
-//        }
-//        catch ( ExecutionException e ) {
-//            throw new GraphRuntimeException( "Unable to load shard key for graph", e );
-//        }
-
-        final Iterator<ShardEntryGroup> edges =
-            nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta
);
-
-        final CacheEntry cacheEntry = new CacheEntry( edges );
+        try {
+            entry = this.graphs.get( key );
+        }
+        catch ( ExecutionException e ) {
+            throw new GraphRuntimeException( "Unable to load shard key for graph", e );
+        }
 
-        Iterator<ShardEntryGroup> iterator = cacheEntry.getShards( maxTimestamp );
+        Iterator<ShardEntryGroup> iterator = entry.getShards( maxTimestamp );
 
         if ( iterator == null ) {
             return Collections.<ShardEntryGroup>emptyList().iterator();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
index f1b5108..b64bb58 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardEntryGroupIterator.java
@@ -14,6 +14,8 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardGroup
 
 import com.google.common.base.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import rx.schedulers.Schedulers;
 
 
@@ -23,6 +25,9 @@ import rx.schedulers.Schedulers;
  */
 public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup> {
 
+    private static final Logger logger = LoggerFactory.getLogger( ShardEntryGroupIterator.class
);
+
+
 
     private final ShardGroupCompaction shardGroupCompaction;
     private final PushbackIterator<Shard> sourceIterator;
@@ -106,11 +111,18 @@ public class ShardEntryGroupIterator implements Iterator<ShardEntryGroup>
{
 
             //we can't add this one to the entries, it doesn't fit within the delta, allocate
a new one and break
             if ( next.addShard( shard ) ) {
+
+                if(logger.isTraceEnabled()) {
+                    logger.trace("adding shard: {}", shard);
+                }
                 continue;
             }
 
 
             sourceIterator.pushback( shard );
+            if(logger.isTraceEnabled()) {
+                logger.trace("unable to add shard: {}, pushing back and stopping", shard);
+            }
 
             break;
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
index af9d979..e609d33 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 import java.util.*;
 
 import org.apache.usergrid.persistence.core.shard.SmartShard;
+import org.apache.usergrid.persistence.graph.SearchByEdgeType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,15 +130,14 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T>
{
 
         final List<SmartShard> rowKeysWithShardEnd = searcher.getRowKeysWithShardEnd();
 
+        final boolean ascending = searcher.getOrder() == SearchByEdgeType.Order.ASCENDING;
+
         if (logger.isTraceEnabled()) {
             logger.trace("Searching with row keys {}", rowKeys);
         }
 
-        //currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf,  consistencyLevel,
searcher, searcher, searcher.getComparator(), rowKeys, pageSize);
-        currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf,  consistencyLevel,
searcher, searcher, searcher.getComparator(), rowKeys, pageSize, rowKeysWithShardEnd);
-
-
-
+        currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf,  consistencyLevel,
searcher, searcher,
+            searcher.getComparator(), rowKeys, pageSize, rowKeysWithShardEnd, ascending);
 
 
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 2602e88..82b0879 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -1,22 +1,20 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied.  See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 package org.apache.usergrid.persistence.graph;
 
@@ -205,8 +203,6 @@ public class GraphManagerShardConsistencyIT {
 
         final int numWorkersPerInjector = numProcessors / numInjectors;
 
-        //final int numWorkersPerInjector = 1;
-
 
 
         /**
@@ -218,16 +214,13 @@ public class GraphManagerShardConsistencyIT {
         final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
 
 
-        final long expectedShardCount = numberOfEdges / shardSize;
-
-
         createExecutor( numWorkersPerInjector );
 
         final AtomicLong writeCounter = new AtomicLong();
 
 
         //min stop time the min delta + 1 cache cycle timeout
-        final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout();
+        final long minExecutionTime = graphFig.getShardMinDelta() + graphFig.getShardCacheTimeout()
+ 60000;
 
 
         logger.info( "Writing {} edges per worker on {} workers in {} injectors", workerWriteLimit,
numWorkersPerInjector,
@@ -266,14 +259,14 @@ public class GraphManagerShardConsistencyIT {
 
 
         final long writeCount = writeCounter.get();
+        final long expectedShardCount = writeCount / shardSize;
         final Meter readMeter = registry.meter( "readThroughput" );
 
 
         final List<Throwable> failures = new ArrayList<>();
+        //Thread.sleep(5000);
 
-        Thread.sleep(5000);
-
-        for(int i = 0; i < 1; i ++) {
+        for(int i = 0; i < 2; i ++) {
 
 
             /**
@@ -351,14 +344,9 @@ public class GraphManagerShardConsistencyIT {
 
             //we're done
             if ( compactedCount >= expectedShardCount ) {
-                logger.info( "All compactions complete, sleeping" );
-
-                //                final Object mutex = new Object();
-                //
-                //                synchronized ( mutex ){
-                //
-                //                    mutex.wait();
-                //                }
+
+                logger.info( "All compactions complete, sleeping. Compacted shard count={},
expected shard count={}",
+                    compactedCount, expectedShardCount );
 
                 break;
             }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/4bbebc5f/stack/corepersistence/graph/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties
index 608ee03..79401c3 100644
--- a/stack/corepersistence/graph/src/test/resources/log4j.properties
+++ b/stack/corepersistence/graph/src/test/resources/log4j.properties
@@ -37,4 +37,8 @@ log4j.logger.cassandra.db=ERROR
 #log4j.logger.org.apache.usergrid.persistence.core.rx=TRACE
 #log4j.logger.org.apache.usergrid.persistence.core.astyanax=TRACE
 #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE
+#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator=INFO
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator=INFO
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=INFO
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=INFO
 


Mime
View raw message