usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [42/50] git commit: Added test as proof of concept
Date Wed, 12 Feb 2014 13:21:50 GMT
Added test as proof of concept


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/af4d9d0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/af4d9d0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/af4d9d0a

Branch: refs/heads/optimistic-tx-semantics
Commit: af4d9d0a061ba1e7e9dd851f61ff1614366306bf
Parents: 75814fc
Author: Todd Nine <tnine@apigee.com>
Authored: Thu Feb 6 18:34:27 2014 -0700
Committer: Todd Nine <tnine@apigee.com>
Committed: Thu Feb 6 18:34:27 2014 -0700

----------------------------------------------------------------------
 .../collection/hystrix/CassandraCommand.java    |  24 ++-
 .../collection/hystrix/CommandUtils.java        |  28 +++
 .../impl/EntityCollectionManagerImpl.java       |  23 ++-
 .../persistence/collection/rx/Concurrent.java   |  29 ++-
 .../collection/rx/ConcurrentTest.java           | 128 +++++++++-----
 .../persistence/collection/rx/ParallelTest.java | 177 +++++++++++++++----
 6 files changed, 298 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af4d9d0a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
index 3a3563b..f31cef4 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
@@ -24,21 +24,28 @@ import com.netflix.hystrix.HystrixCommand;
 import com.netflix.hystrix.HystrixCommandGroupKey;
 
 import rx.Observable;
