usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [38/50] git commit: Removed redundant scheduler
Date Wed, 12 Feb 2014 13:21:46 GMT
Removed redundant scheduler


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dee786b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dee786b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dee786b3

Branch: refs/heads/optimistic-tx-semantics
Commit: dee786b38ca163505086b577659a479c3b3ac10b
Parents: df41a94
Author: Todd Nine <tnine@apigee.com>
Authored: Wed Feb 5 13:58:28 2014 -0700
Committer: Todd Nine <tnine@apigee.com>
Committed: Wed Feb 5 16:46:50 2014 -0700

----------------------------------------------------------------------
 stack/corepersistence/collection/pom.xml        |   9 +-
 .../collection/guice/CollectionModule.java      |  20 +---
 .../collection/rx/CassandraThreadScheduler.java | 107 -------------------
 .../persistence/collection/rx/RxFig.java        |  40 -------
 ...MvccEntitySerializationStrategyImplTest.java |  11 +-
 .../impl/parse/ObservableIterator.java          |   5 +-
 stack/corepersistence/pom.xml                   |   3 +-
 7 files changed, 17 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dee786b3/stack/corepersistence/collection/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/pom.xml b/stack/corepersistence/collection/pom.xml
index 4555264..cfe5028 100644
--- a/stack/corepersistence/collection/pom.xml
+++ b/stack/corepersistence/collection/pom.xml
@@ -161,7 +161,14 @@
     <dependency>
       <groupId>com.netflix.rxjava</groupId>
       <artifactId>rxjava-core</artifactId>
-      <version>0.15.1</version>
+      <version>0.16.1</version>
+    </dependency>
+
+
+    <dependency>
+      <groupId>com.netflix.rxjava</groupId>
+      <artifactId>rxjava-contrib</artifactId>
+      <version>0.14.6</version>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dee786b3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 95bc760..e27da55 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -17,35 +17,25 @@
  */
 package org.apache.usergrid.persistence.collection.guice;
 
-import java.io.IOException;
 
-import org.safehaus.guicyfig.Env;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerSync;
 import org.apache.usergrid.persistence.collection.astyanax.CassandraFig;
-import org.apache.usergrid.persistence.collection.cassandra.AvailablePortFinder;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerImpl;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerSyncImpl;
 import org.apache.usergrid.persistence.collection.migration.MigrationManagerFig;
-import org.apache.usergrid.persistence.collection.rx.CassandraThreadScheduler;
-import org.apache.usergrid.persistence.collection.rx.RxFig;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteFig;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
 
 import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.netflix.config.ConfigurationManager;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteFig;
-
-import rx.Scheduler;
 
 
 /**
@@ -59,8 +49,7 @@ public class CollectionModule extends AbstractModule {
     @Override
     protected void configure() {
         //noinspection unchecked
-        install( new GuicyFigModule( 
-                RxFig.class, 
+        install( new GuicyFigModule(
                 MigrationManagerFig.class,
                 CassandraFig.class, 
                 SerializationFig.class,
@@ -75,7 +64,6 @@ public class CollectionModule extends AbstractModule {
                 .implement( EntityCollectionManagerSync.class, EntityCollectionManagerSyncImpl.class
)
                 .build( EntityCollectionManagerFactory.class ) );
 
-        bind( Scheduler.class ).toProvider( CassandraThreadScheduler.class );
 
         bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class
);
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dee786b3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
deleted file mode 100644
index 512f0ec..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import java.beans.PropertyChangeEvent;
-import java.beans.PropertyChangeListener;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.name.Named;
-
-import rx.Scheduler;
-import rx.concurrency.Schedulers;
-
-
-public class CassandraThreadScheduler implements Provider<Scheduler> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(CassandraThreadScheduler.class);
-
-    private final RxFig rxFig;
-
-
-    @Inject
-    public CassandraThreadScheduler( final RxFig rxFig ) {
-        this.rxFig = rxFig;
-    }
-
-
-    @Override
-    @Named( "cassandraScheduler" )
-    public Scheduler get() {
-
-        //create our thread factory so we can label our threads in case we need to dump them
-        final ThreadFactory factory = new ThreadFactory() {
-
-            private final AtomicLong counter = new AtomicLong();
-
-            @Override
-            public Thread newThread( final Runnable r ) {
-
-               final String threadName = "RxCassandraIOThreadPool-" + counter.incrementAndGet();
-
-                LOG.info( "Allocating new IO thread with name {}", threadName );
-
-                Thread t = new Thread( r, threadName );
-                t.setDaemon( true );
-                return t;
-            }
-        };
-
-
-        /**
-         * Create a threadpool that will reclaim unused threads after 60 seconds.  
-         * It uses the max thread count set here. It intentionally uses the 
-         * DynamicProperty, so that when it is updated, the listener updates the 
-         * pool size. Additional allocation is trivial.  Shrinking the size 
-         * will require all currently executing threads to run to completion, 
-         * without allowing additional tasks to be queued.
-         */
-        final ThreadPoolExecutor pool = new ThreadPoolExecutor( 
-                0, rxFig.getMaxThreadCount(), 60L, TimeUnit.SECONDS,
-                new SynchronousQueue<Runnable>(), factory, new ThreadPoolExecutor.AbortPolicy()
);
-
-
-        // if our max thread count is updated, we want to immediately update the pool.  
-        // Per the javadoc if the size is smaller, existing threads will continue to run

