usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [44/50] git commit: Fixed schedulers and concurrent call wrappers
Date Wed, 12 Feb 2014 13:21:52 GMT
Fixed schedulers and concurrent call wrappers


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/27192cfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/27192cfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/27192cfb

Branch: refs/heads/optimistic-tx-semantics
Commit: 27192cfb4d236ce52f50569c2f0ae0ac0933dc5b
Parents: 6d9232e
Author: Todd Nine <tnine@apigee.com>
Authored: Fri Feb 7 15:49:14 2014 -0700
Committer: Todd Nine <tnine@apigee.com>
Committed: Fri Feb 7 15:49:14 2014 -0700

----------------------------------------------------------------------
 .../collection/hystrix/CassandraCommand.java    |   7 +-
 .../impl/EntityCollectionManagerImpl.java       |  13 +-
 .../collection/rx/CassandraThreadScheduler.java |  10 +-
 .../persistence/collection/rx/Concurrent.java   |  59 ++--
 .../persistence/collection/rx/RxFig.java        |   4 +-
 .../rx/CassandraThreadSchedulerTest.java        | 327 +++++++++++++++++++
 .../collection/rx/ConcurrentTest.java           |  42 +--
 .../persistence/collection/rx/ParallelTest.java |  12 +-
 stack/corepersistence/graph/pom.xml             |   6 -
 .../persistence/graph/impl/EdgeManagerImpl.java |   2 +-
 10 files changed, 388 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/27192cfb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
