blob: ae3f492d1ae7156bd2453728e5ccae03218d6205 [file] [log] [blame]
# */
# * Licensed to the Apache Software Foundation (ASF) under one
# * or more contributor license agreements. See the NOTICE file
# * distributed with this work for additional information
# * regarding copyright ownership. The ASF licenses this file
# * to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# * with the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing,
# * software distributed under the License is distributed on an
# * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# * KIND, either express or implied. See the License for the
# * specific language governing permissions and limitations
# * under the License.
# */
from __future__ import print_function
from __future__ import print_function
from __future__ import print_function
import os
import uuid
from Queue import Empty
import argparse
import json
import logging
import sys
from multiprocessing import Queue, Process
from sets import Set
import time_uuid
import datetime
from cloghandler import ConcurrentRotatingFileHandler
import requests
import traceback
import redis
import time
from sys import platform as _platform
import signal
from requests.auth import HTTPBasicAuth
from usergrid import UsergridQueryIterator
import urllib3
__author__ = 'Jeff.West@yahoo.com'
ECID = str(uuid.uuid1())
key_version = 'v4'
logger = logging.getLogger('GraphMigrator')
worker_logger = logging.getLogger('Worker')
collection_worker_logger = logging.getLogger('CollectionWorker')
error_logger = logging.getLogger('ErrorLogger')
audit_logger = logging.getLogger('AuditLogger')
status_logger = logging.getLogger('StatusLogger')
urllib3.disable_warnings()
DEFAULT_CREATE_APPS = False
DEFAULT_RETRY_SLEEP = 10
DEFAULT_PROCESSING_SLEEP = 1
queue = Queue()
QSIZE_OK = False
try:
queue.qsize()
QSIZE_OK = True
except:
pass
session_source = requests.Session()
session_target = requests.Session()
cache = None
def total_seconds(td):
return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6
def init_logging(stdout_enabled=True):
root_logger = logging.getLogger()
root_logger.setLevel(logging.getLevelName(config.get('log_level', 'INFO')))
# root_logger.setLevel(logging.WARN)
logging.getLogger('requests.packages.urllib3.connectionpool').setLevel(logging.ERROR)
logging.getLogger('boto').setLevel(logging.ERROR)
logging.getLogger('urllib3.connectionpool').setLevel(logging.WARN)
log_formatter = logging.Formatter(
fmt='%(asctime)s | ' + ECID + ' | %(name)s | %(processName)s | %(levelname)s | %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p')
stdout_logger = logging.StreamHandler(sys.stdout)
stdout_logger.setFormatter(log_formatter)
root_logger.addHandler(stdout_logger)
if stdout_enabled:
stdout_logger.setLevel(logging.getLevelName(config.get('log_level', 'INFO')))
# base log file
log_file_name = os.path.join(config.get('log_dir'),
'%s-%s-%s-migrator.log' % (config.get('org'), config.get('migrate'), ECID))
# ConcurrentRotatingFileHandler
rotating_file = ConcurrentRotatingFileHandler(filename=log_file_name,
mode='a',
maxBytes=404857600,
backupCount=0)
rotating_file.setFormatter(log_formatter)
rotating_file.setLevel(logging.INFO)
root_logger.addHandler(rotating_file)
error_log_file_name = os.path.join(config.get('log_dir'), '%s-%s-%s-migrator-errors.log' % (
config.get('org'), config.get('migrate'), ECID))
error_rotating_file = ConcurrentRotatingFileHandler(filename=error_log_file_name,
mode='a',
maxBytes=404857600,
backupCount=0)
error_rotating_file.setFormatter(log_formatter)
error_rotating_file.setLevel(logging.ERROR)
root_logger.addHandler(error_rotating_file)
entity_name_map = {
'users': 'username'
}
config = {}
# URL Templates for Usergrid
org_management_app_url_template = "{api_url}/management/organizations/{org}/applications?client_id={client_id}&client_secret={client_secret}"
org_management_url_template = "{api_url}/management/organizations/{org}/applications?client_id={client_id}&client_secret={client_secret}"
org_url_template = "{api_url}/{org}?client_id={client_id}&client_secret={client_secret}"
app_url_template = "{api_url}/{org}/{app}?client_id={client_id}&client_secret={client_secret}"
collection_url_template = "{api_url}/{org}/{app}/{collection}?client_id={client_id}&client_secret={client_secret}"
collection_query_url_template = "{api_url}/{org}/{app}/{collection}?ql={ql}&client_id={client_id}&client_secret={client_secret}&limit={limit}"
collection_graph_url_template = "{api_url}/{org}/{app}/{collection}?client_id={client_id}&client_secret={client_secret}&limit={limit}"
connection_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}?client_id={client_id}&client_secret={client_secret}"
connecting_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/connecting/{verb}?client_id={client_id}&client_secret={client_secret}"
connection_create_by_uuid_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_uuid}?client_id={client_id}&client_secret={client_secret}"
connection_create_by_name_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_type}/{target_name}?client_id={client_id}&client_secret={client_secret}"
connection_create_by_pairs_url_template = "{api_url}/{org}/{app}/{source_type_id}/{verb}/{target_type_id}?client_id={client_id}&client_secret={client_secret}"
get_entity_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}&connections=none"
get_entity_url_with_connections_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}"
put_entity_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}"
permissions_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/permissions?client_id={client_id}&client_secret={client_secret}"
user_credentials_url_template = "{api_url}/{org}/{app}/users/{uuid}/credentials"
ignore_collections = ['activities', 'queues', 'events', 'notifications']
class StatusListener(Process):
def __init__(self, status_queue, worker_queue):
super(StatusListener, self).__init__()
self.status_queue = status_queue
self.worker_queue = worker_queue
def run(self):
keep_going = True
org_results = {
'name': config.get('org'),
'apps': {},
}
empty_count = 0
status_file_name = os.path.join(config.get('log_dir'),
'%s-%s-%s-status.json' % (config.get('org'), config.get('migrate'), ECID))
while keep_going:
try:
app, collection, status_map = self.status_queue.get(timeout=60)
status_logger.info('Received status update for app/collection: [%s / %s]' % (app, collection))
empty_count = 0
org_results['summary'] = {
'max_created': -1,
'max_modified': -1,
'min_created': 1584946416000,
'min_modified': 1584946416000,
'count': 0,
'bytes': 0
}
if app not in org_results['apps']:
org_results['apps'][app] = {
'collections': {}
}
org_results['apps'][app]['collections'].update(status_map)
try:
for app, app_data in org_results['apps'].iteritems():
app_data['summary'] = {
'max_created': -1,
'max_modified': -1,
'min_created': 1584946416000,
'min_modified': 1584946416000,
'count': 0,
'bytes': 0
}
if 'collections' in app_data:
for collection, collection_data in app_data['collections'].iteritems():
app_data['summary']['count'] += collection_data['count']
app_data['summary']['bytes'] += collection_data['bytes']
org_results['summary']['count'] += collection_data['count']
org_results['summary']['bytes'] += collection_data['bytes']
# APP
if collection_data.get('max_modified') > app_data['summary']['max_modified']:
app_data['summary']['max_modified'] = collection_data.get('max_modified')
if collection_data.get('min_modified') < app_data['summary']['min_modified']:
app_data['summary']['min_modified'] = collection_data.get('min_modified')
if collection_data.get('max_created') > app_data['summary']['max_created']:
app_data['summary']['max_created'] = collection_data.get('max_created')
if collection_data.get('min_created') < app_data['summary']['min_created']:
app_data['summary']['min_created'] = collection_data.get('min_created')
# ORG
if collection_data.get('max_modified') > org_results['summary']['max_modified']:
org_results['summary']['max_modified'] = collection_data.get('max_modified')
if collection_data.get('min_modified') < org_results['summary']['min_modified']:
org_results['summary']['min_modified'] = collection_data.get('min_modified')
if collection_data.get('max_created') > org_results['summary']['max_created']:
org_results['summary']['max_created'] = collection_data.get('max_created')
if collection_data.get('min_created') < org_results['summary']['min_created']:
org_results['summary']['min_created'] = collection_data.get('min_created')
if QSIZE_OK:
status_logger.warn('CURRENT Queue Depth: %s' % self.worker_queue.qsize())
status_logger.warn('UPDATED status of org processed: %s' % json.dumps(org_results))
try:
logger.info('Writing status to file: %s' % status_file_name)
with open(status_file_name, 'w') as f:
json.dump(org_results, f, indent=2)
except:
print traceback.format_exc()
except KeyboardInterrupt, e:
raise e
except:
print traceback.format_exc()
except KeyboardInterrupt, e:
status_logger.warn('FINAL status of org processed: %s' % json.dumps(org_results))
raise e
except Empty:
if QSIZE_OK:
status_logger.warn('CURRENT Queue Depth: %s' % self.worker_queue.qsize())
status_logger.warn('CURRENT status of org processed: %s' % json.dumps(org_results))
status_logger.warning('EMPTY! Count=%s' % empty_count)
empty_count += 1
if empty_count >= 120:
keep_going = False
except:
print traceback.format_exc()
logger.warn('FINAL status of org processed: %s' % json.dumps(org_results))
try:
logger.info('Writing final status to file: %s' % status_file_name)
with open(status_file_name, 'w') as f:
json.dump(org_results, f, indent=2)
except:
print traceback.format_exc()
class EntityWorker(Process):
def __init__(self, queue, handler_function):
super(EntityWorker, self).__init__()
worker_logger.debug('Creating worker!')
self.queue = queue
self.handler_function = handler_function
def run(self):
worker_logger.info('starting run()...')
keep_going = True
count_processed = 0
empty_count = 0
start_time = int(time.time())
while keep_going:
try:
# get an entity with the app and collection name
app, collection_name, entity = self.queue.get(timeout=120)
empty_count = 0
# if entity.get('type') == 'user':
# entity = confirm_user_entity(app, entity)
# the handler operation is the specified operation such as migrate_graph
if self.handler_function is not None:
try:
message_start_time = int(time.time())
processed = self.handler_function(app, collection_name, entity)
message_end_time = int(time.time())
if processed:
count_processed += 1
total_time = message_end_time - start_time
avg_time_per_message = total_time / count_processed
message_time = message_end_time - message_start_time
worker_logger.debug('Processed [%sth] entity = %s / %s / %s' % (
count_processed, app, collection_name, entity.get('uuid')))
if count_processed % 1000 == 1:
worker_logger.info(
'Processed [%sth] entity = [%s / %s / %s] in [%s]s - avg time/message [%s]' % (
count_processed, app, collection_name, entity.get('uuid'), message_time,
avg_time_per_message))
except KeyboardInterrupt, e:
raise e
except Exception, e:
logger.exception('Error in EntityWorker processing message')
print traceback.format_exc()
except KeyboardInterrupt, e:
raise e
except Empty:
worker_logger.warning('EMPTY! Count=%s' % empty_count)
empty_count += 1
if empty_count >= 2:
keep_going = False
except Exception, e:
logger.exception('Error in EntityWorker run()')
print traceback.format_exc()
class CollectionWorker(Process):
def __init__(self, work_queue, entity_queue, response_queue):
super(CollectionWorker, self).__init__()
collection_worker_logger.debug('Creating worker!')
self.work_queue = work_queue
self.response_queue = response_queue
self.entity_queue = entity_queue
def run(self):
collection_worker_logger.info('starting run()...')
keep_going = True
counter = 0
# max_created = 0
empty_count = 0
app = 'ERROR'
collection_name = 'NOT SET'
status_map = {}
sleep_time = 10
try:
while keep_going:
try:
app, collection_name = self.work_queue.get(timeout=30)
status_map = {
collection_name: {
'iteration_started': str(datetime.datetime.now()),
'max_created': -1,
'max_modified': -1,
'min_created': 1584946416000,
'min_modified': 1584946416000,
'count': 0,
'bytes': 0
}
}
empty_count = 0
# added a flag for using graph vs query/index
if config.get('graph', False):
source_collection_url = collection_graph_url_template.format(org=config.get('org'),
app=app,
collection=collection_name,
limit=config.get('limit'),
**config.get('source_endpoint'))
else:
source_collection_url = collection_query_url_template.format(org=config.get('org'),
app=app,
collection=collection_name,
limit=config.get('limit'),
ql="select * %s" % config.get(
'ql'),
**config.get('source_endpoint'))
logger.info('Iterating URL: %s' % source_collection_url)
# use the UsergridQuery from the Python SDK to iterate the collection
q = UsergridQueryIterator(source_collection_url,
page_delay=config.get('page_sleep_time'),
sleep_time=config.get('error_retry_sleep'))
for entity in q:
# begin entity loop
self.entity_queue.put((app, collection_name, entity))
counter += 1
if 'created' in entity:
try:
entity_created = long(entity.get('created'))
if entity_created > status_map[collection_name]['max_created']:
status_map[collection_name]['max_created'] = entity_created
status_map[collection_name]['max_created_str'] = str(
datetime.datetime.fromtimestamp(entity_created / 1000))
if entity_created < status_map[collection_name]['min_created']:
status_map[collection_name]['min_created'] = entity_created
status_map[collection_name]['min_created_str'] = str(
datetime.datetime.fromtimestamp(entity_created / 1000))
except ValueError:
pass
if 'modified' in entity:
try:
entity_modified = long(entity.get('modified'))
if entity_modified > status_map[collection_name]['max_modified']:
status_map[collection_name]['max_modified'] = entity_modified
status_map[collection_name]['max_modified_str'] = str(
datetime.datetime.fromtimestamp(entity_modified / 1000))
if entity_modified < status_map[collection_name]['min_modified']:
status_map[collection_name]['min_modified'] = entity_modified
status_map[collection_name]['min_modified_str'] = str(
datetime.datetime.fromtimestamp(entity_modified / 1000))
except ValueError:
pass
status_map[collection_name]['bytes'] += count_bytes(entity)
status_map[collection_name]['count'] += 1
if counter % 1000 == 1:
try:
collection_worker_logger.warning(
'Sending stats for app/collection [%s / %s]: %s' % (
app, collection_name, status_map))
self.response_queue.put((app, collection_name, status_map))
if QSIZE_OK:
collection_worker_logger.info(
'Counter=%s, collection queue depth=%s' % (
counter, self.work_queue.qsize()))
except:
pass
collection_worker_logger.warn(
'Current status of collections processed: %s' % json.dumps(status_map))
if config.get('entity_sleep_time') > 0:
collection_worker_logger.debug(
'sleeping for [%s]s per entity...' % (config.get('entity_sleep_time')))
time.sleep(config.get('entity_sleep_time'))
collection_worker_logger.debug(
'STOPPED sleeping for [%s]s per entity...' % (config.get('entity_sleep_time')))
# end entity loop
status_map[collection_name]['iteration_finished'] = str(datetime.datetime.now())
collection_worker_logger.warning(
'Collection [%s / %s / %s] loop complete! Max Created entity %s' % (
config.get('org'), app, collection_name, status_map[collection_name]['max_created']))
collection_worker_logger.warning(
'Sending FINAL stats for app/collection [%s / %s]: %s' % (app, collection_name, status_map))
self.response_queue.put((app, collection_name, status_map))
collection_worker_logger.info('Done! Finished app/collection: %s / %s' % (app, collection_name))
except KeyboardInterrupt, e:
raise e
except Empty:
collection_worker_logger.warning('EMPTY! Count=%s' % empty_count)
empty_count += 1
if empty_count >= 2:
keep_going = False
except Exception as e:
logger.exception('Error in CollectionWorker processing collection [%s]' % collection_name)
print traceback.format_exc()
finally:
self.response_queue.put((app, collection_name, status_map))
collection_worker_logger.info('FINISHED!')
def use_name_for_collection(collection_name):
return collection_name in config.get('use_name_for_collection', [])
def include_edge(collection_name, edge_name):
include_edges = config.get('include_edge', [])
if include_edges is None:
include_edges = []
exclude_edges = config.get('exclude_edge', [])
if exclude_edges is None:
exclude_edges = []
if len(include_edges) > 0 and edge_name not in include_edges:
logger.debug(
'Skipping edge [%s] since it is not in INCLUDED list: %s' % (edge_name, include_edges))
return False
if edge_name in exclude_edges:
logger.debug(
'Skipping edge [%s] since it is in EXCLUDED list: %s' % (edge_name, exclude_edges))
return False
if (collection_name in ['users', 'user'] and edge_name in ['followers', 'feed', 'activities']) \
or (collection_name in ['receipts', 'receipt'] and edge_name in ['device', 'devices']):
# feed and activities are not retrievable...
# roles and groups will be more efficiently handled from the role/group -> user
# followers will be handled by 'following'
# do only this from user -> device
return False
return True
def exclude_edge(collection_name, edge_name):
exclude_edges = config.get('exclude_edge', [])
if exclude_edges is None:
exclude_edges = []
if edge_name in exclude_edges:
logger.debug('Skipping edge [%s] since it is in EXCLUDED list: %s' % (edge_name, exclude_edges))
return True
if (collection_name in ['users', 'user'] and edge_name in ['followers', 'feed', 'activities']) \
or (collection_name in ['receipts', 'receipt'] and edge_name in ['device', 'devices']):
# feed and activities are not retrievable...
# roles and groups will be more efficiently handled from the role/group -> user
# followers will be handled by 'following'
# do only this from user -> device
return True
return False
def confirm_user_entity(app, source_entity, attempts=0):
attempts += 1
source_entity_url = get_entity_url_template.format(org=config.get('org'),
app=app,
collection='users',
uuid=source_entity.get('username'),
**config.get('source_endpoint'))
if attempts >= 5:
logger.warning('Punting after [%s] attempts to confirm user at URL [%s], will use the source entity...' % (
attempts, source_entity_url))
return source_entity
r = requests.get(url=source_entity_url)
if r.status_code == 200:
retrieved_entity = r.json().get('entities')[0]
if retrieved_entity.get('uuid') != source_entity.get('uuid'):
logger.info(
'UUID of Source Entity [%s] differs from uuid [%s] of retrieved entity at URL=[%s] and will be substituted' % (
source_entity.get('uuid'), retrieved_entity.get('uuid'), source_entity_url))
return retrieved_entity
elif 'service_resource_not_found' in r.text:
logger.warn('Unable to retrieve user at URL [%s], and will use source entity. status=[%s] response: %s...' % (
source_entity_url, r.status_code, r.text))
return source_entity
else:
logger.error('After [%s] attempts to confirm user at URL [%s], received status [%s] message: %s...' % (
attempts, source_entity_url, r.status_code, r.text))
time.sleep(DEFAULT_RETRY_SLEEP)
return confirm_user_entity(app, source_entity, attempts)
def create_connection(app, collection_name, source_entity, edge_name, target_entity):
target_app, target_collection, target_org = get_target_mapping(app, collection_name)
source_identifier = get_source_identifier(source_entity)
target_identifier = get_source_identifier(target_entity)
source_type_id = '%s/%s' % (source_entity.get('type'), source_identifier)
target_type_id = '%s/%s' % (target_entity.get('type'), target_identifier)
if source_entity.get('type') == 'user':
source_type_id = '%s/%s' % ('users', source_entity.get('username'))
if target_entity.get('type') == 'user':
if edge_name == 'users':
target_type_id = target_entity.get('uuid')
else:
target_type_id = '%s/%s' % ('users', target_entity.get('uuid'))
if target_entity.get('type') == 'device':
if edge_name == 'devices':
target_type_id = target_entity.get('uuid')
else:
target_type_id = '%s/%s' % ('devices', target_entity.get('uuid'))
if target_entity.get('type') == 'receipt':
if edge_name == 'receipts':
target_type_id = target_entity.get('uuid')
else:
target_type_id = '%s/%s' % ('receipts', target_entity.get('uuid'))
create_connection_url = connection_create_by_pairs_url_template.format(
org=target_org,
app=target_app,
source_type_id=source_type_id,
verb=edge_name,
target_type_id=target_type_id,
**config.get('target_endpoint'))
if not config.get('skip_cache_read', False):
processed = cache.get(create_connection_url)
if processed not in [None, 'None']:
logger.debug('Skipping visited Edge: [%s / %s / %s] --[%s]--> [%s / %s / %s]: %s ' % (
app, collection_name, source_identifier, edge_name, target_app, target_entity.get('type'),
target_entity.get('name'), create_connection_url))
return True
logger.info('Connecting entity [%s / %s / %s] --[%s]--> [%s / %s / %s]: %s ' % (
app, collection_name, source_identifier, edge_name, target_app, target_entity.get('type'),
target_entity.get('name', target_entity.get('uuid')), create_connection_url))
attempts = 0
while attempts < 5:
attempts += 1
r_create = session_target.post(create_connection_url)
if r_create.status_code == 200:
if not config.get('skip_cache_write', False):
cache.set(create_connection_url, 1)
return True
else:
if r_create.status_code >= 500:
if attempts < 5:
logger.warning('FAILED [%s] (will retry) to create connection at URL=[%s]: %s' % (
r_create.status_code, create_connection_url, r_create.text))
time.sleep(DEFAULT_RETRY_SLEEP)
else:
logger.critical(
'FAILED [%s] (WILL NOT RETRY - max attempts) to create connection at URL=[%s]: %s' % (
r_create.status_code, create_connection_url, r_create.text))
return False
elif r_create.status_code in [401, 404]:
if config.get('repair_data', False):
logger.warning('FAILED [%s] (WILL attempt repair) to create connection at URL=[%s]: %s' % (
r_create.status_code, create_connection_url, r_create.text))
migrate_data(app, source_entity.get('type'), source_entity, force=True)
migrate_data(app, target_entity.get('type'), target_entity, force=True)
else:
logger.critical('FAILED [%s] (WILL NOT attempt repair) to create connection at URL=[%s]: %s' % (
r_create.status_code, create_connection_url, r_create.text))
else:
logger.warning('FAILED [%s] (will retry) to create connection at URL=[%s]: %s' % (
r_create.status_code, create_connection_url, r_create.text))
return False
def process_edges(app, collection_name, source_entity, edge_name, connection_stack):
source_identifier = get_source_identifier(source_entity)
while len(connection_stack) > 0:
target_entity = connection_stack.pop()
if exclude_collection(collection_name) or exclude_collection(target_entity.get('type')):
logger.debug('EXCLUDING Edge (collection): [%s / %s / %s] --[%s]--> ?' % (
app, collection_name, source_identifier, edge_name ))
continue
create_connection(app, collection_name, source_entity, edge_name, target_entity)
def migrate_out_graph_edge_type(app, collection_name, source_entity, edge_name, depth=0):
if not include_edge(collection_name, edge_name):
return True
source_uuid = source_entity.get('uuid')
key = '%s:edge:out:%s:%s' % (key_version, source_uuid, edge_name)
if not config.get('skip_cache_read', False):
date_visited = cache.get(key)
if date_visited not in [None, 'None']:
logger.info('Skipping EDGE [%s / %s --%s-->] - visited at %s' % (
collection_name, source_uuid, edge_name, date_visited))
return True
else:
cache.delete(key)
if not config.get('skip_cache_write', False):
cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2))
logger.debug('Visiting EDGE [%s / %s (%s) --%s-->] at %s' % (
collection_name, source_uuid, get_uuid_time(source_uuid), edge_name, str(datetime.datetime.utcnow())))
response = True
source_identifier = get_source_identifier(source_entity)
count_edges = 0
logger.debug(
'Processing edge type=[%s] of entity [%s / %s / %s]' % (edge_name, app, collection_name, source_identifier))
target_app, target_collection, target_org = get_target_mapping(app, collection_name)
connection_query_url = connection_query_url_template.format(
org=config.get('org'),
app=app,
verb=edge_name,
collection=collection_name,
uuid=source_identifier,
limit=config.get('limit'),
**config.get('source_endpoint'))
connection_query = UsergridQueryIterator(connection_query_url, sleep_time=config.get('error_retry_sleep'))
connection_stack = []
for target_entity in connection_query:
target_connection_collection = config.get('collection_mapping', {}).get(target_entity.get('type'),
target_entity.get('type'))
target_ok = migrate_graph(app, target_entity.get('type'), source_entity=target_entity, depth=depth)
if not target_ok:
logger.critical(
'Error migrating TARGET entity data for connection [%s / %s / %s] --[%s]--> [%s / %s / %s]' % (
app, collection_name, source_identifier, edge_name, app, target_connection_collection,
target_entity.get('name', target_entity.get('uuid'))))
count_edges += 1
connection_stack.append(target_entity)
process_edges(app, collection_name, source_entity, edge_name, connection_stack)
return response
def get_source_identifier(source_entity):
entity_type = source_entity.get('type')
source_identifier = source_entity.get('uuid')
if use_name_for_collection(entity_type):
if entity_type in ['user']:
source_identifier = source_entity.get('username')
else:
source_identifier = source_entity.get('name')
if source_identifier is None:
source_identifier = source_entity.get('uuid')
logger.warn('Using UUID for entity [%s / %s]' % (entity_type, source_identifier))
return source_identifier
def include_collection(collection_name):
if collection_name in ['events']:
return False
include = config.get('collection', [])
if include is not None and len(include) > 0 and collection_name not in include:
return False
exclude = config.get('exclude_collection', [])
if exclude is not None and collection_name in exclude:
return False
return True
def exclude_collection(collection_name):
exclude = config.get('exclude_collection', [])
if exclude is not None and collection_name in exclude:
return True
return False
def migrate_in_graph_edge_type(app, collection_name, source_entity, edge_name, depth=0):
source_uuid = source_entity.get('uuid')
key = '%s:edges:in:%s:%s' % (key_version, source_uuid, edge_name)
if not config.get('skip_cache_read', False):
date_visited = cache.get(key)
if date_visited not in [None, 'None']:
logger.info('Skipping EDGE [--%s--> %s / %s] - visited at %s' % (
collection_name, source_uuid, edge_name, date_visited))
return True
else:
cache.delete(key)
if not config.get('skip_cache_write', False):
cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2))
logger.debug('Visiting EDGE [--%s--> %s / %s (%s)] at %s' % (
edge_name, collection_name, source_uuid, get_uuid_time(source_uuid), str(datetime.datetime.utcnow())))
source_identifier = get_source_identifier(source_entity)
if exclude_collection(collection_name):
logger.debug('Excluding (Collection) entity [%s / %s / %s]' % (app, collection_name, source_uuid))
return True
if not include_edge(collection_name, edge_name):
return True
logger.debug(
'Processing edge type=[%s] of entity [%s / %s / %s]' % (edge_name, app, collection_name, source_identifier))
logger.debug('Processing IN edges type=[%s] of entity [ %s / %s / %s]' % (
edge_name, app, collection_name, source_uuid))
connecting_query_url = connecting_query_url_template.format(
org=config.get('org'),
app=app,
collection=collection_name,
uuid=source_uuid,
verb=edge_name,
limit=config.get('limit'),
**config.get('source_endpoint'))
connection_query = UsergridQueryIterator(connecting_query_url, sleep_time=config.get('error_retry_sleep'))
response = True
for e_connection in connection_query:
logger.debug('Triggering IN->OUT edge migration on entity [%s / %s / %s] ' % (
app, e_connection.get('type'), e_connection.get('uuid')))
response = migrate_graph(app, e_connection.get('type'), e_connection, depth) and response
return response
def migrate_graph(app, collection_name, source_entity, depth=0):
depth += 1
source_uuid = source_entity.get('uuid')
# short circuit if the graph depth exceeds what was specified
if depth > config.get('graph_depth', 1):
logger.debug(
'Reached Max Graph Depth, stopping after [%s] on [%s / %s]' % (depth, collection_name, source_uuid))
return True
else:
logger.debug('Processing @ Graph Depth [%s]' % depth)
if exclude_collection(collection_name):
logger.warn('Ignoring entity in filtered collection [%s]' % collection_name)
return True
key = '%s:graph:%s' % (key_version, source_uuid)
entity_tag = '[%s / %s / %s (%s)]' % (app, collection_name, source_uuid, get_uuid_time(source_uuid))
if not config.get('skip_cache_read', False):
date_visited = cache.get(key)
if date_visited not in [None, 'None']:
logger.debug('Skipping GRAPH %s at %s' % (entity_tag, date_visited))
return True
else:
cache.delete(key)
logger.info('Visiting GRAPH %s at %s' % (entity_tag, str(datetime.datetime.utcnow())))
if not config.get('skip_cache_write', False):
cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2))
# first, migrate data for current node
response = migrate_data(app, collection_name, source_entity)
# gather the outbound edge names
out_edge_names = [edge_name for edge_name in source_entity.get('metadata', {}).get('collections', [])]
out_edge_names += [edge_name for edge_name in source_entity.get('metadata', {}).get('connections', [])]
logger.debug('Entity %s has [%s] OUT edges' % (entity_tag, len(out_edge_names)))
# migrate each outbound edge type
for edge_name in out_edge_names:
if not exclude_edge(collection_name, edge_name):
response = migrate_out_graph_edge_type(app, collection_name, source_entity, edge_name, depth) and response
if config.get('prune', False):
prune_edge_by_name(edge_name, app, collection_name, source_entity)
# gather the inbound edge names
in_edge_names = [edge_name for edge_name in source_entity.get('metadata', {}).get('connecting', [])]
logger.debug('Entity %s has [%s] IN edges' % (entity_tag, len(in_edge_names)))
# migrate each inbound edge type
for edge_name in in_edge_names:
if not exclude_edge(collection_name, edge_name):
response = migrate_in_graph_edge_type(app, collection_name, source_entity, edge_name,
depth) and response
return response
def collect_entities(q):
response = {}
for e in q:
response[e.get('uuid')] = e
return response
def prune_edge_by_name(edge_name, app, collection_name, source_entity):
if not include_edge(collection_name, edge_name):
return True
source_identifier = get_source_identifier(source_entity)
source_uuid = source_entity.get('uuid')
entity_tag = '[%s / %s / %s (%s)]' % (app, collection_name, source_uuid, get_uuid_time(source_uuid))
target_app, target_collection, target_org = get_target_mapping(app, collection_name)
target_connection_query_url = connection_query_url_template.format(
org=target_org,
app=target_app,
verb=edge_name,
collection=target_collection,
uuid=source_identifier,
limit=config.get('limit'),
**config.get('target_endpoint'))
source_connection_query_url = connection_query_url_template.format(
org=config.get('org'),
app=app,
verb=edge_name,
collection=collection_name,
uuid=source_identifier,
limit=config.get('limit'),
**config.get('source_endpoint'))
source_connections = collect_entities(
UsergridQueryIterator(source_connection_query_url, sleep_time=config.get('error_retry_sleep')))
target_connections = collect_entities(
UsergridQueryIterator(target_connection_query_url, sleep_time=config.get('error_retry_sleep')))
delete_uuids = Set(target_connections.keys()) - Set(source_connections.keys())
if len(delete_uuids) > 0:
logger.info('Found [%s] edges to delete for entity %s' % (len(delete_uuids), entity_tag))
for delete_uuid in delete_uuids:
delete_connection_url = connection_create_by_uuid_url_template.format(
org=target_org,
app=target_app,
verb=edge_name,
collection=target_collection,
uuid=source_identifier,
target_uuid=delete_uuid,
**config.get('target_endpoint'))
attempts = 0
while attempts < 5:
attempts += 1
r = session_target.delete(delete_connection_url)
if not config.get('skip_cache_write'):
cache.delete(delete_connection_url)
if r.status_code == 200:
logger.info('Pruned edge on attempt [%s] URL=[%s]' % (attempts, delete_connection_url))
break
else:
logger.error('Error [%s] on attempt [%s] deleting connection at URL=[%s]: %s' % (
r.status_code, attempts, delete_connection_url, r.text))
time.sleep(DEFAULT_RETRY_SLEEP)
return True
def prune_graph(app, collection_name, source_entity):
source_uuid = source_entity.get('uuid')
key = '%s:prune_graph:%s' % (key_version, source_uuid)
entity_tag = '[%s / %s / %s (%s)]' % (app, collection_name, source_uuid, get_uuid_time(source_uuid))
if not config.get('skip_cache_read', False):
date_visited = cache.get(key)
if date_visited not in [None, 'None']:
logger.debug('Skipping PRUNE %s at %s' % (entity_tag, date_visited))
return True
else:
cache.delete(key)
logger.debug('pruning GRAPH %s at %s' % (entity_tag, str(datetime.datetime.utcnow())))
if not config.get('skip_cache_write', False):
cache.set(name=key, value=str(int(time.time())), ex=config.get('visit_cache_ttl', 3600 * 2))
if collection_name in config.get('exclude_collection', []):
logger.debug('Excluding (Collection) entity %s' % entity_tag)
return True
out_edge_names = [edge_name for edge_name in source_entity.get('metadata', {}).get('collections', [])]
out_edge_names += [edge_name for edge_name in source_entity.get('metadata', {}).get('connections', [])]
for edge_name in out_edge_names:
prune_edge_by_name(edge_name, app, collection_name, source_entity)
def reput(app, collection_name, source_entity, attempts=0):
source_identifier = source_entity.get('uuid')
target_app, target_collection, target_org = get_target_mapping(app, collection_name)
try:
target_entity_url_by_name = put_entity_url_template.format(org=target_org,
app=target_app,
collection=target_collection,
uuid=source_identifier,
**config.get('target_endpoint'))
r = session_source.put(target_entity_url_by_name, data=json.dumps({}))
if r.status_code != 200:
logger.info('HTTP [%s]: %s' % (target_entity_url_by_name, r.status_code))
else:
logger.debug('HTTP [%s]: %s' % (target_entity_url_by_name, r.status_code))
except:
pass
def get_uuid_time(the_uuid_string):
return time_uuid.TimeUUID(the_uuid_string).get_datetime()
def migrate_permissions(app, collection_name, source_entity, attempts=0):
if collection_name not in ['roles', 'role', 'group', 'groups']:
return True
target_app, target_collection, target_org = get_target_mapping(app, collection_name)
source_identifier = get_source_identifier(source_entity)
source_permissions_url = permissions_url_template.format(org=config.get('org'),
app=app,
collection=collection_name,
uuid=source_identifier,
**config.get('source_endpoint'))
r = session_source.get(source_permissions_url)
if r.status_code != 200:
logger.error('Unable to get permissions at URL [%s]: %s' % (source_permissions_url, r.text))
return False
perm_response = r.json()
perms = perm_response.get('data', [])
logger.info('Migrating [%s / %s] with permissions %s' % (collection_name, source_identifier, perms))
if len(perms) > 0:
target_permissions_url = permissions_url_template.format(org=target_org,
app=target_app,
collection=target_collection,
uuid=source_identifier,
**config.get('target_endpoint'))
for permission in perms:
data = {'permission': permission}
logger.info('Posting permission %s to %s' % (json.dumps(data), target_permissions_url))
r = session_target.post(target_permissions_url, json.dumps(data))
if r.status_code != 200:
logger.error(
'ERROR posting permission %s to URL=[%s]: %s' % (
json.dumps(data), target_permissions_url, r.text))
return True
def migrate_data(app, collection_name, source_entity, attempts=0, force=False):
if config.get('skip_data') and not force:
return True
# check the cache to see if this entity has changed
if not config.get('skip_cache_read', False) and not force:
try:
str_modified = cache.get(source_entity.get('uuid'))
if str_modified not in [None, 'None']:
modified = long(str_modified)
logger.debug('FOUND CACHE: %s = %s ' % (source_entity.get('uuid'), modified))
if modified <= source_entity.get('modified'):
modified_date = datetime.datetime.utcfromtimestamp(modified / 1000)
e_uuid = source_entity.get('uuid')
uuid_datetime = time_uuid.TimeUUID(e_uuid).get_datetime()
logger.debug('Skipping ENTITY: %s / %s / %s / %s (%s) / %s (%s)' % (
config.get('org'), app, collection_name, e_uuid, uuid_datetime, modified, modified_date))
return True
else:
logger.debug('DELETING CACHE: %s ' % (source_entity.get('uuid')))
cache.delete(source_entity.get('uuid'))
except:
logger.error('Error on checking cache for uuid=[%s]' % source_entity.get('uuid'))
logger.error(traceback.format_exc())
if exclude_collection(collection_name):
logger.warn('Excluding entity in filtered collection [%s]' % collection_name)
return True
# handle duplicate user case
if collection_name in ['users', 'user']:
source_entity = confirm_user_entity(app, source_entity)
source_identifier = get_source_identifier(source_entity)
logger.info('Visiting ENTITY data [%s / %s (%s) ] at %s' % (
collection_name, source_identifier, get_uuid_time(source_entity.get('uuid')), str(datetime.datetime.utcnow())))
entity_copy = source_entity.copy()
if 'metadata' in entity_copy:
entity_copy.pop('metadata')
target_app, target_collection, target_org = get_target_mapping(app, collection_name)
try:
target_entity_url_by_name = put_entity_url_template.format(org=target_org,
app=target_app,
collection=target_collection,
uuid=source_identifier,
**config.get('target_endpoint'))
r = session_target.put(url=target_entity_url_by_name, data=json.dumps(entity_copy))
if attempts > 1:
logger.warn('Attempt [%s] to migrate entity [%s / %s] at URL [%s]' % (
attempts, collection_name, source_identifier, target_entity_url_by_name))
else:
logger.debug('Attempt [%s] to migrate entity [%s / %s] at URL [%s]' % (
attempts, collection_name, source_identifier, target_entity_url_by_name))
if r.status_code == 200:
# Worked => WE ARE DONE
logger.info(
'migrate_data | success=[%s] | attempts=[%s] | entity=[%s / %s / %s] | created=[%s] | modified=[%s]' % (
True, attempts, config.get('org'), app, source_identifier, source_entity.get('created'),
source_entity.get('modified'),))
if not config.get('skip_cache_write', False):
logger.debug('SETTING CACHE | uuid=[%s] | modified=[%s]' % (
source_entity.get('uuid'), str(source_entity.get('modified'))))
cache.set(source_entity.get('uuid'), str(source_entity.get('modified')))
if collection_name in ['role', 'group', 'roles', 'groups']:
migrate_permissions(app, collection_name, source_entity, attempts=0)
if collection_name in ['users', 'user']:
migrate_user_credentials(app, collection_name, source_entity, attempts=0)
return True
else:
logger.error('Failure [%s] on attempt [%s] to PUT url=[%s], entity=[%s] response=[%s]' % (
r.status_code, attempts, target_entity_url_by_name, json.dumps(source_entity), r.text))
if attempts >= 5:
logger.critical(
'ABORT migrate_data | success=[%s] | attempts=[%s] | created=[%s] | modified=[%s] %s / %s / %s' % (
True, attempts, source_entity.get('created'), source_entity.get('modified'), app,
collection_name, source_identifier))
return False
if r.status_code == 400:
if target_collection in ['roles', 'role']:
return repair_user_role(app, collection_name, source_entity)
elif target_collection in ['users', 'user']:
return handle_user_migration_conflict(app, collection_name, source_entity)
elif 'duplicate_unique_property_exists' in r.text:
logger.error(
'WILL NOT RETRY (duplicate) [%s] attempts to PUT url=[%s], entity=[%s] response=[%s]' % (
attempts, target_entity_url_by_name, json.dumps(source_entity), r.text))
return False
elif r.status_code == 403:
logger.critical(
'ABORT migrate_data | success=[%s] | attempts=[%s] | created=[%s] | modified=[%s] %s / %s / %s' % (
False, attempts, source_entity.get('created'), source_entity.get('modified'), app,
collection_name, source_identifier))
return False
except:
logger.error(traceback.format_exc())
logger.error('error in migrate_data on entity: %s' % json.dumps(source_entity))
logger.warn(
'UNSUCCESSFUL migrate_data | success=[%s] | attempts=[%s] | entity=[%s / %s / %s] | created=[%s] | modified=[%s]' % (
True, attempts, config.get('org'), app, source_identifier, source_entity.get('created'),
source_entity.get('modified'),))
return migrate_data(app, collection_name, source_entity, attempts=attempts + 1)
def handle_user_migration_conflict(app, collection_name, source_entity, attempts=0, depth=0):
if collection_name in ['users', 'user']:
return False
username = source_entity.get('username')
target_app, target_collection, target_org = get_target_mapping(app, collection_name)
target_entity_url = get_entity_url_template.format(org=target_org,
app=target_app,
collection=target_collection,
uuid=username,
**config.get('target_endpoint'))
# There is retry build in, here is the short circuit
if attempts >= 5:
logger.critical(
'Aborting after [%s] attempts to audit user [%s] at URL [%s]' % (attempts, username, target_entity_url))
return False
r = session_target.get(url=target_entity_url)
if r.status_code == 200:
target_entity = r.json().get('entities')[0]
if source_entity.get('created') < target_entity.get('created'):
return repair_user_role(app, collection_name, source_entity)
elif r.status_code / 100 == 5:
audit_logger.warning(
'CONFLICT: handle_user_migration_conflict failed attempt [%s] GET [%s] on TARGET URL=[%s] - : %s' % (
attempts, r.status_code, target_entity_url, r.text))
time.sleep(DEFAULT_RETRY_SLEEP)
return handle_user_migration_conflict(app, collection_name, source_entity, attempts)
else:
audit_logger.error(
'CONFLICT: Failed handle_user_migration_conflict attempt [%s] GET [%s] on TARGET URL=[%s] - : %s' % (
attempts, r.status_code, target_entity_url, r.text))
return False
def get_best_source_entity(app, collection_name, source_entity, depth=0):
target_app, target_collection, target_org = get_target_mapping(app, collection_name)
target_pk = 'uuid'
if target_collection in ['users', 'user']:
target_pk = 'username'
elif target_collection in ['roles', 'role']:
target_pk = 'name'
target_name = source_entity.get(target_pk)
# there should be no target entity now, we just need to decide which one from the source to use
source_entity_url_by_name = get_entity_url_template.format(org=config.get('org'),
app=app,
collection=collection_name,
uuid=target_name,
**config.get('source_endpoint'))
r_get_source_entity = session_source.get(source_entity_url_by_name)
# if we are able to get at the source by PK...
if r_get_source_entity.status_code == 200:
# extract the entity from the response
entity_from_get = r_get_source_entity.json().get('entities')[0]
return entity_from_get
elif r_get_source_entity.status_code / 100 == 4:
# wasn't found, get by QL and sort
source_entity_query_url = collection_query_url_template.format(org=config.get('org'),
app=app,
collection=collection_name,
ql='select * where %s=\'%s\' order by created asc' % (
target_pk, target_name),
limit=config.get('limit'),
**config.get('source_endpoint'))
logger.info('Attempting to determine best entity from query on URL %s' % source_entity_query_url)
q = UsergridQueryIterator(source_entity_query_url, sleep_time=config.get('error_retry_sleep'))
desired_entity = None
entity_counter = 0
for e in q:
entity_counter += 1
if desired_entity is None:
desired_entity = e
elif e.get('created') < desired_entity.get('created'):
desired_entity = e
if desired_entity is None:
logger.warn('Unable to determine best of [%s] entities from query on URL %s' % (
entity_counter, source_entity_query_url))
return source_entity
else:
return desired_entity
else:
return source_entity
def repair_user_role(app, collection_name, source_entity, attempts=0, depth=0):
target_app, target_collection, target_org = get_target_mapping(app, collection_name)
# For the users collection, there seemed to be cases where a USERNAME was created/existing with the a
# different UUID which caused a 'collision' - so the point is to delete the entity with the differing
# UUID by UUID and then do a recursive call to migrate the data - now that the collision has been cleared
target_pk = 'uuid'
if target_collection in ['users', 'user']:
target_pk = 'username'
elif target_collection in ['roles', 'role']:
target_pk = 'name'
target_name = source_entity.get(target_pk)
target_entity_url_by_name = get_entity_url_template.format(org=target_org,
app=target_app,
collection=target_collection,
uuid=target_name,
**config.get('target_endpoint'))
logger.warning('Repairing: Deleting name=[%s] entity at URL=[%s]' % (target_name, target_entity_url_by_name))
r = session_target.delete(target_entity_url_by_name)
if r.status_code == 200 or (r.status_code in [404, 401] and 'service_resource_not_found' in r.text):
logger.info('Deletion of entity at URL=[%s] was [%s]' % (target_entity_url_by_name, r.status_code))
best_source_entity = get_best_source_entity(app, collection_name, source_entity)
target_entity_url_by_uuid = get_entity_url_template.format(org=target_org,
app=target_app,
collection=target_collection,
uuid=best_source_entity.get('uuid'),
**config.get('target_endpoint'))
r = session_target.put(target_entity_url_by_uuid, data=json.dumps(best_source_entity))
if r.status_code == 200:
logger.info('Successfully repaired user at URL=[%s]' % target_entity_url_by_uuid)
return True
else:
logger.critical('Failed to PUT [%s] the desired entity at URL=[%s]: %s' % (
r.status_code, target_entity_url_by_name, r.text))
return False
else:
# log an error and keep going if we cannot delete the entity at the specified URL. Unlikely, but if so
# then this entity is borked
logger.critical(
'Deletion of entity at URL=[%s] FAILED [%s]: %s' % (target_entity_url_by_name, r.status_code, r.text))
return False
def get_target_mapping(app, collection_name):
target_org = config.get('org_mapping', {}).get(config.get('org'), config.get('org'))
target_app = config.get('app_mapping', {}).get(app, app)
target_collection = config.get('collection_mapping', {}).get(collection_name, collection_name)
return target_app, target_collection, target_org
def parse_args():
parser = argparse.ArgumentParser(description='Usergrid Org/App Migrator')
parser.add_argument('--log_dir',
help='path to the place where logs will be written',
default='./',
type=str,
required=False)
parser.add_argument('--log_level',
help='log level - DEBUG, INFO, WARN, ERROR, CRITICAL',
default='INFO',
type=str,
required=False)
parser.add_argument('-o', '--org',
help='Name of the org to migrate',
type=str,
required=True)
parser.add_argument('-a', '--app',
help='Name of one or more apps to include, specify none to include all apps',
required=False,
action='append')
parser.add_argument('-e', '--include_edge',
help='Name of one or more edges/connection types to INCLUDE, specify none to include all edges',
required=False,
action='append')
parser.add_argument('--exclude_edge',
help='Name of one or more edges/connection types to EXCLUDE, specify none to include all edges',
required=False,
action='append')
parser.add_argument('--exclude_collection',
help='Name of one or more collections to EXCLUDE, specify none to include all collections',
required=False,
action='append')
parser.add_argument('-c', '--collection',
help='Name of one or more collections to include, specify none to include all collections',
default=[],
action='append')
parser.add_argument('--force_app',
help='Necessary for using 2.0 as a source at times due to API issues. Forces the specified app(s) to be processed, even if they are not returned from the list of apps in the API call',
default=[],
action='append')
parser.add_argument('--use_name_for_collection',
help='Name of one or more collections to use [name] instead of [uuid] for creating entities and edges',
default=[],
action='append')
parser.add_argument('-m', '--migrate',
help='Specifies what to migrate: data, connections, credentials, audit or none (just iterate '
'the apps/collections)',
type=str,
choices=[
'data',
'prune',
'none',
'reput',
'credentials',
'graph',
'permissions'
],
default='data')
parser.add_argument('-s', '--source_config',
help='The path to the source endpoint/org configuration file',
type=str,
default='source.json')
parser.add_argument('-d', '--target_config',
help='The path to the target endpoint/org configuration file',
type=str,
default='destination.json')
parser.add_argument('--redis_socket',
help='The path to the socket for redis to use',
type=str)
parser.add_argument('--limit',
help='The number of entities to return per query request',
type=int,
default=100)
parser.add_argument('-w', '--entity_workers',
help='The number of worker processes to do the migration',
type=int,
default=16)
parser.add_argument('--visit_cache_ttl',
help='The TTL of the cache of visiting nodes in the graph for connections',
type=int,
default=3600 * 2)
parser.add_argument('--error_retry_sleep',
help='The number of seconds to wait between retrieving after an error',
type=float,
default=30)
parser.add_argument('--page_sleep_time',
help='The number of seconds to wait between retrieving pages from the UsergridQueryIterator',
type=float,
default=0)
parser.add_argument('--entity_sleep_time',
help='The number of seconds to wait between retrieving pages from the UsergridQueryIterator',
type=float,
default=0)
parser.add_argument('--collection_workers',
help='The number of worker processes to do the migration',
type=int,
default=2)
parser.add_argument('--queue_size_max',
help='The max size of entities to allow in the queue',
type=int,
default=100000)
parser.add_argument('--graph_depth',
help='The graph depth to traverse to copy',
type=int,
default=3)
parser.add_argument('--queue_watermark_high',
help='The point at which publishing to the queue will PAUSE until it is at or below low watermark',
type=int,
default=25000)
parser.add_argument('--min_modified',
help='Break when encountering a modified date before this, per collection',
type=int,
default=0)
parser.add_argument('--max_modified',
help='Break when encountering a modified date after this, per collection',
type=long,
default=3793805526000)
parser.add_argument('--queue_watermark_low',
help='The point at which publishing to the queue will RESUME after it has reached the high watermark',
type=int,
default=5000)
parser.add_argument('--ql',
help='The QL to use in the filter for reading data from collections',
type=str,
default='select * order by created asc')
# default='select * order by created asc')
parser.add_argument('--repair_data',
help='Repair data when iterating/migrating graph but skipping data',
action='store_true')
parser.add_argument('--prune',
help='Prune the graph while processing (instead of the prune operation)',
action='store_true')
parser.add_argument('--skip_data',
help='Skip migrating data (useful for connections only)',
action='store_true')
parser.add_argument('--skip_credentials',
help='Skip migrating credentials',
action='store_true')
parser.add_argument('--skip_cache_read',
help='Skip reading the cache (modified timestamps and graph edges)',
dest='skip_cache_read',
action='store_true')
parser.add_argument('--skip_cache_write',
help='Skip updating the cache with modified timestamps of entities and graph edges',
dest='skip_cache_write',
action='store_true')
parser.add_argument('--create_apps',
help='Create apps at the target if they do not exist',
dest='create_apps',
action='store_true')
parser.add_argument('--nohup',
help='specifies not to use stdout for logging',
action='store_true')
parser.add_argument('--graph',
help='Use GRAPH instead of Query',
dest='graph',
action='store_true')
parser.add_argument('--su_username',
help='Superuser username',
required=False,
type=str)
parser.add_argument('--su_password',
help='Superuser Password',
required=False,
type=str)
parser.add_argument('--inbound_connections',
help='Name of the org to migrate',
action='store_true')
parser.add_argument('--map_app',
help="Multiple allowed: A colon-separated string such as 'apples:oranges' which indicates to"
" put data from the app named 'apples' from the source endpoint into app named 'oranges' "
"in the target endpoint",
default=[],
action='append')
parser.add_argument('--map_collection',
help="One or more colon-separated string such as 'cats:dogs' which indicates to put data from "
"collections named 'cats' from the source endpoint into a collection named 'dogs' in the "
"target endpoint, applicable globally to all apps",
default=[],
action='append')
parser.add_argument('--map_org',
help="One or more colon-separated strings such as 'red:blue' which indicates to put data from "
"org named 'red' from the source endpoint into a collection named 'blue' in the target "
"endpoint",
default=[],
action='append')
my_args = parser.parse_args(sys.argv[1:])
return vars(my_args)
def init():
global config
if config.get('migrate') == 'credentials':
if config.get('su_password') is None or config.get('su_username') is None:
message = 'ABORT: In order to migrate credentials, Superuser parameters (su_password, su_username) are required'
print message
logger.critical(message)
exit()
config['collection_mapping'] = {}
config['app_mapping'] = {}
config['org_mapping'] = {}
for mapping in config.get('map_collection', []):
parts = mapping.split(':')
if len(parts) == 2:
config['collection_mapping'][parts[0]] = parts[1]
else:
logger.warning('Skipping Collection mapping: [%s]' % mapping)
for mapping in config.get('map_app', []):
parts = mapping.split(':')
if len(parts) == 2:
config['app_mapping'][parts[0]] = parts[1]
else:
logger.warning('Skipping App mapping: [%s]' % mapping)
for mapping in config.get('map_org', []):
parts = mapping.split(':')
if len(parts) == 2:
config['org_mapping'][parts[0]] = parts[1]
logger.info('Mapping Org [%s] to [%s] from mapping [%s]' % (parts[0], parts[1], mapping))
else:
logger.warning('Skipping Org mapping: [%s]' % mapping)
with open(config.get('source_config'), 'r') as f:
config['source_config'] = json.load(f)
with open(config.get('target_config'), 'r') as f:
config['target_config'] = json.load(f)
if config['exclude_collection'] is None:
config['exclude_collection'] = []
config['source_endpoint'] = config['source_config'].get('endpoint').copy()
config['source_endpoint'].update(config['source_config']['credentials'][config['org']])
target_org = config.get('org_mapping', {}).get(config.get('org'), config.get('org'))
config['target_endpoint'] = config['target_config'].get('endpoint').copy()
config['target_endpoint'].update(config['target_config']['credentials'][target_org])
def wait_for(threads, label, sleep_time=60):
wait = True
logger.info('Starting to wait for [%s] threads with sleep time=[%s]' % (len(threads), sleep_time))
while wait:
wait = False
alive_count = 0
for t in threads:
if t.is_alive():
alive_count += 1
logger.info('Thread [%s] is still alive' % t.name)
if alive_count > 0:
wait = True
logger.info('Continuing to wait for [%s] threads with sleep time=[%s]' % (alive_count, sleep_time))
time.sleep(sleep_time)
logger.warn('All workers [%s] done!' % label)
def count_bytes(entity):
entity_copy = entity.copy()
if 'metadata' in entity_copy:
del entity_copy['metadata']
entity_str = json.dumps(entity_copy)
return len(entity_str)
def migrate_user_credentials(app, collection_name, source_entity, attempts=0):
# this only applies to users
if collection_name not in ['users', 'user'] \
or config.get('skip_credentials', False):
return False
source_identifier = get_source_identifier(source_entity)
target_app, target_collection, target_org = get_target_mapping(app, collection_name)
# get the URLs for the source and target users
source_url = user_credentials_url_template.format(org=config.get('org'),
app=app,
uuid=source_identifier,
**config.get('source_endpoint'))
target_url = user_credentials_url_template.format(org=target_org,
app=target_app,
uuid=source_identifier,
**config.get('target_endpoint'))
# this endpoint for some reason uses basic auth...
r = requests.get(source_url, auth=HTTPBasicAuth(config.get('su_username'), config.get('su_password')))
if r.status_code != 200:
logger.error('Unable to migrate credentials due to HTTP [%s] on GET URL [%s]: %s' % (
r.status_code, source_url, r.text))
return False
source_credentials = r.json()
logger.info('Putting credentials to [%s]...' % target_url)
r = requests.put(target_url,
data=json.dumps(source_credentials),
auth=HTTPBasicAuth(config.get('su_username'), config.get('su_password')))
if r.status_code != 200:
logger.error(
'Unable to migrate credentials due to HTTP [%s] on PUT URL [%s]: %s' % (
r.status_code, target_url, r.text))
return False
logger.info('migrate_user_credentials | success=[%s] | app/collection/name = %s/%s/%s' % (
True, app, collection_name, source_entity.get('uuid')))
return True
def check_response_status(r, url, exit_on_error=True):
if r.status_code != 200:
logger.critical('HTTP [%s] on URL=[%s]' % (r.status_code, url))
logger.critical('Response: %s' % r.text)
if exit_on_error:
exit()
def do_operation(apps_and_collections, operation):
status_map = {}
logger.info('Creating queues...')
# Mac, for example, does not support the max_size for a queue in Python
if _platform == "linux" or _platform == "linux2":
entity_queue = Queue(maxsize=config.get('queue_size_max'))
collection_queue = Queue(maxsize=config.get('queue_size_max'))
collection_response_queue = Queue(maxsize=config.get('queue_size_max'))
else:
entity_queue = Queue()
collection_queue = Queue()
collection_response_queue = Queue()
logger.info('Starting entity_workers...')
collection_count = 0
# create the entity workers, but only start them (later) if there is work to do
entity_workers = [EntityWorker(entity_queue, operation) for x in xrange(config.get('entity_workers'))]
# create the collection workers, but only start them (later) if there is work to do
collection_workers = [CollectionWorker(collection_queue, entity_queue, collection_response_queue) for x in
xrange(config.get('collection_workers'))]
status_listener = StatusListener(collection_response_queue, entity_queue)
try:
# for each app, publish the (app_name, collection_name) to the queue.
# this is received by a collection worker who iterates the collection and publishes
# entities into a queue. These are received by an individual entity worker which
# executes the specified operation on the entity
for app, app_data in apps_and_collections.get('apps', {}).iteritems():
logger.info('Processing app=[%s]' % app)
status_map[app] = {
'iteration_started': str(datetime.datetime.now()),
'max_created': -1,
'max_modified': -1,
'min_created': 1584946416000,
'min_modified': 1584946416000,
'count': 0,
'bytes': 0,
'collections': {}
}
# iterate the collections which are returned.
for collection_name in app_data.get('collections'):
logger.info('Publishing app / collection: %s / %s' % (app, collection_name))
collection_count += 1
collection_queue.put((app, collection_name))
logger.info('Finished publishing [%s] collections for app [%s] !' % (collection_count, app))
# only start the threads if there is work to do
if collection_count > 0:
status_listener.start()
# start the worker processes which will iterate the collections
[w.start() for w in collection_workers]
# start the worker processes which will do the work of migrating
[w.start() for w in entity_workers]
# allow collection workers to finish
wait_for(collection_workers, label='collection_workers', sleep_time=60)
# allow entity workers to finish
wait_for(entity_workers, label='entity_workers', sleep_time=60)
status_listener.terminate()
except KeyboardInterrupt:
logger.warning('Keyboard Interrupt, aborting...')
entity_queue.close()
collection_queue.close()
collection_response_queue.close()
[os.kill(super(EntityWorker, p).pid, signal.SIGINT) for p in entity_workers]
[os.kill(super(CollectionWorker, p).pid, signal.SIGINT) for p in collection_workers]
os.kill(super(StatusListener, status_listener).pid, signal.SIGINT)
[w.terminate() for w in entity_workers]
[w.terminate() for w in collection_workers]
status_listener.terminate()
logger.info('entity_workers DONE!')
def filter_apps_and_collections(org_apps):
app_collecitons = {
'apps': {
}
}
try:
selected_apps = config.get('app')
# iterate the apps retrieved from the org
for org_app in sorted(org_apps.keys()):
logger.info('Found SOURCE App: %s' % org_app)
time.sleep(3)
for org_app in sorted(org_apps.keys()):
parts = org_app.split('/')
app = parts[1]
# if apps are specified and the current app is not in the list, skip it
if selected_apps and len(selected_apps) > 0 and app not in selected_apps:
logger.warning('Skipping app [%s] not included in process list [%s]' % (app, selected_apps))
continue
app_collecitons['apps'][app] = {
'collections': []
}
# get the list of collections from the source org/app
source_app_url = app_url_template.format(org=config.get('org'),
app=app,
**config.get('source_endpoint'))
logger.info('GET %s' % source_app_url)
r_collections = session_source.get(source_app_url)
collection_attempts = 0
# sometimes this call was not working so I put it in a loop to force it...
while r_collections.status_code != 200 and collection_attempts < 5:
collection_attempts += 1
logger.warning('FAILED: GET (%s) [%s] URL: %s' % (r_collections.elapsed, r_collections.status_code,
source_app_url))
time.sleep(DEFAULT_RETRY_SLEEP)
r_collections = session_source.get(source_app_url)
if collection_attempts >= 5:
logger.critical('Unable to get collections at URL %s, skipping app' % source_app_url)
continue
app_response = r_collections.json()
logger.info('App Response: ' + json.dumps(app_response))
app_entities = app_response.get('entities', [])
if len(app_entities) > 0:
app_entity = app_entities[0]
collections = app_entity.get('metadata', {}).get('collections', {})
logger.info('App=[%s] starting Collections=[%s]' % (app, collections))
app_collecitons['apps'][app]['collections'] = [c for c in collections if include_collection(c)]
logger.info('App=[%s] filtered Collections=[%s]' % (app, collections))
except:
print(traceback.format_exc())
return app_collecitons
def confirm_target_org_apps(apps_and_collections):
for app in apps_and_collections.get('apps'):
# it is possible to map source orgs and apps to differently named targets. This gets the
# target names for each
target_org = config.get('org_mapping', {}).get(config.get('org'), config.get('org'))
target_app = config.get('app_mapping', {}).get(app, app)
# Check that the target Org/App exists. If not, move on to the next
target_app_url = app_url_template.format(org=target_org,
app=target_app,
**config.get('target_endpoint'))
logger.info('GET %s' % target_app_url)
r_target_apps = session_target.get(target_app_url)
if r_target_apps.status_code != 200:
if config.get('create_apps', DEFAULT_CREATE_APPS):
create_app_url = org_management_app_url_template.format(org=target_org,
app=target_app,
**config.get('target_endpoint'))
app_request = {'name': target_app}
r = session_target.post(create_app_url, data=json.dumps(app_request))
if r.status_code != 200:
logger.critical('--create_apps specified and unable to create app [%s] at URL=[%s]: %s' % (
target_app, create_app_url, r.text))
logger.critical('Process will now exit')
exit()
else:
logger.warning('Created app=[%s] at URL=[%s]: %s' % (target_app, create_app_url, r.text))
else:
logger.critical('Target application DOES NOT EXIST at [%s] URL=%s' % (
r_target_apps.status_code, target_app_url))
continue
def main():
global config, cache
config = parse_args()
init()
init_logging()
logger.warn('Script starting')
try:
if config.get('redis_socket') is not None:
cache = redis.Redis(unix_socket_path=config.get('redis_socket'))
else:
# this does not try to connect to redis
cache = redis.StrictRedis(host='localhost', port=6379, db=0)
# this is necessary to test the connection to redis
cache.get('usergrid')
except:
logger.error(
'Error connecting to Redis cache, consider using Redis to be able to optimize the migration process...')
logger.error(
'Error connecting to Redis cache, consider using Redis to be able to optimize the migration process...')
time.sleep(3)
config['use_cache'] = False
config['skip_cache_read'] = True
config['skip_cache_write'] = True
org_apps = {
}
force_apps = config.get('force_app', [])
if force_apps is not None and len(force_apps) > 0:
logger.warn('Forcing only the following apps to be processed: %s' % force_apps)
for app in force_apps:
key = '%s/%s' % (app, app)
org_apps[key] = app
if len(org_apps) == 0:
source_org_mgmt_url = org_management_url_template.format(org=config.get('org'),
limit=config.get('limit'),
**config.get('source_endpoint'))
print('Retrieving apps from [%s]' % source_org_mgmt_url)
logger.info('Retrieving apps from [%s]' % source_org_mgmt_url)
try:
# list the apps for the SOURCE org
logger.info('GET %s' % source_org_mgmt_url)
r = session_source.get(source_org_mgmt_url)
if r.status_code != 200:
logger.critical(
'Abort processing: Unable to retrieve apps from [%s]: %s' % (source_org_mgmt_url, r.text))
exit()
logger.info(json.dumps(r.text))
org_apps = r.json().get('data')
except Exception:
logger.exception('ERROR Retrieving apps from [%s]' % source_org_mgmt_url)
print(traceback.format_exc())
logger.critical('Unable to retrieve apps from [%s] and will exit' % source_org_mgmt_url)
exit()
# Check the specified configuration for what to migrate/audit
if config.get('migrate') == 'graph':
operation = migrate_graph
elif config.get('migrate') == 'data':
operation = migrate_data
elif config.get('migrate') == 'prune':
operation = prune_graph
elif config.get('migrate') == 'permissions':
operation = migrate_permissions
config['collection'] = ['roles', 'groups']
logger.warn(
'Since permissions migration was specified, overwriting included collections to be %s...' % config[
'collection'])
elif config.get('migrate') == 'credentials':
operation = migrate_user_credentials
config['collection'] = ['users']
logger.warn('Since credential migration was specified, overwriting included collections to be %s' % config[
'collection'])
elif config.get('migrate') == 'reput':
operation = reput
else:
operation = None
# filter out the apps and collections based on the -c and --exclude_collection directives
apps_and_collections = filter_apps_and_collections(org_apps)
logger.warn('The following apps/collections will be processed: %s' % json.dumps(apps_and_collections))
# confirm the apps exist at the target/destination org
confirm_target_org_apps(apps_and_collections)
# execute the operation over apps and collections
do_operation(apps_and_collections, operation)
logger.warn('Script finished')
if __name__ == "__main__":
main()