usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [37/50] git commit: Fixes RX issues and updates concurrency
Date Wed, 12 Feb 2014 13:21:45 GMT
Fixes RX issues and updates concurrency


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/19b1cfb7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/19b1cfb7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/19b1cfb7

Branch: refs/heads/optimistic-tx-semantics
Commit: 19b1cfb7a55c07acc3f2badd76ed65ca3faff336
Parents: b7e9969
Author: Todd Nine <tnine@apigee.com>
Authored: Wed Feb 5 16:42:43 2014 -0700
Committer: Todd Nine <tnine@apigee.com>
Committed: Wed Feb 5 16:42:43 2014 -0700

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |   7 +-
 .../collection/hystrix/CassandraCommand.java    |   4 +-
 .../impl/EntityCollectionManagerImpl.java       |  18 +++-
 .../collection/rx/CassandraThreadScheduler.java | 107 -------------------
 .../persistence/collection/rx/Concurrent.java   |  37 ++++---
 .../persistence/collection/rx/RxFig.java        |  40 -------
 .../collection/rx/ConcurrentTest.java           |  39 ++++++-
 .../persistence/collection/rx/ParallelTest.java | 102 ++++++++++++++++++
 ...MvccEntitySerializationStrategyImplTest.java |  10 --
 .../src/test/resources/log4j.properties         |   2 +-
 10 files changed, 179 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/19b1cfb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 95bc760..150111e 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -30,8 +30,6 @@ import org.apache.usergrid.persistence.collection.cassandra.AvailablePortFinder;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerImpl;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerSyncImpl;
 import org.apache.usergrid.persistence.collection.migration.MigrationManagerFig;
