usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [01/20] usergrid git commit: Initial work to iterate over the shards with more context so we don't fetch all rows (shards) at the same time always.
Date Wed, 23 Mar 2016 17:34:29 GMT
Repository: usergrid
Updated Branches:
  refs/heads/release-2.1.1 843578310 -> e64fa3503


Initial work to iterate over the shards with more context so we don't fetch all rows (shards) at the same time always.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/8c725f19
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/8c725f19
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/8c725f19

Branch: refs/heads/release-2.1.1
Commit: 8c725f19aa30a1cca5b71017c8a43586b6e4d544
Parents: 8435783
Author: Michael Russo <mrusso@apigee.com>
Authored: Mon Mar 14 16:07:19 2016 -0700
Committer: Michael Russo <mrusso@apigee.com>
Committed: Mon Mar 14 16:07:19 2016 -0700

----------------------------------------------------------------------
 .../asyncevents/AsyncEventServiceImpl.java      |  12 +-
 .../usergrid/corepersistence/index/RxTest.java  | 129 ++++++++++
 .../core/astyanax/MultiRowColumnIterator.java   | 253 +++++++++++++++----
 .../persistence/core/shard/SmartShard.java      |  52 ++++
 .../graph/serialization/impl/shard/Shard.java   |  33 ++-
 .../impl/shard/impl/EdgeSearcher.java           |  86 ++++++-
 .../shard/impl/NodeShardAllocationImpl.java     |   4 +-
 .../impl/shard/impl/NodeShardCacheImpl.java     |  19 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  19 ++
 .../impl/ShardedEdgeSerializationImpl.java      |   9 +
 .../impl/shard/impl/ShardsColumnIterator.java   |  30 ++-
 .../graph/GraphManagerShardConsistencyIT.java   |  17 +-
 12 files changed, 583 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
index 4d78340..7e368c7 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.avro.generic.GenericData;
 import org.apache.usergrid.persistence.index.impl.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -519,6 +520,13 @@ public class AsyncEventServiceImpl implements AsyncEventService {
         }
 
 
+        // don't let this continue if there's nothing to index
+        if (indexOperationMessage == null ||  indexOperationMessage.isEmpty()){
+            throw new RuntimeException(
+                "IndexOperationMessage cannot be null or empty after retrieving from map persistence");
+        }
+
+
         // always do a check to ensure the indexes are initialized for the index requests
         initializeEntityIndexes(indexOperationMessage);
 
