usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [1/3] incubator-usergrid git commit: Removed inner observable and converted it into regular code that functions. Will convert back in next push.
Date Fri, 22 May 2015 18:21:15 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-608 ed317b295 -> 630cb4a87


Removed inner observable and converted it into regular code that functions. Will convert back
in next push.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/11648ab1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/11648ab1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/11648ab1

Branch: refs/heads/USERGRID-608
Commit: 11648ab1c84be349a45020e0cc4a9cafd60d005b
Parents: ed317b2
Author: GERey <greyes@apigee.com>
Authored: Mon May 18 15:08:15 2015 -0700
Committer: GERey <greyes@apigee.com>
Committed: Mon May 18 15:08:15 2015 -0700

----------------------------------------------------------------------
 .../mvcc/stage/delete/UniqueCleanup.java        | 106 ++++++++++++-------
 1 file changed, 69 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/11648ab1/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
index 0034f03..3e3e531 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/delete/UniqueCleanup.java
@@ -88,44 +88,76 @@ public class UniqueCleanup
                 final ApplicationScope applicationScope = mvccEntityCollectionIoEvent.getEntityCollection();
                 final UUID entityVersion = mvccEntityCollectionIoEvent.getEvent().getVersion();
 
+                Iterator<UniqueValue> uniqueFields = uniqueValueSerializationStrategy.getAllUniqueFields(
+                    applicationScope, entityId );
+
+                final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+
+
+                while(uniqueFields.hasNext()){
+                    UniqueValue uniqueValue = uniqueFields.next();
+                    final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
+                    //TODO: should this be equals? That way we clean up the one marked as
well
+                    if(UUIDComparator.staticCompare( entityVersion, uniqueValueVersion )
>= 0){
+                        logger
+                            .debug( "Deleting value:{} from application scope: {} ", uniqueValue,
applicationScope );
+                        uniqueCleanupBatch.mergeShallow(
+                            uniqueValueSerializationStrategy.delete( applicationScope,uniqueValue
));
+
+                    }
+                }
+
+                try {
+                    uniqueCleanupBatch.execute();
+                }
+                catch ( ConnectionException e ) {
+                    throw new RuntimeException( "Unable to execute batch mutation", e );
+                }
+
+
+
                 //TODO Refactor this logic into a a class that can be invoked from anywhere
-                //iterate all unique values
-                final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup
=
-                    Observable.create( new ObservableIterator<UniqueValue>( "Unique
value load" ) {
-                        @Override
-                        protected Iterator<UniqueValue> getIterator() {
-                            return uniqueValueSerializationStrategy.getAllUniqueFields( applicationScope,
entityId );
-                        }
-                    } )
-
-                        //skip  versions > the specified version
-                        .skipWhile( uniqueValue -> {
-
-                            final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
-
-                            return UUIDComparator.staticCompare( uniqueValueVersion, entityVersion
) > 0;
-                        } )
-
-                            //buffer our buffer size, then roll them all up in a single batch
mutation
-                        .buffer( serializationFig.getBufferSize() )
-
-                            //roll them up
-                        .doOnNext( uniqueValues -> {
-                            final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
-
-
-                            for ( UniqueValue value : uniqueValues ) {
-                                uniqueCleanupBatch
-                                    .mergeShallow( uniqueValueSerializationStrategy.delete(
applicationScope, value ) );
-                            }
-
-                            try {
-                                uniqueCleanupBatch.execute();
-                            }
-                            catch ( ConnectionException e ) {
-                                throw new RuntimeException( "Unable to execute batch mutation",
e );
-                            }
-                        } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent
);
+//                //iterate all unique values
+//                final Observable<CollectionIoEvent<MvccEntity>> uniqueValueCleanup
=
+//                    Observable.create( new ObservableIterator<UniqueValue>( "Unique
value load" ) {
+//                        @Override
+//                        protected Iterator<UniqueValue> getIterator() {
+//                            return uniqueValueSerializationStrategy.getAllUniqueFields(
applicationScope, entityId );
+//                        }
+//                    } )
+//
+//                        //skip  versions > the specified version
+//                        //TODO: does this emit for every version before the staticComparator?
+//                        .skipWhile( uniqueValue -> {
+//
+//                            logger.debug( "Cleaning up version:{} in UniqueCleanup", entityVersion
);
+//                            final UUID uniqueValueVersion = uniqueValue.getEntityVersion();
+//                            //TODO: should this be equals? That way we clean up the one
marked as well
+//                            return UUIDComparator.staticCompare( entityVersion,uniqueValueVersion
) > 0;
+//                        } )
+//
+//                            //buffer our buffer size, then roll them all up in a single
batch mutation
+//                        .buffer( serializationFig.getBufferSize() )
+//
+//                            //roll them up
+//                        .doOnNext( uniqueValues -> {
+//                            final MutationBatch uniqueCleanupBatch = keyspace.prepareMutationBatch();
+//
+//
+//                            for ( UniqueValue value : uniqueValues ) {
+//                                logger
+//                                    .debug( "Deleting value:{} from application scope:
{} ", value, applicationScope );
+//                                uniqueCleanupBatch
+//                                    .mergeShallow( uniqueValueSerializationStrategy.delete(
applicationScope, value ) );
+//                            }
+//
+//                            try {
+//                                uniqueCleanupBatch.execute();
+//                            }
+//                            catch ( ConnectionException e ) {
+//                                throw new RuntimeException( "Unable to execute batch mutation",
e );
+//                            }
+//                        } ).lastOrDefault( Collections.emptyList() ).map( list -> mvccEntityCollectionIoEvent
);
             } );
 
         return ObservableTimer.time( outputObservable, uniqueCleanupTimer );


Mime
View raw message