-        // until they become idle and time out
-        rxFig.addPropertyChangeListener( new PropertyChangeListener() {
-            @Override
-            public void propertyChange( final PropertyChangeEvent evt ) {
-            if ( evt.getPropertyName().equals( rxFig.getKeyByMethod( "getMaxThreadCount"
) ) ) {
-                LOG.debug( "Getting update to property: rxFig.getMaxThreadCount() old = {},
new = {} ",
-                        evt.getOldValue(), evt.getNewValue() );
-                pool.setMaximumPoolSize( ( Integer ) evt.getNewValue() );
-            }
-            }
-        } );
-
-        return Schedulers.executor( pool );
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dee786b3/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
deleted file mode 100644
index b7c7b84..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import org.safehaus.guicyfig.Default;
-import org.safehaus.guicyfig.FigSingleton;
-import org.safehaus.guicyfig.GuicyFig;
-import org.safehaus.guicyfig.Key;
-
-
-/**
- * Configuration interface for RxJava classes.
- */
-@FigSingleton
-public interface RxFig extends GuicyFig {
-
-    /**
-     * Max number of threads a pool can allocate.  Can be dynamically changed after starting
-     */
-    @Key( "rx.cassandra.io.threads" )
-    @Default( "100" )
-    int getMaxThreadCount();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dee786b3/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
index b0ee061..f90b47a 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/serialization/impl/MvccEntitySerializationStrategyImplTest.java
@@ -30,7 +30,6 @@ import org.apache.usergrid.persistence.collection.migration.MigrationManagerFig;
 import org.apache.usergrid.persistence.collection.mvcc.MvccEntitySerializationStrategy;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.impl.MvccEntityImpl;
-import org.apache.usergrid.persistence.collection.rx.RxFig;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.util.EntityUtils;
 import org.apache.usergrid.persistence.model.entity.Entity;
@@ -92,14 +91,7 @@ public class MvccEntitySerializationStrategyImplTest {
     )
     public CassandraFig cassandraFig;
 
-    @Inject
-    @Overrides( name = "unit-test",
-        environments = Env.UNIT,
-        options = {
-            @Option( method = "getMaxThreadCount", override = CONNECTION_COUNT )
-        }
-    )
-    public RxFig rxFig;
+
 
     @Inject
     public SerializationFig serializationFig;
@@ -111,7 +103,6 @@ public class MvccEntitySerializationStrategyImplTest {
     @Before
     public void setup() {
         assertNotNull( cassandraFig );
-        assertNotNull( rxFig );
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dee786b3/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
index f820aa9..33efecc 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/parse/ObservableIterator.java
@@ -10,8 +10,9 @@ import rx.subscriptions.Subscriptions;
 
 
 /**
- * Converts an iterator to an observable.  Subclasses need to only implement getting the
iterator from the data source
- * .
+ * Converts an iterator to an observable.  Subclasses need to only implement getting the
iterator from the data source.
+ * This is used in favor of "Observable.just" when the initial fetch of the iterator will
require I/O.  This allows
+ * us to wrap the iterator in a deferred invocation to avoid the blocking on construction.
  */
 public abstract class ObservableIterator<T> implements Observable.OnSubscribeFunc<T>
{
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dee786b3/stack/corepersistence/pom.xml
----------------------------------------------------------------------
diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml
index 4cc845a..4a9db64 100644
--- a/stack/corepersistence/pom.xml
+++ b/stack/corepersistence/pom.xml
@@ -20,10 +20,9 @@
     <archaius.version>0.5.12</archaius.version>
     <slf4j.version>1.7.2</slf4j.version>
     <log4j.version>1.2.17</log4j.version>
-    <rx.version>0.15.1</rx.version>
     <hystrix.version>1.3.8</hystrix.version>
     <guicyfig.version>3.2</guicyfig.version>
-    <chop.version>1.0</chop.version>
+    <chop.version>1.0.1-SNAPSHOT</chop.version>
   </properties>
 
   <modules>


Mime
View raw message