-import org.apache.usergrid.persistence.collection.rx.CassandraThreadScheduler;
-import org.apache.usergrid.persistence.collection.rx.RxFig;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
@@ -59,8 +57,7 @@ public class CollectionModule extends AbstractModule {
     @Override
     protected void configure() {
         //noinspection unchecked
-        install( new GuicyFigModule( 
-                RxFig.class, 
+        install( new GuicyFigModule(
                 MigrationManagerFig.class,
                 CassandraFig.class, 
                 SerializationFig.class,
@@ -75,8 +72,6 @@ public class CollectionModule extends AbstractModule {
                 .implement( EntityCollectionManagerSync.class, EntityCollectionManagerSyncImpl.class
)
                 .build( EntityCollectionManagerFactory.class ) );
 
-        bind( Scheduler.class ).toProvider( CassandraThreadScheduler.class );
-
         bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class
);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/19b1cfb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
index a7a0860..3a3563b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
@@ -33,12 +33,12 @@ import rx.Observable;
  */
 public class CassandraCommand<R> extends HystrixCommand<R> {
 
-    private static final HystrixCommandGroupKey GROUP_KEY =HystrixCommandGroupKey.Factory.asKey(
"CassandraCommand" );
+    public static final HystrixCommandGroupKey GROUP_KEY =HystrixCommandGroupKey.Factory.asKey(
"CassandraCommand" );
 
 
     private final R value;
 
-    protected CassandraCommand(final R value) {
+    public CassandraCommand(final R value) {
         super( GROUP_KEY );
         this.value = value;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/19b1cfb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 6925538..5301a06 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -50,6 +50,7 @@ import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
 import rx.Observable;
+import rx.util.functions.FuncN;
 
 
 /**
@@ -144,7 +145,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
 
 
         //execute all validation stages concurrently.  Needs refactored when this is done.
 https://github.com/Netflix/RxJava/issues/627
-        observable = Concurrent.concurrent(observable, writeVerifyUnique, writeOptimisticVerify);
+        observable = Concurrent.concurrent(observable, new WaitZip( observable ), writeVerifyUnique,
writeOptimisticVerify);
 
         //return the commit result.
         return observable.map( writeCommit );
@@ -173,4 +174,19 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
 
         return CassandraCommand.toObservable( new CollectionIoEvent<Id>( collectionScope,
entityId ) ).map( load );
     }
+
+    private static class WaitZip<R> implements FuncN<R>{
+
+        private final R value;
+
+
+        private WaitZip( final R value ) {this.value = value;}
+
+
+        @Override
+        public R call( final Object... args ) {
+            //no op, just here to require a join before proceeding
+            return value;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/19b1cfb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
deleted file mode 100644
index 512f0ec..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import java.beans.PropertyChangeEvent;
-import java.beans.PropertyChangeListener;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.name.Named;
-
-import rx.Scheduler;
-import rx.concurrency.Schedulers;
-
-
-public class CassandraThreadScheduler implements Provider<Scheduler> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(CassandraThreadScheduler.class);
-
-    private final RxFig rxFig;
-
-
-    @Inject
-    public CassandraThreadScheduler( final RxFig rxFig ) {
-        this.rxFig = rxFig;
-    }
-
-
-    @Override
-    @Named( "cassandraScheduler" )
-    public Scheduler get() {
-
-        //create our thread factory so we can label our threads in case we need to dump them
-        final ThreadFactory factory = new ThreadFactory() {
-
-            private final AtomicLong counter = new AtomicLong();
-
-            @Override
-            public Thread newThread( final Runnable r ) {
-
-               final String threadName = "RxCassandraIOThreadPool-" + counter.incrementAndGet();
-
-                LOG.info( "Allocating new IO thread with name {}", threadName );
-
-                Thread t = new Thread( r, threadName );
-                t.setDaemon( true );
-                return t;
-            }
-        };
-
-
-        /**
-         * Create a threadpool that will reclaim unused threads after 60 seconds.  
-         * It uses the max thread count set here. It intentionally uses the 
-         * DynamicProperty, so that when it is updated, the listener updates the 
-         * pool size. Additional allocation is trivial.  Shrinking the size 
-         * will require all currently executing threads to run to completion, 
-         * without allowing additional tasks to be queued.
-         */
-        final ThreadPoolExecutor pool = new ThreadPoolExecutor( 
-                0, rxFig.getMaxThreadCount(), 60L, TimeUnit.SECONDS,
-                new SynchronousQueue<Runnable>(), factory, new ThreadPoolExecutor.AbortPolicy()
);
-
-
-        // if our max thread count is updated, we want to immediately update the pool.  
-        // Per the javadoc if the size is smaller, existing threads will continue to run

-        // until they become idle and time out
-        rxFig.addPropertyChangeListener( new PropertyChangeListener() {
-            @Override
-            public void propertyChange( final PropertyChangeEvent evt ) {
-            if ( evt.getPropertyName().equals( rxFig.getKeyByMethod( "getMaxThreadCount"
) ) ) {
-                LOG.debug( "Getting update to property: rxFig.getMaxThreadCount() old = {},
new = {} ",
-                        evt.getOldValue(), evt.getNewValue() );
-                pool.setMaximumPoolSize( ( Integer ) evt.getNewValue() );
-            }
-            }
-        } );
-
-        return Schedulers.executor( pool );
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/19b1cfb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
index f1c4ac2..ddc1ff7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
@@ -22,10 +22,15 @@ package org.apache.usergrid.persistence.collection.rx;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.usergrid.persistence.collection.hystrix.CassandraCommand;
+
+import com.netflix.hystrix.HystrixCommand;
+
 import rx.Observable;
 import rx.concurrency.Schedulers;
 import rx.operators.OperationMerge;
 import rx.util.functions.Func1;
+import rx.util.functions.FuncN;
 
 
 /**
@@ -37,34 +42,35 @@ import rx.util.functions.Func1;
 public class Concurrent<T, R> implements Func1<T, Observable<R>> {
 
     private final Func1<T, R>[] concurrent;
+    private final FuncN<R> zip;
 
-    private Concurrent( final Func1<T, R>[] concurrent ){
+    private Concurrent( final FuncN<R> zip, final Func1<T, R>[] concurrent ){
         this.concurrent = concurrent;
+        this.zip = zip;
     }
 
     @Override
       public Observable<R> call( final T input ) {
 
+
+        //TODO T.N Is this resetting the timeouts in hystrix?
+
         List<Observable<R>> observables = new ArrayList<Observable<R>>(concurrent.length);
 
-        //put all our observables together for concurrency
+        //Create multiple observables for each function.  They simply emit the input value.
+        //this is the "fork" step of the concurrent processing
         for( Func1<T, R> funct: concurrent){
-            final Observable<R> observable = 
-                    Observable.from(input).subscribeOn(  
-                            Schedulers.threadPoolForIO() ).map( funct );
+            final Observable<R> observable = CassandraCommand.toObservable( input ).map(
funct );
 
             observables.add( observable );
         }
 
+        final Observable<R> zipped = Observable.zip( observables, zip );
 
 
 
-        final Observable.OnSubscribeFunc<R> merge = OperationMerge.merge( observables
);
-        final Observable<R> newObservable = Observable.create( merge );
-
-
-        //wait until the last operation completes to proceed
-        return newObservable.takeLast( 1 );
+        //return an observable that
+        return zipped;
 
       }
 
@@ -72,15 +78,16 @@ public class Concurrent<T, R> implements Func1<T, Observable<R>>
{
     /**
      * Create an instance of concurrent execution.  All functions specified 
      * in the list are invoked in parallel. The results are then "zipped" 
-     * into a single observable which is returned
+     * into a single observable which is returned  with the specified function
      *
      * @param observable The observable we're invoking on
+     * @param zipFunction The zip function to aggregate the results
      * @param concurrent The concurrent operations we're invoking
-     * @return
+     * @return The observable emitted from the zipped function
      */
     public static <T, R> Observable<R> concurrent( 
-            final Observable<T> observable, final Func1<T, R>... concurrent ){
-        return observable.mapMany( new Concurrent<T, R>( concurrent ));
+            final Observable<T> observable, final FuncN zipFunction, final Func1<T,
R>... concurrent ){
+        return observable.mapMany( new Concurrent<T, R>( zipFunction, concurrent ));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/19b1cfb7/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
deleted file mode 100644
index b7c7b84..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import org.safehaus.guicyfig.Default;
-import org.safehaus.guicyfig.FigSingleton;
-import org.safehaus.guicyfig.GuicyFig;
-import org.safehaus.guicyfig.Key;
-
-
-/**
- * Configuration interface for RxJava classes.
- */
-@FigSingleton
-public interface RxFig extends GuicyFig {
-
-    /**
-     * Max number of threads a pool can allocate.  Can be dynamically changed after starting
-     */
-    @Key( "rx.cassandra.io.threads" )
-    @Default( "100" )
-    int getMaxThreadCount();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/19b1cfb7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
index ab41155..0e38276 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
@@ -8,12 +8,15 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.persistence.collection.hystrix.CassandraCommand;
+
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
 
 import rx.Observable;
 import rx.concurrency.Schedulers;
 import rx.util.functions.Func1;
+import rx.util.functions.FuncN;
 
 import static org.junit.Assert.assertEquals;
 
@@ -30,22 +33,23 @@ public class ConcurrentTest {
     public void concurrent(){
 
         final String source = "test";
-        Observable<String> observable = Observable.from(source);
+        Observable<String> observable = CassandraCommand.toObservable( source );
 
         //we could theoretically use the same instance 3 times.  I just want to use
         //3 actual instances, since this is closer to the real use case.
 
-        final CountDownLatch latch = new CountDownLatch( 0 );
+        final CountDownLatch latch = new CountDownLatch( 3 );
         TestConcurrent instance1 = new TestConcurrent( latch );
         TestConcurrent instance2 = new TestConcurrent( latch );
         TestConcurrent instance3 = new TestConcurrent( latch );
 
-        //concurrent inherits thread pool from it's observable, set it's thread pool
+        Zip zip = new Zip();
+
 
-        observable = observable.subscribeOn( Schedulers.threadPoolForIO() );
 
+        //concurrent inherits thread pool from it's observable, set it's thread pool
         Observable<String> result = Concurrent
-                .concurrent(observable, instance1, instance2, instance3 );
+                .concurrent(observable, zip, instance1, instance2, instance3 );
 
         assertEquals( "No invocation yet", 0, set.size() );
 
@@ -74,6 +78,31 @@ public class ConcurrentTest {
     private Multiset<String> set  = HashMultiset.create();
 
 
+    public class Zip implements FuncN<String> {
+
+
+
+        @Override
+        public String call( final Object... args ) {
+            //no op, just a blocker for joining
+
+
+            for(int i = 0; i < args.length; i ++){
+
+                String current = ( String ) args[i];
+
+                for(int j = i+1; j< args.length; j++){
+
+                    assertEquals(current, args[j]);
+                }
+
+
+            }
+
+            return ( String ) args[0];
+        }
+    }
+
     /**
      * Simple function that just adds data to our multiset for later evaluation
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/19b1cfb7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
new file mode 100644
index 0000000..526f7c7
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.collection.rx;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.hystrix.CassandraCommand;
+
+import rx.Observable;
+import rx.util.functions.Func1;
+import rx.util.functions.FuncN;
+
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Simple tests that provides examples of how to perform common operations in RX
+ */
+public class ParallelTest {
+
+    private static final Logger logger = LoggerFactory.getLogger( ParallelTest.class );
+
+
+    @Test( timeout = 5000 )
+    public void concurrentFunctions() {
+        final String input = "input";
+
+        final int size = 9;
+
+        //TODO Tweak our thread pool size beyond 10.
+
+        //latch used to make each thread block to prove correctness
+        final CountDownLatch latch = new CountDownLatch( size );
+
+        List<Observable<String>> observables = new ArrayList<Observable<String>>(
size );
+
+        for ( int i = 0; i < size; i++ ) {
+            observables.add( new CassandraCommand<String>( input ).toObservable().map(
new Func1<String, String>() {
+                @Override
+                public String call( final String s ) {
+
+//                    logger.info( "Function executing on thread {} with latch value {}",
Thread.currentThread().getName(), latch.getCount() );
+
+                    latch.countDown();
+
+                    logger.info( "Function executing on thread {} with latch value {}", Thread.currentThread().getName(),
latch.getCount() );
+
+
+
+                    try {
+                        latch.await();
+                    }
+                    catch ( InterruptedException e ) {
+                        throw new RuntimeException( e );
+                    }
+
+                    return s;
+                }
+            } ) );
+        }
+
+
+        Observable<String> zipped = Observable.zip( observables, new FuncN<String>()
{
+
+            @Override
+            public String call( final Object... args ) {
+                assertEquals( size, args.length );
+                return input;
+            }
+        } );
+
+
+        String last = zipped.toBlockingObservable().last();
+
+
+        assertEquals( input, last );
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/19b1cfb7/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
index b0ee061..ed7c827 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
@@ -30,7 +30,6 @@ import org.apache.usergrid.persistence.collection.migration.MigrationManagerFig;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
-import org.apache.usergrid.persistence.collection.rx.RxFig;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.util.EntityUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -92,14 +91,6 @@ public class MvccEntitySerializationStrategyImplTest {
     )
     public CassandraFig cassandraFig;
 
-    @Inject
-    @Overrides( name = "unit-test",
-        environments = Env.UNIT,
-        options = {
-            @Option( method = "getMaxThreadCount", override = CONNECTION_COUNT )
-        }
-    )
-    public RxFig rxFig;
 
     @Inject
     public SerializationFig serializationFig;
@@ -111,7 +102,6 @@ public class MvccEntitySerializationStrategyImplTest {
     @Before
     public void setup() {
         assertNotNull( cassandraFig );
-        assertNotNull( rxFig );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/19b1cfb7/stack/corepersistence/collection/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/resources/log4j.properties b/stack/corepersistence/collection/src/test/resources/log4j.properties
index eaf889c..f6f50ef 100644
--- a/stack/corepersistence/collection/src/test/resources/log4j.properties
+++ b/stack/corepersistence/collection/src/test/resources/log4j.properties
@@ -1,5 +1,5 @@
 # suppress inspection "UnusedProperty" for whole file
-log4j.rootLogger=INFO,standard
+log4j.rootLogger=INFO,stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout


Mime
View raw message