usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From snoopd...@apache.org
Subject [3/4] git commit: Fixed concurrent calls. Removed concurrent helper, it's interfering more than helping.
Date Thu, 13 Feb 2014 04:20:54 GMT
Fixed concurrent calls.  Removed concurrent helper, it's interfering more than helping.

Tests need fixed.  Bug in zip found


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fc355560
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fc355560
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fc355560

Branch: refs/heads/two-dot-o
Commit: fc3555609ce9bb89111e05eee2c313ebda0e9f8b
Parents: d312b31
Author: Todd Nine <tnine@apigee.com>
Authored: Wed Feb 12 20:20:59 2014 -0700
Committer: Todd Nine <tnine@apigee.com>
Committed: Wed Feb 12 20:20:59 2014 -0700

----------------------------------------------------------------------
 .../collection/guice/CollectionModule.java      |  14 +-
 .../impl/EntityCollectionManagerImpl.java       |  78 +++++++--
 .../collection/mvcc/stage/write/WriteFig.java   |  42 -----
 .../mvcc/stage/write/WriteUniqueVerify.java     | 158 +++++++++++--------
 .../persistence/collection/rx/Concurrent.java   |  80 ----------
 .../mvcc/stage/write/WriteUniqueVerifyTest.java |  68 ++++++--
 .../collection/rx/ConcurrentTest.java           | 149 -----------------
 7 files changed, 218 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fc355560/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
index 95bc760..e0744c3 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/guice/CollectionModule.java
@@ -17,19 +17,18 @@
  */
 package org.apache.usergrid.persistence.collection.guice;
 
-import java.io.IOException;
 
-import org.safehaus.guicyfig.Env;
 import org.safehaus.guicyfig.GuicyFigModule;
 
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerSync;
 import org.apache.usergrid.persistence.collection.astyanax.CassandraFig;
-import org.apache.usergrid.persistence.collection.cassandra.AvailablePortFinder;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerImpl;
 import org.apache.usergrid.persistence.collection.impl.EntityCollectionManagerSyncImpl;
 import org.apache.usergrid.persistence.collection.migration.MigrationManagerFig;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
+import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
 import org.apache.usergrid.persistence.collection.rx.CassandraThreadScheduler;
 import org.apache.usergrid.persistence.collection.rx.RxFig;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
@@ -37,13 +36,7 @@ import org.apache.usergrid.persistence.collection.serialization.impl.Serializati
 import org.apache.usergrid.persistence.collection.service.impl.ServiceModule;
 
 import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.netflix.config.ConfigurationManager;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategy;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.UniqueValueSerializationStrategyImpl;
-import org.apache.usergrid.persistence.collection.mvcc.stage.write.WriteFig;
 
 import rx.Scheduler;
 
@@ -63,8 +56,7 @@ public class CollectionModule extends AbstractModule {
                 RxFig.class, 
                 MigrationManagerFig.class,
                 CassandraFig.class, 
-                SerializationFig.class,
-                WriteFig.class ) );
+                SerializationFig.class ) );
 
         install( new SerializationModule() );
         install( new ServiceModule() );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fc355560/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
index 5698080..bc456dc 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/impl/EntityCollectionManagerImpl.java
@@ -24,11 +24,8 @@ import java.util.UUID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.thrift.Cassandra;
-
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
-import org.apache.usergrid.persistence.collection.hystrix.CassandraCommand;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
@@ -51,6 +48,8 @@ import com.google.inject.assistedinject.Assisted;
 
 import rx.Observable;
 import rx.Scheduler;
+import rx.util.functions.Func1;
+import rx.util.functions.Func2;
 import rx.util.functions.FuncN;
 
 
@@ -86,8 +85,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
 
 
     @Inject
