usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [08/52] [abbrv] git commit: Updated OrderedMerge to use a faster implementation at runtime. After initialization, it's an O(1) emit operation as long as our produces are fast enough.
Date Wed, 03 Sep 2014 22:11:03 GMT
Updated OrderedMerge to use a faster implementation at runtime.  After initialization, it's an O(1) emit operation as long as our produces are fast enough.

Updated to fix comparator issues

Fixed tests

Changed algorithm.  Counters not decrementing as expected on shard balancing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/e040fdf4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/e040fdf4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/e040fdf4

Branch: refs/heads/two-dot-o-push-notifications
Commit: e040fdf4c2846188455b8cbcbac812e8a44ea69e
Parents: c69c197
Author: Todd Nine <toddnine@apache.org>
Authored: Mon Aug 18 17:19:11 2014 -0600
Committer: Todd Nine <toddnine@apache.org>
Committed: Wed Aug 27 21:35:49 2014 -0600

----------------------------------------------------------------------
 .../src/test/resources/usergrid-UNIT.properties |   1 +
 .../core/astyanax/ColumnNameIterator.java       |   3 -
 .../persistence/core/astyanax/ColumnSearch.java |  46 ++
 .../astyanax/MultiKeyColumnNameIterator.java    |  48 +-
 .../core/astyanax/MultiRowColumnIterator.java   | 251 +++++++++
 .../persistence/core/rx/ObservableIterator.java |   2 +-
 .../persistence/core/rx/OrderedMerge.java       | 193 +++----
 .../core/astyanax/ColumnNameIteratorTest.java   | 205 ++++++++
 .../MultiKeyColumnNameIteratorTest.java         | 330 ++++++++++++
 .../astyanax/MultiRowColumnIteratorTest.java    | 387 ++++++++++++++
 .../persistence/core/astyanax/TestUtils.java    |  76 +++
 .../core/cassandra/CassandraRule.java           |   9 +
 .../persistence/core/rx/OrderedMergeTest.java   | 146 +++++-
 .../common/src/test/resources/log4j.properties  |  39 ++
 .../src/test/resources/usergrid.properties      |   1 +
 .../usergrid/persistence/graph/GraphFig.java    |  71 ++-
 .../persistence/graph/SearchByEdge.java         |   6 +
 .../persistence/graph/SearchByEdgeType.java     |  16 +
 .../persistence/graph/guice/GraphModule.java    |   4 +
 .../persistence/graph/guice/MergedProxy.java    |  34 --
 .../graph/impl/SimpleMarkedEdge.java            |   2 +-
 .../graph/impl/SimpleSearchByEdge.java          |  14 +-
 .../graph/impl/SimpleSearchByEdgeType.java      |  16 +-
 .../graph/impl/SimpleSearchByIdType.java        |   4 +-
 .../graph/impl/stage/EdgeDeleteRepairImpl.java  |   5 +-
 .../graph/impl/stage/EdgeMetaRepairImpl.java    |   7 +-
 .../impl/stage/NodeDeleteListenerImpl.java      |   7 +-
 .../impl/shard/DirectedEdgeMeta.java            | 189 ++++++-
 .../graph/serialization/impl/shard/RowKey.java  |  10 +
 .../serialization/impl/shard/RowKeyType.java    |   7 +-
 .../graph/serialization/impl/shard/Shard.java   |   3 +-
 .../impl/shard/ShardEntryGroup.java             |  62 ++-
 .../impl/shard/ShardGroupCompaction.java        |  31 +-
 .../NodeShardCounterSerializationImpl.java      |   9 +-
 .../impl/shard/impl/EdgeRowKeySerializer.java   |  63 ---
 .../impl/shard/impl/EdgeSearcher.java           |  79 ++-
 .../impl/shard/impl/EdgeSerializer.java         |  77 ---
 .../shard/impl/EdgeShardRowKeySerializer.java   | 103 ----
 .../shard/impl/EdgeShardSerializationImpl.java  |   1 +
 .../shard/impl/NodeShardAllocationImpl.java     |  85 ++-
 .../impl/shard/impl/NodeShardCacheImpl.java     |   4 +-
 .../impl/shard/impl/RowSerializer.java          |  63 ---
 .../impl/shard/impl/RowTypeSerializer.java      |  62 ---
 .../shard/impl/ShardEntryGroupIterator.java     |  39 +-
 .../shard/impl/ShardGroupCompactionImpl.java    | 519 +++++++++++++++----
 .../impl/ShardedEdgeSerializationImpl.java      | 187 +++----
 .../impl/shard/impl/ShardsColumnIterator.java   |  27 +-
 .../shard/impl/SizebasedEdgeColumnFamilies.java |   4 +
 .../DescendingTimestampComparator.java          |  43 ++
 .../DirectedEdgeDescendingComparator.java       |  69 +++
 .../impl/comparators/OrderedComparator.java     |  52 ++
 .../SourceDirectedEdgeDescendingComparator.java |  42 ++
 .../TargetDirectedEdgeDescendingComparator.java |  42 ++
 .../impl/serialize/EdgeRowKeySerializer.java    |  63 +++
 .../shard/impl/serialize/EdgeSerializer.java    |  77 +++
 .../serialize/EdgeShardRowKeySerializer.java    | 103 ++++
 .../shard/impl/serialize/RowSerializer.java     |  63 +++
 .../shard/impl/serialize/RowTypeSerializer.java |  62 +++
 .../persistence/graph/GraphManagerLoadTest.java |   4 +-
 .../graph/GraphManagerShardConsistencyIT.java   | 372 +++++++++----
 .../graph/GraphManagerStressTest.java           |   6 +-
 .../graph/impl/stage/EdgeDeleteRepairTest.java  |   5 +-
 .../impl/shard/NodeShardAllocationTest.java     | 249 ++++++++-
 .../impl/shard/ShardEntryGroupTest.java         |  28 +-
 .../impl/shard/ShardGroupCompactionTest.java    | 226 ++++++++
 .../shard/count/NodeShardApproximationTest.java |  12 +
 .../shard/impl/ShardEntryGroupIteratorTest.java | 180 ++++---
 ...rceDirectedEdgeDescendingComparatorTest.java | 136 +++++
 ...getDirectedEdgeDescendingComparatorTest.java | 136 +++++
 .../graph/test/util/EdgeTestUtils.java          |   6 +-
 .../graph/src/test/resources/log4j.properties   |   1 +
 .../src/test/resources/usergrid-UNIT.properties |   2 +
 .../persistence/model/entity/SimpleIdTest.java  |  73 +++
 stack/corepersistence/pom.xml                   |   2 +-
 74 files changed, 4510 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties b/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
index 9785b25..d038a4a 100644
--- a/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
+++ b/stack/corepersistence/collection/src/test/resources/usergrid-UNIT.properties
@@ -6,6 +6,7 @@ cassandra.hosts=localhost
 cassandra.cluster_name=Usergrid
 collections.keyspace=Usergrid_Collections
 cassandra.timeout=5000
+cassandra.embedded=true
 
 
 collections.keyspace.strategy.options=replication_factor:1

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
index af4e1f9..6256e9c 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIterator.java
@@ -35,9 +35,6 @@ import com.netflix.hystrix.HystrixCommandGroupKey;
 public class ColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
 
 
-    private static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey( "CassRead" );
-
-
     private final RowQuery<?, C> rowQuery;
     private final ColumnParser<C, T> parser;
     private final boolean skipFirst;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java