@@ -739,9 +747,9 @@ public class AsyncEventServiceImpl implements AsyncEventService {
      */
     private List<QueueMessage> submitToIndex(List<IndexEventResult> indexEventResults) {
 
-        // if nothing came back then return null
+        // if nothing came back then return empty list
         if(indexEventResults==null){
-            return null;
+            return new ArrayList<>(0);
         }
 
         IndexOperationMessage combined = new IndexOperationMessage();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
index 6bb8947..f44c028 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/RxTest.java
@@ -20,14 +20,21 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import org.apache.avro.generic.GenericData;
 import org.apache.usergrid.ExperimentalTest;
 import org.junit.Ignore;
 import org.junit.Test;
 
 import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import rx.Observable;
 import rx.Subscription;
 import rx.observables.ConnectableObservable;
@@ -42,6 +49,9 @@ import static org.junit.Assert.assertTrue;
  */
 public class RxTest {
 
+    private static final Logger logger = LoggerFactory.getLogger(RxTest.class);
+
+
     @Test
     @Category(ExperimentalTest.class )
     public void testPublish() throws InterruptedException {
@@ -107,5 +117,124 @@ public class RxTest {
         assertEquals(0, result);
     }
 
+    @Test
+    public void testStreamWithinObservable(){
+
+        List<Integer> numbers = new ArrayList<Integer>(5){{
+            add(1);
+            add(2);
+            add(3);
+            add(4);
+            add(5);
+        }};
+
+        Observable.just(numbers).map( integers -> {
+
+            try{
+
+                logger.info("Starting size: {}", String.valueOf(numbers.size()));
+
+                List<StreamResult> results = callStream(integers);
+
+                logger.info("In process size: {}", String.valueOf(results.size()));
+
+                List<Integer> checked = checkResults(results);
+
+                logger.info("Resulting Size: {}", String.valueOf(checked.size()));
+
+                return results;
+
+            }
+            catch(Exception e){
+
+                logger.info("Caught exception in observable: {}", e.getMessage());
+                return null;
+
+
+            }
+
+        }).subscribe();
+
+
+
+
+
+
+
+    }
+
+    private List<StreamResult> callStream (final List<Integer> input){
+
+        Stream<StreamResult> results = input.stream().map(integer -> {
+
+            try{
+
+
+
+                if(integer.equals(1) || integer.equals(2)){
+                    throwSomeException("Ah integer not what we want!");
+                }
+
+                return new StreamResult(integer);
+
+            }
+            catch(Exception e){
+
+                logger.info("Caught exception in stream: '{}'", e.getMessage());
+                return new StreamResult(0);
+
+            }
+
+        });
+
+        return results.collect(Collectors.toList());
+
+    }
+
+
+    private List<Integer> checkResults(final List<StreamResult> streamResults){
+
+        List<Integer> combined = new ArrayList<>();
+        List<Integer> integers = streamResults.stream().filter( streamResult -> streamResult.getNumber() > 0)
+            .map(streamResult -> {
+
+                combined.add(streamResult.getNumber());
+
+                return streamResult.getNumber();
+            })
+            .collect(Collectors.toList());
+
+        Observable.from(combined).map( s -> {
+            logger.info("Doing work in another observable with Integer: {}", s);
+            return s;
+        }).toBlocking().last();
+
+
+        return integers;
+
+    }
+
+
+    public class StreamResult {
+
+        private int number;
+
+        public StreamResult( final int number){
+
+            this.number = number;
+        }
+
+        public int getNumber(){
+            return number;
+        }
+
+
+    }
+
+    public void throwSomeException(String message){
+
+        throw new RuntimeException(message);
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index a120fda..9971fba 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -1,35 +1,29 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing,
- *  * software distributed under the License is distributed on an
- *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- *  * KIND, either express or implied.  See the License for the
- *  * specific language governing permissions and limitations
- *  * under the License.
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
 
 package org.apache.usergrid.persistence.core.astyanax;
 
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
+import java.util.*;
 
+import org.apache.avro.generic.GenericData;
+import org.apache.usergrid.persistence.core.shard.SmartShard;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,6 +73,14 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
     private Iterator<T> currentColumnIterator;
 
+    private Iterator<SmartShard> currentShardIterator;
+
+    private List<SmartShard> rowKeysWithShardEnd;
+
+    private SmartShard currentShard;
+
+    private List<T> resultsTracking;
+
 
     /**
      * Remove after finding bug
@@ -110,6 +112,28 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
         this.keyspace = keyspace;
         this.consistencyLevel = consistencyLevel;
         this.moreToReturn = true;
+        this.resultsTracking = new ArrayList<>();
+
+        //        seenResults = new HashMap<>( pageSize * 10 );
+    }
+
+    public MultiRowColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf,
+                                   final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser,
+                                   final ColumnSearch<T> columnSearch, final Comparator<T> comparator,
+                                   final Collection<R> rowKeys, final int pageSize,
+                                   final List<SmartShard> rowKeysWithShardEnd) {
+        this.cf = cf;
+        this.pageSize = pageSize;
+        this.columnParser = columnParser;
+        this.columnSearch = columnSearch;
+        this.comparator = comparator;
+        this.rowKeys = rowKeys;
+        this.keyspace = keyspace;
+        this.consistencyLevel = consistencyLevel;
+        this.moreToReturn = true;
+        this.rowKeysWithShardEnd = rowKeysWithShardEnd;
+        this.resultsTracking = new ArrayList<>();
+
 
         //        seenResults = new HashMap<>( pageSize * 10 );
     }
@@ -117,12 +141,34 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
     @Override
     public boolean hasNext() {
+        //logger.info(Thread.currentThread().getName()+" - calling hasNext()");
+        if( currentColumnIterator != null && !currentColumnIterator.hasNext() && !moreToReturn){
+            if(currentShardIterator.hasNext()) {
+                logger.info(Thread.currentThread().getName()+" - advancing shard iterator");
+                //logger.info(Thread.currentThread().getName()+" - currentColumnIterator.hasNext()={}", currentColumnIterator.hasNext());
+                logger.info(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
+                //Collections.reverse(rowKeysWithShardEnd);
+                logger.info(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
+
+                logger.info(Thread.currentThread().getName()+" - current shard: {}", currentShard);
+                currentShard = currentShardIterator.next();
+                logger.info(Thread.currentThread().getName()+" - current shard: {}", currentShard);
+                startColumn = null;
+
+                advance();
+            }
+        }
 
         if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToReturn ) ) {
-            advance();
-        }
+            if(currentColumnIterator != null) {
+                logger.info(Thread.currentThread().getName() + " - currentColumnIterator.hasNext()={}", currentColumnIterator.hasNext());
+            }
+            logger.info(Thread.currentThread().getName()+" - moreToReturn={}", moreToReturn);
 
+            logger.info(Thread.currentThread().getName()+" - going into advance()");
 
+            advance();
+        }
         return currentColumnIterator.hasNext();
     }
 
@@ -148,7 +194,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
     public void advance() {
 
-
+        logger.info( "Advancing multi row column iterator" );
         if (logger.isTraceEnabled()) logger.trace( "Advancing multi row column iterator" );
 
         /**
@@ -161,11 +207,33 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
         final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
 
+        //final int selectSize = pageSize;
+
         final RangeBuilder rangeBuilder = new RangeBuilder();
 
 
-        //set the range into the search
 
+
+        if(currentShardIterator == null){
+            currentShardIterator = rowKeysWithShardEnd.iterator();
+
+        }
+
+        if(currentShard == null){
+            Collections.reverse(rowKeysWithShardEnd); // ranges are ascending
+            logger.info(Thread.currentThread().getName()+" - currentShard: {}", currentShard);
+            currentShard = currentShardIterator.next();
+            logger.info(Thread.currentThread().getName()+" - all shards when starting: {}", rowKeysWithShardEnd);
+            logger.info(Thread.currentThread().getName()+" - initializing iterator with shard: {}", currentShard);
+
+        }
+
+
+
+
+
+        //set the range into the search
+        logger.info(Thread.currentThread().getName()+" - startColumn={}", startColumn);
         if ( startColumn == null ) {
             columnSearch.buildRange( rangeBuilder );
         }
@@ -181,9 +249,10 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
         /**
          * Get our list of slices
          */
+        //logger.info("shard: {}, end: {}",currentShard.getRowKey().getKey(), currentShard.getShardEnd());
         final RowSliceQuery<R, C> query =
-                keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
-                        .withColumnRange( rangeBuilder.build() );
+            keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( (R) currentShard.getRowKey() )
+                .withColumnRange( rangeBuilder.build() );
 
         final Rows<R, C> result;
         try {
@@ -194,6 +263,33 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
         }
 
 
+
+//        List<RowSliceQuery<R, C>> queries = new ArrayList<>();
+//
+//        rowKeys.forEach( rowkey -> {
+//
+//            queries.add(keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
+//                .withColumnRange( rangeBuilder.build() ));
+//
+//        });
+//
+//
+//        final List<Rows<R,C>> combinedResults = new ArrayList<>();
+//
+//        queries.forEach(query ->{
+//
+//            try {
+//                combinedResults.add(query.execute().getResult());
+//            }
+//            catch ( ConnectionException e ) {
+//                throw new RuntimeException( "Unable to connect to casandra", e );
+//            }
+//
+//        });
+
+
+
+
         //now aggregate them together
 
         //this is an optimization.  It's faster to see if we only have values for one row,
@@ -201,14 +297,34 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
         //do a merge if only one row has data.
 
 
+
         final List<T> mergedResults;
 
-        if ( containsSingleRowOnly( result ) ) {
-            mergedResults = singleRowResult( result );
-        }
-        else {
-            mergedResults = mergeResults( result, selectSize );
-        }
+        mergedResults = mergeResults( result, selectSize );
+
+//        if ( containsSingleRowOnly( result ) ) {
+//            mergedResults = singleRowResult( result );
+//        }
+//        else {
+//            mergedResults = mergeResults( result, selectSize );
+//        }
+
+
+
+//        final List<T> mergedResults = new ArrayList<>();
+//
+//        combinedResults.forEach(rows -> {
+//
+//            if ( containsSingleRowOnly( rows ) ) {
+//                mergedResults.addAll(singleRowResult( rows ));
+//            }
+//            else {
+//                mergedResults.addAll(mergeResults( rows, selectSize ));
+//            }
+//
+//        });
+
+
 
 
 
@@ -223,8 +339,20 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
         final int size = mergedResults.size();
 
+
+
+        if(logger.isTraceEnabled()){
+            logger.trace(Thread.currentThread().getName()+" - current shard: {}, retrieved size: {}", currentShard, size);
+
+        }
+
+        logger.info(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size);
         moreToReturn = size == selectSize;
 
+//        if(selectSize == 1001 && mergedResults.size() == 1000){
+//            moreToReturn = true;
+//        }
+
         //we have a first column to to check
         if( size > 0) {
 
@@ -232,6 +360,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
             //The search has either told us to skip the first element, or it matches our last, therefore we disregard it
             if(columnSearch.skipFirst( firstResult ) || (skipFirstColumn && comparator.compare( startColumn, firstResult ) == 0)){
+                logger.info("removing an entry");
                 mergedResults.remove( 0 );
             }
 
@@ -240,10 +369,25 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
         if(moreToReturn && mergedResults.size() > 0){
             startColumn = mergedResults.get( mergedResults.size()  - 1 );
+
         }
 
+        logger.info(Thread.currentThread().getName()+" - current shard: {}", currentShard);
+        logger.info(Thread.currentThread().getName()+" - selectSize={}, size={}, ", selectSize, size);
+
+
+//        if(mergedResults.size() == 0 && currentShardIterator.hasNext()){
+//                //currentShard = currentShardIterator.next();
+//
+//        }
+
 
         currentColumnIterator = mergedResults.iterator();
+        //logger.info(Thread.currentThread().getName()+" - shards: {}",rowKeysWithShardEnd);
+        logger.info(
+            Thread.currentThread().getName()+" - currentColumnIterator.hasNext()={}, " +
+                "moreToReturn={}, currentShardIterator.hasNext()={}",
+            currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext());
 
         if (logger.isTraceEnabled()) logger.trace( "Finished parsing {} rows for results", rowKeys.size() );
     }
@@ -328,7 +472,7 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
                 final T returnedValue = columnParser.parseColumn( column );
 
                 //Use an O(log n) search, same as a tree, but with fast access to indexes for later operations
-                int searchIndex = Collections.binarySearch( mergedResults, returnedValue, comparator );
+                int searchIndex = Collections.binarySearch( resultsTracking, returnedValue, comparator );
 
                 /**
                  * DO NOT remove this section of code. If you're seeing inconsistent results during shard transition,
@@ -350,29 +494,37 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
 
                 //we've already seen it, no-op
                 if(searchIndex > -1){
+                    logger.info("skipping column as it was already retrieved before");
                     continue;
                 }
 
-                final int insertIndex = (searchIndex+1)*-1;
+//                final int insertIndex = (searchIndex+1)*-1;
+//
+//                //it's at the end of the list, don't bother inserting just to remove it
+//                if(insertIndex >= maxSize){
+//                    logger.info("skipping column as it was at the end of the list");
+//                    continue;
+//                }
 
-                //it's at the end of the list, don't bother inserting just to remove it
-                if(insertIndex >= maxSize){
-                    continue;
-                }
+                resultsTracking.add(returnedValue);
 
-                if (logger.isTraceEnabled()) logger.trace( "Adding value {} to merged set at index {}", returnedValue, insertIndex );
+                //if (logger.isTraceEnabled()) logger.trace( "Adding value {} to merged set at index {}", returnedValue, insertIndex );
 
-                mergedResults.add( insertIndex, returnedValue );
+                //mergedResults.add( insertIndex, returnedValue );
+                mergedResults.add(returnedValue );
 
 
-                //prune the mergedResults
-                while ( mergedResults.size() > maxSize ) {
 
-                    if (logger.isTraceEnabled()) logger.trace( "Trimming results to size {}", maxSize );
-
-                    //just remove from our tail until the size falls to the correct value
-                    mergedResults.remove(mergedResults.size()-1);
-                }
+                //prune the mergedResults
+//                while ( mergedResults.size() > maxSize ) {
+//
+//                    if (logger.isTraceEnabled()) logger.trace( "Trimming results to size {}", maxSize );
+//
+//                    //just remove from our tail until the size falls to the correct value
+//                    mergedResults.remove(mergedResults.size()-1);
+//                    resultsTracking.remove(resultsTracking.size()-1);
+//
+//                }
             }
 
             if (logger.isTraceEnabled()) logger.trace( "Candidate result set size is {}", mergedResults.size() );
@@ -381,7 +533,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
         return mergedResults;
     }
 
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
new file mode 100644
index 0000000..8a1bee8
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/shard/SmartShard.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.core.shard;
+
+import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
+
+public class SmartShard<R, C> {
+
+    final ScopedRowKey<R> rowKey;
+    final C shardEnd;
+
+
+    public SmartShard(final ScopedRowKey<R> rowKey, final C shardEnd){
+
+        this.rowKey = rowKey;
+        this.shardEnd = shardEnd;
+    }
+
+
+    public ScopedRowKey<R> getRowKey(){
+        return rowKey;
+    }
+
+    public C getShardEnd(){
+        return shardEnd;
+    }
+
+    @Override
+    public String toString(){
+
+        return "Shard { rowKey="+rowKey + ", shardEnd="+shardEnd+" }";
+
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
index 472e0a2..92793cb 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/Shard.java
@@ -19,6 +19,9 @@
 package org.apache.usergrid.persistence.graph.serialization.impl.shard;
 
 
+import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.graph.Edge;
+
 public class Shard implements Comparable<Shard> {
 
 
@@ -30,12 +33,14 @@ public class Shard implements Comparable<Shard> {
     private final long shardIndex;
     private final long createdTime;
     private final boolean compacted;
+    private Optional<Edge> shardEnd;
 
 
     public Shard( final long shardIndex, final long createdTime, final boolean compacted ) {
         this.shardIndex = shardIndex;
         this.createdTime = createdTime;
         this.compacted = compacted;
+        this.shardEnd = Optional.absent();
     }
 
 
@@ -71,6 +76,14 @@ public class Shard implements Comparable<Shard> {
         return shardIndex == MIN_SHARD.shardIndex;
     }
 
+    public void setShardEnd(final Optional<Edge> shardEnd) {
+        this.shardEnd = shardEnd;
+    }
+
+    public Optional<Edge> getShardEnd() {
+        return shardEnd;
+    }
+
 
     /**
      * Compare the shards based on the timestamp first, then the created time second
@@ -149,10 +162,20 @@ public class Shard implements Comparable<Shard> {
 
     @Override
     public String toString() {
-        return "Shard{" +
-                "shardIndex=" + shardIndex +
-                ", createdTime=" + createdTime +
-                ", compacted=" + compacted +
-                '}';
+
+        StringBuilder string = new StringBuilder();
+        string.append("Shard{ ");
+        string.append("shardIndex=").append(shardIndex);
+        string.append(", createdTime=").append(createdTime);
+        string.append(", compacted=").append(compacted);
+        string.append(", shardEndTimestamp=");
+        if(shardEnd.isPresent()){
+            string.append(shardEnd.get().getTimestamp());
+        }else{
+            string.append("null");
+        }
+        string.append(" }");
+
+        return string.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
index 4d02ba9..e0ba3ec 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/EdgeSearcher.java
@@ -1,16 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
+import java.util.*;
 
+import org.apache.http.cookie.SM;
 import org.apache.usergrid.persistence.core.astyanax.ColumnParser;
 import org.apache.usergrid.persistence.core.astyanax.ColumnSearch;
+import org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.core.shard.SmartShard;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.MarkedEdge;
 import org.apache.usergrid.persistence.graph.SearchByEdgeType;
@@ -22,6 +40,10 @@ import com.google.common.base.Preconditions;
 import com.netflix.astyanax.Serializer;
 import com.netflix.astyanax.model.Column;
 import com.netflix.astyanax.util.RangeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator.*;
 
 
 /**
@@ -34,6 +56,9 @@ import com.netflix.astyanax.util.RangeBuilder;
  */
 public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, ColumnSearch<T>{
 
+    private static final Logger logger = LoggerFactory.getLogger( EdgeSearcher.class );
+
+
     protected final Optional<T> last;
     protected final long maxTimestamp;
     protected final ApplicationScope scope;
@@ -52,6 +77,8 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
         this.shards = shards;
         this.last = last;
         this.comparator = comparator;
+
+        //logger.info("initializing with shards: {}", shards);
     }
 
 
@@ -59,6 +86,7 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
     public List<ScopedRowKey<R>> getRowKeys() {
 
         List<ScopedRowKey<R>> rowKeys = new ArrayList<>(shards.size());
+        //logger.info("shards: {}", shards);
 
         for(Shard shard : shards){
 
@@ -72,6 +100,33 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
         return rowKeys;
     }
 
+    public List<SmartShard> getRowKeysWithShardEnd(){
+
+
+        final List<SmartShard> rowKeysWithShardEnd = new ArrayList<>(shards.size());
+
+        for(Shard shard : shards){
+
+            final ScopedRowKey< R> rowKey = ScopedRowKey
+                .fromKey( scope.getApplication(), generateRowKey(shard.getShardIndex() ) );
+
+            final C shardEnd;
+            if(shard.getShardEnd().isPresent()){
+                shardEnd = createColumn((T) shard.getShardEnd().get());
+
+            }else{
+                shardEnd = null;
+            }
+
+
+
+            rowKeysWithShardEnd.add(new SmartShard(rowKey, shardEnd));
+        }
+
+        return rowKeysWithShardEnd;
+
+    }
+
 
     @Override
     public boolean skipFirst( final T first ) {
@@ -127,6 +182,29 @@ public abstract class EdgeSearcher<R, C, T> implements ColumnParser<C, T>, Colum
 
     }
 
+//    public class SmartShard {
+//
+//        final ScopedRowKey<R> rowKey;
+//        final C shardEnd;
+//
+//
+//        public SmartShard(final ScopedRowKey<R> rowKey, final C shardEnd){
+//
+//            this.rowKey = rowKey;
+//            this.shardEnd = shardEnd;
+//        }
+//
+//
+//        public ScopedRowKey<R> getRowKey(){
+//            return rowKey;
+//        }
+//
+//        public C getShardEnd(){
+//            return shardEnd;
+//        }
+//
+//    }
+
 
     /**
      * Get the comparator

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
index a79b91a..6f95cf5 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardAllocationImpl.java
@@ -98,13 +98,14 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
 
         else {
             existingShards = edgeShardSerialization.getShardMetaData( scope, maxShardId, directedEdgeMeta );
+            //logger.info("existing shards has something: {}", existingShards.hasNext());
 
             /**
              * We didn't get anything out of cassandra, so we need to create the minumum shard
              */
             if ( existingShards == null || !existingShards.hasNext() ) {
 
-
+                //logger.info("writing min shard");
                 final MutationBatch batch = edgeShardSerialization.writeShardMeta( scope, Shard.MIN_SHARD, directedEdgeMeta );
                 try {
                     batch.execute();
@@ -117,6 +118,7 @@ public class NodeShardAllocationImpl implements NodeShardAllocation {
             }
         }
 
+        //logger.info("getShards existing shards: {}", existingShards);
 
         return new ShardEntryGroupIterator( existingShards, graphFig.getShardMinDelta(), shardGroupCompaction, scope,
             directedEdgeMeta );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
index 1a88ebb..3ff9d47 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/NodeShardCacheImpl.java
@@ -164,14 +164,19 @@ public class NodeShardCacheImpl implements NodeShardCache {
         final CacheKey key = new CacheKey( scope, directedEdgeMeta );
         CacheEntry entry;
 
-        try {
-            entry = this.graphs.get( key );
-        }
-        catch ( ExecutionException e ) {
-            throw new GraphRuntimeException( "Unable to load shard key for graph", e );
-        }
+//        try {
+//            entry = this.graphs.get( key );
+//        }
+//        catch ( ExecutionException e ) {
+//            throw new GraphRuntimeException( "Unable to load shard key for graph", e );
+//        }
+
+        final Iterator<ShardEntryGroup> edges =
+            nodeShardAllocation.getShards( key.scope, Optional.<Shard>absent(), key.directedEdgeMeta );
+
+        final CacheEntry cacheEntry = new CacheEntry( edges );
 
-        Iterator<ShardEntryGroup> iterator = entry.getShards( maxTimestamp );
+        Iterator<ShardEntryGroup> iterator = cacheEntry.getShards( maxTimestamp );
 
         if ( iterator == null ) {
             return Collections.<ShardEntryGroup>emptyList().iterator();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index f0b0ac9..80b63ec 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
+import com.google.common.base.Optional;
+import org.apache.usergrid.persistence.graph.Edge;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -169,6 +171,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
         final MutationBatch newRowBatch = keyspace.prepareMutationBatch();
         final MutationBatch deleteRowBatch = keyspace.prepareMutationBatch();
+        final MutationBatch updateShardMetaBatch = keyspace.prepareMutationBatch();
 
         /**
          * As we move edges, we want to keep track of it
@@ -181,10 +184,13 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                 .loadEdges( shardedEdgeSerialization, edgeColumnFamilies, scope, Collections.singleton( sourceShard ),
                     Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING );
 
+            MarkedEdge shardEnd = null;
+
             while ( edges.hasNext() ) {
                 final MarkedEdge edge = edges.next();
 
                 final long edgeTimestamp = edge.getTimestamp();
+                shardEnd = edge;
 
                 /**
                  * The edge is within a different shard, break
@@ -202,6 +208,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                         .deleteEdge( shardedEdgeSerialization, edgeColumnFamilies, scope, sourceShard, edge,
                             timestamp ) );
 
+
                 edgeCount++;
 
                 //if we're at our count, execute the mutation of writing the edges to the new row, then remove them
@@ -217,12 +224,21 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
                     }
                 }
             }
+
+            Shard updatedShard = new Shard( sourceShard.getShardIndex(), sourceShard.getCreatedTime(), sourceShard.isCompacted() );
+            updatedShard.setShardEnd(Optional.fromNullable(shardEnd));
+            logger.info("updating with shard end: {}", shardEnd );
+            updateShardMetaBatch.mergeShallow( edgeShardSerialization.writeShardMeta(scope, updatedShard, edgeMeta));
+
         }
 
 
+
+
         try {
             newRowBatch.execute();
             deleteRowBatch.execute();
+            updateShardMetaBatch.execute();
         }
         catch ( Throwable t ) {
             logger.error( "Unable to move edges to target shard {}", targetShard );
@@ -232,6 +248,8 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
         if (logger.isTraceEnabled()) {
             logger.trace("Finished compacting {} shards and moved {} edges", sourceShards, edgeCount);
         }
+        logger.info("Finished compacting {} shards and moved {} edges", sourceShards, edgeCount);
+
 
         resultBuilder.withCopiedEdges( edgeCount ).withSourceShards( sourceShards ).withTargetShard( targetShard );
 
@@ -276,6 +294,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction {
 
             //Overwrite our shard index with a newly created one that has been marked as compacted
             Shard compactedShard = new Shard( targetShard.getShardIndex(), timeService.getCurrentTime(), true );
+
             final MutationBatch updateMark = edgeShardSerialization.writeShardMeta( scope, compactedShard, edgeMeta );
             try {
                 updateMark.execute();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
index ce0953c..c3e0cc0 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardedEdgeSerializationImpl.java
@@ -67,6 +67,8 @@ import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.Serializer;
 import com.netflix.astyanax.util.RangeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -77,6 +79,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
 @Singleton
 public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
 
+    private static final Logger logger = LoggerFactory.getLogger( ShardedEdgeSerializationImpl.class );
+
+
     protected final Keyspace keyspace;
     protected final CassandraConfig cassandraConfig;
     protected final GraphFig graphFig;
@@ -401,6 +406,10 @@ public class ShardedEdgeSerializationImpl implements ShardedEdgeSerialization {
         ValidationUtils.validateApplicationScope( scope );
         GraphValidation.validateSearchByEdgeType( search );
 
+        if(logger.isTraceEnabled()){
+            logger.info("getEdgesFromSource shards: {}", shards);
+        }
+
         final Id sourceId = search.getNode();
         final String type = search.getType();
         final long maxTimestamp = search.getMaxTimestamp();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
index d1000fb..af9d979 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardsColumnIterator.java
@@ -1,10 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
+import java.util.*;
 
+import org.apache.usergrid.persistence.core.shard.SmartShard;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,14 +127,19 @@ public class ShardsColumnIterator<R, C, T> implements Iterator<T> {
          */
         final List<ScopedRowKey<R>> rowKeys = searcher.getRowKeys();
 
+        final List<SmartShard> rowKeysWithShardEnd = searcher.getRowKeysWithShardEnd();
+
         if (logger.isTraceEnabled()) {
             logger.trace("Searching with row keys {}", rowKeys);
         }
 
-        currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf,  consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize);
+        //currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf,  consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize);
+        currentColumnIterator = new MultiRowColumnIterator<>( keyspace, cf,  consistencyLevel, searcher, searcher, searcher.getComparator(), rowKeys, pageSize, rowKeysWithShardEnd);
+
 
 
 
 
     }
+
 }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/8c725f19/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 3ae3ff1..b131e95 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -203,7 +203,10 @@ public class GraphManagerShardConsistencyIT {
         // power for writes
         final int numProcessors = Runtime.getRuntime().availableProcessors() / 2;
 
-        final int numWorkersPerInjector = numProcessors / numInjectors;
+        //final int numWorkersPerInjector = numProcessors / numInjectors;
+
+        final int numWorkersPerInjector = 1;
+
 
 
         /**
@@ -268,7 +271,9 @@ public class GraphManagerShardConsistencyIT {
 
         final List<Throwable> failures = new ArrayList<>();
 
-        for(int i = 0; i < 2; i ++) {
+        Thread.sleep(5000);
+
+        for(int i = 0; i < 1; i ++) {
 
 
             /**
@@ -656,7 +661,7 @@ public class GraphManagerShardConsistencyIT {
             final long startTime = System.currentTimeMillis();
 
 
-            for ( long i = 0; i < writeLimit || System.currentTimeMillis() - startTime < minExecutionTime; i++ ) {
+            for ( long i = 1; i < writeLimit +1 && System.currentTimeMillis() - startTime < minExecutionTime; i++ ) {
 
                 Edge edge = generator.newEdge();
 
@@ -671,8 +676,8 @@ public class GraphManagerShardConsistencyIT {
                 writeCounter.incrementAndGet();
 
 
-                if ( i % 1000 == 0 ) {
-                    logger.info( "   Wrote: " + i );
+                if ( i % 100 == 0 ) {
+                    logger.info( Thread.currentThread().getName()+" wrote: " + i );
                 }
             }
 
@@ -718,7 +723,7 @@ public class GraphManagerShardConsistencyIT {
                 logger.info( "Completed reading {} edges", returnedEdgeCount );
 
                 if ( writeCount != returnedEdgeCount ) {
-                    logger.warn( "Unexpected edge count returned!!!  Expected {} but was {}", writeCount,
+                    logger.warn( Thread.currentThread().getName()+" - Unexpected edge count returned!!!  Expected {} but was {}", writeCount,
                         returnedEdgeCount );
                 }
 


Mime
View raw message