usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [18/25] incubator-usergrid git commit: Updated the data migration to also migrate version logs
Date Wed, 01 Apr 2015 20:44:53 GMT
Updated the data migration to also migrate version logs

Updated the batch to use lambdas


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/72da01d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/72da01d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/72da01d5

Branch: refs/heads/two-dot-o-dev
Commit: 72da01d52da94cfff6600e4a6dbc26f11e37d4b4
Parents: d145eb4
Author: Todd Nine <tnine@apigee.com>
Authored: Tue Mar 31 15:04:47 2015 -0600
Committer: Todd Nine <tnine@apigee.com>
Committed: Tue Mar 31 15:04:47 2015 -0600

----------------------------------------------------------------------
 .../migration/MvccEntityDataMigrationImpl.java  | 42 +++++++++---
 .../index/impl/EsIndexBufferConsumerImpl.java   | 72 ++++++--------------
 2 files changed, 52 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72da01d5/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
index 4551d5f..3168817 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/serialization/impl/migration/MvccEntityDataMigrationImpl.java
@@ -30,13 +30,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.collection.MvccEntity;
+import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.impl.EntityVersionCleanupTask;
 import org.apache.usergrid.persistence.collection.impl.EntityVersionTaskFactory;
 import org.apache.usergrid.persistence.collection.serialization.MvccEntitySerializationStrategy;
+import org.apache.usergrid.persistence.collection.serialization.MvccLogEntrySerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValue;
 import org.apache.usergrid.persistence.collection.serialization.UniqueValueSerializationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.MvccEntitySerializationStrategyV3Impl;
+import org.apache.usergrid.persistence.collection.serialization.impl.MvccLogEntrySerializationStrategyV2Impl;
 import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueImpl;
+import org.apache.usergrid.persistence.collection.serialization.impl.UniqueValueSerializationStrategyV2Impl;
 import org.apache.usergrid.persistence.core.migration.data.DataMigration;
 import org.apache.usergrid.persistence.core.migration.data.DataMigrationException;
 import org.apache.usergrid.persistence.core.migration.data.MigrationDataProvider;
@@ -72,25 +76,25 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
 
     private final Keyspace keyspace;
     private final VersionedMigrationSet<MvccEntitySerializationStrategy> allVersions;
-    private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
     private final EntityVersionTaskFactory entityVersionCleanupFactory;
     private final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3;
-
+    private final UniqueValueSerializationStrategy uniqueValueSerializationStrategy;
+    private final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy;
 
 
     @Inject
     public MvccEntityDataMigrationImpl( final Keyspace keyspace,
                                         final VersionedMigrationSet<MvccEntitySerializationStrategy>
allVersions,
-                                        final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
                                         final EntityVersionTaskFactory entityVersionCleanupFactory,
-                                        final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3
-                                      ) {
-
+                                        final MvccEntitySerializationStrategyV3Impl mvccEntitySerializationStrategyV3,
+                                        final UniqueValueSerializationStrategy uniqueValueSerializationStrategy,
+                                        final MvccLogEntrySerializationStrategy mvccLogEntrySerializationStrategy
) {
         this.keyspace = keyspace;
         this.allVersions = allVersions;
-        this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
         this.entityVersionCleanupFactory = entityVersionCleanupFactory;
         this.mvccEntitySerializationStrategyV3 = mvccEntitySerializationStrategyV3;
+        this.uniqueValueSerializationStrategy = uniqueValueSerializationStrategy;
+        this.mvccLogEntrySerializationStrategy = mvccLogEntrySerializationStrategy;
     }
 
 
@@ -189,11 +193,11 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
                             // time with
                             // no TTL so that cleanup can clean up
                             // older values
-                            for ( Field field : EntityUtils.getUniqueFields( message.entity.getEntity().get()
) ) {
+                            for (final Field field : EntityUtils.getUniqueFields( message.entity.getEntity().get()
) ) {
 
-                                UniqueValue written = new UniqueValueImpl( field, entityId,
version );
+                                final UniqueValue written = new UniqueValueImpl( field, entityId,
version );
 
-                                MutationBatch mb = uniqueValueSerializationStrategy.write(
message.scope, written );
+                                final MutationBatch mb = uniqueValueSerializationStrategy.write(
message.scope, written );
 
 
                                 // merge into our
@@ -202,6 +206,24 @@ public class MvccEntityDataMigrationImpl implements DataMigration<EntityIdScope>
                                 totalBatch.mergeShallow( mb );
                             }
 
+
+                            //add all our log entries
+                            final List<MvccLogEntry> logEntries = mvccLogEntrySerializationStrategy.load(
message.scope,
+                                message.entity.getId(), version, 1000 );
+
+                            /**
+                             * Migrate the log entry to the new format
+                             */
+                            for(final MvccLogEntry entry: logEntries){
+
+                                final MutationBatch mb = mvccLogEntrySerializationStrategy.write(
message.scope, entry );
+
+                                totalBatch.mergeShallow( mb );
+                            }
+
+
+
+                            //schedule our cleanup task to clean up all the data
                             final EntityVersionCleanupTask task = entityVersionCleanupFactory
                                 .getCleanupTask( message.scope, message.entity.getId(), version,
false );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/72da01d5/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 6d5a4d8..7e8d8f4 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -21,7 +21,6 @@ package org.apache.usergrid.persistence.index.impl;
 
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -50,8 +49,6 @@ import rx.Observable;
 import rx.Subscriber;
 import rx.Subscription;
 import rx.functions.Action1;
-import rx.functions.Func1;
-import rx.functions.Func2;
 import rx.schedulers.Schedulers;
 
 
@@ -200,9 +197,7 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer
{
                         }
                         while ( true );
                     }
