usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sfeld...@apache.org
Subject [1/2] incubator-usergrid git commit: Add migration system plugin into the script. Reformat logging, ensure appinfos are migrated only when they can be.
Date Thu, 13 Aug 2015 20:58:28 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 4f9650b93 -> 35430a59d


Add migration system plugin into the script.  Reformat logging, ensure appinfos are migrated
only when they can be.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5803d58c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5803d58c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5803d58c

Branch: refs/heads/two-dot-o-dev
Commit: 5803d58c972562567645a8871c0d36c1f5ebf133
Parents: e3a4a95
Author: Michael Russo <michaelarusso@gmail.com>
Authored: Thu Aug 13 13:12:33 2015 -0700
Committer: Michael Russo <michaelarusso@gmail.com>
Committed: Thu Aug 13 13:12:33 2015 -0700

----------------------------------------------------------------------
 stack/scripts/migrate_entity_data.py | 155 +++++++++++++++++++++---------
 1 file changed, 112 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5803d58c/stack/scripts/migrate_entity_data.py
----------------------------------------------------------------------
diff --git a/stack/scripts/migrate_entity_data.py b/stack/scripts/migrate_entity_data.py
index fd4d936..13c1b41 100644
--- a/stack/scripts/migrate_entity_data.py
+++ b/stack/scripts/migrate_entity_data.py
@@ -15,13 +15,30 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-# Usage from a machine running Usergrid:
 #
-# python migrate_entity_data.py -u adminuser:adminpass    (standard data migration and reindex)
-# python migrate_entity_data.py -u adminuser:adminpass -f    (force a re-migration )
-# python migrate_entity_data.py -u adminuser:adminpass -d <timestamp>    (re-index
only from the timestamp specified)
 #
+# Usage from a machine running Usergrid with the new Usergrid version:
 #
+# ######################################################
+# STEP 1 - BEFORE SWITCHING TRAFFIC TO NEW UG VERSION
+# ######################################################
+#
+# python migrate_entity_data.py --user adminuser:adminpass
+#
+# The above command performs an appinfo migration and system re-index only.  This creates
indices in Elasticsearch with
+# the updated indexing strategy in the new Usergrid version.
+#
+# ######################################################
+# STEP 2 - AFTER SWITCHING TRAFFIC TO NEW UG VERSION
+# ######################################################
+#
+# python migrate_entity_data.py --user adminuser:adminpass --delta --date <timestamp>
+#
+# The above command performs an appinfo migration, system re-index using a start date, and
full data migration which
+# includes entity data.  This step is necessary to ensure Usergrid starts reading and writing
data from the latest
+# entity version, including delta indexing of any documents create during the time between
STEP 1 and STEP 2.  If
+# all data has already been migrated (running this a 2nd, 3rd, etc. time), then the appinfo
migration will be skipped.
+
 
 
 import sys
@@ -35,30 +52,37 @@ import json
 
 # Version expected in status response post-migration for entity and app-info data
 TARGET_VERSION = 2
+TARGET_MIGRATION_SYSTEM_VERSION = 1
 
 # Set an interval (in seconds) for checking if re-index and/or migration has finished
 STATUS_INTERVAL_SECONDS = 2
 
+# Set plugin names
+PLUGIN_MIGRATION_SYSTEM = 'migration-system'
+PLUGIN_APPINFO = 'appinfo-migration'
+PLUGIN_ENTITYDATA = 'collections-entity-data'
+
+
 
 def parse_args():
     parser = argparse.ArgumentParser(description='Usergrid Migration Tool')
 
