| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| # |
| # |
| # Usage from a machine running Usergrid with the new Usergrid version: |
| # |
| # ###################################################### |
| # STEP 1 - BEFORE SWITCHING TRAFFIC TO NEW UG VERSION |
| # ###################################################### |
| # |
| # python migrate_entity_data.py --user adminuser:adminpass |
| # |
| # The above command performs an appinfo migration and system re-index only. This creates indices in Elasticsearch with |
| # the updated indexing strategy in the new Usergrid version. |
| # |
| # ###################################################### |
| # STEP 2 - AFTER SWITCHING TRAFFIC TO NEW UG VERSION |
| # ###################################################### |
| # |
| # python migrate_entity_data.py --user adminuser:adminpass --delta --date <timestamp> |
| # |
| # The above command performs an appinfo migration, system re-index using a start date, and full data migration which |
| # includes entity data. This step is necessary to ensure Usergrid starts reading and writing data from the latest |
| # entity version, including delta indexing of any documents create during the time between STEP 1 and STEP 2. If |
| # all data has already been migrated (running this a 2nd, 3rd, etc. time), then the appinfo migration will be skipped. |
| |
| |
| |
| import sys |
| import logging |
| from logging.handlers import RotatingFileHandler |
| import argparse |
| import time |
| import requests |
| import json |
| |
| |
| # Version expected in status response post-migration for entity and app-info data |
| TARGET_APPINFO_VERSION=2 |
| TARGET_ENTITY_DATA_VERSION=2 |
| TARGET_CORE_DATA_VERSION=2 |
| TARGET_MIGRATION_SYSTEM_VERSION = 1 |
| TARGET_INDEX_MAPPING_VERSION = 2 |
| |
| # Set an interval (in seconds) for checking if re-index and/or migration has finished |
| STATUS_INTERVAL_SECONDS = 2 |
| |
| # Set plugin names |
| PLUGIN_MIGRATION_SYSTEM = 'migration-system' |
| PLUGIN_APPINFO = 'appinfo-migration' |
| PLUGIN_ENTITYDATA = 'collections-entity-data' |
| PLUGIN_INDEX_MAPPING = 'index_mapping_migration' |
| PLUGIN_CORE_DATA = 'core-data' |
| |
| |
| |
| def parse_args(): |
| parser = argparse.ArgumentParser(description='Usergrid Migration Tool') |
| |
| parser.add_argument('--date', |
| help='A date from which to start the migration', |
| type=str) |
| |
| parser.add_argument('--endpoint', |
| help='The endpoint to use for making API requests.', |
| type=str, |
| default='http://localhost:8080') |
| |
| parser.add_argument('--user', |
| help='System Admin Credentials used to authenticate with Usergrid <user:pass>', |
| type=str, |
| required=True) |
| |
| parser.add_argument('--delta', |
| help='Run a delta migration.', |
| action='store_true', |
| default=False) |
| |
| my_args = parser.parse_args(sys.argv[1:]) |
| |
| arg_vars = vars(my_args) |
| creds = arg_vars['user'].split(':') |
| if len(creds) != 2: |
| print('Credentials not properly specified. Must be "-u <user:pass>". Exiting...') |
| exit_on_error() |
| else: |
| arg_vars['user'] = creds[0] |
| arg_vars['pass'] = creds[1] |
| |
| return arg_vars |
| |
| |
| |
| class Migrate: |
| def __init__(self): |
| self.args = parse_args() |
| self.start_date = self.args['date'] |
| self.endpoint = self.args['endpoint'] |
| self.metrics = {'reindex_start': '', |
| 'reindex_end': '', |
| 'appinfo_migration_start': '', |
| 'appinfo_migration_end': '', |
| 'full_data_migration_start': '', |
| 'full_data_migration_end': ''} |
| self.logger = init_logging(self.__class__.__name__) |
| self.admin_user = self.args['user'] |
| self.admin_pass = self.args['pass'] |
| self.delta_migration = self.args['delta'] |
| |
| def run(self): |
| self.logger.info('Initializing...') |
| |
| if not self.is_endpoint_available(): |
| exit_on_error('Endpoint is not available, aborting') |
| |
| if self.start_date is not None: |
| self.logger.info("Date Provided. Re-index will run from date=[%s]", self.start_date) |
| |
| try: |
| |
| self.run_database_setup() |
| |
| # We need to check and roll the migration system to 1 if not already |
| migration_system_updated = self.is_migration_system_updated() |
| |
| if not migration_system_updated: |
| self.logger.info('Migration system needs to be updated. Updating migration system..') |
| self.start_migration_system_update() |
| while not migration_system_updated: |
| time.sleep(STATUS_INTERVAL_SECONDS) |
| migration_system_updated = self.is_migration_system_updated() |
| if migration_system_updated: |
| break |
| |
| index_mapping_updated = self.is_index_mapping_updated() |
| |
| if not index_mapping_updated: |
| self.logger.info('Index Mapping needs to be updated. Updating index mapping..') |
| self.start_index_mapping_migration() |
| while not index_mapping_updated: |
| time.sleep(STATUS_INTERVAL_SECONDS) |
| index_mapping_updated = self.is_index_mapping_updated() |
| if index_mapping_updated: |
| break |
| |
| # Run AppInfo migration only when both appinfos and collection entity data have not been migrated |
| if not self.is_data_migrated(): |
| |
| #Migrate app info |
| if self.is_appinfo_migrated(): |
| self.logger.info('AppInfo already migrated. Resetting version for re-migration.') |
| self.reset_appinfo_migration() |
| time.sleep(STATUS_INTERVAL_SECONDS) |
| |
| self.start_appinfo_migration() |
| self.logger.info('AppInfo Migration Started.') |
| self.metrics['appinfo_migration_start'] = get_current_time() |
| |
| is_appinfo_migrated = False |
| while not is_appinfo_migrated: |
| is_appinfo_migrated = self.is_appinfo_migrated() |
| time.sleep(STATUS_INTERVAL_SECONDS) |
| if is_appinfo_migrated: |
| self.metrics['appinfo_migration_end'] = get_current_time() |
| break |
| self.logger.info('AppInfo Migration Ended.') |
| |
| |
| else: |
| self.logger.info('Full Data Migration previously ran... skipping AppInfo migration.') |
| |
| |
| |
| # We need to check and roll index mapping version to 1 if not already there |
| |
| # Perform system re-index (it will grab date from input if provided) |
| job = self.start_reindex() |
| self.metrics['reindex_start'] = get_current_time() |
| self.logger.info('Started Re-index. Job=[%s]', job) |
| is_running = True |
| while is_running: |
| time.sleep(STATUS_INTERVAL_SECONDS) |
| is_running = self.is_reindex_running(job) |
| if not is_running: |
| break |
| |
| self.logger.info("Finished Re-index. Job=[%s]", job) |
| self.metrics['reindex_end'] = get_current_time() |
| |
| # Only when we do a delta migration do we run the full data migration (includes appinfo and entity data) |
| if self.delta_migration: |
| |
| self.logger.info('Delta option provided. Performing full data migration...') |
| if self.is_data_migrated(): |
| self.reset_data_migration() |
| time.sleep(STATUS_INTERVAL_SECONDS) |
| self.is_data_migrated() |
| |
| # self.start_core_data_migration() |
| self.start_fulldata_migration() |
| |
| self.metrics['full_data_migration_start'] = get_current_time() |
| self.logger.info("Full Data Migration Started") |
| is_migrated = False |
| while not is_migrated: |
| time.sleep(STATUS_INTERVAL_SECONDS) |
| is_migrated = self.is_data_migrated() |
| if is_migrated: |
| break |
| |
| self.metrics['full_data_migration_end'] = get_current_time() |
| self.logger.info("Full Data Migration completed") |
| |
| self.log_metrics() |
| self.logger.info("Finished...") |
| |
| except KeyboardInterrupt: |
| self.log_metrics() |
| self.logger.error('Keyboard interrupted migration. Please run again to ensure the migration finished.') |
| |
| def get_database_setup_url(self): |
| url = self.endpoint + '/system/database/setup' |
| return url |
| |
| def get_migration_url(self): |
| url = self.endpoint + '/system/migrate/run' |
| return url |
| |
| def get_reset_migration_url(self): |
| url = self.endpoint + '/system/migrate/set' |
| return url |
| |
| def get_migration_status_url(self): |
| url = self.endpoint + '/system/migrate/status' |
| return url |
| |
| def get_reindex_url(self): |
| url = self.endpoint + '/system/index/rebuild' |
| return url |
| |
| def get_management_reindex_url(self): |
| url = self.get_reindex_url() + "/management" |
| return url |
| |
| |
| def start_core_data_migration(self): |
| try: |
| r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.admin_pass)) |
| response = r.json() |
| return response |
| except requests.exceptions.RequestException as e: |
| self.logger.error('Failed to start migration, %s', e) |
| exit_on_error(str(e)) |
| |
| |
| def start_fulldata_migration(self): |
| try: |
| r = requests.put(url=self.get_migration_url(), auth=(self.admin_user, self.admin_pass)) |
| response = r.json() |
| return response |
| except requests.exceptions.RequestException as e: |
| self.logger.error('Failed to start migration, %s', e) |
| exit_on_error(str(e)) |
| |
| def start_migration_system_update(self): |
| try: |
| #TODO fix this URL |
| migrateUrl = self.get_migration_url() + '/' + PLUGIN_MIGRATION_SYSTEM |
| r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass)) |
| response = r.json() |
| return response |
| except requests.exceptions.RequestException as e: |
| self.logger.error('Failed to start migration, %s', e) |
| exit_on_error(str(e)) |
| |
| def run_database_setup(self): |
| try: |
| setupUrl = self.get_database_setup_url() |
| r = requests.put(url=setupUrl, auth=(self.admin_user, self.admin_pass)) |
| if r.status_code != 200: |
| exit_on_error('Database Setup Failed') |
| |
| except requests.exceptions.RequestException as e: |
| self.logger.error('Failed to run database setup, %s', e) |
| exit_on_error(str(e)) |
| |
| def start_index_mapping_migration(self): |
| try: |
| migrateUrl = self.get_migration_url() + '/' + PLUGIN_INDEX_MAPPING |
| r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass)) |
| response = r.json() |
| return response |
| except requests.exceptions.RequestException as e: |
| self.logger.error('Failed to start migration, %s', e) |
| exit_on_error(str(e)) |
| |
| def start_appinfo_migration(self): |
| try: |
| migrateUrl = self.get_migration_url() + '/' + PLUGIN_APPINFO |
| r = requests.put(url=migrateUrl, auth=(self.admin_user, self.admin_pass)) |
| response = r.json() |
| return response |
| except requests.exceptions.RequestException as e: |
| self.logger.error('Failed to start migration, %s', e) |
| exit_on_error(str(e)) |
| |
| def reset_data_migration(self): |
| version = TARGET_ENTITY_DATA_VERSION - 1 |
| body = json.dumps({PLUGIN_ENTITYDATA: version, PLUGIN_APPINFO: version}) |
| try: |
| r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.admin_user, self.admin_pass)) |
| response = r.json() |
| self.logger.info('Resetting data migration versions to %s=[%s] ' |
| 'and %s=[%s]', PLUGIN_ENTITYDATA, version, PLUGIN_APPINFO, version) |
| return response |
| except requests.exceptions.RequestException as e: |
| self.logger.error('Failed to reset full data migration versions, %s', e) |
| exit_on_error(str(e)) |
| |
| def reset_appinfo_migration(self): |
| version = TARGET_APPINFO_VERSION - 1 |
| body = json.dumps({PLUGIN_APPINFO: version}) |
| try: |
| r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.admin_user, self.admin_pass)) |
| response = r.json() |
| self.logger.info('Resetting appinfo migration versions to %s=[%s]', PLUGIN_APPINFO, version) |
| return response |
| except requests.exceptions.RequestException as e: |
| self.logger.error('Failed to reset appinfo migration version, %s', e) |
| exit_on_error(str(e)) |
| |
| def is_data_migrated(self): |
| status = self.check_data_migration_status() |
| if status is not None: |
| entity_version = status['data'][PLUGIN_ENTITYDATA] |
| appinfo_version = status['data'][PLUGIN_APPINFO] |
| core_data_version = status['data'][PLUGIN_CORE_DATA] |
| |
| if entity_version == TARGET_ENTITY_DATA_VERSION and appinfo_version == TARGET_APPINFO_VERSION and core_data_version == TARGET_CORE_DATA_VERSION: |
| self.logger.info('Full Data Migration status=[COMPLETE], %s=[%s], ' |
| '%s=[%s], %s=%s', |
| PLUGIN_ENTITYDATA, |
| entity_version, |
| PLUGIN_APPINFO, |
| appinfo_version, |
| PLUGIN_CORE_DATA, |
| core_data_version) |
| return True |
| else: |
| self.logger.info('Full Data Migration status=[NOTSTARTED/INPROGRESS]') |
| return False |
| |
| def is_appinfo_migrated(self): |
| status = self.check_data_migration_status() |
| if status is not None: |
| appinfo_version = status['data'][PLUGIN_APPINFO] |
| |
| if appinfo_version == TARGET_APPINFO_VERSION: |
| self.logger.info('AppInfo Migration status=[COMPLETE],' |
| '%s=[%s]', |
| PLUGIN_APPINFO, |
| appinfo_version) |
| return True |
| else: |
| self.logger.info('AppInfo Migration status=[NOTSTARTED/INPROGRESS]') |
| return False |
| |
| def is_migration_system_updated(self): |
| status = self.check_data_migration_status() |
| if status is not None: |
| migration_system_version = status['data'][PLUGIN_MIGRATION_SYSTEM] |
| |
| if migration_system_version == TARGET_MIGRATION_SYSTEM_VERSION: |
| self.logger.info('Migration System CURRENT, %s=[%s]', |
| PLUGIN_MIGRATION_SYSTEM, |
| migration_system_version) |
| return True |
| else: |
| self.logger.info('Migration System OLD, %s=[%s]', |
| PLUGIN_MIGRATION_SYSTEM, |
| migration_system_version) |
| return False |
| |
| def is_index_mapping_updated(self): |
| status = self.check_data_migration_status() |
| if status is not None: |
| index_mapping_version = status['data'][PLUGIN_INDEX_MAPPING] |
| |
| if index_mapping_version == TARGET_INDEX_MAPPING_VERSION: |
| self.logger.info('Index Mapping CURRENT, %s=[%s]', |
| PLUGIN_INDEX_MAPPING, |
| index_mapping_version) |
| return True |
| else: |
| self.logger.info('Index Mapping OLD, %s=[%s]', |
| PLUGIN_INDEX_MAPPING, |
| index_mapping_version) |
| return False |
| |
| def check_data_migration_status(self): |
| |
| try: |
| r = requests.get(url=self.get_migration_status_url(), auth=(self.admin_user, self.admin_pass)) |
| if r.status_code == 200: |
| response = r.json() |
| return response |
| else: |
| self.logger.error('Failed to check migration status, %s', r) |
| return |
| except requests.exceptions.RequestException as e: |
| self.logger.error('Failed to check migration status, %s', e) |
| # exit_on_error() |
| |
| def get_reindex_status(self, job): |
| status_url = self.get_reindex_url()+'/' + job |
| |
| try: |
| r = requests.get(url=status_url, auth=(self.admin_user, self.admin_pass)) |
| response = r.json() |
| return response['status'] |
| except requests.exceptions.RequestException as e: |
| self.logger.error('Failed to get reindex status, %s', e) |
| # exit_on_error() |
| |
| def start_reindex(self): |
| body = "" |
| if self.start_date is not None: |
| body = json.dumps({'updated': self.start_date}) |
| |
| try: |
| r = requests.post(url=self.get_reindex_url(), data=body, auth=(self.admin_user, self.admin_pass)) |
| |
| if r.status_code == 200: |
| response = r.json() |
| return response['jobId'] |
| else: |
| self.logger.error('Failed to start reindex, %s', r) |
| exit_on_error(str(r)) |
| except requests.exceptions.RequestException as e: |
| self.logger.error('Unable to make API request for reindex, %s', e) |
| exit_on_error(str(e)) |
| |
| def is_reindex_running(self, job): |
| status = self.get_reindex_status(job) |
| self.logger.info('Re-index status=[%s]', status) |
| if status != "COMPLETE": |
| return True |
| else: |
| return False |
| |
| def is_endpoint_available(self): |
| |
| try: |
| r = requests.get(url=self.endpoint+'/status') |
| if r.status_code == 200: |
| return True |
| except requests.exceptions.RequestException as e: |
| self.logger.error('Endpoint is unavailable, %s', str(e)) |
| return False |
| |
| def log_metrics(self): |
| self.logger.info( |
| 'Re-index start=[%s], ' + |
| 'Re-index end =[%s], ' + |
| 'Full Data Migration start=[%s], ' + |
| 'Full Data Migration end=[%s] ' + |
| 'AppInfo Migration start=[%s], ' + |
| 'AppInfo Migration end=[%s] ', |
| self.metrics['reindex_start'], |
| self.metrics['reindex_end'], |
| self.metrics['full_data_migration_start'], |
| self.metrics['full_data_migration_end'], |
| self.metrics['appinfo_migration_start'], |
| self.metrics['appinfo_migration_end'] |
| |
| ) |
| |
| |
| def get_current_time(): |
| return str(int(time.time()*1000)) |
| |
| |
| def exit_on_error(e=""): |
| print ('Exiting migration script due to error: ' + str(e)) |
| sys.exit(1) |
| |
| |
| def init_logging(name): |
| |
| logger = logging.getLogger(name) |
| log_file_name = './migration.log' |
| log_formatter = logging.Formatter(fmt='%(asctime)s [%(name)s] %(levelname)s %(message)s', |
| datefmt='%Y-%m-%d %H:%M:%S') |
| |
| rotating_file = logging.handlers.RotatingFileHandler(filename=log_file_name, |
| mode='a', |
| maxBytes=104857600, |
| backupCount=10) |
| rotating_file.setFormatter(log_formatter) |
| rotating_file.setLevel(logging.INFO) |
| logger.addHandler(rotating_file) |
| logger.setLevel(logging.INFO) |
| |
| stdout_logger = logging.StreamHandler(sys.stdout) |
| stdout_logger.setFormatter(log_formatter) |
| stdout_logger.setLevel(logging.INFO) |
| logger.addHandler(stdout_logger) |
| |
| return logger |
| |
| if __name__ == '__main__': |
| |
| migration = Migrate() |
| migration.run() |