-                } ).doOnNext( new Action1<List<IndexOperationMessage>>() {
-                @Override
-                public void call( List<IndexOperationMessage> containerList ) {
+                } ).doOnNext( containerList -> {
                     if ( containerList.size() == 0 ) {
                         return;
                     }
@@ -214,16 +209,12 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer
{
                     execute( containerList );
 
                     time.stop();
-                }
-            } )
+                } )
                 //ack after we process
-                .doOnNext( new Action1<List<IndexOperationMessage>>() {
-                    @Override
-                    public void call( final List<IndexOperationMessage> indexOperationMessages
) {
-                        bufferQueue.ack( indexOperationMessages );
-                        //release  so we know we've done processing
-                        inFlight.addAndGet( -1 * indexOperationMessages.size() );
-                    }
+                .doOnNext( indexOperationMessages -> {
+                    bufferQueue.ack( indexOperationMessages );
+                    //release  so we know we've done processing
+                    inFlight.addAndGet( -1 * indexOperationMessages.size() );
                 } )
 
                 .subscribeOn( Schedulers.newThread() );
@@ -240,54 +231,31 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer
{
     /**
      * Execute the request, check for errors, then re-init the batch for future use
      */
-    private void execute(final List<IndexOperationMessage> operationMessages) {
+    private void execute( final List<IndexOperationMessage> operationMessages ) {
 
-        if (operationMessages == null || operationMessages.size() == 0) {
+        if ( operationMessages == null || operationMessages.size() == 0 ) {
             return;
         }
 
         //process and flatten all the messages to builder requests
         //batch shard operations into a bulk request
-        Observable.from( operationMessages ).flatMap( new Func1<IndexOperationMessage,
Observable<BatchRequest>>() {
-            @Override
-            public Observable<BatchRequest> call( final IndexOperationMessage indexOperationMessage
) {
-                final Observable<IndexRequest> index = Observable.from( indexOperationMessage.getIndexRequests()
);
-                final Observable<DeIndexRequest> deIndex =
-                    Observable.from( indexOperationMessage.getDeIndexRequests() );
+        Observable.from( operationMessages ).flatMap( indexOperationMessage -> {
+            final Observable<IndexRequest> index = Observable.from( indexOperationMessage.getIndexRequests()
);
+            final Observable<DeIndexRequest> deIndex = Observable.from( indexOperationMessage.getDeIndexRequests()
);
 
-                indexSizeCounter.dec( indexOperationMessage.getDeIndexRequests().size() );
-                indexSizeCounter.dec( indexOperationMessage.getIndexRequests().size() );
+            indexSizeCounter.dec( indexOperationMessage.getDeIndexRequests().size() );
+            indexSizeCounter.dec( indexOperationMessage.getIndexRequests().size() );
 
-                return Observable.merge( index, deIndex );
-            }
+            return Observable.merge( index, deIndex );
         } )
-      //collection all the operations into a single stream
-       .reduce( initRequest(), new Func2<BulkRequestBuilder, BatchRequest, BulkRequestBuilder>()
{
-           @Override
-           public BulkRequestBuilder call( final BulkRequestBuilder bulkRequestBuilder,
-                                           final BatchRequest batchRequest ) {
-               batchRequest.doOperation( client, bulkRequestBuilder );
-
-               return bulkRequestBuilder;
-           }
-       } )
-        //send the request off to ES
-        .doOnNext( new Action1<BulkRequestBuilder>() {
-            @Override
-            public void call( final BulkRequestBuilder bulkRequestBuilder ) {
-                sendRequest( bulkRequestBuilder );
-            }
-        } ).toBlocking().lastOrDefault(null);
+            //collection all the operations into a single stream
+            .collect( () -> initRequest(), ( bulkRequestBuilder, batchRequest ) ->
{
+                batchRequest.doOperation( client, bulkRequestBuilder );
+            } )  //send the request off to ES
+            .doOnNext( bulkRequestBuilder -> sendRequest( bulkRequestBuilder ) ).toBlocking().lastOrDefault(
null );
 
         //call back all futures
-        Observable.from(operationMessages)
-            .doOnNext(new Action1<IndexOperationMessage>() {
-                @Override
-                public void call(IndexOperationMessage operationMessage) {
-                    operationMessage.getFuture().done();
-                }
-            })
-            .toBlocking().lastOrDefault(null);
+        Observable.from( operationMessages ).doOnNext( operationMessage -> operationMessage.getFuture().done()
).toBlocking().lastOrDefault( null );
     }
 
 


Mime
View raw message