-    parser.add_argument('-d', '--date',
+    parser.add_argument('--date',
                         help='A date from which to start the migration',
                         type=str)
 
-    parser.add_argument('-e', '--endpoint',
+    parser.add_argument('--endpoint',
                         help='The endpoint to use for making API requests.',
                         type=str,
                         default='http://localhost:8080')
 
-    parser.add_argument('-u', '--user',
+    parser.add_argument('--user',
                         help='System Admin Credentials used to authenticate with Usergrid
 <user:pass>',
                         type=str,
                         required=True)
 
-    parser.add_argument('-f', '--force',
-                        help='Force a delta migration.',
+    parser.add_argument('--delta',
+                        help='Run a delta migration.',
                         action='store_true',
                         default=False)
 
@@ -91,7 +115,7 @@ class Migrate:
         self.logger = init_logging(self.__class__.__name__)
         self.admin_user = self.args['user']
         self.admin_pass = self.args['pass']
-        self.force_migration = self.args['force']
+        self.delta_migration = self.args['delta']
 
     def run(self):
         self.logger.info('Initializing...')
@@ -104,24 +128,40 @@ class Migrate:
 
         try:
 
-            # Always run an app info migration first
-            if self.is_appinfo_migrated():
-                self.logger.info('AppInfo already migrated. Resetting version for re-migration.')
-                self.reset_appinfo_migration()
-                time.sleep(STATUS_INTERVAL_SECONDS)
+            # We need to check and roll the migration system to 1 if not already
+            migration_system_updated = self.is_migration_system_updated()
 
-            self.start_appinfo_migration()
-            self.logger.info('AppInfo Migration Started.')
-            self.metrics['appinfo_migration_start'] = get_current_time()
+            if not migration_system_updated:
+                self.logger.info('Migration system needs to updated.  Updating migration
system..')
+                self.start_migration_system_update()
+                while not migration_system_updated:
+                    time.sleep(STATUS_INTERVAL_SECONDS)
+                    migration_system_updated = self.is_migration_system_updated()
+                    if migration_system_updated:
+                        break
 
-            is_appinfo_migrated = False
-            while not is_appinfo_migrated:
-                is_appinfo_migrated = self.is_appinfo_migrated()
-                time.sleep(STATUS_INTERVAL_SECONDS)
-                if is_appinfo_migrated:
-                    self.metrics['appinfo_migration_end'] = get_current_time()
-                    break
-            self.logger.info('AppInfo Migration Ended.')
+            # Run AppInfo migration only when both appinfos and collection entity data have
not been migrated
+            if not self.is_data_migrated():
+
+                if self.is_appinfo_migrated():
+                    self.logger.info('AppInfo already migrated. Resetting version for re-migration.')
+                    self.reset_appinfo_migration()
+                    time.sleep(STATUS_INTERVAL_SECONDS)
+
+                self.start_appinfo_migration()
+                self.logger.info('AppInfo Migration Started.')
+                self.metrics['appinfo_migration_start'] = get_current_time()
+
+                is_appinfo_migrated = False
+                while not is_appinfo_migrated:
+                    is_appinfo_migrated = self.is_appinfo_migrated()
+                    time.sleep(STATUS_INTERVAL_SECONDS)
+                    if is_appinfo_migrated:
+                        self.metrics['appinfo_migration_end'] = get_current_time()
+                        break
+                self.logger.info('AppInfo Migration Ended.')
+            else:
+                self.logger.info('Full Data Migration previously ran... skipping AppInfo
migration.')
 
             # Perform system re-index (it will grab date from input if provided)
             job = self.start_reindex()
@@ -137,10 +177,10 @@ class Migrate:
             self.logger.info("Finished Re-index. Job=[%s]", job)
             self.metrics['reindex_end'] = get_current_time()
 
-            # Only when we do a delta (force migration) do we run the full data migration
(includes entity data)
-            if self.force_migration:
+            # Only when we do a delta migration do we run the full data migration (includes
appinfo and entity data)
+            if self.delta_migration:
 
-                self.logger.info('Force option provided. Performing full data migration...')
+                self.logger.info('Delta option provided. Performing full data migration...')
                 if self.is_data_migrated():
                     self.reset_data_migration()
                 time.sleep(STATUS_INTERVAL_SECONDS)
@@ -191,9 +231,19 @@ class Migrate:
             self.logger.error('Failed to start migration, %s', e)
             exit_on_error(str(e))
 
+    def start_migration_system_update(self):
+        try:
+            migrateUrl = self.get_migration_url() + '/' + PLUGIN_MIGRATION_SYSTEM
+            r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass))
+            response = r.json()
+            return response
+        except requests.exceptions.RequestException as e:
+            self.logger.error('Failed to start migration, %s', e)
+            exit_on_error(str(e))
+
     def start_appinfo_migration(self):
         try:
-            migrateUrl = self.get_migration_url() + '/' + 'appinfo-migration'
+            migrateUrl = self.get_migration_url() + '/' + PLUGIN_APPINFO
             r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass))
             response = r.json()
             return response
@@ -203,12 +253,12 @@ class Migrate:
 
     def reset_data_migration(self):
         version = TARGET_VERSION - 1
-        body = json.dumps({'collections-entity-data': version, 'appinfo-migration': version})
+        body = json.dumps({PLUGIN_ENTITYDATA: version, PLUGIN_APPINFO: version})
         try:
             r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.admin_user,
self.admin_pass))
             response = r.json()