-    public EntityCollectionManagerImpl( final UUIDService uuidService, final WriteStart writeStart,
final Scheduler scheduler,
-                                        final WriteUniqueVerify writeVerifyUnique,
+    public EntityCollectionManagerImpl( final UUIDService uuidService, final WriteStart writeStart,
+                                        final Scheduler scheduler, final WriteUniqueVerify
writeVerifyUnique,
                                         final WriteOptimisticVerify writeOptimisticVerify,
                                         final WriteCommit writeCommit, final Load load, final
DeleteStart deleteStart,
                                         final DeleteCommit deleteCommit,
@@ -146,11 +145,60 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
         //create our observable and start the write
         CollectionIoEvent<Entity> writeData = new CollectionIoEvent<Entity>(
collectionScope, entity );
 
-        Observable<CollectionIoEvent<MvccEntity>> observable =  Observable.from(
writeData ).subscribeOn( scheduler ).map( writeStart );
+        Observable<CollectionIoEvent<MvccEntity>> observable =
+                Observable.from( writeData ).subscribeOn( scheduler ).map( writeStart ).flatMap(
+                        new Func1<CollectionIoEvent<MvccEntity>, Observable<CollectionIoEvent<MvccEntity>>>()
{
+
+
+                            @Override
+                            public Observable<CollectionIoEvent<MvccEntity>>
call(
+                                    final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent
) {
+
+                                //do the unique and optimistic steps in parallel
+
+                                /**
+                                 *unique function.  Since there can be more than 1 unique
value in this entity
+                                 * the unique verification step itself is multiple parallel
executions.
+                                 * This is why we use "flatMap" instead of "map", which allows
the
+                                 * WriteVerifyUnique stage to execute multiple verification
steps in parallel and
+                                 * zip the results
+                                 */
+
+
+                                Observable<CollectionIoEvent<MvccEntity>> unique
=
+                                        Observable.from( mvccEntityCollectionIoEvent ).subscribeOn(
scheduler )
+                                                  .flatMap( writeVerifyUnique);
 
 
-        //execute all validation stages concurrently.  Needs refactored when this is done.
 https://github.com/Netflix/RxJava/issues/627
-        observable = Concurrent.concurrent(observable, scheduler, new WaitZip( ), writeVerifyUnique,
writeOptimisticVerify);
+                                //optimistic verification
+                                Observable<CollectionIoEvent<MvccEntity>> optimistic
=
+                                        Observable.from( mvccEntityCollectionIoEvent ).subscribeOn(
scheduler )
+                                                  .map( writeOptimisticVerify );
+
+
+                                //zip the results
+                                /**
+                                 * TODO: Should the zip only return errors here, and if errors
are present, we throw during the zip phase?  I couldn't find "
+                                 */
+
+                               return Observable.zip( unique, optimistic, new Func2<CollectionIoEvent<MvccEntity>,
+                                       CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>>()
{
+                                   @Override
+                                   public CollectionIoEvent<MvccEntity> call(
+                                           final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent,
+                                           final CollectionIoEvent<MvccEntity> mvccEntityCollectionIoEvent2
) {
+
+                                       return mvccEntityCollectionIoEvent;
+                                   }
+                               } );
+                            }
+                        } );
+
+
+//        //execute all validation stages concurrently.  Needs refactored when this is done.
 https://github
+//        // .com/Netflix/RxJava/issues/627
+//        observable =
+//                Concurrent.concurrent( observable, scheduler, new WaitZip(), writeVerifyUnique,
writeOptimisticVerify );
 
         //return the commit result.
         return observable.map( writeCommit );
@@ -166,7 +214,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
         Preconditions.checkNotNull( entityId.getType(), "Entity type is required in this
stage" );
 
 
-        return Observable.from(new CollectionIoEvent<Id>( collectionScope, entityId
) ).subscribeOn( scheduler ).map( deleteStart ).map( deleteCommit );
+        return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId
) ).subscribeOn( scheduler )
+                         .map( deleteStart ).map( deleteCommit );
     }
 
 
@@ -177,16 +226,15 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
         Preconditions.checkNotNull( entityId.getUuid(), "Entity id uuid required in the load
stage" );
         Preconditions.checkNotNull( entityId.getType(), "Entity id type required in the load
stage" );
 
-        return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId
) ).subscribeOn( scheduler ).map( load );
+        return Observable.from( new CollectionIoEvent<Id>( collectionScope, entityId
) ).subscribeOn( scheduler )
+                         .map( load );
     }
 
 
     /**
      * Class that validates all results are equal then proceeds
-     * @param <R>
      */
