usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [43/50] git commit: Temp commit
Date Wed, 12 Feb 2014 13:21:51 GMT
Temp commit


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/6d9232e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/6d9232e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/6d9232e8

Branch: refs/heads/optimistic-tx-semantics
Commit: 6d9232e8636565e1e352f18740eb2bbcd8f2cc34
Parents: af4d9d0
Author: Todd Nine <tnine@apigee.com>
Authored: Fri Feb 7 09:19:50 2014 -0700
Committer: Todd Nine <tnine@apigee.com>
Committed: Fri Feb 7 09:19:50 2014 -0700

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |  20 +++-
 .../collection/rx/CassandraThreadScheduler.java | 107 +++++++++++++++++++
 .../persistence/collection/rx/RxFig.java        |  40 +++++++
 3 files changed, 163 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d9232e8/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index e27da55..95bc760 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -17,25 +17,35 @@
  */
 package org.apache.usergrid.persistence.collection.guice;
 
+import java.io.IOException;
 
+import org.safehaus.guicyfig.Env;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerSync;
 import org.apache.usergrid.persistence.collection.astyanax.CassandraFig;
+import org.apache.usergrid.persistence.collection.cassandra.AvailablePortFinder;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerImpl;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerSyncImpl;
 import org.apache.usergrid.persistence.collection.migration.MigrationManagerFig;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteFig;
+import org.apache.usergrid.persistence.collection.rx.CassandraThreadScheduler;
+import org.apache.usergrid.persistence.collection.rx.RxFig;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.collection.serialization.impl.SerializationModule;
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.netflix.config.ConfigurationManager;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteFig;
+
+import rx.Scheduler;
 
 
 /**
@@ -49,7 +59,8 @@ public class CollectionModule extends AbstractModule {
     @Override
     protected void configure() {
         //noinspection unchecked
-        install( new GuicyFigModule(
+        install( new GuicyFigModule( 
+                RxFig.class, 
                 MigrationManagerFig.class,
                 CassandraFig.class, 
                 SerializationFig.class,
@@ -64,6 +75,7 @@ public class CollectionModule extends AbstractModule {
                 .implement( EntityCollectionManagerSync.class, EntityCollectionManagerSyncImpl.class
)
                 .build( EntityCollectionManagerFactory.class ) );
 
+        bind( Scheduler.class ).toProvider( CassandraThreadScheduler.class );
 
         bind( UniqueValueSerializationStrategy.class ).to( UniqueValueSerializationStrategyImpl.class
);
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d9232e8/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
new file mode 100644
index 0000000..512f0ec
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/CassandraThreadScheduler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.rx;
+
+
+import java.beans.PropertyChangeEvent;
+import java.beans.PropertyChangeListener;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.name.Named;
+
+import rx.Scheduler;
+import rx.concurrency.Schedulers;
+
+
+public class CassandraThreadScheduler implements Provider<Scheduler> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraThreadScheduler.class);
+
+    private final RxFig rxFig;
+
+
+    @Inject
+    public CassandraThreadScheduler( final RxFig rxFig ) {
+        this.rxFig = rxFig;
+    }
+
+
+    @Override
+    @Named( "cassandraScheduler" )
+    public Scheduler get() {
+
+        //create our thread factory so we can label our threads in case we need to dump them
+        final ThreadFactory factory = new ThreadFactory() {
+
+            private final AtomicLong counter = new AtomicLong();
+
+            @Override
+            public Thread newThread( final Runnable r ) {
+
+               final String threadName = "RxCassandraIOThreadPool-" + counter.incrementAndGet();
+
+                LOG.info( "Allocating new IO thread with name {}", threadName );
+
+                Thread t = new Thread( r, threadName );
+                t.setDaemon( true );
+                return t;
+            }
+        };
+
+
+        /**
+         * Create a threadpool that will reclaim unused threads after 60 seconds.  
+         * It uses the max thread count set here. It intentionally uses the 
+         * DynamicProperty, so that when it is updated, the listener updates the 
+         * pool size. Additional allocation is trivial.  Shrinking the size 
+         * will require all currently executing threads to run to completion, 
+         * without allowing additional tasks to be queued.
+         */
+        final ThreadPoolExecutor pool = new ThreadPoolExecutor( 
+                0, rxFig.getMaxThreadCount(), 60L, TimeUnit.SECONDS,
+                new SynchronousQueue<Runnable>(), factory, new ThreadPoolExecutor.AbortPolicy()
);
+
+
+        // if our max thread count is updated, we want to immediately update the pool.  
+        // Per the javadoc if the size is smaller, existing threads will continue to run

+        // until they become idle and time out
+        rxFig.addPropertyChangeListener( new PropertyChangeListener() {
+            @Override
+            public void propertyChange( final PropertyChangeEvent evt ) {
+            if ( evt.getPropertyName().equals( rxFig.getKeyByMethod( "getMaxThreadCount"
) ) ) {
+                LOG.debug( "Getting update to property: rxFig.getMaxThreadCount() old = {},
new = {} ",
+                        evt.getOldValue(), evt.getNewValue() );
+                pool.setMaximumPoolSize( ( Integer ) evt.getNewValue() );
+            }
+            }
+        } );
+
+        return Schedulers.executor( pool );
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/6d9232e8/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
new file mode 100644
index 0000000..b7c7b84
--- /dev/null
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/RxFig.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.usergrid.persistence.collection.rx;
+
+
+import org.safehaus.guicyfig.Default;
+import org.safehaus.guicyfig.FigSingleton;
+import org.safehaus.guicyfig.GuicyFig;
+import org.safehaus.guicyfig.Key;
+
+
+/**
+ * Configuration interface for RxJava classes.
+ */
+@FigSingleton
+public interface RxFig extends GuicyFig {
+
+    /**
+     * Max number of threads a pool can allocate.  Can be dynamically changed after starting
+     */
+    @Key( "rx.cassandra.io.threads" )
+    @Default( "100" )
+    int getMaxThreadCount();
+}


Mime
View raw message