-            self.logger.info('Resetting data migration versions to collections-entity-data=[v%s]
'
-                             'and appinfo-migration=[v%s]', version, version)
+            self.logger.info('Resetting data migration versions to %s=[%s] '
+                             'and %s=[%s]', PLUGIN_ENTITYDATA, version, PLUGIN_APPINFO, version)
             return response
         except requests.exceptions.RequestException as e:
             self.logger.error('Failed to reset full data migration versions, %s', e)
@@ -216,11 +266,11 @@ class Migrate:
 
     def reset_appinfo_migration(self):
         version = TARGET_VERSION - 1
-        body = json.dumps({'appinfo-migration': version})
+        body = json.dumps({PLUGIN_APPINFO: version})
         try:
             r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.admin_user,
self.admin_pass))
             response = r.json()
-            self.logger.info('Resetting appinfo migration versions to appinfo-migration=[v%s]',
version)
+            self.logger.info('Resetting appinfo migration versions to %s=[%s]', PLUGIN_APPINFO,
version)
             return response
         except requests.exceptions.RequestException as e:
             self.logger.error('Failed to reset appinfo migration version, %s', e)
@@ -229,13 +279,15 @@ class Migrate:
     def is_data_migrated(self):
         status = self.check_data_migration_status()
         if status is not None:
-            entity_version = status['data']['collections-entity-data']
-            appinfo_version = status['data']['appinfo-migration']
+            entity_version = status['data'][PLUGIN_ENTITYDATA]
+            appinfo_version = status['data'][PLUGIN_APPINFO]
 
             if entity_version == TARGET_VERSION and appinfo_version == TARGET_VERSION:
-                self.logger.info('Full Data Migration status=[COMPLETE], collections-entity-data=[v%s],
'
-                                 'appinfo-migration=[v%s]',
+                self.logger.info('Full Data Migration status=[COMPLETE], %s=[%s], '
+                                 '%s=[%s]',
+                                 PLUGIN_ENTITYDATA,
                                  entity_version,
+                                 PLUGIN_APPINFO,
                                  appinfo_version)
                 return True
             else:
@@ -245,17 +297,34 @@ class Migrate:
     def is_appinfo_migrated(self):
         status = self.check_data_migration_status()
         if status is not None:
-            appinfo_version = status['data']['appinfo-migration']
+            appinfo_version = status['data'][PLUGIN_APPINFO]
 
             if appinfo_version == TARGET_VERSION:
                 self.logger.info('AppInfo Migration status=[COMPLETE],'
-                                 'appinfo-migration=[v%s]',
+                                 '%s=[%s]',
+                                 PLUGIN_APPINFO,
                                  appinfo_version)
                 return True
             else:
                 self.logger.info('AppInfo Migration status=[NOTSTARTED/INPROGRESS]')
         return False
 
+    def is_migration_system_updated(self):
+        status = self.check_data_migration_status()
+        if status is not None:
+            migration_system_version = status['data'][PLUGIN_MIGRATION_SYSTEM]
+
+            if migration_system_version == TARGET_MIGRATION_SYSTEM_VERSION:
+                self.logger.info('Migration System CURRENT, %s=[%s]',
+                                 PLUGIN_MIGRATION_SYSTEM,
+                                 migration_system_version)
+                return True
+            else:
+                self.logger.info('Migration System OLD, %s=[%s]',
+                                 PLUGIN_MIGRATION_SYSTEM,
+                                 migration_system_version)
+        return False
+
     def check_data_migration_status(self):
 
         try:
@@ -348,8 +417,8 @@ def init_logging(name):
 
     logger = logging.getLogger(name)
     log_file_name = './migration.log'
-    log_formatter = logging.Formatter(fmt='%(asctime)s | %(name)s | %(levelname)s | %(message)s',
-                                      datefmt='%m/%d/%Y %I:%M:%S %p')
+    log_formatter = logging.Formatter(fmt='%(asctime)s [%(name)s] %(levelname)s %(message)s',
+                                      datefmt='%Y-%m-%d %H:%M:%S')
 
     rotating_file = logging.handlers.RotatingFileHandler(filename=log_file_name,
                                                          mode='a',


Mime
View raw message