+import rx.concurrency.Schedulers;
 
 
 /**
- * Default command that just returns the value handed to it.  Useful for creating observables
- * that are subscribed on the correct underlying Hystrix thread pool
- *
+ * Default command that just returns the value handed to it.  Useful for creating observables
that are subscribed on the
+ * correct underlying Hystrix thread pool
  */
 public class CassandraCommand<R> extends HystrixCommand<R> {
 
-    public static final HystrixCommandGroupKey GROUP_KEY =HystrixCommandGroupKey.Factory.asKey(
"CassandraCommand" );
+    public static final String NAME = "CassandraCommand";
+
+    public static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey(
NAME );
+
+    public static final String THREAD_POOL_SIZE = CommandUtils.getThreadPoolCoreSize( NAME
);
+
+    public static final String THREAD_POOL_QUEUE = CommandUtils.getThreadPoolMaxQueueSize(
NAME );
 
 
     private final R value;
 
-    public CassandraCommand(final R value) {
+
+    public CassandraCommand( final R value ) {
         super( GROUP_KEY );
         this.value = value;
     }
@@ -54,10 +61,11 @@ public class CassandraCommand<R> extends HystrixCommand<R>
{
      * Get the write command
      *
      * @param readValue The value to observe on
+     *
      * @return The value wrapped in a Hystrix observable
      */
-    public static <R> Observable<R> toObservable( R readValue ){
-         return new CassandraCommand<R>(readValue).toObservable();
+    public static <R> Observable<R> toObservable( R readValue ) {
+        //create a new command and ensure it's observed on the correct thread scheduler
+        return new CassandraCommand<R>( readValue ).toObservable( Schedulers.threadPoolForIO()
);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af4d9d0a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java
new file mode 100644
index 0000000..b81f79b
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CommandUtils.java
@@ -0,0 +1,28 @@
+package org.apache.usergrid.persistence.collection.hystrix;
+
+
+/**
+ *
+ *
+ */
+public class CommandUtils {
+
+    /**
+     * Get the name of the archiaus property for the core thread pool size
+     * @param threadPoolName
+     * @return
+     */
+    public static String getThreadPoolCoreSize(String threadPoolName){
+        return "hystrix.threadpool."+ threadPoolName + ".coreSize";
+    }
+
+    /**
+       * Get the name of the archiaus property for the max thread pool size
+       * @param threadPoolName
+       * @return
+       */
+      public static String getThreadPoolMaxQueueSize(String threadPoolName){
+          return "hystrix.threadpool."+ threadPoolName + ".maxQueueSize";
+      }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af4d9d0a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 5301a06..70b9b65 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -141,11 +141,13 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
         //these 3 lines could be done in a single line, but they are on multiple lines for
clarity
 
         //create our observable and start the write
-        Observable<CollectionIoEvent<MvccEntity>> observable = CassandraCommand.toObservable(
new CollectionIoEvent<Entity>( collectionScope, entity ) ).map( writeStart );
+        CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>(
collectionScope, entity );
+
+        Observable<CollectionIoEvent<MvccEntity>> observable = CassandraCommand.toObservable(
writeData ).map( writeStart );
 
 
         //execute all validation stages concurrently.  Needs refactored when this is done.
 https://github.com/Netflix/RxJava/issues/627
-        observable = Concurrent.concurrent(observable, new WaitZip( observable ), writeVerifyUnique,
writeOptimisticVerify);
+        observable = Concurrent.concurrent(observable, new WaitZip( ), writeVerifyUnique,
writeOptimisticVerify);
 
         //return the commit result.
         return observable.map( writeCommit );
@@ -175,18 +177,27 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
         return CassandraCommand.toObservable( new CollectionIoEvent<Id>( collectionScope,
entityId ) ).map( load );
     }
 
+
+    /**
+     * Class that validates all results are equal then proceeds
+     * @param <R>
+     */
     private static class WaitZip<R> implements FuncN<R>{
 
-        private final R value;
 
 
-        private WaitZip( final R value ) {this.value = value;}
+        private WaitZip() {
+        }
 
 
         @Override
         public R call( final Object... args ) {
-            //no op, just here to require a join before proceeding
-            return value;
+
+            for(int i = 0; i < args.length-1; i ++){
+                assert args[i] == args[i+1];
+            }
+
+            return ( R ) args[0];
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af4d9d0a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
index ddc1ff7..2a997d7 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
@@ -39,23 +39,20 @@ import rx.util.functions.FuncN;
  * This is useful when you want to perform operations on a single initial observable, 
  * then combine the result into a single observable to continue the sequence
  */
-public class Concurrent<T, R> implements Func1<T, Observable<R>> {
+public class Concurrent<T, R, Z> implements Func1<T, Observable<Z>> {
 
     private final Func1<T, R>[] concurrent;
-    private final FuncN<R> zip;
+    private final FuncN<Z> zip;
 
-    private Concurrent( final FuncN<R> zip, final Func1<T, R>[] concurrent ){
+    private Concurrent( final FuncN<Z> zip, final Func1<T, R>[] concurrent ){
         this.concurrent = concurrent;
         this.zip = zip;
     }
 
     @Override
-      public Observable<R> call( final T input ) {
+      public Observable<Z> call( final T input ) {
 
-
-        //TODO T.N Is this resetting the timeouts in hystrix?
-
-        List<Observable<R>> observables = new ArrayList<Observable<R>>(concurrent.length);
+         List<Observable<R>> observables = new ArrayList<Observable<R>>(concurrent.length);
 
         //Create multiple observables for each function.  They simply emit the input value.
         //this is the "fork" step of the concurrent processing
@@ -65,11 +62,11 @@ public class Concurrent<T, R> implements Func1<T, Observable<R>>
{
             observables.add( observable );
         }
 
-        final Observable<R> zipped = Observable.zip( observables, zip );
+        final Observable<Z> zipped = Observable.zip( observables, zip );
 
 
 
-        //return an observable that
+        //return an observable that is hte result of the zip
         return zipped;
 
       }
@@ -80,14 +77,14 @@ public class Concurrent<T, R> implements Func1<T, Observable<R>>
{
      * in the list are invoked in parallel. The results are then "zipped" 
      * into a single observable which is returned  with the specified function
      *
-     * @param observable The observable we're invoking on
-     * @param zipFunction The zip function to aggregate the results
+     * @param observable The observable we're invoking many concurrent operations on
+     * @param zipFunction The zip function to aggregate the results. And return the observable
      * @param concurrent The concurrent operations we're invoking
-     * @return The observable emitted from the zipped function
+     * @return The observable emitted from the zipped function of type Z
      */
-    public static <T, R> Observable<R> concurrent( 
-            final Observable<T> observable, final FuncN zipFunction, final Func1<T,
R>... concurrent ){
-        return observable.mapMany( new Concurrent<T, R>( zipFunction, concurrent ));
+    public static <T, R, Z> Observable<Z> concurrent(
+            final Observable<T> observable, final FuncN<Z> zipFunction, final
Func1<T, R>... concurrent ){
+        return observable.mapMany( new Concurrent<T, R, Z>( zipFunction, concurrent
));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af4d9d0a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
index 0e38276..29f11d9 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
@@ -4,6 +4,8 @@ package org.apache.usergrid.persistence.collection.rx;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -12,13 +14,15 @@ import org.apache.usergrid.persistence.collection.hystrix.CassandraCommand;
 
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
+import com.netflix.config.ConfigurationManager;
 
 import rx.Observable;
-import rx.concurrency.Schedulers;
 import rx.util.functions.Func1;
 import rx.util.functions.FuncN;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 
 /**
@@ -27,115 +31,147 @@ import static org.junit.Assert.assertEquals;
  */
 public class ConcurrentTest {
 
-    private static final Logger logger = LoggerFactory.getLogger(ConcurrentTest.class);
+    private static final Logger logger = LoggerFactory.getLogger( ConcurrentTest.class );
 
-    @Test
-    public void concurrent(){
+
+    private int preRunThreadCount = -1;
+    private int queueDepth = -1;
+
+
+    @Before
+    public void getSettings() {
+
+        //        preRunThreadCount = ConfigurationManager.getConfigInstance().getInt( CassandraCommand
+        // .THREAD_POOL_SIZE );
+        //
+        //
+        //        //reject requests we have to queue
+        //        queueDepth = ConfigurationManager.getConfigInstance().getInt( CassandraCommand.THREAD_POOL_QUEUE
);
+    }
+
+
+    @After
+    public void restoreSettings() {
+        ConfigurationManager.getConfigInstance().setProperty( CassandraCommand.THREAD_POOL_SIZE,
preRunThreadCount );
+
+
+        //reject requests we have to queue
+        ConfigurationManager.getConfigInstance().setProperty( CassandraCommand.THREAD_POOL_QUEUE,
queueDepth );
+    }
+
+
+    @Test( timeout = 5000 )
+    public void concurrent() {
 
         final String source = "test";
         Observable<String> observable = CassandraCommand.toObservable( source );
 
-        //we could theoretically use the same instance 3 times.  I just want to use
-        //3 actual instances, since this is closer to the real use case.
+        //we could theoretically use the same instance over and over
 
-        final CountDownLatch latch = new CountDownLatch( 3 );
-        TestConcurrent instance1 = new TestConcurrent( latch );
-        TestConcurrent instance2 = new TestConcurrent( latch );
-        TestConcurrent instance3 = new TestConcurrent( latch );
+        final int size = 100;
+
+        //set the size of concurrent threads to our requests/2.  Should be more than sufficient
for this test
+        ConfigurationManager.getConfigInstance().setProperty( CassandraCommand.THREAD_POOL_SIZE,
size / 2 );
 
-        Zip zip = new Zip();
 
+        //reject requests we have to queue
+        ConfigurationManager.getConfigInstance().setProperty( CassandraCommand.THREAD_POOL_QUEUE,
-1 );
+
+
+        final CountDownLatch latch = new CountDownLatch( size );
+
+        TestConcurrent[] concurrentFunctions = new TestConcurrent[size];
+
+        for ( int i = 0; i < size; i++ ) {
+            concurrentFunctions[i] = new TestConcurrent( latch, i );
+        }
+
+        Zip zip = new Zip();
 
 
         //concurrent inherits thread pool from it's observable, set it's thread pool
-        Observable<String> result = Concurrent
-                .concurrent(observable, zip, instance1, instance2, instance3 );
+        Observable<Integer> result = Concurrent.concurrent( observable, zip, concurrentFunctions
);
 
         assertEquals( "No invocation yet", 0, set.size() );
 
 
         //now invoke it
-        String response = result.toBlockingObservable().single();
+        Integer response = result.toBlockingObservable().single();
 
 
-        assertEquals( "Same value emitted", source, response );
+        assertEquals( "Same value emitted", size - 1, response.intValue() );
 
         //verify each function executed in it's own thread
 
-        assertEquals( "3 thread invoked", 3, set.size() );
+        assertEquals( size + " threads invoked", size, set.size() );
 
         //print them out just for giggles
-        for(Multiset.Entry<String> entry: set.entrySet()){
+        for ( Multiset.Entry<String> entry : set.entrySet() ) {
             System.out.println( entry.getElement() );
             assertEquals( "1 Thread per invocation", 1, entry.getCount() );
         }
-
-
-
     }
 
 
-    private Multiset<String> set  = HashMultiset.create();
-
+    private Multiset<String> set = HashMultiset.create();
 
-    public class Zip implements FuncN<String> {
 
+    public class Zip implements FuncN<Integer> {
 
 
         @Override
-        public String call( final Object... args ) {
-            //no op, just a blocker for joining
-
-
-            for(int i = 0; i < args.length; i ++){
-
-                String current = ( String ) args[i];
-
-                for(int j = i+1; j< args.length; j++){
-
-                    assertEquals(current, args[j]);
-                }
+        public Integer call( final Object... args ) {
 
+            //validate our arguments come in order
+            for ( int i = 0; i < args.length; i++ ) {
 
+                assertEquals( i, args[i] );
             }
 
-            return ( String ) args[0];
+            return ( Integer ) args[args.length - 1];
         }
     }
 
+
     /**
      * Simple function that just adds data to our multiset for later evaluation
      */
-    public class TestConcurrent implements Func1<String, String>{
+    public class TestConcurrent implements Func1<String, Integer> {
 
-        private final  CountDownLatch latch;
+        private final CountDownLatch latch;
+        private final int index;
 
 
-        public TestConcurrent( final  CountDownLatch latch) {
-            this.latch = latch;}
+        public TestConcurrent( final CountDownLatch latch, int index ) {
+            this.latch = latch;
+            this.index = index;
+        }
 
 
         @Override
-        public String call( final String s ) {
+        public Integer call( final String s ) {
             final String threadName = Thread.currentThread().getName();
 
-            logger.info("Function executing on thread {}", threadName);
+            logger.info( "Function executing on thread {}", threadName );
 
             set.add( threadName );
 
             //we want to make sure each thread blocks until they all have passed the latch
             //this way we can ensure they're all running concurrently
             try {
-               latch.countDown();
-               latch.await( 30, TimeUnit.SECONDS );
+                latch.countDown();
+
 
+                boolean waited = latch.await( 5, TimeUnit.SECONDS );
+
+                //validate everything ran concurrently
+                assertTrue( "Latch released", waited );
             }
             catch ( InterruptedException e ) {
                 logger.error( "Runner interrupted", e );
-
             }
 
-            return s;
+            return index;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/af4d9d0a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
index b7f4ef7..b097b60 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -21,88 +21,195 @@ package org.apache.usergrid.persistence.collection.rx;
 
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.persistence.collection.hystrix.CassandraCommand;
+import org.apache.usergrid.persistence.collection.hystrix.CommandUtils;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+import com.netflix.config.ConfigurationManager;
+import com.netflix.hystrix.HystrixCommand;
+import com.netflix.hystrix.HystrixCommandGroupKey;
 
 import rx.Observable;
+import rx.Scheduler;
+import rx.concurrency.Schedulers;
 import rx.util.functions.Func1;
 import rx.util.functions.FuncN;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 
 /**
- * Simple tests that provides examples of how to perform common operations in RX
+ * Tests that provides examples of how to perform more complex RX operations
  */
 public class ParallelTest {
 
     private static final Logger logger = LoggerFactory.getLogger( ParallelTest.class );
 
 
-//    @Test( timeout = 5000 )
-    @Test
+    private static final HystrixCommandGroupKey GROUP_KEY = HystrixCommandGroupKey.Factory.asKey(
"TEST_KEY" );
+
+
+    public static final String THREAD_POOL_SIZE = CommandUtils.getThreadPoolCoreSize( GROUP_KEY.name()
);
+
+    public static final String THREAD_POOL_QUEUE = CommandUtils.getThreadPoolMaxQueueSize(
GROUP_KEY.name() );
+
+
+    /**
+     * An example of how an observable that requires a "fan out" then join should execute.
+     */
+    @Test( timeout = 5000 )
     public void concurrentFunctions() {
         final String input = "input";
 
-        final int size = 9;
+        final int size = 100;
+        //since we start at index 0
+        final int expected = size - 1;
 
-        //TODO Tweak our thread pool size beyond 10.
+
+        /**
+         * QUESTION Using this thread blocks indefinitely.  The execution of the Hystrix
command happens on the
+         * computation
+         * Thread if this is used
+         */
+        //        final Scheduler scheduler = Schedulers.threadPoolForComputation();
+
+        //use the I/O scheduler to allow enough thread, otherwise our pool will be the same
size as the # of cores
+        final Scheduler scheduler = Schedulers.threadPoolForIO();
+
+        //set our size equal
+        ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE, size );
+        //        ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_SIZE,
10 );
+
+        //reject requests we have to queue
+        ConfigurationManager.getConfigInstance().setProperty( THREAD_POOL_QUEUE, -1 );
 
         //latch used to make each thread block to prove correctness
         final CountDownLatch latch = new CountDownLatch( size );
 
-        final List<Observable<String>> observables = new ArrayList<Observable<String>>(
size );
 
+        final Multiset<String> set = HashMultiset.create();
 
-        //this is not using a hystrix thread pool as I expected but rather the Rx computation
thread pool.  Am I doing this
-        //incorrectly?
-        for ( int i = 0; i < size; i++ ) {
-            observables.add( new CassandraCommand<String>( input ).toObservable().map(
new Func1<String, String>() {
-                @Override
-                public String call( final String s ) {
 
-                    final String threadName = Thread.currentThread().getName();
+        //create our observable and execute it in the I/O pool since we'll be doing I/O operations
 
-                    latch.countDown();
+        /**
+         *  QUESTION: Should this use the computation scheduler since all operations (except
the hystrix command) are
+         *  non blocking?
+         */
 
-                    logger.info( "Function executing on thread {} with latch value {}",
-                            threadName, latch.getCount() );
+        final Observable<String> observable = Observable.from( input ).observeOn( scheduler
);
 
 
-                    try {
-                        latch.await();
-                    }
-                    catch ( InterruptedException e ) {
-                        throw new RuntimeException( e );
-                    }
+        Observable<Integer> thing = observable.mapMany( new Func1<String, Observable<Integer>>()
{
+
+            @Override
+            public Observable<Integer> call( final String s ) {
+                List<Observable<Integer>> functions = new ArrayList<Observable<Integer>>();
+
+                logger.info( "Creating new set of observables in thread {}", Thread.currentThread().getName()
);
+
+                for ( int i = 0; i < size; i++ ) {
+
+
+                    final int index = i;
+
+                    //create a new observable and execute the function on it.  These should
happen in parallel when
+                    // a subscription occurs
+
+                    /**
+                     * QUESTION: Should this again be the process thread, not the I/O
+                     */
+                    Observable<String> newObservable = Observable.from( input ).subscribeOn(
scheduler );
+
+                    Observable<Integer> transformed = newObservable.map( new Func1<String,
Integer>() {
+
+                        @Override
+                        public Integer call( final String s ) {
 
-                    return s;
+                            final String threadName = Thread.currentThread().getName();
+
+                            logger.info( "Invoking parallel task in thread {}", threadName
);
+
+                            //Simulate a Hystrix command making a call to an external resource.
 Invokes
+                            //the Hystrix command immediately as the function is invoked
+
+                            return new HystrixCommand<Integer>( GROUP_KEY ) {
+                                @Override
+                                protected Integer run() throws Exception {
+
+                                    final String threadName = Thread.currentThread().getName();
+
+                                    logger.info( "Invoking hystrix task in thread {}", threadName
);
+
+
+                                    set.add( threadName );
+
+                                    latch.countDown();
+
+                                    try {
+                                        latch.await();
+                                    }
+                                    catch ( InterruptedException e ) {
+                                        throw new RuntimeException( "Interrupted", e );
+                                    }
+
+                                    assertTrue( isExecutedInThread() );
+
+                                    return index;
+                                }
+                            }.execute();
+                        }
+                    } );
+
+                    functions.add( transformed );
                 }
-            } ) );
-        }
 
+                /**
+                 * Execute the functions above and zip the results together
+                 */
+                Observable<Integer> zipped = Observable.zip( functions, new FuncN<Integer>()
{
 
-        Observable<String> zipped = Observable.zip( observables, new FuncN<String>()
{
+                    @Override
+                    public Integer call( final Object... args ) {
 
-            @Override
-            public String call( final Object... args ) {
-                assertEquals( size, args.length );
-                return input;
+                        logger.info( "Invoking zip in thread {}", Thread.currentThread().getName()
);
+
+                        assertEquals( size, args.length );
+
+                        for ( int i = 0; i < args.length; i++ ) {
+                            assertEquals( "Indexes are returned in order", i, args[i] );
+                        }
+
+                        //just return our string
+                        return ( Integer ) args[args.length - 1];
+                    }
+                } );
+
+                return zipped;
             }
         } );
 
 
-        String last = zipped.toBlockingObservable().last();
+        final Integer last = thing.toBlockingObservable().last();
+
 
+        assertEquals( expected, last.intValue() );
 
-        assertEquals( input, last );
+        assertEquals( size, set.size() );
+
+        /**
+         * Ensure only 1 entry per thread
+         */
+        for ( String entry : set.elementSet() ) {
+            assertEquals( 1, set.count( entry ) );
+        }
     }
 }


Mime
View raw message