-    private static class WaitZip<R> implements FuncN<R>{
-
+    private static class WaitZip<R> implements FuncN<R> {
 
 
         private WaitZip() {
@@ -196,8 +244,8 @@ public class EntityCollectionManagerImpl implements EntityCollectionManager
{
         @Override
         public R call( final Object... args ) {
 
-            for(int i = 0; i < args.length-1; i ++){
-                assert args[i] == args[i+1];
+            for ( int i = 0; i < args.length - 1; i++ ) {
+                assert args[i] == args[i + 1];
             }
 
             return ( R ) args[0];

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fc355560/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteFig.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteFig.java
deleted file mode 100644
index d2282a8..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteFig.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  The ASF licenses this file to You
- * under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.  For additional information regarding
- * copyright in this work, please see the NOTICE file in the top level
- * directory of this distribution.
- */
-package org.apache.usergrid.persistence.collection.mvcc.stage.write;
-
-import org.safehaus.guicyfig.Default;
-import org.safehaus.guicyfig.GuicyFig;
-import org.safehaus.guicyfig.Key;
-
-/**
- * Configuration for Write stage classes.
- */
-public interface WriteFig extends GuicyFig {
-
-    /**
-     * Max number of threads the uniqueness verification pool can allocate.  Can be dynamically
changed after starting
-     */
-    @Key( "collection.stage.write.verification.threads" )
-    @Default( "20" )
-    int getMaxThreadCount();
-
-    /**
-     * Time to Live for Unique Values before commit.
-     */
-    @Key( "collection.stage.write.verification.ttl.seconds" )
-    @Default( "10" )
-    int getUniqueValueTimeToLive();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fc355560/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index ad65820..370fd0a 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -17,66 +17,65 @@
  */
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.entity.ValidationUtils;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
+import org.apache.usergrid.persistence.model.entity.Entity;
+import org.apache.usergrid.persistence.model.field.Field;
 
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import org.apache.usergrid.persistence.collection.exception.CollectionRuntimeException;
-import org.apache.usergrid.persistence.model.entity.Entity;
-import org.apache.usergrid.persistence.model.field.Field;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+import rx.Observable;
+import rx.Scheduler;
 import rx.util.functions.Func1;
+import rx.util.functions.FuncN;
+
 
 /**
  * This phase execute all unique value verification on the MvccEntity.
  */
 @Singleton
-public class WriteUniqueVerify
-        implements Func1<CollectionIoEvent<MvccEntity>, CollectionIoEvent<MvccEntity>>
{
+public class WriteUniqueVerify implements Func1<CollectionIoEvent<MvccEntity>, Observable<?
extends CollectionIoEvent<MvccEntity>>> {
 
     private static final Logger LOG = LoggerFactory.getLogger( WriteUniqueVerify.class );
 
     private final UniqueValueSerializationStrategy uniqueValueStrat;
 
-    //TODO Dave: we don't want to use our own thread pool.  Use the Concurrent class to create
new observables
-    private final ExecutorService threadPool;
 
-    private final int MAX_THREAD_COUNT;
+    private final Scheduler scheduler;
 
-    private final int UNIQUE_VALUE_TTL;
+    protected final SerializationFig serializationFig;
 
-    @Inject
-    public WriteUniqueVerify( WriteFig writeFig, 
-            UniqueValueSerializationStrategy uniqueValueSerializiationStrategy ) {
 
-        this.uniqueValueStrat = uniqueValueSerializiationStrategy;
+    @Inject
+    public WriteUniqueVerify( final UniqueValueSerializationStrategy uniqueValueSerializiationStrategy,
+                              final Scheduler scheduler, final SerializationFig serializationFig
) {
 
-        if ( writeFig == null) {
-            MAX_THREAD_COUNT = 100;
-            UNIQUE_VALUE_TTL = 30;
-        } else {
-            MAX_THREAD_COUNT = writeFig.getMaxThreadCount();
-            UNIQUE_VALUE_TTL = writeFig.getUniqueValueTimeToLive();
-        }
+        Preconditions.checkNotNull( uniqueValueSerializiationStrategy, "uniqueValueSerializationStrategy
is required" );
+        Preconditions.checkNotNull( scheduler, "scheduler is required" );
+        Preconditions.checkNotNull( serializationFig, "serializationFig is required" );
 
-        this.threadPool = Executors.newFixedThreadPool( MAX_THREAD_COUNT );
+        this.uniqueValueStrat = uniqueValueSerializiationStrategy;
+        this.scheduler = scheduler;
+        this.serializationFig = serializationFig;
     }
 
+
     @Override
-    public CollectionIoEvent<MvccEntity> call( final CollectionIoEvent<MvccEntity>
ioevent ) {
+    public Observable<? extends CollectionIoEvent<MvccEntity>> call(final CollectionIoEvent<MvccEntity>
ioevent ) {
 
         ValidationUtils.verifyMvccEntityWithEntity( ioevent.getEvent() );
 
@@ -88,90 +87,121 @@ public class WriteUniqueVerify
         // We want to use concurrent to fork all validations this way they're wrapped by
timeouts and
         // Hystrix thread pools for JMX operations.  See the WriteCommand in the EntityCollectionManagerImpl
         // I think it still needs added to the Concurrent utility class
-        final List<Future<FieldUniquenessResult>> results = 
-                new ArrayList<Future<FieldUniquenessResult>>();
 
-        for ( final Field field : entity.getFields() ) {
 
-            if ( field.isUnique() ) {
+        final List<Observable<FieldUniquenessResult>> fields =
+                new ArrayList<Observable<FieldUniquenessResult>>();
+
 
-                results.add( threadPool.submit( new Callable<FieldUniquenessResult>()
{
 
-                    public FieldUniquenessResult call() throws Exception {
+        /**
+         * Construct all the functions for verifying we're unique
+         */
+        for ( final Field field : entity.getFields() ) {
+
+            //if it's unique, create a function to validate it and add it to the list of
concurrent validations
+            if ( !field.isUnique() ) {
 
-                        // use write-first then read strategy 
-                        UniqueValue written = new UniqueValueImpl( ioevent.getEntityCollection(),

-                                field, entity.getId(), entity.getVersion() );
+                Observable<FieldUniquenessResult> result =  Observable.from( field
).subscribeOn( scheduler ).map(new Func1<Field,  FieldUniquenessResult>() {
+                    @Override
+                    public FieldUniquenessResult call(Field field ) {
+
+                        // use write-first then read strategy
+                        UniqueValue written = new UniqueValueImpl( ioevent.getEntityCollection(),
field, entity.getId(),
+                                entity.getVersion() );
 
                         // use TTL in case something goes wrong before entity is finally
committed
-                        MutationBatch mb = uniqueValueStrat.write( written, UNIQUE_VALUE_TTL
);
+                        MutationBatch mb = uniqueValueStrat.write( written, serializationFig.getTimeout()
);
 
                         try {
                             mb.execute();
-                        } catch ( ConnectionException ex ) {
-                            throw new CollectionRuntimeException(
-                                    "Error writing unique value " + field.toString(), ex
);
+                        }
+                        catch ( ConnectionException ex ) {
+                            throw new CollectionRuntimeException( "Error writing unique value
" + field.toString(),
+                                    ex );
                         }
 
                         // does the database value match what we wrote?
                         UniqueValue loaded;
                         try {
-                            loaded = uniqueValueStrat.load(
-                                    ioevent.getEntityCollection(), field );
-
-                        } catch ( ConnectionException ex ) {
+                            loaded = uniqueValueStrat.load( ioevent.getEntityCollection(),
field );
+                        }
+                        catch ( ConnectionException ex ) {
                             throw new CollectionRuntimeException( ex );
                         }
 
-                        return new FieldUniquenessResult( field, loaded.equals( written ));
+                        return new FieldUniquenessResult( field, loaded.equals( written )
);
                     }
+                } );
 
-                } ) );
-
+                fields.add(result);
             }
         }
 
-        for ( Future<FieldUniquenessResult> result : results ) {
-            try {
-                if ( !result.get().isUnique() ) {
-                    Field field = result.get().getField();
-                    throw new CollectionRuntimeException( "Duplicate field value " 
-                            + field.getName() + " = " + field.getValue().toString());
+        //short circuit.  If we zip up nothing, we block forever.
+        if(fields.size() == 0){
+            return Observable.from(ioevent ).subscribeOn( scheduler );
+        }
+
+        /**
+         * Zip the results up
+         */
+        final FuncN<CollectionIoEvent<MvccEntity>> zipFunction = new FuncN<CollectionIoEvent<MvccEntity>>()
{
+            @Override
+            public CollectionIoEvent<MvccEntity> call( final Object... args ) {
+
+
+
+                for ( Object resultObj : args ) {
+
+                    FieldUniquenessResult result = ( FieldUniquenessResult ) resultObj;
+
+                    if ( !result.isUnique() ) {
+                        Field field = result.getField();
+                        throw new CollectionRuntimeException(
+                                "Duplicate field value " + field.getName() + " = " + field.getValue().toString()
);
+                    }
                 }
-            } catch ( InterruptedException ex ) {
-                LOG.error( "Error verifing uniqueness", ex );
-            } catch ( ExecutionException ex ) {
-                LOG.error( "Error verifing uniqueness", ex );
+
+                //return the original event
+                return ioevent;
             }
-        }
+        };
+
+
+        return Observable.zip( fields, zipFunction );
 
-        return ioevent;
     }
 
+
     static class FieldUniquenessResult {
         private Field field;
         private Boolean unique;
 
+
         public FieldUniquenessResult( Field f, Boolean u ) {
             this.field = f;
             this.unique = u;
         }
 
+
         public Boolean isUnique() {
             return unique;
         }
 
+
         public void setUnique( Boolean isUnique ) {
             this.unique = isUnique;
         }
 
+
         public Field getField() {
             return field;
         }
 
+
         public void setField( Field field ) {
             this.field = field;
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fc355560/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
deleted file mode 100644
index 868c73b..0000000
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/rx/Concurrent.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.usergrid.persistence.collection.hystrix.CassandraCommand;
-
-import com.netflix.hystrix.HystrixCommand;
-
-import rx.Observable;
-import rx.Scheduler;
-import rx.concurrency.Schedulers;
-import rx.operators.OperationMerge;
-import rx.util.functions.Func1;
-import rx.util.functions.FuncN;
-
-
-/**
- * A utility class that encapsulates possible combinations of functions.
- * These functions are executed in parallel, then "zipped" into a single response.  
- * This is useful when you want to perform operations on a single initial observable, 
- * then combine the result of multiple parallel operations on that object into a single observable
to continue the sequence
- */
-public class Concurrent {
-
-
-    /**
-     * Create an instance of concurrent execution.  All functions specified 
-     * in the list are invoked in parallel. The results are then "zipped" 
-     * into a single observable which is returned  with the specified function
-     *
-     * @param observable The observable we're invoking many concurrent operations on
-     * @param scheduler The scheduler to use when invoking the parallel operations
-     * @param zipFunction The zip function to aggregate the results. And return the observable
-     * @param concurrent The concurrent operations we're invoking
-     * @return The observable emitted from the zipped function of type Z
-     */
-    public static <T, R, Z> Observable<Z> concurrent(
-            final Observable<T> observable, final Scheduler scheduler, final FuncN<Z>
zipFunction, final Func1<T, R>... concurrent ){
-
-       return  observable.mapMany( new Func1<T, Observable<Z>>() {
-            @Override
-            public Observable<Z> call( final T input ) {
-
-               List<Observable<R>> observables = new ArrayList<Observable<R>>(concurrent.length);
-
-               //Create multiple observables for each function.  They simply emit the input
value.
-               //this is the "fork" step of the concurrent processing
-               for( Func1<T, R> funct: concurrent){
-                   final Observable<R> observable = Observable.from( input ).subscribeOn(
scheduler ).map( funct );
-
-                   observables.add( observable );
-               }
-
-                return Observable.zip( observables, zipFunction );
-            }
-        } );
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fc355560/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
index d247127..da16a21 100644
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
+++ b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerifyTest.java
@@ -18,29 +18,39 @@
 package org.apache.usergrid.persistence.collection.mvcc.stage.write;
 
 
+import org.jukito.JukitoRunner;
 import org.jukito.UseModules;
 import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import org.apache.usergrid.persistence.collection.CollectionScope;
 import org.apache.usergrid.persistence.collection.cassandra.CassandraRule;
+import org.apache.usergrid.persistence.collection.guice.MigrationManagerRule;
 import org.apache.usergrid.persistence.collection.guice.TestCollectionModule;
 import org.apache.usergrid.persistence.collection.mvcc.entity.MvccEntity;
 import org.apache.usergrid.persistence.collection.mvcc.stage.AbstractMvccEntityStageTest;
 import org.apache.usergrid.persistence.collection.mvcc.stage.CollectionIoEvent;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
 import org.apache.usergrid.persistence.model.entity.Entity;
 
 import com.google.inject.Inject;
 
+import rx.Scheduler;
+
 import static org.apache.usergrid.persistence.collection.mvcc.stage.TestEntityGenerator.fromEntity;
 import static org.apache.usergrid.persistence.collection.mvcc.stage.TestEntityGenerator.generateEntity;
 import static org.junit.Assert.assertSame;
 import static org.mockito.Mockito.mock;
 
 
-/** 
- * @author tnine 
+/**
+ * TODO: Update the test to correctly test for detecting more than 1 duplicate and exception
handling correctly
+ *
+ * @author tnine
  */
+@RunWith( JukitoRunner.class )
 @UseModules( TestCollectionModule.class )
 public class WriteUniqueVerifyTest extends AbstractMvccEntityStageTest {
 
@@ -50,24 +60,41 @@ public class WriteUniqueVerifyTest extends AbstractMvccEntityStageTest
{
     @Inject
     private UniqueValueSerializationStrategy uniqueValueSerializiationStrategy;
 
-    /** Standard flow */
-    @Test
+
+    @Inject
+    @Rule
+    public MigrationManagerRule migrationManagerRule;
+
+
+    @Inject
+    private Scheduler scheduler;
+
+    @Inject
+    private SerializationFig fig;
+
+
+    /**
+     * Standard flow
+     */
+    @Test( timeout = 5000 )
     public void testStartStage() throws Exception {
 
         final CollectionScope collectionScope = mock( CollectionScope.class );
 
+
         // set up the mock to return the entity from the start phase
         final Entity entity = generateEntity();
 
         final MvccEntity mvccEntity = fromEntity( entity );
 
         // run the stage
-        WriteUniqueVerify newStage = new WriteUniqueVerify( null, uniqueValueSerializiationStrategy
);
+        WriteUniqueVerify newStage = new WriteUniqueVerify( uniqueValueSerializiationStrategy,
scheduler, fig );
 
-        CollectionIoEvent<MvccEntity>
-            result = newStage.call( new CollectionIoEvent<MvccEntity>( collectionScope,
mvccEntity ) );
+        CollectionIoEvent<MvccEntity> result =
+                newStage.call( new CollectionIoEvent<MvccEntity>( collectionScope,
mvccEntity ) ).toBlockingObservable()
+                        .last();
 
-        assertSame("Context was correct", collectionScope, result.getEntityCollection())
;
+        assertSame( "Context was correct", collectionScope, result.getEntityCollection()
);
 
         // verify the log entry is correct
         MvccEntity entry = result.getEvent();
@@ -80,11 +107,32 @@ public class WriteUniqueVerifyTest extends AbstractMvccEntityStageTest
{
         assertSame( "Entity correct", entity, entry.getEntity().get() );
     }
 
+
+    @Test
+    public void testNoFields() {
+        final CollectionScope collectionScope = mock( CollectionScope.class );
+
+
+        // set up the mock to return the entity from the start phase
+        final Entity entity = generateEntity();
+
+        final MvccEntity mvccEntity = fromEntity( entity );
+
+        // run the stage
+        WriteUniqueVerify newStage = new WriteUniqueVerify( uniqueValueSerializiationStrategy,
scheduler, fig );
+
+        CollectionIoEvent<MvccEntity> result =
+                newStage.call( new CollectionIoEvent<MvccEntity>( collectionScope,
mvccEntity ) ).toBlockingObservable()
+                        .last();
+
+        assertSame( "Context was correct", collectionScope, result.getEntityCollection()
);
+    }
+
+
     @Override
     protected void validateStage( final CollectionIoEvent<MvccEntity> event ) {
-        new WriteUniqueVerify( null, uniqueValueSerializiationStrategy ).call( event );
+        new WriteUniqueVerify( uniqueValueSerializiationStrategy, scheduler, fig ).call(
event );
     }
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fc355560/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
b/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
deleted file mode 100644
index eafe3aa..0000000
--- a/stack/corepersistence/collection/src/test/java/org/apache/usergrid/persistence/collection/rx/ConcurrentTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package org.apache.usergrid.persistence.collection.rx;
-
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.usergrid.persistence.collection.hystrix.CassandraCommand;
-
-import com.google.common.collect.HashMultiset;
-import com.google.common.collect.Multiset;
-import com.netflix.config.ConfigurationManager;
-
-import rx.Observable;
-import rx.Scheduler;
-import rx.concurrency.Schedulers;
-import rx.util.functions.Func1;
-import rx.util.functions.FuncN;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-
-/**
- *
- *
- */
-public class ConcurrentTest {
-
-    private static final Logger logger = LoggerFactory.getLogger( ConcurrentTest.class );
-
-
-
-    @Test( timeout = 5000 )
-    public void concurrent() {
-        //use the I/O scheduler so we always have enough threads
-        final Scheduler scheduler = Schedulers.threadPoolForIO();
-
-        final String source = "test";
-        Observable<String> observable =  Observable.from( source );
-
-        //we could theoretically use the same instance over and over
-
-        final int size = 5;
-
-
-        final CountDownLatch latch = new CountDownLatch( size );
-
-        TestConcurrent[] concurrentFunctions = new TestConcurrent[size];
-
-        for ( int i = 0; i < size; i++ ) {
-            concurrentFunctions[i] = new TestConcurrent( latch, i );
-        }
-
-        Zip zip = new Zip();
-
-
-        //concurrent inherits thread pool from it's observable, set it's thread pool
-        Observable<Integer> result = Concurrent.concurrent( observable, scheduler,
 zip, concurrentFunctions );
-
-        assertEquals( "No invocation yet", 0, set.size() );
-
-
-        //now invoke it
-        Integer response = result.toBlockingObservable().single();
-
-
-        assertEquals( "Same value emitted", size - 1, response.intValue() );
-
-        //verify each function executed in it's own thread
-
-        assertEquals( size + " threads invoked", size, set.size() );
-
-        //print them out just for giggles
-        for ( Multiset.Entry<String> entry : set.entrySet() ) {
-            System.out.println( entry.getElement() );
-            assertEquals( "1 Thread per invocation", 1, entry.getCount() );
-        }
-    }
-
-
-    private Multiset<String> set = HashMultiset.create();
-
-
-    public class Zip implements FuncN<Integer> {
-
-
-        @Override
-        public Integer call( final Object... args ) {
-
-            //validate our arguments come in order
-            for ( int i = 0; i < args.length; i++ ) {
-
-                assertEquals( i, args[i] );
-            }
-
-            return ( Integer ) args[args.length - 1];
-        }
-    }
-
-
-    /**
-     * Simple function that just adds data to our multiset for later evaluation
-     */
-    public class TestConcurrent implements Func1<String, Integer> {
-
-        private final CountDownLatch latch;
-        private final int index;
-
-
-        public TestConcurrent( final CountDownLatch latch, int index ) {
-            this.latch = latch;
-            this.index = index;
-        }
-
-
-        @Override
-        public Integer call( final String s ) {
-            final String threadName = Thread.currentThread().getName();
-
-            logger.info( "Function executing on thread {}", threadName );
-
-            set.add( threadName );
-
-            //we want to make sure each thread blocks until they all have passed the latch
-            //this way we can ensure they're all running concurrently
-            try {
-                latch.countDown();
-
-
-                boolean waited = latch.await( 5, TimeUnit.SECONDS );
-
-                //validate everything ran concurrently
-                assertTrue( "Latch released", waited );
-            }
-            catch ( InterruptedException e ) {
-                logger.error( "Runner interrupted", e );
-            }
-
-            return index;
-        }
-    }
-}


Mime
View raw message