| # */ |
| # * Licensed to the Apache Software Foundation (ASF) under one |
| # * or more contributor license agreements. See the NOTICE file |
| # * distributed with this work for additional information |
| # * regarding copyright ownership. The ASF licenses this file |
| # * to you under the Apache License, Version 2.0 (the |
| # * "License"); you may not use this file except in compliance |
| # * with the License. You may obtain a copy of the License at |
| # * |
| # * http://www.apache.org/licenses/LICENSE-2.0 |
| # * |
| # * Unless required by applicable law or agreed to in writing, |
| # * software distributed under the License is distributed on an |
| # * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # * KIND, either express or implied. See the License for the |
| # * specific language governing permissions and limitations |
| # * under the License. |
| # */ |
| |
| from __future__ import print_function |
| from __future__ import print_function |
| from __future__ import print_function |
| import os |
| import uuid |
| from Queue import Empty |
| import argparse |
| import json |
| import logging |
| import sys |
| from multiprocessing import Queue, Process |
| from sets import Set |
| import time_uuid |
| import datetime |
| from cloghandler import ConcurrentRotatingFileHandler |
| import requests |
| import traceback |
| import redis |
| import time |
| from sys import platform as _platform |
| import signal |
| from requests.auth import HTTPBasicAuth |
| from usergrid import UsergridQueryIterator |
| import urllib3 |
| |
| __author__ = 'Jeff.West@yahoo.com' |
| |
| ECID = str(uuid.uuid1()) |
| key_version = 'v4' |
| |
| logger = logging.getLogger('GraphMigrator') |
| worker_logger = logging.getLogger('Worker') |
| collection_worker_logger = logging.getLogger('CollectionWorker') |
| error_logger = logging.getLogger('ErrorLogger') |
| audit_logger = logging.getLogger('AuditLogger') |
| status_logger = logging.getLogger('StatusLogger') |
| |
| urllib3.disable_warnings() |
| |
| DEFAULT_CREATE_APPS = False |
| DEFAULT_RETRY_SLEEP = 10 |
| DEFAULT_PROCESSING_SLEEP = 1 |
| |
| queue = Queue() |
| QSIZE_OK = False |
| |
| try: |
| queue.qsize() |
| QSIZE_OK = True |
| except: |
| pass |
| |
| session_source = requests.Session() |
| session_target = requests.Session() |
| |
| cache = None |
| |
| |
| def total_seconds(td): |
| return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6 |
| |
| |
| def init_logging(stdout_enabled=True): |
| root_logger = logging.getLogger() |
| root_logger.setLevel(logging.getLevelName(config.get('log_level', 'INFO'))) |
| |
| # root_logger.setLevel(logging.WARN) |
| |
| logging.getLogger('requests.packages.urllib3.connectionpool').setLevel(logging.ERROR) |
| logging.getLogger('boto').setLevel(logging.ERROR) |
| logging.getLogger('urllib3.connectionpool').setLevel(logging.WARN) |
| |
| log_formatter = logging.Formatter( |
| fmt='%(asctime)s | ' + ECID + ' | %(name)s | %(processName)s | %(levelname)s | %(message)s', |
| datefmt='%m/%d/%Y %I:%M:%S %p') |
| |
| stdout_logger = logging.StreamHandler(sys.stdout) |
| stdout_logger.setFormatter(log_formatter) |
| root_logger.addHandler(stdout_logger) |
| |
| if stdout_enabled: |
| stdout_logger.setLevel(logging.getLevelName(config.get('log_level', 'INFO'))) |
| |
| # base log file |
| |
| log_file_name = os.path.join(config.get('log_dir'), |
| '%s-%s-%s-migrator.log' % (config.get('org'), config.get('migrate'), ECID)) |
| |
| # ConcurrentRotatingFileHandler |
| rotating_file = ConcurrentRotatingFileHandler(filename=log_file_name, |
| mode='a', |
| maxBytes=404857600, |
| backupCount=0) |
| rotating_file.setFormatter(log_formatter) |
| rotating_file.setLevel(logging.INFO) |
| |
| root_logger.addHandler(rotating_file) |
| error_log_file_name = os.path.join(config.get('log_dir'), '%s-%s-%s-migrator-errors.log' % ( |
| config.get('org'), config.get('migrate'), ECID)) |
| |
| error_rotating_file = ConcurrentRotatingFileHandler(filename=error_log_file_name, |
| mode='a', |
| maxBytes=404857600, |
| backupCount=0) |
| error_rotating_file.setFormatter(log_formatter) |
| error_rotating_file.setLevel(logging.ERROR) |
| |
| root_logger.addHandler(error_rotating_file) |
| |
| |
| entity_name_map = { |
| 'users': 'username' |
| } |
| |
| config = {} |
| |
| # URL Templates for Usergrid |
| org_management_app_url_template = "{api_url}/management/organizations/{org}/applications?client_id={client_id}&client_secret={client_secret}" |
| org_management_url_template = "{api_url}/management/organizations/{org}/applications?client_id={client_id}&client_secret={client_secret}" |
| org_url_template = "{api_url}/{org}?client_id={client_id}&client_secret={client_secret}" |
| app_url_template = "{api_url}/{org}/{app}?client_id={client_id}&client_secret={client_secret}" |
| collection_url_template = "{api_url}/{org}/{app}/{collection}?client_id={client_id}&client_secret={client_secret}" |
| collection_query_url_template = "{api_url}/{org}/{app}/{collection}?ql={ql}&client_id={client_id}&client_secret={client_secret}&limit={limit}" |
| collection_graph_url_template = "{api_url}/{org}/{app}/{collection}?client_id={client_id}&client_secret={client_secret}&limit={limit}" |
| connection_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}?client_id={client_id}&client_secret={client_secret}" |
| connecting_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/connecting/{verb}?client_id={client_id}&client_secret={client_secret}" |
| connection_create_by_uuid_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_uuid}?client_id={client_id}&client_secret={client_secret}" |
| connection_create_by_name_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_type}/{target_name}?client_id={client_id}&client_secret={client_secret}" |
| |
| connection_create_by_pairs_url_template = "{api_url}/{org}/{app}/{source_type_id}/{verb}/{target_type_id}?client_id={client_id}&client_secret={client_secret}" |
| |
| get_entity_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}&connections=none" |
| get_entity_url_with_connections_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}" |
| put_entity_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}" |
| permissions_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/permissions?client_id={client_id}&client_secret={client_secret}" |
| |
| user_credentials_url_template = "{api_url}/{org}/{app}/users/{uuid}/credentials" |
| |
| ignore_collections = ['activities', 'queues', 'events', 'notifications'] |
| |
| |
| class StatusListener(Process): |
| def __init__(self, status_queue, worker_queue): |
| super(StatusListener, self).__init__() |
| self.status_queue = status_queue |
| self.worker_queue = worker_queue |
| |
| def run(self): |
| keep_going = True |
| |
| org_results = { |
| 'name': config.get('org'), |
| 'apps': {}, |
| } |
| |
| empty_count = 0 |
| |
| status_file_name = os.path.join(config.get('log_dir'), |
| '%s-%s-%s-status.json' % (config.get('org'), config.get('migrate'), ECID)) |
| |
| while keep_going: |
| |
| try: |
| app, collection, status_map = self.status_queue.get(timeout=60) |
| status_logger.info('Received status update for app/collection: [%s / %s]' % (app, collection)) |
| empty_count = 0 |
| org_results['summary'] = { |
| 'max_created': -1, |
| 'max_modified': -1, |
| 'min_created': 1584946416000, |
| 'min_modified': 1584946416000, |
| 'count': 0, |
| 'bytes': 0 |
| } |
| |
| if app not in org_results['apps']: |
| org_results['apps'][app] = { |
| 'collections': {} |
| } |
| |
| org_results['apps'][app]['collections'].update(status_map) |
| |
| try: |
| for app, app_data in org_results['apps'].iteritems(): |
| app_data['summary'] = { |
| 'max_created': -1, |
| 'max_modified': -1, |
| 'min_created': 1584946416000, |
| 'min_modified': 1584946416000, |
| 'count': 0, |
| 'bytes': 0 |
| } |
| |
| if 'collections' in app_data: |
| for collection, collection_data in app_data['collections'].iteritems(): |
| |
| app_data['summary']['count'] += collection_data['count'] |
| app_data['summary']['bytes'] += collection_data['bytes'] |
| |
| org_results['summary']['count'] += collection_data['count'] |
| org_results['summary']['bytes'] += collection_data['bytes'] |
| |
| # APP |
| if collection_data.get('max_modified') > app_data['summary']['max_modified']: |
| app_data['summary']['max_modified'] = collection_data.get('max_modified') |
| |
| if collection_data.get('min_modified') < app_data['summary']['min_modified']: |
| app_data['summary']['min_modified'] = collection_data.get('min_modified') |
| |
| if collection_data.get('max_created') > app_data['summary']['max_created']: |
| app_data['summary']['max_created'] = collection_data.get('max_created') |
| |
| if collection_data.get('min_created') < app_data['summary']['min_created']: |
| app_data['summary']['min_created'] = collection_data.get('min_created') |
| |
| # ORG |
| if collection_data.get('max_modified') > org_results['summary']['max_modified']: |
| org_results['summary']['max_modified'] = collection_data.get('max_modified') |
| |
| if collection_data.get('min_modified') < org_results['summary']['min_modified']: |
| org_results['summary']['min_modified'] = collection_data.get('min_modified') |
| |
| if collection_data.get('max_created') > org_results['summary']['max_created']: |
| org_results['summary']['max_created'] = collection_data.get('max_created') |
| |
| if collection_data.get('min_created') < org_results['summary']['min_created']: |
| org_results['summary']['min_created'] = collection_data.get('min_created') |
| |
| if QSIZE_OK: |
| status_logger.warn('CURRENT Queue Depth: %s' % self.worker_queue.qsize()) |
| |
| status_logger.warn('UPDATED status of org processed: %s' % json.dumps(org_results)) |
| |
| try: |
| logger.info('Writing status to file: %s' % status_file_name) |
| |
| with open(status_file_name, 'w') as f: |
| json.dump(org_results, f, indent=2) |
| except: |
| print traceback.format_exc() |
| |
| except KeyboardInterrupt, e: |
| raise e |
| |
| except: |
| print traceback.format_exc() |
| |
| except KeyboardInterrupt, e: |
| status_logger.warn('FINAL status of org processed: %s' % json.dumps(org_results)) |
| raise e |
| |
| except Empty: |
| if QSIZE_OK: |
| status_logger.warn('CURRENT Queue Depth: %s' % self.worker_queue.qsize()) |
| |
| status_logger.warn('CURRENT status of org processed: %s' % json.dumps(org_results)) |
| |
| status_logger.warning('EMPTY! Count=%s' % empty_count) |
| |
| empty_count += 1 |
| |
| if empty_count >= 120: |
| keep_going = False |
| |
| except: |
| print traceback.format_exc() |
| |
| logger.warn('FINAL status of org processed: %s' % json.dumps(org_results)) |
| |
| try: |
| logger.info('Writing final status to file: %s' % status_file_name) |
| with open(status_file_name, 'w') as f: |
| json.dump(org_results, f, indent=2) |
| except: |
| print traceback.format_exc() |
| |
| |
| class EntityWorker(Process): |
| def __init__(self, queue, handler_function): |
| super(EntityWorker, self).__init__() |
| |
| worker_logger.debug('Creating worker!') |
| self.queue = queue |
| self.handler_function = handler_function |
| |
| def run(self): |
| |
| worker_logger.info('starting run()...') |
| keep_going = True |
| |
| count_processed = 0 |
| empty_count = 0 |
| start_time = int(time.time()) |
| |
| while keep_going: |
| |
| try: |
| # get an entity with the app and collection name |
| app, collection_name, entity = self.queue.get(timeout=120) |
| empty_count = 0 |
| |
| # if entity.get('type') == 'user': |
| # entity = confirm_user_entity(app, entity) |
| |
| # the handler operation is the specified operation such as migrate_graph |
| if self.handler_function is not None: |
| try: |
| message_start_time = int(time.time()) |
| processed = self.handler_function(app, collection_name, entity) |
| message_end_time = int(time.time()) |
| |
| if processed: |
| count_processed += 1 |
| |
| total_time = message_end_time - start_time |
| avg_time_per_message = total_time / count_processed |
| message_time = message_end_time - message_start_time |
| |
| worker_logger.debug('Processed [%sth] entity = %s / %s / %s' % ( |
| count_processed, app, collection_name, entity.get('uuid'))) |
| |
| if count_processed % 1000 == 1: |
| worker_logger.info( |
| 'Processed [%sth] entity = [%s / %s / %s] in [%s]s - avg time/message [%s]' % ( |
| count_processed, app, collection_name, entity.get('uuid'), message_time, |
| avg_time_per_message)) |
| |
| except KeyboardInterrupt, e: |
| raise e |
| |
| except Exception, e: |
| logger.exception('Error in EntityWorker processing message') |
| print traceback.format_exc() |
| |
| except KeyboardInterrupt, e: |
| raise e |
| |
| except Empty: |
| worker_logger.warning('EMPTY! Count=%s' % empty_count) |
| |
| empty_count += 1 |
| |
| if empty_count >= 2: |
| keep_going = False |
| |
| except Exception, e: |
| logger.exception('Error in EntityWorker run()') |
| print traceback.format_exc() |
| |
| |
| class CollectionWorker(Process): |
| def __init__(self, work_queue, entity_queue, response_queue): |
| super(CollectionWorker, self).__init__() |
| collection_worker_logger.debug('Creating worker!') |
| self.work_queue = work_queue |
| self.response_queue = response_queue |
| self.entity_queue = entity_queue |
| |
| def run(self): |
| |
| collection_worker_logger.info('starting run()...') |
| keep_going = True |
| |
| counter = 0 |
| # max_created = 0 |
| empty_count = 0 |
| app = 'ERROR' |
| collection_name = 'NOT SET' |
| status_map = {} |
| sleep_time = 10 |
| |
| try: |
| |
| while keep_going: |
| |
| try: |
| app, collection_name = self.work_queue.get(timeout=30) |
| |
| status_map = { |
| collection_name: { |
| 'iteration_started': str(datetime.datetime.now()), |
| 'max_created': -1, |
| 'max_modified': -1, |
| 'min_created': 1584946416000, |
| 'min_modified': 1584946416000, |
| 'count': 0, |
| 'bytes': 0 |
| } |
| } |
| |
| empty_count = 0 |
| |
| # added a flag for using graph vs query/index |
| if config.get('graph', False): |
| source_collection_url = collection_graph_url_template.format(org=config.get('org'), |
| app=app, |
| collection=collection_name, |
| limit=config.get('limit'), |
| **config.get('source_endpoint')) |
| else: |
| source_collection_url = collection_query_url_template.format(org=config.get('org'), |
| app=app, |
| collection=collection_name, |
| limit=config.get('limit'), |
| ql="select * %s" % config.get( |
| 'ql'), |
| **config.get('source_endpoint')) |
| |
| logger.info('Iterating URL: %s' % source_collection_url) |
| |
| # use the UsergridQuery from the Python SDK to iterate the collection |
| q = UsergridQueryIterator(source_collection_url, |
| page_delay=config.get('page_sleep_time'), |
| sleep_time=config.get('error_retry_sleep')) |
| |
| for entity in q: |
| |
| # begin entity loop |
| |
| self.entity_queue.put((app, collection_name, entity)) |
| counter += 1 |
| |
| if 'created' in entity: |
| |
| try: |
| entity_created = long(entity.get('created')) |
| |
| if entity_created > status_map[collection_name]['max_created']: |
| status_map[collection_name]['max_created'] = entity_created |
| status_map[collection_name]['max_created_str'] = str( |
| datetime.datetime.fromtimestamp(entity_created / 1000)) |
| |
| if entity_created < status_map[collection_name]['min_created']: |
| status_map[collection_name]['min_created'] = entity_created |
| status_map[collection_name]['min_created_str'] = str( |
| datetime.datetime.fromtimestamp(entity_created / 1000)) |
| |
| except ValueError: |
| pass |
| |
| if 'modified' in entity: |
| |
| try: |
| entity_modified = long(entity.get('modified')) |
| |
| if entity_modified > status_map[collection_name]['max_modified']: |
| status_map[collection_name]['max_modified'] = entity_modified |
| status_map[collection_name]['max_modified_str'] = str( |
| datetime.datetime.fromtimestamp(entity_modified / 1000)) |
| |
| if entity_modified < status_map[collection_name]['min_modified']: |
| status_map[collection_name]['min_modified'] = entity_modified |
| status_map[collection_name]['min_modified_str'] = str( |
| datetime.datetime.fromtimestamp(entity_modified / 1000)) |
| |
| except ValueError: |
| pass |
| |
| status_map[collection_name]['bytes'] += count_bytes(entity) |
| status_map[collection_name]['count'] += 1 |
| |
| if counter % 1000 == 1: |
| try: |
| collection_worker_logger.warning( |
| 'Sending stats for app/collection [%s / %s]: %s' % ( |
| app, collection_name, status_map)) |
| |
| self.response_queue.put((app, collection_name, status_map)) |
| |
| if QSIZE_OK: |
| collection_worker_logger.info( |
| 'Counter=%s, collection queue depth=%s' % ( |
| counter, self.work_queue.qsize())) |
| except: |
| pass |
| |
| collection_worker_logger.warn( |
| 'Current status of collections processed: %s' % json.dumps(status_map)) |
| |
| if config.get('entity_sleep_time') > 0: |
| collection_worker_logger.debug( |
| 'sleeping for [%s]s per entity...' % (config.get('entity_sleep_time'))) |
| time.sleep(config.get('entity_sleep_time')) |
| collection_worker_logger.debug( |
| 'STOPPED sleeping for [%s]s per entity...' % (config.get('entity_sleep_time'))) |
| |
| # end entity loop |
| |
| status_map[collection_name]['iteration_finished'] = str(datetime.datetime.now()) |
| |
| collection_worker_logger.warning( |
| 'Collection [%s / %s / %s] loop complete! Max Created entity %s' % ( |
| config.get('org'), app, collection_name, status_map[collection_name]['max_created'])) |
| |
| collection_worker_logger.warning( |
| 'Sending FINAL stats for app/collection [%s / %s]: %s' % (app, collection_name, status_map)) |
| |
| self.response_queue.put((app, collection_name, status_map)) |
| |
| collection_worker_logger.info('Done! Finished app/collection: %s / %s' % (app, collection_name)) |
| |
| except KeyboardInterrupt, e: |
| raise e |
| |
| except Empty: |
| collection_worker_logger.warning('EMPTY! Count=%s' % empty_count) |
| |
| empty_count += 1 |
| |
| if empty_count >= 2: |
| keep_going = False |
| |
| except Exception as e: |
| logger.exception('Error in CollectionWorker processing collection [%s]' % collection_name) |
| print traceback.format_exc() |
| |
| finally: |
| self.response_queue.put((app, collection_name, status_map)) |
| collection_worker_logger.info('FINISHED!') |
| |
| |
| def use_name_for_collection(collection_name): |
| return collection_name in config.get('use_name_for_collection', []) |
| |
| |
| def include_edge(collection_name, edge_name): |
| include_edges = config.get('include_edge', []) |
| |
| if include_edges is None: |
| include_edges = [] |
| |
| exclude_edges = config.get('exclude_edge', []) |
| |
| if exclude_edges is None: |
| exclude_edges = [] |
| |
| if len(include_edges) > 0 and edge_name not in include_edges: |
| logger.debug( |
| 'Skipping edge [%s] since it is not in INCLUDED list: %s' % (edge_name, include_edges)) |
| return False |
| |
| if edge_name in exclude_edges: |
| logger.debug( |
| 'Skipping edge [%s] since it is in EXCLUDED list: %s' % (edge_name, exclude_edges)) |
| return False |
| |
| if (collection_name in ['users', 'user'] and edge_name in ['followers', 'feed', 'activities']) \ |
| or (collection_name in ['receipts', 'receipt'] and edge_name in ['device', 'devices']): |
| # feed and activities are not retrievable... |
| # roles and groups will be more efficiently handled from the role/group -> user |
| # followers will be handled by 'following' |
| # do only this from user -> device |
| return False |
| |
| return True |
| |
| |
| def exclude_edge(collection_name, edge_name): |
| exclude_edges = config.get('exclude_edge', []) |
| |
| if exclude_edges is None: |
| exclude_edges = [] |
| |
| if edge_name in exclude_edges: |
| logger.debug('Skipping edge [%s] since it is in EXCLUDED list: %s' % (edge_name, exclude_edges)) |
| return True |
| |
| if (collection_name in ['users', 'user'] and edge_name in ['followers', 'feed', 'activities']) \ |
| or (collection_name in ['receipts', 'receipt'] and edge_name in ['device', 'devices']): |
| # feed and activities are not retrievable... |
| # roles and groups will be more efficiently handled from the role/group -> user |
| # followers will be handled by 'following' |
| # do only this from user -> device |
| return True |
| |
| return False |
| |
| |
| def confirm_user_entity(app, source_entity, attempts=0): |
| attempts += 1 |
| |
| source_entity_url = get_entity_url_template.format(org=config.get('org'), |
| app=app, |
| collection='users', |
| uuid=source_entity.get('username'), |
| **config.get('source_endpoint')) |
| |
| if attempts >= 5: |
| logger.warning('Punting after [%s] attempts to confirm user at URL [%s], will use the source entity...' % ( |
| attempts, source_entity_url)) |
| |
| return source_entity |
| |
| r = requests.get(url=source_entity_url) |
| |
| if r.status_code == 200: |
| retrieved_entity = r.json().get('entities')[0] |
| |
| if retrieved_entity.get('uuid') != source_entity.get('uuid'): |
| logger.info( |
| 'UUID of Source Entity [%s] differs from uuid [%s] of retrieved entity at URL=[%s] and will be substituted' % ( |
| source_entity.get('uuid'), retrieved_entity.get('uuid'), source_entity_url)) |
| |
| return retrieved_entity |
| |
| elif 'service_resource_not_found' in r.text: |
| |
| logger.warn('Unable to retrieve user at URL [%s], and will use source entity. status=[%s] response: %s...' % ( |
| source_entity_url, r.status_code, r.text)) |
| |
| return source_entity |
| |
| else: |
| logger.error('After [%s] attempts to confirm user at URL [%s], received status [%s] message: %s...' % ( |
| attempts, source_entity_url, r.status_code, r.text)) |
| |
| time.sleep(DEFAULT_RETRY_SLEEP) |
| |
| return confirm_user_entity(app, source_entity, attempts) |
| |
| |
| def create_connection(app, collection_name, source_entity, edge_name, target_entity): |
| target_app, target_collection, target_org = get_target_mapping(app, collection_name) |
| |
| source_identifier = get_source_identifier(source_entity) |
| target_identifier = get_source_identifier(target_entity) |
| |
| source_type_id = '%s/%s' % (source_entity.get('type'), source_identifier) |
| target_type_id = '%s/%s' % (target_entity.get('type'), target_identifier) |
| |
| if source_entity.get('type') == 'user': |
| source_type_id = '%s/%s' % ('users', source_entity.get('username')) |
| |
| if target_entity.get('type') == 'user': |
| if edge_name == 'users': |
| target_type_id = target_entity.get('uuid') |
| else: |
| target_type_id = '%s/%s' % ('users', target_entity.get('uuid')) |
| |
| if target_entity.get('type') == 'device': |
| if edge_name == 'devices': |
| target_type_id = target_entity.get('uuid') |
| else: |
| target_type_id = '%s/%s' % ('devices', target_entity.get('uuid')) |
| |
| if target_entity.get('type') == 'receipt': |
| if edge_name == 'receipts': |
| target_type_id = target_entity.get('uuid') |
| else: |
| target_type_id = '%s/%s' % ('receipts', target_entity.get('uuid')) |
| |
| create_connection_url = connection_create_by_pairs_url_template.format( |
| org=target_org, |
| app=target_app, |
| source_type_id=source_type_id, |
| verb=edge_name, |
| target_type_id=target_type_id, |
| **config.get('target_endpoint')) |
| |
| if not config.get('skip_cache_read', False): |
| processed = cache.get(create_connection_url) |
| |
| if processed not in [None, 'None']: |
| logger.debug('Skipping visited Edge: [%s / %s / %s] --[%s]--> [%s / %s / %s]: %s ' % ( |
| app, collection_name, source_identifier, edge_name, target_app, target_entity.get('type'), |
| target_entity.get('name'), create_connection_url)) |
| |
| return True |
| |
| logger.info('Connecting entity [%s / %s / %s] --[%s]--> [%s / %s / %s]: %s ' % ( |
| app, collection_name, source_identifier, edge_name, target_app, target_entity.get('type'), |
| target_entity.get('name', target_entity.get('uuid')), create_connection_url)) |
| |
| attempts = 0 |
| |
| while attempts < 5: |
| attempts += 1 |
| |
| r_create = session_target.post(create_connection_url) |
| |
| if r_create.status_code == 200: |
| |
| if not config.get('skip_cache_write', False): |
| cache.set(create_connection_url, 1) |
| |
| return True |
| else: |
| if r_create.status_code >= 500: |
| |
| if attempts < 5: |
| logger.warning('FAILED [%s] (will retry) to create connection at URL=[%s]: %s' % ( |
| r_create.status_code, create_connection_url, r_create.text)) |
| time.sleep(DEFAULT_RETRY_SLEEP) |
| else: |
| logger.critical( |
| 'FAILED [%s] (WILL NOT RETRY - max attempts) to create connection at URL=[%s]: %s' % ( |
| r_create.status_code, create_connection_url, r_create.text)) |
| return False |
| |
| elif r_create.status_code in [401, 404]: |
| |
| if config.get('repair_data', False): |
| logger.warning('FAILED [%s] (WILL attempt repair) to create connection at URL=[%s]: %s' % ( |
| r_create.status_code, create_connection_url, r_create.text)) |
| migrate_data(app, source_entity.get('type'), source_entity, force=True) |
| migrate_data(app, target_entity.get('type'), target_entity, force=True) |
| |
| else: |
| logger.critical('FAILED [%s] (WILL NOT attempt repair) to create connection at URL=[%s]: %s' % ( |
| r_create.status_code, create_connection_url, r_create.text)) |
| |
| else: |
| logger.warning('FAILED [%s] (will retry) to create connection at URL=[%s]: %s' % ( |
| r_create.status_code, create_connection_url, r_create.text)) |
| |
| return False |
| |
| |
| def process_edges(app, collection_name, source_entity, edge_name, connection_stack): |
| |
| source_identifier = get_source_identifier(source_entity) |
| |
| while len(connection_stack) > 0: |
| |
| target_entity = connection_stack.pop() |
| |
| if exclude_collection(collection_name) or exclude_collection(target_entity.get('type')): |
| logger.debug('EXCLUDING Edge (collection): [%s / %s / %s] --[%s]--> ?' % ( |
| app, collection_name, source_identifier, edge_name )) |
| continue |
| |
| create_connection(app, collection_name, source_entity, edge_name, target_entity) |
| |
| |
| def migrate_out_graph_edge_type(app, collection_name, source_entity, edge_name, depth=0): |
| if not include_edge(collection_name, edge_name): |
| return True |
| |
| source_uuid = source_entity.get('uuid') |
| |
| key = '%s:edge:out:%s:%s' % (key_version, source_uuid, edge_name) |
| |
| if not config.get('skip_cache_read', False): |
| date_visited = cache.get(key) |
| |
| if date_visited not in [None, 'None']: |
| logger.info('Skipping EDGE [%s / %s --%s-->] - visited at %s' % ( |
| collection_name, source_uuid, edge_name, date_visited)) |
| return True |
| else: |
| cache.delete(key) |
| |
| if not config.get('skip_cache_write', False): |
| cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2)) |
| |
| logger.debug('Visiting EDGE [%s / %s (%s) --%s-->] at %s' % ( |
| collection_name, source_uuid, get_uuid_time(source_uuid), edge_name, str(datetime.datetime.utcnow()))) |
| |
| response = True |
| |
| source_identifier = get_source_identifier(source_entity) |
| |
| count_edges = 0 |
| |
| logger.debug( |
| 'Processing edge type=[%s] of entity [%s / %s / %s]' % (edge_name, app, collection_name, source_identifier)) |
| |
| target_app, target_collection, target_org = get_target_mapping(app, collection_name) |
| |
| connection_query_url = connection_query_url_template.format( |
| org=config.get('org'), |
| app=app, |
| verb=edge_name, |
| collection=collection_name, |
| uuid=source_identifier, |
| limit=config.get('limit'), |
| **config.get('source_endpoint')) |
| |
| connection_query = UsergridQueryIterator(connection_query_url, sleep_time=config.get('error_retry_sleep')) |
| |
| connection_stack = [] |
| |
| for target_entity in connection_query: |
| target_connection_collection = config.get('collection_mapping', {}).get(target_entity.get('type'), |
| target_entity.get('type')) |
| |
| target_ok = migrate_graph(app, target_entity.get('type'), source_entity=target_entity, depth=depth) |
| |
| if not target_ok: |
| logger.critical( |
| 'Error migrating TARGET entity data for connection [%s / %s / %s] --[%s]--> [%s / %s / %s]' % ( |
| app, collection_name, source_identifier, edge_name, app, target_connection_collection, |
| target_entity.get('name', target_entity.get('uuid')))) |
| |
| count_edges += 1 |
| connection_stack.append(target_entity) |
| |
| process_edges(app, collection_name, source_entity, edge_name, connection_stack) |
| |
| return response |
| |
| |
| def get_source_identifier(source_entity): |
| entity_type = source_entity.get('type') |
| |
| source_identifier = source_entity.get('uuid') |
| |
| if use_name_for_collection(entity_type): |
| |
| if entity_type in ['user']: |
| source_identifier = source_entity.get('username') |
| else: |
| source_identifier = source_entity.get('name') |
| |
| if source_identifier is None: |
| source_identifier = source_entity.get('uuid') |
| logger.warn('Using UUID for entity [%s / %s]' % (entity_type, source_identifier)) |
| |
| return source_identifier |
| |
| |
| def include_collection(collection_name): |
| if collection_name in ['events']: |
| return False |
| |
| include = config.get('collection', []) |
| |
| if include is not None and len(include) > 0 and collection_name not in include: |
| return False |
| |
| exclude = config.get('exclude_collection', []) |
| |
| if exclude is not None and collection_name in exclude: |
| return False |
| |
| return True |
| |
| |
| def exclude_collection(collection_name): |
| exclude = config.get('exclude_collection', []) |
| |
| if exclude is not None and collection_name in exclude: |
| return True |
| |
| return False |
| |
| |
| def migrate_in_graph_edge_type(app, collection_name, source_entity, edge_name, depth=0): |
| source_uuid = source_entity.get('uuid') |
| key = '%s:edges:in:%s:%s' % (key_version, source_uuid, edge_name) |
| |
| if not config.get('skip_cache_read', False): |
| date_visited = cache.get(key) |
| |
| if date_visited not in [None, 'None']: |
| logger.info('Skipping EDGE [--%s--> %s / %s] - visited at %s' % ( |
| collection_name, source_uuid, edge_name, date_visited)) |
| return True |
| else: |
| cache.delete(key) |
| |
| if not config.get('skip_cache_write', False): |
| cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2)) |
| |
| logger.debug('Visiting EDGE [--%s--> %s / %s (%s)] at %s' % ( |
| edge_name, collection_name, source_uuid, get_uuid_time(source_uuid), str(datetime.datetime.utcnow()))) |
| |
| source_identifier = get_source_identifier(source_entity) |
| |
| if exclude_collection(collection_name): |
| logger.debug('Excluding (Collection) entity [%s / %s / %s]' % (app, collection_name, source_uuid)) |
| return True |
| |
| if not include_edge(collection_name, edge_name): |
| return True |
| |
| logger.debug( |
| 'Processing edge type=[%s] of entity [%s / %s / %s]' % (edge_name, app, collection_name, source_identifier)) |
| |
| logger.debug('Processing IN edges type=[%s] of entity [ %s / %s / %s]' % ( |
| edge_name, app, collection_name, source_uuid)) |
| |
| connecting_query_url = connecting_query_url_template.format( |
| org=config.get('org'), |
| app=app, |
| collection=collection_name, |
| uuid=source_uuid, |
| verb=edge_name, |
| limit=config.get('limit'), |
| **config.get('source_endpoint')) |
| |
| connection_query = UsergridQueryIterator(connecting_query_url, sleep_time=config.get('error_retry_sleep')) |
| |
| response = True |
| |
| for e_connection in connection_query: |
| logger.debug('Triggering IN->OUT edge migration on entity [%s / %s / %s] ' % ( |
| app, e_connection.get('type'), e_connection.get('uuid'))) |
| |
| response = migrate_graph(app, e_connection.get('type'), e_connection, depth) and response |
| |
| return response |
| |
| |
| def migrate_graph(app, collection_name, source_entity, depth=0): |
| depth += 1 |
| source_uuid = source_entity.get('uuid') |
| |
| # short circuit if the graph depth exceeds what was specified |
| if depth > config.get('graph_depth', 1): |
| logger.debug( |
| 'Reached Max Graph Depth, stopping after [%s] on [%s / %s]' % (depth, collection_name, source_uuid)) |
| return True |
| else: |
| logger.debug('Processing @ Graph Depth [%s]' % depth) |
| |
| if exclude_collection(collection_name): |
| logger.warn('Ignoring entity in filtered collection [%s]' % collection_name) |
| return True |
| |
| key = '%s:graph:%s' % (key_version, source_uuid) |
| entity_tag = '[%s / %s / %s (%s)]' % (app, collection_name, source_uuid, get_uuid_time(source_uuid)) |
| |
| if not config.get('skip_cache_read', False): |
| date_visited = cache.get(key) |
| |
| if date_visited not in [None, 'None']: |
| logger.debug('Skipping GRAPH %s at %s' % (entity_tag, date_visited)) |
| return True |
| else: |
| cache.delete(key) |
| |
| logger.info('Visiting GRAPH %s at %s' % (entity_tag, str(datetime.datetime.utcnow()))) |
| |
| if not config.get('skip_cache_write', False): |
| cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2)) |
| |
| # first, migrate data for current node |
| response = migrate_data(app, collection_name, source_entity) |
| |
| # gather the outbound edge names |
| out_edge_names = [edge_name for edge_name in source_entity.get('metadata', {}).get('collections', [])] |
| out_edge_names += [edge_name for edge_name in source_entity.get('metadata', {}).get('connections', [])] |
| |
| logger.debug('Entity %s has [%s] OUT edges' % (entity_tag, len(out_edge_names))) |
| |
| # migrate each outbound edge type |
| for edge_name in out_edge_names: |
| |
| if not exclude_edge(collection_name, edge_name): |
| response = migrate_out_graph_edge_type(app, collection_name, source_entity, edge_name, depth) and response |
| |
| if config.get('prune', False): |
| prune_edge_by_name(edge_name, app, collection_name, source_entity) |
| |
| # gather the inbound edge names |
| in_edge_names = [edge_name for edge_name in source_entity.get('metadata', {}).get('connecting', [])] |
| |
| logger.debug('Entity %s has [%s] IN edges' % (entity_tag, len(in_edge_names))) |
| |
| # migrate each inbound edge type |
| for edge_name in in_edge_names: |
| |
| if not exclude_edge(collection_name, edge_name): |
| response = migrate_in_graph_edge_type(app, collection_name, source_entity, edge_name, |
| depth) and response |
| |
| return response |
| |
| |
| def collect_entities(q): |
| response = {} |
| |
| for e in q: |
| response[e.get('uuid')] = e |
| |
| return response |
| |
| |
| def prune_edge_by_name(edge_name, app, collection_name, source_entity): |
| if not include_edge(collection_name, edge_name): |
| return True |
| |
| source_identifier = get_source_identifier(source_entity) |
| source_uuid = source_entity.get('uuid') |
| |
| entity_tag = '[%s / %s / %s (%s)]' % (app, collection_name, source_uuid, get_uuid_time(source_uuid)) |
| |
| target_app, target_collection, target_org = get_target_mapping(app, collection_name) |
| |
| target_connection_query_url = connection_query_url_template.format( |
| org=target_org, |
| app=target_app, |
| verb=edge_name, |
| collection=target_collection, |
| uuid=source_identifier, |
| limit=config.get('limit'), |
| **config.get('target_endpoint')) |
| |
| source_connection_query_url = connection_query_url_template.format( |
| org=config.get('org'), |
| app=app, |
| verb=edge_name, |
| collection=collection_name, |
| uuid=source_identifier, |
| limit=config.get('limit'), |
| **config.get('source_endpoint')) |
| |
| source_connections = collect_entities( |
| UsergridQueryIterator(source_connection_query_url, sleep_time=config.get('error_retry_sleep'))) |
| |
| target_connections = collect_entities( |
| UsergridQueryIterator(target_connection_query_url, sleep_time=config.get('error_retry_sleep'))) |
| |
| delete_uuids = Set(target_connections.keys()) - Set(source_connections.keys()) |
| |
| if len(delete_uuids) > 0: |
| logger.info('Found [%s] edges to delete for entity %s' % (len(delete_uuids), entity_tag)) |
| |
| for delete_uuid in delete_uuids: |
| delete_connection_url = connection_create_by_uuid_url_template.format( |
| org=target_org, |
| app=target_app, |
| verb=edge_name, |
| collection=target_collection, |
| uuid=source_identifier, |
| target_uuid=delete_uuid, |
| **config.get('target_endpoint')) |
| |
| attempts = 0 |
| |
| while attempts < 5: |
| attempts += 1 |
| |
| r = session_target.delete(delete_connection_url) |
| |
| if not config.get('skip_cache_write'): |
| cache.delete(delete_connection_url) |
| |
| if r.status_code == 200: |
| logger.info('Pruned edge on attempt [%s] URL=[%s]' % (attempts, delete_connection_url)) |
| break |
| else: |
| logger.error('Error [%s] on attempt [%s] deleting connection at URL=[%s]: %s' % ( |
| r.status_code, attempts, delete_connection_url, r.text)) |
| time.sleep(DEFAULT_RETRY_SLEEP) |
| |
| return True |
| |
| |
| def prune_graph(app, collection_name, source_entity): |
| source_uuid = source_entity.get('uuid') |
| key = '%s:prune_graph:%s' % (key_version, source_uuid) |
| entity_tag = '[%s / %s / %s (%s)]' % (app, collection_name, source_uuid, get_uuid_time(source_uuid)) |
| |
| if not config.get('skip_cache_read', False): |
| date_visited = cache.get(key) |
| |
| if date_visited not in [None, 'None']: |
| logger.debug('Skipping PRUNE %s at %s' % (entity_tag, date_visited)) |
| return True |
| else: |
| cache.delete(key) |
| |
| logger.debug('pruning GRAPH %s at %s' % (entity_tag, str(datetime.datetime.utcnow()))) |
| if not config.get('skip_cache_write', False): |
| cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2)) |
| |
| if collection_name in config.get('exclude_collection', []): |
| logger.debug('Excluding (Collection) entity %s' % entity_tag) |
| return True |
| |
| out_edge_names = [edge_name for edge_name in source_entity.get('metadata', {}).get('collections', [])] |
| out_edge_names += [edge_name for edge_name in source_entity.get('metadata', {}).get('connections', [])] |
| |
| for edge_name in out_edge_names: |
| prune_edge_by_name(edge_name, app, collection_name, source_entity) |
| |
| |
| def reput(app, collection_name, source_entity, attempts=0): |
| source_identifier = source_entity.get('uuid') |
| target_app, target_collection, target_org = get_target_mapping(app, collection_name) |
| |
| try: |
| target_entity_url_by_name = put_entity_url_template.format(org=target_org, |
| app=target_app, |
| collection=target_collection, |
| uuid=source_identifier, |
| **config.get('target_endpoint')) |
| |
| r = session_source.put(target_entity_url_by_name, data=json.dumps({})) |
| if r.status_code != 200: |
| logger.info('HTTP [%s]: %s' % (target_entity_url_by_name, r.status_code)) |
| else: |
| logger.debug('HTTP [%s]: %s' % (target_entity_url_by_name, r.status_code)) |
| |
| except: |
| pass |
| |
| |
| def get_uuid_time(the_uuid_string): |
| return time_uuid.TimeUUID(the_uuid_string).get_datetime() |
| |
| |
| def migrate_permissions(app, collection_name, source_entity, attempts=0): |
| if collection_name not in ['roles', 'role', 'group', 'groups']: |
| return True |
| |
| target_app, target_collection, target_org = get_target_mapping(app, collection_name) |
| |
| source_identifier = get_source_identifier(source_entity) |
| |
| source_permissions_url = permissions_url_template.format(org=config.get('org'), |
| app=app, |
| collection=collection_name, |
| uuid=source_identifier, |
| **config.get('source_endpoint')) |
| |
| r = session_source.get(source_permissions_url) |
| |
| if r.status_code != 200: |
| logger.error('Unable to get permissions at URL [%s]: %s' % (source_permissions_url, r.text)) |
| return False |
| |
| perm_response = r.json() |
| |
| perms = perm_response.get('data', []) |
| |
| logger.info('Migrating [%s / %s] with permissions %s' % (collection_name, source_identifier, perms)) |
| |
| if len(perms) > 0: |
| target_permissions_url = permissions_url_template.format(org=target_org, |
| app=target_app, |
| collection=target_collection, |
| uuid=source_identifier, |
| **config.get('target_endpoint')) |
| |
| for permission in perms: |
| data = {'permission': permission} |
| |
| logger.info('Posting permission %s to %s' % (json.dumps(data), target_permissions_url)) |
| |
| r = session_target.post(target_permissions_url, json.dumps(data)) |
| |
| if r.status_code != 200: |
| logger.error( |
| 'ERROR posting permission %s to URL=[%s]: %s' % ( |
| json.dumps(data), target_permissions_url, r.text)) |
| |
| return True |
| |
| |
| def migrate_data(app, collection_name, source_entity, attempts=0, force=False): |
| if config.get('skip_data') and not force: |
| return True |
| |
| # check the cache to see if this entity has changed |
| if not config.get('skip_cache_read', False) and not force: |
| try: |
| str_modified = cache.get(source_entity.get('uuid')) |
| |
| if str_modified not in [None, 'None']: |
| |
| modified = long(str_modified) |
| |
| logger.debug('FOUND CACHE: %s = %s ' % (source_entity.get('uuid'), modified)) |
| |
| if modified <= source_entity.get('modified'): |
| |
| modified_date = datetime.datetime.utcfromtimestamp(modified / 1000) |
| e_uuid = source_entity.get('uuid') |
| |
| uuid_datetime = time_uuid.TimeUUID(e_uuid).get_datetime() |
| |
| logger.debug('Skipping ENTITY: %s / %s / %s / %s (%s) / %s (%s)' % ( |
| config.get('org'), app, collection_name, e_uuid, uuid_datetime, modified, modified_date)) |
| return True |
| else: |
| logger.debug('DELETING CACHE: %s ' % (source_entity.get('uuid'))) |
| cache.delete(source_entity.get('uuid')) |
| except: |
| logger.error('Error on checking cache for uuid=[%s]' % source_entity.get('uuid')) |
| logger.error(traceback.format_exc()) |
| |
| if exclude_collection(collection_name): |
| logger.warn('Excluding entity in filtered collection [%s]' % collection_name) |
| return True |
| |
| # handle duplicate user case |
| if collection_name in ['users', 'user']: |
| source_entity = confirm_user_entity(app, source_entity) |
| |
| source_identifier = get_source_identifier(source_entity) |
| |
| logger.info('Visiting ENTITY data [%s / %s (%s) ] at %s' % ( |
| collection_name, source_identifier, get_uuid_time(source_entity.get('uuid')), str(datetime.datetime.utcnow()))) |
| |
| entity_copy = source_entity.copy() |
| |
| if 'metadata' in entity_copy: |
| entity_copy.pop('metadata') |
| |
| target_app, target_collection, target_org = get_target_mapping(app, collection_name) |
| |
| try: |
| target_entity_url_by_name = put_entity_url_template.format(org=target_org, |
| app=target_app, |
| collection=target_collection, |
| uuid=source_identifier, |
| **config.get('target_endpoint')) |
| |
| r = session_target.put(url=target_entity_url_by_name, data=json.dumps(entity_copy)) |
| |
| if attempts > 1: |
| logger.warn('Attempt [%s] to migrate entity [%s / %s] at URL [%s]' % ( |
| attempts, collection_name, source_identifier, target_entity_url_by_name)) |
| else: |
| logger.debug('Attempt [%s] to migrate entity [%s / %s] at URL [%s]' % ( |
| attempts, collection_name, source_identifier, target_entity_url_by_name)) |
| |
| if r.status_code == 200: |
| # Worked => WE ARE DONE |
| logger.info( |
| 'migrate_data | success=[%s] | attempts=[%s] | entity=[%s / %s / %s] | created=[%s] | modified=[%s]' % ( |
| True, attempts, config.get('org'), app, source_identifier, source_entity.get('created'), |
| source_entity.get('modified'),)) |
| |
| if not config.get('skip_cache_write', False): |
| logger.debug('SETTING CACHE | uuid=[%s] | modified=[%s]' % ( |
| source_entity.get('uuid'), str(source_entity.get('modified')))) |
| |
| cache.set(source_entity.get('uuid'), str(source_entity.get('modified'))) |
| |
| if collection_name in ['role', 'group', 'roles', 'groups']: |
| migrate_permissions(app, collection_name, source_entity, attempts=0) |
| |
| if collection_name in ['users', 'user']: |
| migrate_user_credentials(app, collection_name, source_entity, attempts=0) |
| |
| return True |
| |
| else: |
| logger.error('Failure [%s] on attempt [%s] to PUT url=[%s], entity=[%s] response=[%s]' % ( |
| r.status_code, attempts, target_entity_url_by_name, json.dumps(source_entity), r.text)) |
| |
| if attempts >= 5: |
| logger.critical( |
| 'ABORT migrate_data | success=[%s] | attempts=[%s] | created=[%s] | modified=[%s] %s / %s / %s' % ( |
| True, attempts, source_entity.get('created'), source_entity.get('modified'), app, |
| collection_name, source_identifier)) |
| |
| return False |
| |
| if r.status_code == 400: |
| |
| if target_collection in ['roles', 'role']: |
| return repair_user_role(app, collection_name, source_entity) |
| |
| elif target_collection in ['users', 'user']: |
| return handle_user_migration_conflict(app, collection_name, source_entity) |
| |
| elif 'duplicate_unique_property_exists' in r.text: |
| logger.error( |
| 'WILL NOT RETRY (duplicate) [%s] attempts to PUT url=[%s], entity=[%s] response=[%s]' % ( |
| attempts, target_entity_url_by_name, json.dumps(source_entity), r.text)) |
| |
| return False |
| |
| elif r.status_code == 403: |
| logger.critical( |
| 'ABORT migrate_data | success=[%s] | attempts=[%s] | created=[%s] | modified=[%s] %s / %s / %s' % ( |
| False, attempts, source_entity.get('created'), source_entity.get('modified'), app, |
| collection_name, source_identifier)) |
| return False |
| |
| except: |
| logger.error(traceback.format_exc()) |
| logger.error('error in migrate_data on entity: %s' % json.dumps(source_entity)) |
| |
| logger.warn( |
| 'UNSUCCESSFUL migrate_data | success=[%s] | attempts=[%s] | entity=[%s / %s / %s] | created=[%s] | modified=[%s]' % ( |
| True, attempts, config.get('org'), app, source_identifier, source_entity.get('created'), |
| source_entity.get('modified'),)) |
| |
| return migrate_data(app, collection_name, source_entity, attempts=attempts + 1) |
| |
| |
| def handle_user_migration_conflict(app, collection_name, source_entity, attempts=0, depth=0): |
| if collection_name in ['users', 'user']: |
| return False |
| |
| username = source_entity.get('username') |
| target_app, target_collection, target_org = get_target_mapping(app, collection_name) |
| |
| target_entity_url = get_entity_url_template.format(org=target_org, |
| app=target_app, |
| collection=target_collection, |
| uuid=username, |
| **config.get('target_endpoint')) |
| |
| # There is retry build in, here is the short circuit |
| if attempts >= 5: |
| logger.critical( |
| 'Aborting after [%s] attempts to audit user [%s] at URL [%s]' % (attempts, username, target_entity_url)) |
| |
| return False |
| |
| r = session_target.get(url=target_entity_url) |
| |
| if r.status_code == 200: |
| target_entity = r.json().get('entities')[0] |
| |
| if source_entity.get('created') < target_entity.get('created'): |
| return repair_user_role(app, collection_name, source_entity) |
| |
| elif r.status_code / 100 == 5: |
| audit_logger.warning( |
| 'CONFLICT: handle_user_migration_conflict failed attempt [%s] GET [%s] on TARGET URL=[%s] - : %s' % ( |
| attempts, r.status_code, target_entity_url, r.text)) |
| |
| time.sleep(DEFAULT_RETRY_SLEEP) |
| |
| return handle_user_migration_conflict(app, collection_name, source_entity, attempts) |
| |
| else: |
| audit_logger.error( |
| 'CONFLICT: Failed handle_user_migration_conflict attempt [%s] GET [%s] on TARGET URL=[%s] - : %s' % ( |
| attempts, r.status_code, target_entity_url, r.text)) |
| |
| return False |
| |
| |
| def get_best_source_entity(app, collection_name, source_entity, depth=0): |
| target_app, target_collection, target_org = get_target_mapping(app, collection_name) |
| |
| target_pk = 'uuid' |
| |
| if target_collection in ['users', 'user']: |
| target_pk = 'username' |
| elif target_collection in ['roles', 'role']: |
| target_pk = 'name' |
| |
| target_name = source_entity.get(target_pk) |
| |
| # there should be no target entity now, we just need to decide which one from the source to use |
| source_entity_url_by_name = get_entity_url_template.format(org=config.get('org'), |
| app=app, |
| collection=collection_name, |
| uuid=target_name, |
| **config.get('source_endpoint')) |
| |
| r_get_source_entity = session_source.get(source_entity_url_by_name) |
| |
| # if we are able to get at the source by PK... |
| if r_get_source_entity.status_code == 200: |
| |
| # extract the entity from the response |
| entity_from_get = r_get_source_entity.json().get('entities')[0] |
| |
| return entity_from_get |
| |
| elif r_get_source_entity.status_code / 100 == 4: |
| # wasn't found, get by QL and sort |
| source_entity_query_url = collection_query_url_template.format(org=config.get('org'), |
| app=app, |
| collection=collection_name, |
| ql='select * where %s=\'%s\' order by created asc' % ( |
| target_pk, target_name), |
| limit=config.get('limit'), |
| **config.get('source_endpoint')) |
| |
| logger.info('Attempting to determine best entity from query on URL %s' % source_entity_query_url) |
| |
| q = UsergridQueryIterator(source_entity_query_url, sleep_time=config.get('error_retry_sleep')) |
| |
| desired_entity = None |
| |
| entity_counter = 0 |
| |
| for e in q: |
| entity_counter += 1 |
| |
| if desired_entity is None: |
| desired_entity = e |
| |
| elif e.get('created') < desired_entity.get('created'): |
| desired_entity = e |
| |
| if desired_entity is None: |
| logger.warn('Unable to determine best of [%s] entities from query on URL %s' % ( |
| entity_counter, source_entity_query_url)) |
| |
| return source_entity |
| |
| else: |
| return desired_entity |
| |
| else: |
| return source_entity |
| |
| |
| def repair_user_role(app, collection_name, source_entity, attempts=0, depth=0): |
| target_app, target_collection, target_org = get_target_mapping(app, collection_name) |
| |
| # For the users collection, there seemed to be cases where a USERNAME was created/existing with the a |
| # different UUID which caused a 'collision' - so the point is to delete the entity with the differing |
| # UUID by UUID and then do a recursive call to migrate the data - now that the collision has been cleared |
| |
| target_pk = 'uuid' |
| |
| if target_collection in ['users', 'user']: |
| target_pk = 'username' |
| elif target_collection in ['roles', 'role']: |
| target_pk = 'name' |
| |
| target_name = source_entity.get(target_pk) |
| |
| target_entity_url_by_name = get_entity_url_template.format(org=target_org, |
| app=target_app, |
| collection=target_collection, |
| uuid=target_name, |
| **config.get('target_endpoint')) |
| |
| logger.warning('Repairing: Deleting name=[%s] entity at URL=[%s]' % (target_name, target_entity_url_by_name)) |
| |
| r = session_target.delete(target_entity_url_by_name) |
| |
| if r.status_code == 200 or (r.status_code in [404, 401] and 'service_resource_not_found' in r.text): |
| logger.info('Deletion of entity at URL=[%s] was [%s]' % (target_entity_url_by_name, r.status_code)) |
| |
| best_source_entity = get_best_source_entity(app, collection_name, source_entity) |
| |
| target_entity_url_by_uuid = get_entity_url_template.format(org=target_org, |
| app=target_app, |
| collection=target_collection, |
| uuid=best_source_entity.get('uuid'), |
| **config.get('target_endpoint')) |
| |
| r = session_target.put(target_entity_url_by_uuid, data=json.dumps(best_source_entity)) |
| |
| if r.status_code == 200: |
| logger.info('Successfully repaired user at URL=[%s]' % target_entity_url_by_uuid) |
| return True |
| |
| else: |
| logger.critical('Failed to PUT [%s] the desired entity at URL=[%s]: %s' % ( |
| r.status_code, target_entity_url_by_name, r.text)) |
| return False |
| |
| else: |
| # log an error and keep going if we cannot delete the entity at the specified URL. Unlikely, but if so |
| # then this entity is borked |
| logger.critical( |
| 'Deletion of entity at URL=[%s] FAILED [%s]: %s' % (target_entity_url_by_name, r.status_code, r.text)) |
| return False |
| |
| |
| def get_target_mapping(app, collection_name): |
| target_org = config.get('org_mapping', {}).get(config.get('org'), config.get('org')) |
| target_app = config.get('app_mapping', {}).get(app, app) |
| target_collection = config.get('collection_mapping', {}).get(collection_name, collection_name) |
| return target_app, target_collection, target_org |
| |
| |
| def parse_args(): |
| parser = argparse.ArgumentParser(description='Usergrid Org/App Migrator') |
| |
| parser.add_argument('--log_dir', |
| help='path to the place where logs will be written', |
| default='./', |
| type=str, |
| required=False) |
| |
| parser.add_argument('--log_level', |
| help='log level - DEBUG, INFO, WARN, ERROR, CRITICAL', |
| default='INFO', |
| type=str, |
| required=False) |
| |
| parser.add_argument('-o', '--org', |
| help='Name of the org to migrate', |
| type=str, |
| required=True) |
| |
| parser.add_argument('-a', '--app', |
| help='Name of one or more apps to include, specify none to include all apps', |
| required=False, |
| action='append') |
| |
| parser.add_argument('-e', '--include_edge', |
| help='Name of one or more edges/connection types to INCLUDE, specify none to include all edges', |
| required=False, |
| action='append') |
| |
| parser.add_argument('--exclude_edge', |
| help='Name of one or more edges/connection types to EXCLUDE, specify none to include all edges', |
| required=False, |
| action='append') |
| |
| parser.add_argument('--exclude_collection', |
| help='Name of one or more collections to EXCLUDE, specify none to include all collections', |
| required=False, |
| action='append') |
| |
| parser.add_argument('-c', '--collection', |
| help='Name of one or more collections to include, specify none to include all collections', |
| default=[], |
| action='append') |
| |
| parser.add_argument('--force_app', |
| help='Necessary for using 2.0 as a source at times due to API issues. Forces the specified app(s) to be processed, even if they are not returned from the list of apps in the API call', |
| default=[], |
| action='append') |
| |
| parser.add_argument('--use_name_for_collection', |
| help='Name of one or more collections to use [name] instead of [uuid] for creating entities and edges', |
| default=[], |
| action='append') |
| |
| parser.add_argument('-m', '--migrate', |
| help='Specifies what to migrate: data, connections, credentials, audit or none (just iterate ' |
| 'the apps/collections)', |
| type=str, |
| choices=[ |
| 'data', |
| 'prune', |
| 'none', |
| 'reput', |
| 'credentials', |
| 'graph', |
| 'permissions' |
| ], |
| default='data') |
| |
| parser.add_argument('-s', '--source_config', |
| help='The path to the source endpoint/org configuration file', |
| type=str, |
| default='source.json') |
| |
| parser.add_argument('-d', '--target_config', |
| help='The path to the target endpoint/org configuration file', |
| type=str, |
| default='destination.json') |
| |
| parser.add_argument('--redis_socket', |
| help='The path to the socket for redis to use', |
| type=str) |
| |
| parser.add_argument('--limit', |
| help='The number of entities to return per query request', |
| type=int, |
| default=100) |
| |
| parser.add_argument('-w', '--entity_workers', |
| help='The number of worker processes to do the migration', |
| type=int, |
| default=16) |
| |
| parser.add_argument('--visit_cache_ttl', |
| help='The TTL of the cache of visiting nodes in the graph for connections', |
| type=int, |
| default=3600 * 2) |
| |
| parser.add_argument('--error_retry_sleep', |
| help='The number of seconds to wait between retrieving after an error', |
| type=float, |
| default=30) |
| |
| parser.add_argument('--page_sleep_time', |
| help='The number of seconds to wait between retrieving pages from the UsergridQueryIterator', |
| type=float, |
| default=0) |
| |
| parser.add_argument('--entity_sleep_time', |
| help='The number of seconds to wait between retrieving pages from the UsergridQueryIterator', |
| type=float, |
| default=0) |
| |
| parser.add_argument('--collection_workers', |
| help='The number of worker processes to do the migration', |
| type=int, |
| default=2) |
| |
| parser.add_argument('--queue_size_max', |
| help='The max size of entities to allow in the queue', |
| type=int, |
| default=100000) |
| |
| parser.add_argument('--graph_depth', |
| help='The graph depth to traverse to copy', |
| type=int, |
| default=3) |
| |
| parser.add_argument('--queue_watermark_high', |
| help='The point at which publishing to the queue will PAUSE until it is at or below low watermark', |
| type=int, |
| default=25000) |
| |
| parser.add_argument('--min_modified', |
| help='Break when encountering a modified date before this, per collection', |
| type=int, |
| default=0) |
| |
| parser.add_argument('--max_modified', |
| help='Break when encountering a modified date after this, per collection', |
| type=long, |
| default=3793805526000) |
| |
| parser.add_argument('--queue_watermark_low', |
| help='The point at which publishing to the queue will RESUME after it has reached the high watermark', |
| type=int, |
| default=5000) |
| |
| parser.add_argument('--ql', |
| help='The QL to use in the filter for reading data from collections', |
| type=str, |
| default='select * order by created asc') |
| # default='select * order by created asc') |
| |
| parser.add_argument('--repair_data', |
| help='Repair data when iterating/migrating graph but skipping data', |
| action='store_true') |
| |
| parser.add_argument('--prune', |
| help='Prune the graph while processing (instead of the prune operation)', |
| action='store_true') |
| |
| parser.add_argument('--skip_data', |
| help='Skip migrating data (useful for connections only)', |
| action='store_true') |
| |
| parser.add_argument('--skip_credentials', |
| help='Skip migrating credentials', |
| action='store_true') |
| |
| parser.add_argument('--skip_cache_read', |
| help='Skip reading the cache (modified timestamps and graph edges)', |
| dest='skip_cache_read', |
| action='store_true') |
| |
| parser.add_argument('--skip_cache_write', |
| help='Skip updating the cache with modified timestamps of entities and graph edges', |
| dest='skip_cache_write', |
| action='store_true') |
| |
| parser.add_argument('--create_apps', |
| help='Create apps at the target if they do not exist', |
| dest='create_apps', |
| action='store_true') |
| |
| parser.add_argument('--nohup', |
| help='specifies not to use stdout for logging', |
| action='store_true') |
| |
| parser.add_argument('--graph', |
| help='Use GRAPH instead of Query', |
| dest='graph', |
| action='store_true') |
| |
| parser.add_argument('--su_username', |
| help='Superuser username', |
| required=False, |
| type=str) |
| |
| parser.add_argument('--su_password', |
| help='Superuser Password', |
| required=False, |
| type=str) |
| |
| parser.add_argument('--inbound_connections', |
| help='Name of the org to migrate', |
| action='store_true') |
| |
| parser.add_argument('--map_app', |
| help="Multiple allowed: A colon-separated string such as 'apples:oranges' which indicates to" |
| " put data from the app named 'apples' from the source endpoint into app named 'oranges' " |
| "in the target endpoint", |
| default=[], |
| action='append') |
| |
| parser.add_argument('--map_collection', |
| help="One or more colon-separated string such as 'cats:dogs' which indicates to put data from " |
| "collections named 'cats' from the source endpoint into a collection named 'dogs' in the " |
| "target endpoint, applicable globally to all apps", |
| default=[], |
| action='append') |
| |
| parser.add_argument('--map_org', |
| help="One or more colon-separated strings such as 'red:blue' which indicates to put data from " |
| "org named 'red' from the source endpoint into a collection named 'blue' in the target " |
| "endpoint", |
| default=[], |
| action='append') |
| |
| my_args = parser.parse_args(sys.argv[1:]) |
| |
| return vars(my_args) |
| |
| |
| def init(): |
| global config |
| |
| if config.get('migrate') == 'credentials': |
| |
| if config.get('su_password') is None or config.get('su_username') is None: |
| message = 'ABORT: In order to migrate credentials, Superuser parameters (su_password, su_username) are required' |
| print message |
| logger.critical(message) |
| exit() |
| |
| config['collection_mapping'] = {} |
| config['app_mapping'] = {} |
| config['org_mapping'] = {} |
| |
| for mapping in config.get('map_collection', []): |
| parts = mapping.split(':') |
| |
| if len(parts) == 2: |
| config['collection_mapping'][parts[0]] = parts[1] |
| else: |
| logger.warning('Skipping Collection mapping: [%s]' % mapping) |
| |
| for mapping in config.get('map_app', []): |
| parts = mapping.split(':') |
| |
| if len(parts) == 2: |
| config['app_mapping'][parts[0]] = parts[1] |
| else: |
| logger.warning('Skipping App mapping: [%s]' % mapping) |
| |
| for mapping in config.get('map_org', []): |
| parts = mapping.split(':') |
| |
| if len(parts) == 2: |
| config['org_mapping'][parts[0]] = parts[1] |
| logger.info('Mapping Org [%s] to [%s] from mapping [%s]' % (parts[0], parts[1], mapping)) |
| else: |
| logger.warning('Skipping Org mapping: [%s]' % mapping) |
| |
| with open(config.get('source_config'), 'r') as f: |
| config['source_config'] = json.load(f) |
| |
| with open(config.get('target_config'), 'r') as f: |
| config['target_config'] = json.load(f) |
| |
| if config['exclude_collection'] is None: |
| config['exclude_collection'] = [] |
| |
| config['source_endpoint'] = config['source_config'].get('endpoint').copy() |
| config['source_endpoint'].update(config['source_config']['credentials'][config['org']]) |
| |
| target_org = config.get('org_mapping', {}).get(config.get('org'), config.get('org')) |
| |
| config['target_endpoint'] = config['target_config'].get('endpoint').copy() |
| config['target_endpoint'].update(config['target_config']['credentials'][target_org]) |
| |
| |
| def wait_for(threads, label, sleep_time=60): |
| wait = True |
| |
| logger.info('Starting to wait for [%s] threads with sleep time=[%s]' % (len(threads), sleep_time)) |
| |
| while wait: |
| wait = False |
| alive_count = 0 |
| |
| for t in threads: |
| |
| if t.is_alive(): |
| alive_count += 1 |
| logger.info('Thread [%s] is still alive' % t.name) |
| |
| if alive_count > 0: |
| wait = True |
| logger.info('Continuing to wait for [%s] threads with sleep time=[%s]' % (alive_count, sleep_time)) |
| time.sleep(sleep_time) |
| |
| logger.warn('All workers [%s] done!' % label) |
| |
| |
| def count_bytes(entity): |
| entity_copy = entity.copy() |
| |
| if 'metadata' in entity_copy: |
| del entity_copy['metadata'] |
| |
| entity_str = json.dumps(entity_copy) |
| |
| return len(entity_str) |
| |
| |
| def migrate_user_credentials(app, collection_name, source_entity, attempts=0): |
| # this only applies to users |
| if collection_name not in ['users', 'user'] \ |
| or config.get('skip_credentials', False): |
| return False |
| |
| source_identifier = get_source_identifier(source_entity) |
| |
| target_app, target_collection, target_org = get_target_mapping(app, collection_name) |
| |
| # get the URLs for the source and target users |
| |
| source_url = user_credentials_url_template.format(org=config.get('org'), |
| app=app, |
| uuid=source_identifier, |
| **config.get('source_endpoint')) |
| |
| target_url = user_credentials_url_template.format(org=target_org, |
| app=target_app, |
| uuid=source_identifier, |
| **config.get('target_endpoint')) |
| |
| # this endpoint for some reason uses basic auth... |
| r = requests.get(source_url, auth=HTTPBasicAuth(config.get('su_username'), config.get('su_password'))) |
| |
| if r.status_code != 200: |
| logger.error('Unable to migrate credentials due to HTTP [%s] on GET URL [%s]: %s' % ( |
| r.status_code, source_url, r.text)) |
| |
| return False |
| |
| source_credentials = r.json() |
| |
| logger.info('Putting credentials to [%s]...' % target_url) |
| |
| r = requests.put(target_url, |
| data=json.dumps(source_credentials), |
| auth=HTTPBasicAuth(config.get('su_username'), config.get('su_password'))) |
| |
| if r.status_code != 200: |
| logger.error( |
| 'Unable to migrate credentials due to HTTP [%s] on PUT URL [%s]: %s' % ( |
| r.status_code, target_url, r.text)) |
| return False |
| |
| logger.info('migrate_user_credentials | success=[%s] | app/collection/name = %s/%s/%s' % ( |
| True, app, collection_name, source_entity.get('uuid'))) |
| |
| return True |
| |
| |
| def check_response_status(r, url, exit_on_error=True): |
| if r.status_code != 200: |
| logger.critical('HTTP [%s] on URL=[%s]' % (r.status_code, url)) |
| logger.critical('Response: %s' % r.text) |
| |
| if exit_on_error: |
| exit() |
| |
| |
| def do_operation(apps_and_collections, operation): |
| status_map = {} |
| |
| logger.info('Creating queues...') |
| |
| # Mac, for example, does not support the max_size for a queue in Python |
| if _platform == "linux" or _platform == "linux2": |
| entity_queue = Queue(maxsize=config.get('queue_size_max')) |
| collection_queue = Queue(maxsize=config.get('queue_size_max')) |
| collection_response_queue = Queue(maxsize=config.get('queue_size_max')) |
| else: |
| entity_queue = Queue() |
| collection_queue = Queue() |
| collection_response_queue = Queue() |
| |
| logger.info('Starting entity_workers...') |
| |
| collection_count = 0 |
| # create the entity workers, but only start them (later) if there is work to do |
| entity_workers = [EntityWorker(entity_queue, operation) for x in xrange(config.get('entity_workers'))] |
| |
| # create the collection workers, but only start them (later) if there is work to do |
| collection_workers = [CollectionWorker(collection_queue, entity_queue, collection_response_queue) for x in |
| xrange(config.get('collection_workers'))] |
| |
| status_listener = StatusListener(collection_response_queue, entity_queue) |
| |
| try: |
| # for each app, publish the (app_name, collection_name) to the queue. |
| # this is received by a collection worker who iterates the collection and publishes |
| # entities into a queue. These are received by an individual entity worker which |
| # executes the specified operation on the entity |
| |
| for app, app_data in apps_and_collections.get('apps', {}).iteritems(): |
| logger.info('Processing app=[%s]' % app) |
| |
| status_map[app] = { |
| 'iteration_started': str(datetime.datetime.now()), |
| 'max_created': -1, |
| 'max_modified': -1, |
| 'min_created': 1584946416000, |
| 'min_modified': 1584946416000, |
| 'count': 0, |
| 'bytes': 0, |
| 'collections': {} |
| } |
| |
| # iterate the collections which are returned. |
| for collection_name in app_data.get('collections'): |
| logger.info('Publishing app / collection: %s / %s' % (app, collection_name)) |
| |
| collection_count += 1 |
| collection_queue.put((app, collection_name)) |
| |
| logger.info('Finished publishing [%s] collections for app [%s] !' % (collection_count, app)) |
| |
| # only start the threads if there is work to do |
| if collection_count > 0: |
| status_listener.start() |
| |
| # start the worker processes which will iterate the collections |
| [w.start() for w in collection_workers] |
| |
| # start the worker processes which will do the work of migrating |
| [w.start() for w in entity_workers] |
| |
| # allow collection workers to finish |
| wait_for(collection_workers, label='collection_workers', sleep_time=60) |
| |
| # allow entity workers to finish |
| wait_for(entity_workers, label='entity_workers', sleep_time=60) |
| |
| status_listener.terminate() |
| |
| except KeyboardInterrupt: |
| logger.warning('Keyboard Interrupt, aborting...') |
| entity_queue.close() |
| collection_queue.close() |
| collection_response_queue.close() |
| |
| [os.kill(super(EntityWorker, p).pid, signal.SIGINT) for p in entity_workers] |
| [os.kill(super(CollectionWorker, p).pid, signal.SIGINT) for p in collection_workers] |
| os.kill(super(StatusListener, status_listener).pid, signal.SIGINT) |
| |
| [w.terminate() for w in entity_workers] |
| [w.terminate() for w in collection_workers] |
| status_listener.terminate() |
| |
| logger.info('entity_workers DONE!') |
| |
| |
| def filter_apps_and_collections(org_apps): |
| app_collecitons = { |
| 'apps': { |
| |
| } |
| } |
| |
| try: |
| selected_apps = config.get('app') |
| |
| # iterate the apps retrieved from the org |
| for org_app in sorted(org_apps.keys()): |
| logger.info('Found SOURCE App: %s' % org_app) |
| |
| time.sleep(3) |
| |
| for org_app in sorted(org_apps.keys()): |
| parts = org_app.split('/') |
| app = parts[1] |
| |
| # if apps are specified and the current app is not in the list, skip it |
| if selected_apps and len(selected_apps) > 0 and app not in selected_apps: |
| logger.warning('Skipping app [%s] not included in process list [%s]' % (app, selected_apps)) |
| continue |
| |
| app_collecitons['apps'][app] = { |
| 'collections': [] |
| } |
| |
| # get the list of collections from the source org/app |
| source_app_url = app_url_template.format(org=config.get('org'), |
| app=app, |
| **config.get('source_endpoint')) |
| logger.info('GET %s' % source_app_url) |
| |
| r_collections = session_source.get(source_app_url) |
| |
| collection_attempts = 0 |
| |
| # sometimes this call was not working so I put it in a loop to force it... |
| while r_collections.status_code != 200 and collection_attempts < 5: |
| collection_attempts += 1 |
| logger.warning('FAILED: GET (%s) [%s] URL: %s' % (r_collections.elapsed, r_collections.status_code, |
| source_app_url)) |
| time.sleep(DEFAULT_RETRY_SLEEP) |
| r_collections = session_source.get(source_app_url) |
| |
| if collection_attempts >= 5: |
| logger.critical('Unable to get collections at URL %s, skipping app' % source_app_url) |
| continue |
| |
| app_response = r_collections.json() |
| |
| logger.info('App Response: ' + json.dumps(app_response)) |
| |
| app_entities = app_response.get('entities', []) |
| |
| if len(app_entities) > 0: |
| app_entity = app_entities[0] |
| collections = app_entity.get('metadata', {}).get('collections', {}) |
| logger.info('App=[%s] starting Collections=[%s]' % (app, collections)) |
| |
| app_collecitons['apps'][app]['collections'] = [c for c in collections if include_collection(c)] |
| logger.info('App=[%s] filtered Collections=[%s]' % (app, collections)) |
| |
| except: |
| print(traceback.format_exc()) |
| |
| return app_collecitons |
| |
| |
| def confirm_target_org_apps(apps_and_collections): |
| for app in apps_and_collections.get('apps'): |
| |
| # it is possible to map source orgs and apps to differently named targets. This gets the |
| # target names for each |
| target_org = config.get('org_mapping', {}).get(config.get('org'), config.get('org')) |
| target_app = config.get('app_mapping', {}).get(app, app) |
| |
| # Check that the target Org/App exists. If not, move on to the next |
| target_app_url = app_url_template.format(org=target_org, |
| app=target_app, |
| **config.get('target_endpoint')) |
| logger.info('GET %s' % target_app_url) |
| r_target_apps = session_target.get(target_app_url) |
| |
| if r_target_apps.status_code != 200: |
| |
| if config.get('create_apps', DEFAULT_CREATE_APPS): |
| create_app_url = org_management_app_url_template.format(org=target_org, |
| app=target_app, |
| **config.get('target_endpoint')) |
| app_request = {'name': target_app} |
| r = session_target.post(create_app_url, data=json.dumps(app_request)) |
| |
| if r.status_code != 200: |
| logger.critical('--create_apps specified and unable to create app [%s] at URL=[%s]: %s' % ( |
| target_app, create_app_url, r.text)) |
| logger.critical('Process will now exit') |
| exit() |
| else: |
| logger.warning('Created app=[%s] at URL=[%s]: %s' % (target_app, create_app_url, r.text)) |
| else: |
| logger.critical('Target application DOES NOT EXIST at [%s] URL=%s' % ( |
| r_target_apps.status_code, target_app_url)) |
| continue |
| |
| |
| def main(): |
| global config, cache |
| |
| config = parse_args() |
| init() |
| init_logging() |
| |
| logger.warn('Script starting') |
| |
| try: |
| if config.get('redis_socket') is not None: |
| cache = redis.Redis(unix_socket_path=config.get('redis_socket')) |
| |
| else: |
| # this does not try to connect to redis |
| cache = redis.StrictRedis(host='localhost', port=6379, db=0) |
| |
| # this is necessary to test the connection to redis |
| cache.get('usergrid') |
| |
| except: |
| logger.error( |
| 'Error connecting to Redis cache, consider using Redis to be able to optimize the migration process...') |
| logger.error( |
| 'Error connecting to Redis cache, consider using Redis to be able to optimize the migration process...') |
| |
| time.sleep(3) |
| |
| config['use_cache'] = False |
| config['skip_cache_read'] = True |
| config['skip_cache_write'] = True |
| |
| org_apps = { |
| } |
| |
| force_apps = config.get('force_app', []) |
| |
| if force_apps is not None and len(force_apps) > 0: |
| logger.warn('Forcing only the following apps to be processed: %s' % force_apps) |
| |
| for app in force_apps: |
| key = '%s/%s' % (app, app) |
| org_apps[key] = app |
| |
| if len(org_apps) == 0: |
| source_org_mgmt_url = org_management_url_template.format(org=config.get('org'), |
| limit=config.get('limit'), |
| **config.get('source_endpoint')) |
| |
| print('Retrieving apps from [%s]' % source_org_mgmt_url) |
| logger.info('Retrieving apps from [%s]' % source_org_mgmt_url) |
| |
| try: |
| # list the apps for the SOURCE org |
| logger.info('GET %s' % source_org_mgmt_url) |
| r = session_source.get(source_org_mgmt_url) |
| |
| if r.status_code != 200: |
| logger.critical( |
| 'Abort processing: Unable to retrieve apps from [%s]: %s' % (source_org_mgmt_url, r.text)) |
| exit() |
| |
| logger.info(json.dumps(r.text)) |
| |
| org_apps = r.json().get('data') |
| |
| except Exception: |
| logger.exception('ERROR Retrieving apps from [%s]' % source_org_mgmt_url) |
| print(traceback.format_exc()) |
| logger.critical('Unable to retrieve apps from [%s] and will exit' % source_org_mgmt_url) |
| exit() |
| |
| # Check the specified configuration for what to migrate/audit |
| if config.get('migrate') == 'graph': |
| operation = migrate_graph |
| |
| elif config.get('migrate') == 'data': |
| operation = migrate_data |
| |
| elif config.get('migrate') == 'prune': |
| operation = prune_graph |
| |
| elif config.get('migrate') == 'permissions': |
| operation = migrate_permissions |
| config['collection'] = ['roles', 'groups'] |
| logger.warn( |
| 'Since permissions migration was specified, overwriting included collections to be %s...' % config[ |
| 'collection']) |
| |
| elif config.get('migrate') == 'credentials': |
| operation = migrate_user_credentials |
| config['collection'] = ['users'] |
| logger.warn('Since credential migration was specified, overwriting included collections to be %s' % config[ |
| 'collection']) |
| |
| elif config.get('migrate') == 'reput': |
| operation = reput |
| |
| else: |
| operation = None |
| |
| # filter out the apps and collections based on the -c and --exclude_collection directives |
| apps_and_collections = filter_apps_and_collections(org_apps) |
| |
| logger.warn('The following apps/collections will be processed: %s' % json.dumps(apps_and_collections)) |
| |
| # confirm the apps exist at the target/destination org |
| confirm_target_org_apps(apps_and_collections) |
| |
| # execute the operation over apps and collections |
| do_operation(apps_and_collections, operation) |
| |
| logger.warn('Script finished') |
| |
| |
| if __name__ == "__main__": |
| main() |