index f31cef4..c87bf33 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/hystrix/CassandraCommand.java
@@ -30,6 +30,9 @@ import rx.concurrency.Schedulers;
 /**
  * Default command that just returns the value handed to it.  Useful for creating observables
that are subscribed on the
  * correct underlying Hystrix thread pool
+ *
+ * TODO change this when this PR makes it into head to wrap our observables
+ * https://github.com/Netflix/Hystrix/pull/209
  */
 public class CassandraCommand<R> extends HystrixCommand<R> {
 
@@ -45,7 +48,7 @@ public class CassandraCommand<R> extends HystrixCommand<R> {
     private final R value;
 
 
-    public CassandraCommand( final R value ) {
+    private CassandraCommand( final R value ) {
         super( GROUP_KEY );
         this.value = value;
     }
@@ -64,7 +67,7 @@ public class CassandraCommand<R> extends HystrixCommand<R> {
      *
      * @return The value wrapped in a Hystrix observable
      */
-    public static <R> Observable<R> toObservable( R readValue ) {
+    private static <R> Observable<R> toObservable( R readValue ) {
         //create a new command and ensure it's observed on the correct thread scheduler
         return new CassandraCommand<R>( readValue ).toObservable( Schedulers.threadPoolForIO()
);
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/27192cfb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 70b9b65..5698080 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -50,6 +50,7 @@ import com.google.inject.Inject;
 import com.google.inject.assistedinject.Assisted;
 
 import rx.Observable;
+import rx.Scheduler;
 import rx.util.functions.FuncN;
 
 
@@ -66,6 +67,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
 
     private final CollectionScope collectionScope;
     private final UUIDService uuidService;
+    private final Scheduler scheduler;
 
 
     //start stages
@@ -84,7 +86,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
 
 
     @Inject
-    public EntityCollectionManagerImpl( final UUIDService uuidService, final WriteStart writeStart,
+    public EntityCollectionManagerImpl( final UUIDService uuidService, final WriteStart writeStart,
final Scheduler scheduler,
                                         final WriteUniqueVerify writeVerifyUnique,
                                         final WriteOptimisticVerify writeOptimisticVerify,
                                         final WriteCommit writeCommit, final Load load, final
DeleteStart deleteStart,
@@ -106,6 +108,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
 
 
         this.uuidService = uuidService;
+        this.scheduler = scheduler;
         this.collectionScope = collectionScope;
     }
 
@@ -143,11 +146,11 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
         //create our observable and start the write
         CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>(
collectionScope, entity );
 
-        Observable<CollectionIoEvent<MvccEntity>> observable = CassandraCommand.toObservable(
writeData ).map( writeStart );
+        Observable<CollectionIoEvent<MvccEntity>> observable =  Observable.from(
writeData ).subscribeOn( scheduler ).map( writeStart );
 
 
         //execute all validation stages concurrently.  Needs refactored when this is done.
 https://github.com/Netflix/RxJava/issues/627
-        observable = Concurrent.concurrent(observable, new WaitZip( ), writeVerifyUnique,
writeOptimisticVerify);
+        observable = Concurrent.concurrent(observable, scheduler, new WaitZip( ), writeVerifyUnique,
writeOptimisticVerify);
 
         //return the commit result.
         return observable.map( writeCommit );
@@ -163,7 +166,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this
stage" );
 
 
-        return CassandraCommand.toObservable( new CollectionIoEvent<Id>( collectionScope,
entityId ) ).map( deleteStart ).map( deleteCommit );
+        return Observable.from(new CollectionIoEvent<Id>( collectionScope, entityId
) ).subscribeOn( scheduler ).map( deleteStart ).map( deleteCommit );
     }
 
 
@@ -174,7 +177,7 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
         Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in the load
stage" );
         Preconditions.checkNotNull( entityId.getType(), "Entity id type required in the load
stage" );
 
-        return CassandraCommand.toObservable( new CollectionIoEvent<Id>( collectionScope,
entityId ) ).map( load );
+        return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId
) ).subscribeOn( scheduler ).map( load );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/27192cfb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
index 512f0ec..90f3b2d 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
@@ -93,11 +93,11 @@ public class CassandraThreadScheduler implements Provider<Scheduler>
{
         rxFig.addPropertyChangeListener( new PropertyChangeListener() {
             @Override
             public void propertyChange( final PropertyChangeEvent evt ) {
-            if ( evt.getPropertyName().equals( rxFig.getKeyByMethod( "getMaxThreadCount"
) ) ) {
-                LOG.debug( "Getting update to property: rxFig.getMaxThreadCount() old = {},
new = {} ",
-                        evt.getOldValue(), evt.getNewValue() );
-                pool.setMaximumPoolSize( ( Integer ) evt.getNewValue() );
-            }
+                if ( evt.getPropertyName().equals( rxFig.getKeyByMethod( "getMaxThreadCount"
) ) ) {
+                    LOG.debug( "Getting update to property: rxFig.getMaxThreadCount() old
= {}, new = {} ",
+                            evt.getOldValue(), evt.getNewValue() );
+                    pool.setMaximumPoolSize( ( Integer ) evt.getNewValue() );
+                }
             }
         } );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/27192cfb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
index 2a997d7..868c73b 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
@@ -27,6 +27,7 @@ import org.apache.usergrid.persistence.collection.hystrix.CassandraCommand;
 import com.netflix.hystrix.HystrixCommand;
 
 import rx.Observable;
+import rx.Scheduler;
 import rx.concurrency.Schedulers;
 import rx.operators.OperationMerge;
 import rx.util.functions.Func1;
@@ -34,42 +35,12 @@ import rx.util.functions.FuncN;
 
 
 /**
- * A utility class that encapsulates many Funct1 operations that receive and emit the same
type.
+ * A utility class that encapsulates possible combinations of functions.
  * These functions are executed in parallel, then "zipped" into a single response.  
  * This is useful when you want to perform operations on a single initial observable, 
- * then combine the result into a single observable to continue the sequence
+ * then combine the result of multiple parallel operations on that object into a single observable
to continue the sequence
  */
-public class Concurrent<T, R, Z> implements Func1<T, Observable<Z>> {
-
-    private final Func1<T, R>[] concurrent;
-    private final FuncN<Z> zip;
-
-    private Concurrent( final FuncN<Z> zip, final Func1<T, R>[] concurrent ){
-        this.concurrent = concurrent;
-        this.zip = zip;
-    }
-
-    @Override
-      public Observable<Z> call( final T input ) {
-
-         List<Observable<R>> observables = new ArrayList<Observable<R>>(concurrent.length);
-
-        //Create multiple observables for each function.  They simply emit the input value.
-        //this is the "fork" step of the concurrent processing
-        for( Func1<T, R> funct: concurrent){
-            final Observable<R> observable = CassandraCommand.toObservable( input ).map(
funct );
-
-            observables.add( observable );
-        }
-
-        final Observable<Z> zipped = Observable.zip( observables, zip );
-
-
-
-        //return an observable that is hte result of the zip
-        return zipped;
-
-      }
+public class Concurrent {
 
 
     /**
@@ -78,13 +49,31 @@ public class Concurrent<T, R, Z> implements Func1<T, Observable<Z>>
{
      * into a single observable which is returned  with the specified function
      *
      * @param observable The observable we're invoking many concurrent operations on
+     * @param scheduler The scheduler to use when invoking the parallel operations
      * @param zipFunction The zip function to aggregate the results. And return the observable
      * @param concurrent The concurrent operations we're invoking
      * @return The observable emitted from the zipped function of type Z
      */
     public static <T, R, Z> Observable<Z> concurrent(
-            final Observable<T> observable, final FuncN<Z> zipFunction, final
Func1<T, R>... concurrent ){
-        return observable.mapMany( new Concurrent<T, R, Z>( zipFunction, concurrent
));
+            final Observable<T> observable, final Scheduler scheduler, final FuncN<Z>
zipFunction, final Func1<T, R>... concurrent ){
+
+       return  observable.mapMany( new Func1<T, Observable<Z>>() {
+            @Override
+            public Observable<Z> call( final T input ) {
+
+               List<Observable<R>> observables = new ArrayList<Observable<R>>(concurrent.length);
+
+               //Create multiple observables for each function.  They simply emit the input
value.
+               //this is the "fork" step of the concurrent processing
+               for( Func1<T, R> funct: concurrent){
+                   final Observable<R> observable = Observable.from( input ).subscribeOn(
scheduler ).map( funct );
+
+                   observables.add( observable );
+               }
+
+                return Observable.zip( observables, zipFunction );
+            }
+        } );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/27192cfb/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
index b7c7b84..54a5211 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
@@ -31,10 +31,12 @@ import org.safehaus.guicyfig.Key;
 @FigSingleton
 public interface RxFig extends GuicyFig {
 
+    public static final String PROP_THREAD = "rx.cassandra.io.threads";
+
     /**
      * Max number of threads a pool can allocate.  Can be dynamically changed after starting
      */
-    @Key( "rx.cassandra.io.threads" )
+    @Key( PROP_THREAD )
     @Default( "100" )
     int getMaxThreadCount();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/27192cfb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
new file mode 100644
index 0000000..45a175e
--- /dev/null
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadSchedulerTest.java
@@ -0,0 +1,327 @@
+package org.apache.usergrid.persistence.collection.rx;
+
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.jukito.JukitoRunner;
+import org.jukito.UseModules;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
+
+import com.google.inject.Inject;
+
+import rx.Scheduler;
+import rx.util.functions.Action0;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+/**
+ * Test for our scheduler
+ */
+@RunWith( JukitoRunner.class )
+@UseModules( TestCollectionModule.class )
+public class CassandraThreadSchedulerTest {
+
+
+    private static final Logger LOG = LoggerFactory.getLogger( CassandraThreadSchedulerTest.class
);
+
+    /**
+     * Number of milliseconds to wait when trying to acquire the semaphore
+     */
+    private static final long TEST_TIMEOUT = 30000;
+
+    @Inject
+    private RxFig rxFig;
+
+
+    @Test
+    public void testMaxLimit() throws InterruptedException {
+
+        final int maxCount = 10;
+
+        rxFig.override( RxFig.PROP_THREAD, ""+ maxCount );
+
+        final CassandraThreadScheduler cassSchedulerSetup = new CassandraThreadScheduler(
rxFig );
+
+        final Scheduler rxScheduler = cassSchedulerSetup.get();
+
+        //we want a fair semaphore so we can release in acquire order
+        final Semaphore semaphore = new Semaphore( 0, true );
+
+        //we should not have maxCount actions running in the scheduler
+        CountDownLatch result = schedule( rxScheduler, rxFig.getMaxThreadCount(), semaphore,
TEST_TIMEOUT );
+
+        //schedule and we should fail
+
+        try {
+
+            rxScheduler.schedule( new Action0() {
+                @Override
+                public void call() {
+                    //no op
+                }
+            } );
+
+            fail( "This should have thrown an exception" );
+        }
+        catch ( RejectedExecutionException ree ) {
+            //swallow, we just want to ensure we catch this to continue the test
+        }
+
+        //now release the semaphore so all 10 can run
+        semaphore.release( rxFig.getMaxThreadCount() );
+
+        //wait for completion
+        boolean completed = result.await( 20, TimeUnit.SECONDS );
+
+        assertTrue( "Completed executing actions", completed );
+
+        //verify we can schedule and execute a new operation
+        result = schedule( rxScheduler, 1, semaphore, TEST_TIMEOUT );
+
+        semaphore.release( 1 );
+
+        completed = result.await( 20, TimeUnit.SECONDS );
+
+        assertTrue( "Completed executing actions", completed );
+    }
+
+
+    /**
+     * Test running from a max limit to a lower limit and fails to schedule new threads
+     */
+    @Test
+    public void testMaxLimitShrink() throws InterruptedException {
+
+        final int maxCount = 10;
+
+        final int half = maxCount / 2;
+
+        //kind of a hack, but necessary with the way properties are singletons.  Otherwise
you get side effects from
+        // other tests
+        rxFig.override( RxFig.PROP_THREAD, "" + maxCount );
+
+        final CassandraThreadScheduler cassSchedulerSetup = new CassandraThreadScheduler(
rxFig );
+
+        final Scheduler rxScheduler = cassSchedulerSetup.get();
+
+        //we want a fair semaphore so we can release in acquire order
+        final Semaphore semaphore = new Semaphore( 0, true );
+
+        //we should not have maxCount actions running in the scheduler
+        CountDownLatch firstHalf = schedule( rxScheduler, half, semaphore, TEST_TIMEOUT );
+
+        //schedule the second half
+        CountDownLatch secondHalf = schedule( rxScheduler, half, semaphore, TEST_TIMEOUT
);
+
+        //schedule and we should fail
+
+        try {
+
+            rxScheduler.schedule( new Action0() {
+                @Override
+                public void call() {
+                    //no op
+                }
+            } );
+
+            fail( "This should have thrown an exception" );
+        }
+        catch ( RejectedExecutionException ree ) {
+            //swallow, we just want to ensure we catch this to continue the test
+        }
+
+
+        //update the property to shrink the size
+        rxFig.override( RxFig.PROP_THREAD, "" + half );
+
+        //now release the first half of executors
+        semaphore.release( half );
+
+
+        //wait for completion
+        boolean completed = firstHalf.await( 20, TimeUnit.SECONDS );
+
+        assertTrue( "Completed executing actions", completed );
+
+        //verify we can't schedule b/c we're still at capacity
+
+
+        try {
+
+            rxScheduler.schedule( new Action0() {
+                @Override
+                public void call() {
+                    //no op
+                }
+            } );
+
+            fail( "This should have thrown an exception.  We still don't have capacity for
new threads" );
+        }
+        catch ( RejectedExecutionException ree ) {
+            //swallow, we just want to ensure we catch this to continue the test
+        }
+
+
+        //now release the rest of the semaphores
+        semaphore.release( maxCount - half );
+
+        completed = secondHalf.await( 20, TimeUnit.SECONDS );
+
+        assertTrue( "Completed executing actions", completed );
+
+        //verify we can schedule and execute a new operation
+        CountDownLatch newJob = schedule( rxScheduler, 1, semaphore, TEST_TIMEOUT  );
+
+        semaphore.release( 1 );
+
+        completed = newJob.await( 20, TimeUnit.SECONDS );
+        assertTrue( "Completed executing actions", completed );
+    }
+
+
+    /**
+     * Test that when we're fully blocked, if we expand we have capacity
+     */
+    @Test
+    public void testExpandLimit() throws InterruptedException {
+
+        final int startCount = 10;
+
+        //kind of a hack, but necessary with the way properties are singletons.  Otherwise
you get side effects from
+        // other tests
+        rxFig.override( RxFig.PROP_THREAD, "" + startCount );
+
+        final CassandraThreadScheduler cassSchedulerSetup = new CassandraThreadScheduler(
rxFig );
+
+        final Scheduler rxScheduler = cassSchedulerSetup.get();
+
+        //we want a fair semaphore so we can release in acquire order
+        final Semaphore semaphore = new Semaphore( 0, true );
+
+        //we should not have maxCount actions running in the scheduler
+        CountDownLatch firstBatch = schedule( rxScheduler, rxFig.getMaxThreadCount(), semaphore,
TEST_TIMEOUT  );
+
+        //schedule and we should fail
+
+        try {
+
+            rxScheduler.schedule( new Action0() {
+                @Override
+                public void call() {
+                    //no op
+                }
+            } );
+
+            fail( "This should have thrown an exception" );
+        }
+        catch ( RejectedExecutionException ree ) {
+            //swallow, we just want to ensure we catch this to continue the test
+        }
+
+
+        //now allocate more capacity
+        final int doubleMaxCount = startCount * 2;
+
+        //update the property to shrink the size
+        rxFig.override( RxFig.PROP_THREAD, "" + doubleMaxCount );
+
+
+        //now schedule 10 more
+
+        CountDownLatch secondBatch = schedule( rxScheduler, rxFig.getMaxThreadCount() - startCount,
semaphore, TEST_TIMEOUT  );
+
+        //this should fail.  We're at capacity
+
+        try {
+
+            rxScheduler.schedule( new Action0() {
+                @Override
+                public void call() {
+                    //no op
+                }
+            } );
+
+            fail( "This should have thrown an exception" );
+        }
+        catch ( RejectedExecutionException ree ) {
+            //swallow, we just want to ensure we catch this to continue the test
+        }
+
+
+        //now release the semaphores so all
+        semaphore.release( rxFig.getMaxThreadCount() );
+
+        //wait for completion
+        boolean completed = firstBatch.await( 20, TimeUnit.SECONDS );
+
+        assertTrue( "Completed executing actions", completed );
+
+        completed = secondBatch.await( 20, TimeUnit.SECONDS );
+
+        assertTrue( "Completed executing actions", completed );
+
+        //verify we can schedule and execute a new operation
+        CountDownLatch result = schedule( rxScheduler, 1, semaphore, TEST_TIMEOUT  );
+
+        semaphore.release( 1 );
+
+        completed = result.await( 20, TimeUnit.SECONDS );
+
+        assertTrue( "Completed executing actions", completed );
+    }
+
+
+    /**
+     * Schedule actions into the semaphore.
+     *
+     * @param rxScheduler the Scheduler to use
+     * @param totalCount The total count of actions to invoke
+     * @param semaphore The semaphore to take on acquire
+     *
+     * @return The latch to block on.  When all jobs have been executed this will be tripped
+     */
+    private CountDownLatch schedule( Scheduler rxScheduler, final int totalCount, final Semaphore
semaphore, final long timeout ) {
+
+        final CountDownLatch latch = new CountDownLatch( totalCount );
+
+        for ( int i = 0; i < totalCount; i++ ) {
+            final Action0 action = new Action0() {
+                @Override
+                public void call() {
+                    try {
+                        final String threadName = Thread.currentThread().getName();
+
+                        LOG.info( "{} trying to acquire semaphore", threadName );
+                        //get and release the lock
+                        semaphore.tryAcquire( timeout, TimeUnit.MILLISECONDS );
+
+                        LOG.info( "{} has acquired sempahore", threadName );
+
+                        //countdown the latch
+                        latch.countDown();
+                    }
+                    catch ( InterruptedException e ) {
+                        throw new RuntimeException( e );
+                    }
+                }
+            };
+            rxScheduler.schedule( action );
+        }
+
+
+        return latch;
+    }
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/27192cfb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
index 29f11d9..eafe3aa 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
@@ -17,6 +17,8 @@ import com.google.common.collect.Multiset;
 import com.netflix.config.ConfigurationManager;
 
 import rx.Observable;
+import rx.Scheduler;
+import rx.concurrency.Schedulers;
 import rx.util.functions.Func1;
 import rx.util.functions.FuncN;
 
@@ -34,48 +36,18 @@ public class ConcurrentTest {
     private static final Logger logger = LoggerFactory.getLogger( ConcurrentTest.class );
 
 
-    private int preRunThreadCount = -1;
-    private int queueDepth = -1;
-
-
-    @Before
-    public void getSettings() {
-
-        //        preRunThreadCount = ConfigurationManager.getConfigInstance().getInt( CassandraCommand
-        // .THREAD_POOL_SIZE );
-        //
-        //
-        //        //reject requests we have to queue
-        //        queueDepth = ConfigurationManager.getConfigInstance().getInt( CassandraCommand.THREAD_POOL_QUEUE
);
-    }
-
-
-    @After
-    public void restoreSettings() {
-        ConfigurationManager.getConfigInstance().setProperty( CassandraCommand.THREAD_POOL_SIZE,
preRunThreadCount );
-
-
-        //reject requests we have to queue
-        ConfigurationManager.getConfigInstance().setProperty( CassandraCommand.THREAD_POOL_QUEUE,
queueDepth );
-    }
-
 
     @Test( timeout = 5000 )
     public void concurrent() {
+        //use the I/O scheduler so we always have enough threads
+        final Scheduler scheduler = Schedulers.threadPoolForIO();
 
         final String source = "test";
-        Observable<String> observable = CassandraCommand.toObservable( source );
+        Observable<String> observable =  Observable.from( source );
 
         //we could theoretically use the same instance over and over
 
-        final int size = 100;
-
-        //set the size of concurrent threads to our requests/2.  Should be more than sufficient
for this test
-        ConfigurationManager.getConfigInstance().setProperty( CassandraCommand.THREAD_POOL_SIZE,
size / 2 );
-
-
-        //reject requests we have to queue
-        ConfigurationManager.getConfigInstance().setProperty( CassandraCommand.THREAD_POOL_QUEUE,
-1 );
+        final int size = 5;
 
 
         final CountDownLatch latch = new CountDownLatch( size );
@@ -90,7 +62,7 @@ public class ConcurrentTest {
 
 
         //concurrent inherits thread pool from it's observable, set it's thread pool
-        Observable<Integer> result = Concurrent.concurrent( observable, zip, concurrentFunctions
);
+        Observable<Integer> result = Concurrent.concurrent( observable, scheduler,
 zip, concurrentFunctions );
 
         assertEquals( "No invocation yet", 0, set.size() );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/27192cfb/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
index b097b60..ac252fb 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ParallelTest.java
@@ -65,7 +65,7 @@ public class ParallelTest {
     /**
      * An example of how an observable that requires a "fan out" then join should execute.
      */
-    @Test( timeout = 5000 )
+    @Test(timeout = 5000)
     public void concurrentFunctions() {
         final String input = "input";
 
@@ -138,9 +138,13 @@ public class ParallelTest {
 
                             logger.info( "Invoking parallel task in thread {}", threadName
);
 
-                            //Simulate a Hystrix command making a call to an external resource.
 Invokes
-                            //the Hystrix command immediately as the function is invoked
-
+                            /**
+                             * Simulate a Hystrix command making a call to an external resource.
 Invokes
+                             * the Hystrix command immediately as the function is invoked.
 This is currently
+                             * how we have to call Cassandra.
+                             *
+                             * TODO This needs to be re-written and evaluated once this PR
is released https://github.com/Netflix/Hystrix/pull/209
+                             */
                             return new HystrixCommand<Integer>( GROUP_KEY ) {
                                 @Override
                                 protected Integer run() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/27192cfb/stack/corepersistence/graph/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/pom.xml b/stack/corepersistence/graph/pom.xml
index e04d92f..069962d 100644
--- a/stack/corepersistence/graph/pom.xml
+++ b/stack/corepersistence/graph/pom.xml
@@ -53,12 +53,6 @@
       <scope>test</scope>
     </dependency>
 
-    <dependency>
-      <groupId>com.netflix.hystrix</groupId>
-      <artifactId>hystrix-core</artifactId>
-      <version>${hystrix.version}</version>
-    </dependency>
-
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/27192cfb/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
index c790ac4..d2aace2 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/impl/EdgeManagerImpl.java
@@ -76,7 +76,7 @@ public class EdgeManagerImpl implements EdgeManager {
 
     @Override
     public Observable<Edge> writeEdge( final Edge edge ) {
-        return CassandraCommand.toObservable( new GraphIoEvent<Edge>( scope, edge )
).map( edgeWriteStage );
+        return Observable.from( new GraphIoEvent<Edge>( scope, edge ) ).map( edgeWriteStage
);
     }
 
 


Mime
View raw message