new file mode 100644
index 0000000..589cb72
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/ColumnSearch.java
@@ -0,0 +1,46 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.core.astyanax;
+
+
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ *
+ *
+ */
+public interface ColumnSearch<T> {
+
+    /**
+     * Set the start value supplied and the user supplied end value (if present)
+     * @param rangeBuilder
+     * @param value The value to set in the start
+     */
+    public  void   buildRange(final RangeBuilder rangeBuilder, final T value);
+
+    /**
+     * Set the range builder with the user supplied start and finish
+     * @param rangeBuilder
+     */
+    public void buildRange(final RangeBuilder rangeBuilder);
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
index c5a8c95..16ae97a 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIterator.java
@@ -24,22 +24,21 @@ package org.apache.usergrid.persistence.core.astyanax;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.NoSuchElementException;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Exchanger;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.rx.OrderedMerge;
 
 import com.amazonaws.services.redshift.model.UnsupportedOptionException;
-import com.google.common.base.Preconditions;
 
+import rx.Notification;
 import rx.Observable;
 import rx.Subscriber;
+import rx.functions.Action1;
 import rx.schedulers.Schedulers;
 
 
@@ -54,13 +53,26 @@ import rx.schedulers.Schedulers;
 public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T> {
 
 
-    private InnerIterator<T> iterator;
+    private static final Logger LOG = LoggerFactory.getLogger( MultiKeyColumnNameIterator.class );
+
+    private Iterator<T> iterator;
 
 
     public MultiKeyColumnNameIterator( final Collection<ColumnNameIterator<C, T>> columnNameIterators,
                                        final Comparator<T> comparator, final int bufferSize ) {
 
 
+        //optimization for single use case
+        if ( columnNameIterators.size() == 1 ) {
+            iterator = columnNameIterators.iterator().next();
+            return;
+        }
+
+
+        /**
+         * We have more than 1 iterator, subscribe to all of them on their own thread so they can
+         * produce in parallel.  This way our inner iterator will be filled and processed the fastest
+         */
         Observable<T>[] observables = new Observable[columnNameIterators.size()];
 
         int i = 0;
@@ -77,9 +89,11 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
         Observable<T> merged = OrderedMerge.orderedMerge( comparator, bufferSize, observables ).distinctUntilChanged();
 
 
-        iterator = new InnerIterator(bufferSize);
+        InnerIterator innerIterator = new InnerIterator( bufferSize );
+
+        merged.subscribe( innerIterator );
 
-        merged.subscribe( iterator );
+        iterator = innerIterator;
     }
 
 
@@ -114,9 +128,12 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
      */
     private final class InnerIterator<T> extends Subscriber<T> implements Iterator<T> {
 
-        private CountDownLatch startLatch = new CountDownLatch( 1 );
+        private final CountDownLatch startLatch = new CountDownLatch( 1 );
 
-        private final LinkedBlockingQueue<T> queue;
+        /**
+         * Use an ArrayBlockingQueue for faster access since our upper bounds is static
+         */
+        private final ArrayBlockingQueue<T> queue;
 
 
         private Throwable error;
@@ -126,7 +143,7 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
 
 
         private InnerIterator( int maxSize ) {
-            queue = new LinkedBlockingQueue<>( maxSize );
+            queue = new ArrayBlockingQueue<>( maxSize );
         }
 
 
@@ -148,7 +165,6 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
             }
 
 
-
             //this is almost a busy wait, and is intentional, if we have nothing to poll, we want to get it as soon
             //as it's available.  We generally only hit this once
             do {
@@ -204,11 +220,13 @@ public class MultiKeyColumnNameIterator<C, T> implements Iterable<T>, Iterator<T
         public void onNext( final T t ) {
 
             //may block if we get full, that's expected behavior
+
             try {
+                LOG.trace( "Received element {}" , t );
                 queue.put( t );
             }
             catch ( InterruptedException e ) {
-                throw new RuntimeException( "Unable to take from queue" );
+                throw new RuntimeException( "Unable to insert to queue" );
             }
 
             startLatch.countDown();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
new file mode 100644
index 0000000..155ce84
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -0,0 +1,251 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.core.astyanax;
+
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.core.hystrix.HystrixCassandra;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ColumnFamily;
+import com.netflix.astyanax.model.ColumnList;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.model.Rows;
+import com.netflix.astyanax.query.RowSliceQuery;
+import com.netflix.astyanax.util.RangeBuilder;
+
+
+/**
+ *
+ *
+ */
+public class MultiRowColumnIterator<R, C, T> implements Iterator<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger( MultiRowColumnIterator.class );
+
+    private final int pageSize;
+
+    private final ColumnFamily<R, C> cf;
+
+
+    private final ColumnParser<C, T> columnParser;
+
+    private final ColumnSearch<T> columnSearch;
+
+    private final Comparator<T> comparator;
+
+
+    private final Collection<R> rowKeys;
+
+    private final Keyspace keyspace;
+
+    private final ConsistencyLevel consistencyLevel;
+
+
+    private T startColumn;
+
+    private boolean moreToFollow;
+
+    private Iterator<T> currentColumnIterator;
+
+
+    /**
+     * Remove after finding bug
+     */
+
+
+    //    private int advanceCount;
+    //
+    //    private final HashMap<T, SeekPosition> seenResults;
+
+    /**
+     * Complete Remove
+     */
+
+
+    /**
+     * Create the iterator
+     */
+    public MultiRowColumnIterator( final Keyspace keyspace, final ColumnFamily<R, C> cf,
+                                   final ConsistencyLevel consistencyLevel, final ColumnParser<C, T> columnParser,
+                                   final ColumnSearch<T> columnSearch, final Comparator<T> comparator,
+                                   final Collection<R> rowKeys, final int pageSize ) {
+        this.cf = cf;
+        this.pageSize = pageSize;
+        this.columnParser = columnParser;
+        this.columnSearch = columnSearch;
+        this.comparator = comparator;
+        this.rowKeys = rowKeys;
+        this.keyspace = keyspace;
+        this.consistencyLevel = consistencyLevel;
+        this.moreToFollow = false;
+
+        //        seenResults = new HashMap<>( pageSize * 10 );
+    }
+
+
+    @Override
+    public boolean hasNext() {
+
+        if ( currentColumnIterator == null || ( !currentColumnIterator.hasNext() && moreToFollow ) ) {
+            advance();
+        }
+
+
+        return currentColumnIterator.hasNext();
+    }
+
+
+    @Override
+    public T next() {
+        if ( !hasNext() ) {
+            throw new NoSuchElementException( "No new element exists" );
+        }
+
+        final T next = currentColumnIterator.next();
+
+        //        LOG.trace( "Emitting {}", next );
+
+        return next;
+    }
+
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException( "Remove is unsupported this is a read only iterator" );
+    }
+
+
+    public void advance() {
+
+
+        /**
+         * If the edge is present, we need to being seeking from this
+         */
+
+
+        //TODO, finalize why this isn't working as expected
+        final int selectSize = startColumn == null ? pageSize : pageSize + 1;
+
+        final RangeBuilder rangeBuilder = new RangeBuilder();
+
+
+        //set the range into the search
+
+        if ( startColumn == null ) {
+            columnSearch.buildRange( rangeBuilder );
+        }
+        else {
+            columnSearch.buildRange( rangeBuilder, startColumn );
+        }
+
+
+        rangeBuilder.setLimit( selectSize );
+
+
+        /**
+         * Get our list of slices
+         */
+        final RowSliceQuery<R, C> query =
+                keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( rowKeys )
+                        .withColumnRange( rangeBuilder.build() );
+
+        final Rows<R, C> result = HystrixCassandra.user( query ).getResult();
+
+        final TreeSet<T> mergedResults = new TreeSet<>( comparator );
+
+
+        //now aggregate them together
+
+        for ( final R key : result.getKeys() ) {
+            final ColumnList<C> columns = result.getRow( key ).getColumns();
+            final int size = columns.size();
+
+            int readIndex = 0;
+
+            //skip the first since it's equal and has been set
+            if ( startColumn != null && size > 0 ) {
+                final T returnedValue = columnParser.parseColumn( columns.getColumnByIndex( 0 ) );
+
+                if ( comparator.compare( returnedValue, startColumn ) == 0 ) {
+                    readIndex++;
+                }
+            }
+
+
+//            T previous = null;
+
+            for (; readIndex < size; readIndex++ ) {
+                final Column<C> column = columns.getColumnByIndex( readIndex );
+                final T returnedValue = columnParser.parseColumn( column );
+
+                /**
+                 * DO NOT remove this section of code. If you're seeing inconsistent results during shard transition, you'll
+                 * need to enable this
+                 */
+//
+//                if ( previous != null && comparator.compare( previous, returnedValue ) == 0 ) {
+//                    throw new RuntimeException( String.format(
+//                            "Cassandra returned 2 unique columns, but your comparator marked them as equal.  This " +
+//                                    "indicates a bug in your comparator.  Previous value was %s and current value is " +
+//                                    "%s",
+//                            previous, returnedValue ) );
+//                }
+//
+//                previous = returnedValue;
+
+                mergedResults.add( returnedValue );
+
+                //prune the mergedResults
+                while ( mergedResults.size() > pageSize ) {
+                    mergedResults.pollLast();
+                }
+            }
+
+            LOG.trace( "Read {} columns from row key {}", readIndex, key );
+            LOG.trace( "Candidate result set size is {}", mergedResults.size() );
+        }
+
+
+        //we've parsed everything truncate to the first pageSize, it's all we can ensure is correct without another
+        //trip back to cassandra
+
+        startColumn = mergedResults.last();
+
+        moreToFollow = mergedResults.size() == pageSize;
+
+        currentColumnIterator = mergedResults.iterator();
+
+        LOG.trace( "Finished parsing {} rows for a total of {} results", rowKeys.size(), mergedResults.size() );
+    }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
index a6c3aa9..2bd1edb 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
@@ -64,7 +64,7 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
             while ( itr.hasNext() && !subscriber.isUnsubscribed() ) {
                 final T next = itr.next();
 
-                log.trace( "Iterator '{}' emitting item '{}'", name, next );
+//                log.trace( "Iterator '{}' emitting item '{}'", name, next );
 
                 subscriber.onNext( next );
             }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
index 4032176..cdad0d1 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/OrderedMerge.java
@@ -20,17 +20,21 @@
 package org.apache.usergrid.persistence.core.rx;
 
 
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.NavigableSet;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.TreeMultimap;
+
 import rx.Observable;
 import rx.Subscriber;
 import rx.Subscription;
@@ -40,9 +44,9 @@ import rx.subscriptions.CompositeSubscription;
 
 /**
  * Produces a single Observable from multiple ordered source observables.  The same as the "merge" step in a merge sort.
- * Ensure that your comparator matches the ordering of your inputs, or you may get strange results.
- * The current implementation requires each Observable to be running in it's own thread.  Once backpressure in RX is
- * implemented, this requirement can be removed.
+ * Ensure that your comparator matches the ordering of your inputs, or you may get strange results. The current
+ * implementation requires each Observable to be running in it's own thread.  Once backpressure in RX is implemented,
+ * this requirement can be removed.
  */
 public final class OrderedMerge<T> implements Observable.OnSubscribe<T> {
 
@@ -74,7 +78,7 @@ public final class OrderedMerge<T> implements Observable.OnSubscribe<T> {
 
 
         //when a subscription is received, we need to subscribe on each observable
-        SubscriberCoordinator coordinator = new SubscriberCoordinator( comparator, outerOperation );
+        SubscriberCoordinator coordinator = new SubscriberCoordinator( comparator, outerOperation, observables.length );
 
         InnerObserver<T>[] innerObservers = new InnerObserver[observables.length];
 
@@ -84,7 +88,7 @@ public final class OrderedMerge<T> implements Observable.OnSubscribe<T> {
         for ( int i = 0; i < observables.length; i++ ) {
             //subscribe to each one and add it to the composite
             //create a new inner and subscribe
-            final InnerObserver<T> inner = new InnerObserver<T>( coordinator, maxBufferSize );
+            final InnerObserver<T> inner = new InnerObserver<T>( coordinator, maxBufferSize, i );
 
             coordinator.add( inner );
 
@@ -117,16 +121,19 @@ public final class OrderedMerge<T> implements Observable.OnSubscribe<T> {
         private volatile boolean readyToProduce = false;
 
 
-        private final Comparator<T> comparator;
         private final Subscriber<? super T> subscriber;
+        private final TreeMultimap<T, InnerObserver<T>> nextValues;
         private final List<InnerObserver<T>> innerSubscribers;
+        private final ArrayDeque<InnerObserver<T>> toProduce;
 
 
-        private SubscriberCoordinator( final Comparator<T> comparator, final Subscriber<? super T> subscriber ) {
+        private SubscriberCoordinator( final Comparator<T> comparator, final Subscriber<? super T> subscriber,
+                                       final int innerSize ) {
             //we only want to emit events serially
             this.subscriber = new SerializedSubscriber( subscriber );
-            this.innerSubscribers = new ArrayList<InnerObserver<T>>();
-            this.comparator = comparator;
+            this.innerSubscribers = new ArrayList<>( innerSize );
+            this.nextValues = TreeMultimap.create( comparator, InnerObserverComparator.INSTANCE );
+            this.toProduce = new ArrayDeque<>( innerSize );
         }
 
 
@@ -146,7 +153,7 @@ public final class OrderedMerge<T> implements Observable.OnSubscribe<T> {
                 log.trace( "Completing Observable.  Draining elements from the subscribers", innerSubscribers.size() );
 
                 //Drain the queues
-                while ( !subscriber.isUnsubscribed() && !drained() ) {
+                while ( !subscriber.isUnsubscribed() && (!nextValues.isEmpty() || !toProduce.isEmpty()) ) {
                     next();
                 }
 
@@ -158,6 +165,7 @@ public final class OrderedMerge<T> implements Observable.OnSubscribe<T> {
 
         public void add( InnerObserver<T> inner ) {
             this.innerSubscribers.add( inner );
+            this.toProduce.add( inner );
         }
 
 
@@ -168,100 +176,108 @@ public final class OrderedMerge<T> implements Observable.OnSubscribe<T> {
 
         public void next() {
 
-            //nothing to do, we haven't started emitting values yet
-            if ( !readyToProduce() ) {
-                return;
-            }
-
             //we want to emit items in order, so we synchronize our next
             synchronized ( this ) {
+                /**
+                 * Init before our loop
+                 */
+                while ( !toProduce.isEmpty() ) {
+
+                    InnerObserver<T> inner = toProduce.pop();
+
+                    //This has nothing left to produce, skip it
+                    if ( inner.drained ) {
+                        continue;
+                    }
+
+                    final T nextKey = inner.peek();
+
+                    //we can't produce, not everything has an element to inspect, leave it in the set to produce next
+                    // time
+                    if ( nextKey == null ) {
+                        toProduce.push( inner );
+                        return;
+                    }
+
+                    //add it to our fast access set
+                    nextValues.put( nextKey, inner );
+                }
+
+
+                //take as many elements as we can until we hit a case where we can't take anymore
+                while ( !nextValues.isEmpty() ) {
 
-                //take as many elements as we can until we hit the completed case
-                while ( true ) {
-                    InnerObserver<T> maxObserver = null;
-                    T max = null;
 
                     /**
-                     * TODO T.N. change this to be an 0(1) for min and O(log n) to update after pop rather than O(n*inner)
+                     * Get our lowest key and begin producing until we can't produce any longer
                      */
-                    for ( InnerObserver<T> inner : innerSubscribers ) {
+                    final T lowestKey = nextValues.keySet().first();
 
-                        //nothing to do, this inner
 
-                        //we're done skip it
-                        if ( inner.drained ) {
-                            continue;
-                        }
+                    //we need to create a copy, otherwise we receive errors. We use ArrayDque
 
+                    NavigableSet<InnerObserver<T>> nextObservers = nextValues.get( lowestKey );
 
-                        final T current = inner.peek();
+                    while ( !nextObservers.isEmpty() ) {
 
-                        /**
-                         * Our current is null but we're not drained (I.E we haven't finished and completed consuming)
-                         * This means the producer is slow, and we don't have a complete set to compare,
-                         * we can't produce.  Bail and try again on the next event.
-                         */
-                        if ( current == null ) {
-                            return;
-                        }
+                        final InnerObserver<T> inner = nextObservers.pollFirst();
 
+                        nextValues.remove( lowestKey, inner );
 
-                        if ( max == null || ( current != null
-                                && comparator.compare( current, max ) > 0 ) ) {
-                            maxObserver = inner;
-                            max = current;
-                        }
-                    }
+                        final T value = inner.pop();
 
-                    //No max observer was ever assigned, meaning all our inners are drained, break from loop
-                    if ( maxObserver == null ) {
-                        return;
-                    }
+                        log.trace( "Emitting value {}", value );
 
-                    log.trace( "Max element is item {}", max );
+                        subscriber.onNext( value );
 
-                    subscriber.onNext( maxObserver.pop() );
-                }
-            }
-        }
+                        final T nextKey = inner.peek();
 
+                        //nothing to peek, it's either drained or slow
+                        if ( nextKey == null ) {
 
-        /**
-         * Return true if we're ready to produce
-         */
-        private boolean readyToProduce() {
-            if ( readyToProduce ) {
-                return true;
-            }
+                            //it's drained, nothing left to do
+                            if ( inner.drained ) {
+                                continue;
+                            }
 
+                            //it's slow, we can't process because we don't know if this is another min value without
+                            // inspecting it. Stop emitting and try again next pass through
+                            toProduce.push( inner );
+                            return;
+                        }
 
-            //perform an audit
-            for ( InnerObserver<T> inner : innerSubscribers ) {
-                if ( !inner.started ) {
-                    readyToProduce = false;
-                    return false;
+                        //we have a next value, insert it and keep running
+                        nextValues.put( nextKey, inner );
+                    }
                 }
             }
+        }
 
-            readyToProduce = true;
 
-            //we'll try again next time
-            return false;
-        }
+//        /**
+//         * Return true if every inner observer has been drained
+//         */
+//        private boolean drained() {
+//            //perform an audit
+//            for ( InnerObserver<T> inner : innerSubscribers ) {
+//                if ( !inner.drained ) {
+//                    return false;
+//                }
+//            }
+//
+//            return true;
+//        }
+    }
 
 
-        /**
-         * Return true if every inner observer has been drained
-         */
-        private boolean drained() {
-            //perform an audit
-            for ( InnerObserver<T> inner : innerSubscribers ) {
-                if ( !inner.drained ) {
-                    return false;
-                }
-            }
+    private static final class InnerObserverComparator implements Comparator<InnerObserver> {
 
-            return true;
+        private static final InnerObserverComparator INSTANCE = new InnerObserverComparator();
+
+
+        @Override
+        public int compare( final InnerObserver o1, final InnerObserver o2 ) {
+            return Integer.compare( o1.id, o2.id );
         }
     }
 
@@ -269,13 +285,18 @@ public final class OrderedMerge<T> implements Observable.OnSubscribe<T> {
     private static final class InnerObserver<T> extends Subscriber<T> {
 
         private final SubscriberCoordinator<T> coordinator;
-        private final Deque<T> items = new LinkedList<T>();
+        private final Deque<T> items = new LinkedList<>();
         private final int maxQueueSize;
         /**
          * TODO: T.N. Once backpressure makes it into RX Java, this needs to be remove and should use backpressure
          */
         private final Semaphore semaphore;
 
+        /**
+         * Our id so we have something unique to compare in the multimap
+         */
+        public final int id;
+
 
         /**
          * Flags for synchronization with coordinator. Multiple threads may be used, so volatile is required
@@ -285,9 +306,11 @@ public final class OrderedMerge<T> implements Observable.OnSubscribe<T> {
         private volatile boolean drained = false;
 
 
-        public InnerObserver( final SubscriberCoordinator<T> coordinator, final int maxQueueSize ) {
+        public InnerObserver( final SubscriberCoordinator<T> coordinator, final int maxQueueSize, final int id ) {
             this.coordinator = coordinator;
             this.maxQueueSize = maxQueueSize;
+            this.id = id;
+
             this.semaphore = new Semaphore( maxQueueSize );
         }
 
@@ -302,8 +325,7 @@ public final class OrderedMerge<T> implements Observable.OnSubscribe<T> {
              * release this semaphore and invoke next.  Both these calls can be removed when backpressure is added.
              * We need the next to force removal of other inner consumers
              */
-             coordinator.onCompleted();
-
+            coordinator.onCompleted();
         }
 
 
@@ -316,21 +338,14 @@ public final class OrderedMerge<T> implements Observable.OnSubscribe<T> {
         @Override
         public void onNext( T a ) {
 
-            log.trace( "Received {}", a );
-
             try {
                 this.semaphore.acquire();
             }
             catch ( InterruptedException e ) {
-                onError(e);
-            }
-
-            if ( items.size() == maxQueueSize ) {
-                RuntimeException e =
-                        new RuntimeException( "The maximum queue size of " + maxQueueSize + " has been reached" );
                 onError( e );
             }
 
+
             items.add( a );
 
             started = true;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
new file mode 100644
index 0000000..aabab24
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/ColumnNameIteratorTest.java
@@ -0,0 +1,205 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.core.astyanax;
+
+
+import java.util.HashMap;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ColumnFamily;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.serializers.LongSerializer;
+import com.netflix.astyanax.serializers.StringSerializer;
+import com.netflix.astyanax.util.RangeBuilder;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class ColumnNameIteratorTest {
+
+
+    @ClassRule
+    public static CassandraRule rule = new CassandraRule();
+
+    protected static Keyspace keyspace;
+
+    protected ApplicationScope scope;
+
+    protected static ColumnFamily<String, Long> COLUMN_FAMILY =
+            new ColumnFamily<>( "LongTests", StringSerializer.get(), LongSerializer.get() );
+
+    protected static final boolean TRUE = true;
+
+
+    @BeforeClass
+    public static void setup() throws ConnectionException {
+
+
+        final CassandraConfig cassandraConfig = new CassandraConfig() {
+            @Override
+            public ConsistencyLevel getReadCL() {
+                return ConsistencyLevel.CL_QUORUM;
+            }
+
+
+            @Override
+            public ConsistencyLevel getWriteCL() {
+                return ConsistencyLevel.CL_QUORUM;
+            }
+        };
+
+
+        AstyanaxKeyspaceProvider astyanaxKeyspaceProvider =
+                new AstyanaxKeyspaceProvider( rule.getCassandraFig(), cassandraConfig );
+
+        keyspace = astyanaxKeyspaceProvider.get();
+
+        TestUtils.createKeyspace( keyspace );
+
+        TestUtils.createColumnFamiliy( keyspace, COLUMN_FAMILY, new HashMap<String, Object>() );
+    }
+
+
+    @Test
+    public void testSingleIterator() {
+
+        String rowKey1 = UUIDGenerator.newTimeUUID().toString();
+
+
+        final long maxValue = 10000;
+
+
+        /**
+         * Write to both rows in parallel
+         */
+
+
+        final MutationBatch batch = keyspace.prepareMutationBatch();
+
+        for ( long i = 0; i < maxValue; i++ ) {
+            batch.withRow( COLUMN_FAMILY, rowKey1 ).putColumn( i, TRUE );
+
+            if ( i % 1000 == 0 ) {
+                try {
+                    batch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( e );
+                }
+            }
+        }
+
+        try {
+            batch.execute();
+        }
+        catch ( ConnectionException e ) {
+            throw new RuntimeException( e );
+        }
+
+
+        //now read from them, we should get an iterator that repeats from 0 to 9999 2 x for every entry
+
+        final ColumnParser<Long, Long> longParser = new ColumnParser<Long, Long>() {
+            @Override
+            public Long parseColumn( final Column<Long> column ) {
+                return column.getName();
+            }
+        };
+
+
+        //ensure we have to make several trips, purposefully set to a nonsensical value to ensure we make all the
+        // trips required
+        final RangeBuilder forwardRange = new RangeBuilder().setLimit( 720 );
+
+
+        final RowQuery<String, Long> forwardQuery =
+                keyspace.prepareQuery( COLUMN_FAMILY ).getKey( rowKey1 ).withColumnRange( forwardRange.build() );
+
+
+        ColumnNameIterator<Long, Long> itr = new ColumnNameIterator<>( forwardQuery, longParser, false );
+
+        for ( long i = 0; i < maxValue; i++ ) {
+            assertEquals( i, itr.next().longValue() );
+        }
+
+        //now test it in reverse
+
+
+        final RangeBuilder reverseRange = new RangeBuilder().setLimit( 720 ).setReversed( true );
+
+
+        final RowQuery<String, Long> reverseQuery =
+                keyspace.prepareQuery( COLUMN_FAMILY ).getKey( rowKey1 ).withColumnRange( reverseRange.build() );
+
+
+        ColumnNameIterator<Long, Long> reverseItr = new ColumnNameIterator<>( reverseQuery, longParser, false );
+
+        for ( long i = maxValue - 1; i > -1; i-- ) {
+            assertEquals( i, reverseItr.next().longValue() );
+        }
+    }
+
+
+    //    /**
+    //             * Write to both rows in parallel
+    //             */
+    //            Observable.from( new String[]{rowKey1, rowKey2} ).parallel( new Func1<Observable<String>,
+    // Observable<String>>() {
+    //                @Override
+    //                public Observable<String> call( final Observable<String> stringObservable ) {
+    //                   return stringObservable.doOnNext( new Action1<String>() {
+    //                       @Override
+    //                       public void call( final String key ) {
+    //
+    //                           final MutationBatch batch = keyspace.prepareMutationBatch();
+    //
+    //                           for(long i = 0; i < maxValue; i ++){
+    //                               batch.withRow( COLUMN_FAMILY, key).putColumn( i, TRUE );
+    //
+    //                               if(i % 1000 == 0){
+    //                                   try {
+    //                                       batch.execute();
+    //                                   }
+    //                                   catch ( ConnectionException e ) {
+    //                                       throw new RuntimeException(e);
+    //                                   }
+    //                               }
+    //
+    //                           }
+    //
+    //                       }
+    //                   } );
+    //                }
+    //            } ).toBlocking().last();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
new file mode 100644
index 0000000..6762588
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiKeyColumnNameIteratorTest.java
@@ -0,0 +1,330 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.core.astyanax;
+
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ColumnFamily;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.query.RowQuery;
+import com.netflix.astyanax.serializers.LongSerializer;
+import com.netflix.astyanax.serializers.StringSerializer;
+import com.netflix.astyanax.util.RangeBuilder;
+
+import rx.Observable;
+import rx.functions.Action1;
+import rx.functions.Func1;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class MultiKeyColumnNameIteratorTest {
+
+    @ClassRule
+    public static CassandraRule rule = new CassandraRule();
+
+    protected static Keyspace keyspace;
+
+    protected ApplicationScope scope;
+
+    protected static ColumnFamily<String, Long> COLUMN_FAMILY =
+            new ColumnFamily<>( "MultiKeyLongTests", StringSerializer.get(), LongSerializer.get() );
+
+    protected static final boolean TRUE = true;
+
+
+    @BeforeClass
+    public static void setup() throws ConnectionException {
+
+
+        final CassandraConfig cassandraConfig = new CassandraConfig() {
+            @Override
+            public ConsistencyLevel getReadCL() {
+                return ConsistencyLevel.CL_QUORUM;
+            }
+
+
+            @Override
+            public ConsistencyLevel getWriteCL() {
+                return ConsistencyLevel.CL_QUORUM;
+            }
+        };
+
+
+        AstyanaxKeyspaceProvider astyanaxKeyspaceProvider =
+                new AstyanaxKeyspaceProvider( rule.getCassandraFig(), cassandraConfig );
+
+        keyspace = astyanaxKeyspaceProvider.get();
+
+        TestUtils.createKeyspace( keyspace );
+
+        TestUtils.createColumnFamiliy( keyspace, COLUMN_FAMILY, new HashMap<String, Object>() );
+    }
+
+
+    @Test
+    public void multiIterator() {
+
+        final String rowKey1 = UUIDGenerator.newTimeUUID().toString();
+
+        final String rowKey2 = UUIDGenerator.newTimeUUID().toString();
+
+        final String rowKey3 = UUIDGenerator.newTimeUUID().toString();
+
+
+        final long maxValue = 10000;
+
+        /**
+         * Write to both rows in parallel
+         */
+        Observable.from( new String[] { rowKey1, rowKey2, rowKey3 } )
+                  .parallel( new Func1<Observable<String>, Observable<String>>() {
+                      @Override
+                      public Observable<String> call( final Observable<String> stringObservable ) {
+                          return stringObservable.doOnNext( new Action1<String>() {
+                              @Override
+                              public void call( final String key ) {
+
+                                  final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                  for ( long i = 0; i < maxValue; i++ ) {
+                                      batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
+
+                                      if ( i % 1000 == 0 ) {
+                                          try {
+                                              batch.execute();
+                                          }
+                                          catch ( ConnectionException e ) {
+                                              throw new RuntimeException( e );
+                                          }
+                                      }
+                                  }
+
+                                  try {
+                                      batch.execute();
+                                  }
+                                  catch ( ConnectionException e ) {
+                                      throw new RuntimeException( e );
+                                  }
+                              }
+                          } );
+                      }
+                  } ).toBlocking().last();
+
+
+        //create 3 iterators
+
+        ColumnNameIterator<Long, Long> row1Iterator = createIterator( rowKey1, false );
+        ColumnNameIterator<Long, Long> row2Iterator = createIterator( rowKey2, false );
+        ColumnNameIterator<Long, Long> row3Iterator = createIterator( rowKey3, false );
+
+        final Comparator<Long> ascendingComparator = new Comparator<Long>() {
+
+            @Override
+            public int compare( final Long o1, final Long o2 ) {
+                return Long.compare( o1, o2 );
+            }
+        };
+
+        /**
+         * Again, arbitrary buffer size to attempt we buffer at some point
+         */
+        final MultiKeyColumnNameIterator<Long, Long> ascendingItr =
+                new MultiKeyColumnNameIterator<>( Arrays.asList( row1Iterator, row2Iterator, row3Iterator ),
+                        ascendingComparator, 900 );
+
+
+        //ensure we have to make several trips, purposefully set to a nonsensical value to ensure we make all the
+        // trips required
+
+
+        for ( long i = 0; i < maxValue; i++ ) {
+            assertEquals( i, ascendingItr.next().longValue() );
+        }
+
+        //now test it in reverse
+
+        ColumnNameIterator<Long, Long> row1IteratorDesc = createIterator( rowKey1, true );
+        ColumnNameIterator<Long, Long> row2IteratorDesc = createIterator( rowKey2, true );
+        ColumnNameIterator<Long, Long> row3IteratorDesc = createIterator( rowKey3, true );
+
+        final Comparator<Long> descendingComparator = new Comparator<Long>() {
+
+            @Override
+            public int compare( final Long o1, final Long o2 ) {
+                return ascendingComparator.compare( o1, o2 ) * -1;
+            }
+        };
+
+        /**
+         * Again, arbitrary buffer size to attempt we buffer at some point
+         */
+        final MultiKeyColumnNameIterator<Long, Long> descendingItr =
+                new MultiKeyColumnNameIterator<>( Arrays.asList( row1IteratorDesc, row2IteratorDesc, row3IteratorDesc ),
+                        descendingComparator, 900 );
+
+
+        for ( long i = maxValue - 1; i > -1; i-- ) {
+            assertEquals( i, descendingItr.next().longValue() );
+        }
+    }
+
+
+    @Test
+       public void singleIterator() {
+
+           final String rowKey1 = UUIDGenerator.newTimeUUID().toString();
+
+
+
+           final long maxValue = 10000;
+
+           /**
+            * Write to both rows in parallel
+            */
+           Observable.just( rowKey1  )
+                     .parallel( new Func1<Observable<String>, Observable<String>>() {
+                         @Override
+                         public Observable<String> call( final Observable<String> stringObservable ) {
+                             return stringObservable.doOnNext( new Action1<String>() {
+                                 @Override
+                                 public void call( final String key ) {
+
+                                     final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                                     for ( long i = 0; i < maxValue; i++ ) {
+                                         batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
+
+                                         if ( i % 1000 == 0 ) {
+                                             try {
+                                                 batch.execute();
+                                             }
+                                             catch ( ConnectionException e ) {
+                                                 throw new RuntimeException( e );
+                                             }
+                                         }
+                                     }
+
+                                     try {
+                                         batch.execute();
+                                     }
+                                     catch ( ConnectionException e ) {
+                                         throw new RuntimeException( e );
+                                     }
+                                 }
+                             } );
+                         }
+                     } ).toBlocking().last();
+
+
+           //create 3 iterators
+
+           ColumnNameIterator<Long, Long> row1Iterator = createIterator( rowKey1, false );
+
+           final Comparator<Long> ascendingComparator = new Comparator<Long>() {
+
+               @Override
+               public int compare( final Long o1, final Long o2 ) {
+                   return Long.compare( o1, o2 );
+               }
+           };
+
+           /**
+            * Again, arbitrary buffer size to attempt we buffer at some point
+            */
+           final MultiKeyColumnNameIterator<Long, Long> ascendingItr =
+                   new MultiKeyColumnNameIterator<>( Arrays.asList( row1Iterator ),
+                           ascendingComparator, 900 );
+
+
+           //ensure we have to make several trips, purposefully set to a nonsensical value to ensure we make all the
+           // trips required
+
+
+           for ( long i = 0; i < maxValue; i++ ) {
+               //we have 3 iterators, so we should get each value 3 times in the aggregation
+               assertEquals( i, ascendingItr.next().longValue() );
+           }
+
+           //now test it in reverse
+
+           ColumnNameIterator<Long, Long> row1IteratorDesc = createIterator( rowKey1, true );
+
+           final Comparator<Long> descendingComparator = new Comparator<Long>() {
+
+               @Override
+               public int compare( final Long o1, final Long o2 ) {
+                   return ascendingComparator.compare( o1, o2 ) * -1;
+               }
+           };
+
+           /**
+            * Again, arbitrary buffer size to attempt we buffer at some point
+            */
+           final MultiKeyColumnNameIterator<Long, Long> descendingItr =
+                   new MultiKeyColumnNameIterator<>( Arrays.asList( row1IteratorDesc),
+                           descendingComparator, 900 );
+
+
+           for ( long i = maxValue - 1; i > -1; i-- ) {
+               assertEquals( i, descendingItr.next().longValue() );
+           }
+       }
+
+
+    private static ColumnNameIterator<Long, Long> createIterator( final String rowKey, final boolean reversed ) {
+
+
+        final ColumnParser<Long, Long> longParser = new ColumnParser<Long, Long>() {
+            @Override
+            public Long parseColumn( final Column<Long> column ) {
+                return column.getName();
+            }
+        };
+
+        final RangeBuilder forwardRange = new RangeBuilder().setLimit( 720 ).setReversed( reversed );
+
+
+        final RowQuery<String, Long> forwardQuery =
+                keyspace.prepareQuery( COLUMN_FAMILY ).getKey( rowKey ).withColumnRange( forwardRange.build() );
+
+
+        ColumnNameIterator<Long, Long> itr = new ColumnNameIterator<>( forwardQuery, longParser, false );
+
+        return itr;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
new file mode 100644
index 0000000..8a8d0b0
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIteratorTest.java
@@ -0,0 +1,387 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.core.astyanax;
+
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.apache.usergrid.persistence.core.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.MutationBatch;
+import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import com.netflix.astyanax.model.Column;
+import com.netflix.astyanax.model.ColumnFamily;
+import com.netflix.astyanax.model.ConsistencyLevel;
+import com.netflix.astyanax.serializers.LongSerializer;
+import com.netflix.astyanax.serializers.StringSerializer;
+import com.netflix.astyanax.util.RangeBuilder;
+
+import rx.Observable;
+import rx.Observer;
+import rx.Subscription;
+import rx.functions.Action1;
+import rx.functions.Func1;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class MultiRowColumnIteratorTest {
+
+    @ClassRule
+    public static CassandraRule rule = new CassandraRule();
+
+    protected static Keyspace keyspace;
+
+    protected ApplicationScope scope;
+
+    protected static ColumnFamily<String, Long> COLUMN_FAMILY =
+            new ColumnFamily<>( "MultiRowLongTests", StringSerializer.get(), LongSerializer.get() );
+
+    protected static final boolean TRUE = true;
+
+
+    @BeforeClass
+    public static void setup() throws ConnectionException {
+
+
+        final CassandraConfig cassandraConfig = new CassandraConfig() {
+            @Override
+            public ConsistencyLevel getReadCL() {
+                return ConsistencyLevel.CL_QUORUM;
+            }
+
+
+            @Override
+            public ConsistencyLevel getWriteCL() {
+                return ConsistencyLevel.CL_QUORUM;
+            }
+        };
+
+
+        AstyanaxKeyspaceProvider astyanaxKeyspaceProvider =
+                new AstyanaxKeyspaceProvider( rule.getCassandraFig(), cassandraConfig );
+
+        keyspace = astyanaxKeyspaceProvider.get();
+
+        TestUtils.createKeyspace( keyspace );
+
+        TestUtils.createColumnFamiliy( keyspace, COLUMN_FAMILY, new HashMap<String, Object>() );
+    }
+
+
+    @Test
+    public void multiIterator() throws InterruptedException {
+
+        final String rowKey1 = UUIDGenerator.newTimeUUID().toString();
+
+        final String rowKey2 = UUIDGenerator.newTimeUUID().toString();
+
+        final String rowKey3 = UUIDGenerator.newTimeUUID().toString();
+
+
+        final long maxValue = 10000;
+
+        final CountDownLatch latch = new CountDownLatch( 3 );
+
+
+        writeData( latch, rowKey1, maxValue, 1 );
+        writeData( latch, rowKey2, maxValue, 2 );
+        writeData( latch, rowKey3, maxValue, 10 );
+
+
+        latch.await();
+
+
+        //create 3 iterators
+
+
+        final ColumnParser<Long, Long> longParser = new ColumnParser<Long, Long>() {
+            @Override
+            public Long parseColumn( final Column<Long> column ) {
+                return column.getName();
+            }
+        };
+
+
+        final ColumnSearch<Long> ascendingSearch = new ColumnSearch<Long>() {
+            @Override
+            public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+                rangeBuilder.setStart( value );
+            }
+
+
+            @Override
+            public void buildRange( final RangeBuilder rangeBuilder ) {
+
+            }
+        };
+
+
+        final Comparator<Long> ascendingComparator = new Comparator<Long>() {
+
+            @Override
+            public int compare( final Long o1, final Long o2 ) {
+                return Long.compare( o1, o2 );
+            }
+        };
+
+
+        final Collection<String> rowKeys = Arrays.asList( rowKey1, rowKey2, rowKey3 );
+
+        MultiRowColumnIterator<String, Long, Long> ascendingItr =
+                new MultiRowColumnIterator<>( keyspace, COLUMN_FAMILY, ConsistencyLevel.CL_QUORUM, longParser,
+                        ascendingSearch, ascendingComparator, rowKeys, 852 );
+
+
+        //ensure we have to make several trips, purposefully set to a nonsensical value to ensure we make all the
+        // trips required
+
+
+        for ( long i = 0; i < maxValue; i++ ) {
+            assertEquals( i, ascendingItr.next().longValue() );
+        }
+
+
+        final ColumnSearch<Long> descendingSearch = new ColumnSearch<Long>() {
+            @Override
+            public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+                rangeBuilder.setStart( value );
+                buildRange( rangeBuilder );
+            }
+
+
+            @Override
+            public void buildRange( final RangeBuilder rangeBuilder ) {
+                rangeBuilder.setReversed( true );
+            }
+        };
+
+
+        final Comparator<Long> descendingComparator = new Comparator<Long>() {
+
+            @Override
+            public int compare( final Long o1, final Long o2 ) {
+                return ascendingComparator.compare( o1, o2 ) * -1;
+            }
+        };
+
+
+        MultiRowColumnIterator<String, Long, Long> descendingItr =
+                new MultiRowColumnIterator<>( keyspace, COLUMN_FAMILY, ConsistencyLevel.CL_QUORUM, longParser,
+                        descendingSearch, descendingComparator, rowKeys, 712 );
+
+        for ( long i = maxValue - 1; i > -1; i-- ) {
+            assertEquals( i, descendingItr.next().longValue() );
+        }
+    }
+
+
+    @Test
+    public void singleIterator() {
+
+        final String rowKey1 = UUIDGenerator.newTimeUUID().toString();
+
+
+        final long maxValue = 10000;
+
+        /**
+         * Write to both rows in parallel
+         */
+        Observable.just( rowKey1 ).parallel( new Func1<Observable<String>, Observable<String>>() {
+            @Override
+            public Observable<String> call( final Observable<String> stringObservable ) {
+                return stringObservable.doOnNext( new Action1<String>() {
+                    @Override
+                    public void call( final String key ) {
+
+                        final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                        for ( long i = 0; i < maxValue; i++ ) {
+                            batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
+
+                            if ( i % 1000 == 0 ) {
+                                try {
+                                    batch.execute();
+                                }
+                                catch ( ConnectionException e ) {
+                                    throw new RuntimeException( e );
+                                }
+                            }
+                        }
+
+                        try {
+                            batch.execute();
+                        }
+                        catch ( ConnectionException e ) {
+                            throw new RuntimeException( e );
+                        }
+                    }
+                } );
+            }
+        } ).toBlocking().last();
+
+
+        //create 3 iterators
+
+        final ColumnParser<Long, Long> longParser = new ColumnParser<Long, Long>() {
+            @Override
+            public Long parseColumn( final Column<Long> column ) {
+                return column.getName();
+            }
+        };
+
+
+        final ColumnSearch<Long> ascendingSearch = new ColumnSearch<Long>() {
+            @Override
+            public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+                rangeBuilder.setStart( value );
+            }
+
+
+            @Override
+            public void buildRange( final RangeBuilder rangeBuilder ) {
+
+            }
+        };
+
+
+        final Comparator<Long> ascendingComparator = new Comparator<Long>() {
+
+            @Override
+            public int compare( final Long o1, final Long o2 ) {
+                return Long.compare( o1, o2 );
+            }
+        };
+
+
+        final Collection<String> rowKeys = Arrays.asList( rowKey1 );
+
+        MultiRowColumnIterator<String, Long, Long> ascendingItr =
+                new MultiRowColumnIterator<>( keyspace, COLUMN_FAMILY, ConsistencyLevel.CL_QUORUM, longParser,
+                        ascendingSearch, ascendingComparator, rowKeys, 712 );
+
+
+        //ensure we have to make several trips, purposefully set to a nonsensical value to ensure we make all the
+        // trips required
+
+
+        for ( long i = 0; i < maxValue; i++ ) {
+            assertEquals( i, ascendingItr.next().longValue() );
+        }
+
+
+        final ColumnSearch<Long> descendingSearch = new ColumnSearch<Long>() {
+            @Override
+            public void buildRange( final RangeBuilder rangeBuilder, final Long value ) {
+                rangeBuilder.setStart( value );
+                buildRange( rangeBuilder );
+            }
+
+
+            @Override
+            public void buildRange( final RangeBuilder rangeBuilder ) {
+                rangeBuilder.setReversed( true );
+            }
+        };
+
+
+        final Comparator<Long> descendingComparator = new Comparator<Long>() {
+
+            @Override
+            public int compare( final Long o1, final Long o2 ) {
+                return ascendingComparator.compare( o1, o2 ) * -1;
+            }
+        };
+
+
+        MultiRowColumnIterator<String, Long, Long> descendingItr =
+                new MultiRowColumnIterator<>( keyspace, COLUMN_FAMILY, ConsistencyLevel.CL_QUORUM, longParser,
+                        descendingSearch, descendingComparator, rowKeys, 712 );
+
+        for ( long i = maxValue - 1; i > -1; i-- ) {
+            assertEquals( i, descendingItr.next().longValue() );
+        }
+    }
+
+
+    private void writeData(final CountDownLatch latch, final String rowKey, final long maxValue, final long mod){
+
+        Observable.just( rowKey ).doOnNext( new Action1<String>() {
+            @Override
+            public void call( final String key ) {
+
+                final MutationBatch batch = keyspace.prepareMutationBatch();
+
+                for ( long i = 0; i < maxValue; i++ ) {
+
+                    if ( i % mod == 0 ) {
+                        batch.withRow( COLUMN_FAMILY, key ).putColumn( i, TRUE );
+                    }
+
+                    if ( i % 1000 == 0 ) {
+                                               try {
+                                                   batch.execute();
+                                               }
+                                               catch ( ConnectionException e ) {
+                                                   throw new RuntimeException( e );
+                                               }
+                                           }
+                }
+
+                try {
+                    batch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( e );
+                }
+            }
+        } ).subscribe( new Observer<String>() {
+            @Override
+            public void onCompleted() {
+                latch.countDown();
+            }
+
+
+            @Override
+            public void onError( final Throwable e ) {
+
+            }
+
+
+            @Override
+            public void onNext( final String s ) {
+
+            }
+        } );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java
new file mode 100644
index 0000000..1ede643
--- /dev/null
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/astyanax/TestUtils.java
@@ -0,0 +1,76 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.usergrid.persistence.core.astyanax;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import com.google.common.collect.ImmutableMap;
+import com.netflix.astyanax.Keyspace;
+import com.netflix.astyanax.model.ColumnFamily;
+
+
+/**
+ * Utilities for Cassandra tests
+ */
+public class TestUtils {
+
+    private static final Logger log = LoggerFactory.getLogger( TestUtils.class );
+
+    /**
+     * Create the kespace, ignore exceptions if it already exists
+     * @param keyspace
+     */
+    public static void createKeyspace(final Keyspace keyspace){
+
+        ImmutableMap.Builder<Object, Object> strategyOptions = ImmutableMap.builder().put( "replication_factor", "1" );
+
+        ImmutableMap<String, Object> options = ImmutableMap.<String, Object>builder().put( "strategy_class",
+                "org.apache.cassandra.locator.SimpleStrategy" ).put( "strategy_options", strategyOptions.build() )
+                                                           .build();
+
+
+        try {
+            keyspace.createKeyspace( options );
+        }
+        catch ( Throwable t ) {
+          log.error( "Error on creating keyspace, ignoring", t );
+        }
+
+
+
+    }
+
+
+    public static <K, C> void createColumnFamiliy(final Keyspace keyspace, final ColumnFamily<K, C> columnFamily, final Map<String, Object> options){
+        try{
+            keyspace.createColumnFamily( columnFamily, new HashMap<String, Object>() );
+        }catch(Exception e){
+           log.error( "Error on creating column family, ignoring" , e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java
index ee89e0f..43f5a0c 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/cassandra/CassandraRule.java
@@ -51,6 +51,15 @@ public class CassandraRule extends EnvironResource {
         cassandraFig = injector.getInstance( CassandraFig.class );
     }
 
+
+    /**
+     * Get the cassandra fig
+     * @return
+     */
+    public CassandraFig getCassandraFig(){
+        return cassandraFig;
+    }
+
     @Override
     protected void before() throws Throwable {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
index 72e49f7..07e2e58 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
@@ -107,7 +107,7 @@ public class OrderedMergeTest {
 
 
         Observable<Integer> ordered =
-                OrderedMerge.orderedMerge( new IntegerComparator(), 10, expected1, expected2, expected3 );
+                OrderedMerge.orderedMerge( new ReverseIntegerComparator(), 10, expected1, expected2, expected3 );
 
         final CountDownLatch latch = new CountDownLatch( 1 );
         final List<Integer> results = new ArrayList();
@@ -165,7 +165,7 @@ public class OrderedMergeTest {
         //set our buffer size to 2.  We should easily exceed this since every observable has more than 2 elements
 
         Observable<Integer> ordered =
-                OrderedMerge.orderedMerge( new IntegerComparator(), 2, expected1, expected2, expected3 );
+                OrderedMerge.orderedMerge( new ReverseIntegerComparator(), 2, expected1, expected2, expected3 );
 
         final CountDownLatch latch = new CountDownLatch( 1 );
         final List<Integer> results = new ArrayList();
@@ -228,7 +228,7 @@ public class OrderedMergeTest {
 
 
         Observable<Integer> ordered =
-                OrderedMerge.orderedMerge( new IntegerComparator(), 10, expected1, expected2, expected3 );
+                OrderedMerge.orderedMerge( new ReverseIntegerComparator(), 10, expected1, expected2, expected3 );
 
         final CountDownLatch latch = new CountDownLatch( 1 );
         final List<Integer> results = new ArrayList();
@@ -359,7 +359,7 @@ public class OrderedMergeTest {
          * proceed
          */
         Observable<Integer> ordered =
-                OrderedMerge.orderedMerge( new IntegerComparator(), 2, expected1, expected2, expected3 );
+                OrderedMerge.orderedMerge( new ReverseIntegerComparator(), 2, expected1, expected2, expected3 );
 
 
         final CountDownLatch latch = new CountDownLatch( 1 );
@@ -399,6 +399,144 @@ public class OrderedMergeTest {
     }
 
 
+    /**
+       * Tests that with a buffer size much smaller than our inputs, we successfully block observables from
+       * producing values when our pressure gets too high.  Eventually, one of these events should begin production, eventually
+       * draining all values
+       *
+       * @throws InterruptedException
+       */
+      @Test
+      public void testDuplicateOrderingCorrect() throws InterruptedException {
+
+          List<Integer> expected1List = Arrays.asList( 10, 5, 4,  3, 2, 1 );
+
+          Observable<Integer> expected1 = Observable.from( expected1List ).subscribeOn( Schedulers.io() );
+
+          List<Integer> expected2List = Arrays.asList( 9, 8, 7, 6, 5 );
+
+          Observable<Integer> expected2 = Observable.from( expected2List ).subscribeOn( Schedulers.io() );
+
+
+          List<Integer> expected3List = Arrays.asList( 9, 6, 5, 3, 2, 1, 0 );
+
+          Observable<Integer> expected3 = Observable.from( expected3List ).subscribeOn( Schedulers.io() );
+
+
+          /**
+           * Fails because our first observable will have to buffer the last 4 elements while waiting for the others to
+           * proceed
+           */
+          Observable<Integer> ordered =
+                  OrderedMerge.orderedMerge( new ReverseIntegerComparator(), 2, expected1, expected2, expected3 );
+
+
+          final CountDownLatch latch = new CountDownLatch( 1 );
+          final List<Integer> results = new ArrayList();
+
+          ordered.subscribe( new Subscriber<Integer>() {
+              @Override
+              public void onCompleted() {
+                  latch.countDown();
+              }
+
+
+              @Override
+              public void onError( final Throwable e ) {
+                  e.printStackTrace();
+                  fail( "An error was thrown " );
+              }
+
+
+              @Override
+              public void onNext( final Integer integer ) {
+                  log.info( "onNext invoked with {}", integer );
+                  results.add( integer );
+              }
+          } );
+
+          latch.await();
+
+          List<Integer> expected = Arrays.asList( 10, 9, 9,  8, 7, 6,  6, 5, 5, 5, 4, 3, 3, 2, 2, 1, 1, 0);
+
+          assertEquals( expected.size(), results.size() );
+
+
+          for ( int i = 0; i < expected.size(); i++ ) {
+              assertEquals( "Same element expected", expected.get( i ), results.get( i ) );
+          }
+      }
+
+
+    /**
+       * Tests that with a buffer size much smaller than our inputs, we successfully block observables from
+       * producing values when our pressure gets too high.  Eventually, one of these events should begin production, eventually
+       * draining all values
+       *
+       * @throws InterruptedException
+       */
+      @Test
+      public void testDuplicateOrderingCorrectComparator() throws InterruptedException {
+
+          List<Integer> expected1List = Arrays.asList( 1, 2, 3, 4, 5, 10 );
+
+          Observable<Integer> expected1 = Observable.from( expected1List ).subscribeOn( Schedulers.io() );
+
+          List<Integer> expected2List = Arrays.asList( 5, 6, 7, 8, 9 );
+
+          Observable<Integer> expected2 = Observable.from( expected2List ).subscribeOn( Schedulers.io() );
+
+
+          List<Integer> expected3List = Arrays.asList( 0, 1, 2, 3, 5, 6, 9 );
+
+          Observable<Integer> expected3 = Observable.from( expected3List ).subscribeOn( Schedulers.io() );
+
+
+          /**
+           * Fails because our first observable will have to buffer the last 4 elements while waiting for the others to
+           * proceed
+           */
+          Observable<Integer> ordered =
+                  OrderedMerge.orderedMerge( new IntegerComparator(), 2, expected1, expected2, expected3 );
+
+
+          final CountDownLatch latch = new CountDownLatch( 1 );
+          final List<Integer> results = new ArrayList();
+
+          ordered.subscribe( new Subscriber<Integer>() {
+              @Override
+              public void onCompleted() {
+                  latch.countDown();
+              }
+
+
+              @Override
+              public void onError( final Throwable e ) {
+                  e.printStackTrace();
+                  fail( "An error was thrown " );
+              }
+
+
+              @Override
+              public void onNext( final Integer integer ) {
+                  log.info( "onNext invoked with {}", integer );
+                  results.add( integer );
+              }
+          } );
+
+          latch.await();
+
+          List<Integer> expected = Arrays.asList(  0, 1, 1,2, 2, 3, 3,4,  5, 5, 5,  6,  6, 7,8,  9, 9,10 );
+
+          assertEquals( expected.size(), results.size() );
+
+
+          for ( int i = 0; i < expected.size(); i++ ) {
+              assertEquals( "Same element expected", expected.get( i ), results.get( i ) );
+          }
+      }
+
+
     private static class IntegerComparator implements Comparator<Integer> {
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/resources/log4j.properties b/stack/corepersistence/common/src/test/resources/log4j.properties
new file mode 100644
index 0000000..08d897c
--- /dev/null
+++ b/stack/corepersistence/common/src/test/resources/log4j.properties
@@ -0,0 +1,39 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# suppress inspection "UnusedProperty" for whole file
+log4j.rootLogger=INFO,stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{3}.%M(%L)<%t>- %m%n
+
+log4j.logger.org.safehaus.chop.plugin=DEBUG
+log4j.logger.org.safehaus.guicyfig=ERROR
+log4j.logger.org.safehaus.chop.api.store.amazon=DEBUG
+log4j.logger.org.apache.http=ERROR
+log4j.logger.com.amazonaws.request=ERROR
+log4j.logger.cassandra.db=ERROR
+
+#log4j.logger.org.apache.usergrid=DEBUG
+
+log4j.logger.org.apache.usergrid.persistence.graph=TRACE
+log4j.logger.org.apache.usergrid.persistence.core.rx=TRACE
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/e040fdf4/stack/corepersistence/common/src/test/resources/usergrid.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/resources/usergrid.properties b/stack/corepersistence/common/src/test/resources/usergrid.properties
new file mode 100644
index 0000000..febda88
--- /dev/null
+++ b/stack/corepersistence/common/src/test/resources/usergrid.properties
@@ -0,0 +1 @@
+# No properties in our test env


Mime
View raw message