blob: 08f2bf7c0c98a58d27dd2d4c631dfc3903bf1c5c [file] [log] [blame]
# */
# * Licensed to the Apache Software Foundation (ASF) under one
# * or more contributor license agreements. See the NOTICE file
# * distributed with this work for additional information
# * regarding copyright ownership. The ASF licenses this file
# * to you under the Apache License, Version 2.0 (the
# * "License"); you may not use this file except in compliance
# * with the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing,
# * software distributed under the License is distributed on an
# * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# * KIND, either express or implied. See the License for the
# * specific language governing permissions and limitations
# * under the License.
# */
from usergrid import UsergridQuery
from Queue import Empty
import argparse
import json
import time
import logging
import sys
from multiprocessing import Process, JoinableQueue
import datetime
import requests
import traceback
import urllib3.contrib.pyopenssl
__author__ = 'Jeff.West@yahoo.com'
urllib3.disable_warnings()
urllib3.contrib.pyopenssl.inject_into_urllib3()
# This was used to force a sync of C* across the regions. The idea is to query entities from
# a region where they exist using QL. Then, iterate over the results and do a GET by UUID
# in the region where the entities are 'missing'.
#
# In order for this to be successful the readcl in the "GET by UUID" region or target region
# must be set to 'ALL' - this will force a repair across the cluster.
#
# It is recommended to have the target tomcat out of the ELB for a customer. Ideally,
# you should spin up another Tomcat, leaving 2+ in the ELB for a given customer.
logger = logging.getLogger('UsergridCrossRegionRepair')
token_url_template = "{api_url}/management/token"
org_management_url_template = "{api_url}/management/organizations/{org}/applications?access_token={access_token}"
org_url_template = "{api_url}/{org}?access_token={access_token}"
app_url_template = "{api_url}/{org}/{app}?access_token={access_token}"
collection_url_template = "{api_url}/{org}/{app}/{collection}?access_token={access_token}"
collection_query_url_template = "{api_url}/{org}/{app}/{collection}?ql={ql}&access_token={access_token}&limit={limit}"
get_entity_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}?access_token={access_token}&connections=none"
put_entity_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}?access_token={access_token}"
# config can be loaded from a file
config = {}
# config = {
# "regions": {
# "us_west": {
# "api_url": "http://rut040wo:8080"
# },
# "us_east": {
# "api_url": "http://rut154ea:8080"
# },
# "eu_west": {
# "api_url": "http://localhost:8080"
# }
# },
# "management_region_id": "us_west",
# "query_region_id": "us_west",
# "get_region_ids": [
# "us_east"
# ]
# }
session_map = {}
def init_logging(stdout_enabled=True):
root_logger = logging.getLogger()
log_file_name = './cross-region-repair.log'
log_formatter = logging.Formatter(fmt='%(asctime)s | %(name)s | %(processName)s | %(levelname)s | %(message)s',
datefmt='%m/%d/%Y %I:%M:%S %p')
rotating_file = logging.handlers.RotatingFileHandler(filename=log_file_name,
mode='a',
maxBytes=204857600,
backupCount=10)
rotating_file.setFormatter(log_formatter)
rotating_file.setLevel(logging.INFO)
root_logger.addHandler(rotating_file)
root_logger.setLevel(logging.INFO)
logging.getLogger('boto').setLevel(logging.ERROR)
logging.getLogger('urllib3.connectionpool').setLevel(logging.WARN)
logging.getLogger('requests.packages.urllib3.connectionpool').setLevel(logging.WARN)
if stdout_enabled:
stdout_logger = logging.StreamHandler(sys.stdout)
stdout_logger.setFormatter(log_formatter)
stdout_logger.setLevel(logging.INFO)
root_logger.addHandler(stdout_logger)
class Worker(Process):
def __init__(self, queue, handler_function):
super(Worker, self).__init__()
logger.warning('Creating worker!')
self.queue = queue
self.handler_function = handler_function
def run(self):
logger.info('starting run()...')
keep_going = True
count_processed = 0
count_error = 0
while keep_going:
empty_count = 0
try:
org, app, collection, entity = self.queue.get(timeout=600)
logger.debug('Task: org=[%s] app=[%s] collection=[%s] entity=[%s]' % (org, app, collection, entity))
if self.handler_function is not None:
processed = self.handler_function(org=org,
app=app,
collection=collection,
entity=entity,
counter=count_processed)
if processed:
count_processed += 1
logger.info('Processed count=[%s] SUCCESS uuid/name = %s / %s' % (
count_processed, entity.get('uuid'), entity.get('name')))
else:
count_error += 1
logger.error('Processed count=[%s] ERROR uuid/name = %s / %s' % (
count_error, entity.get('uuid'), entity.get('name')))
self.queue.task_done()
except KeyboardInterrupt, e:
raise e
except Empty:
logger.warning('EMPTY!')
empty_count += 1
if empty_count > 30:
keep_going = False
logger.warning('WORKER DONE!')
def wait_for(threads, sleep_time=3000):
count_alive = 1
while count_alive > 0:
count_alive = 0
for t in threads:
if t.is_alive():
count_alive += 1
if count_alive > 0:
logger.warning('Waiting for [%s] processes to finish' % count_alive)
time.sleep(sleep_time)
def parse_args():
DEFAULT_WORKERS = 16
DEFAULT_TOKEN_TTL = 25200000
parser = argparse.ArgumentParser(description='Usergrid Cross-Region Repair Script')
parser.add_argument('-o', '--org',
help='The org to iterate',
type=str,
required=True)
parser.add_argument('-a', '--app',
help='The org to iterate',
action='append',
default=[])
parser.add_argument('-c', '--collection',
help='The org to iterate',
action='append',
default=[])
parser.add_argument('-p', '--password',
help='The Password for the token request',
type=str,
required=True)
parser.add_argument('-u', '--username',
help='The Username for the token request',
type=str,
required=True)
parser.add_argument('-w', '--workers',
help='The Password for the token request',
type=int,
default=DEFAULT_WORKERS)
parser.add_argument('--ttl',
help='The TTL for the token request',
type=int,
default=DEFAULT_TOKEN_TTL)
parser.add_argument('-l', '--limit',
help='The global limit for QL requests',
type=int,
default=DEFAULT_WORKERS * 3)
parser.add_argument('-f', '--config',
help='The file from which to load the configuration',
type=str)
my_args = parser.parse_args(sys.argv[1:])
return vars(my_args)
def get_by_UUID(org, app, collection, entity, counter, attempts=0):
response = False
if attempts >= 10:
return False
for region_id in config.get('get_region_ids', []):
url_data = config.get('regions', {}).get(region_id)
url = get_entity_url_template.format(collection=collection,
app=app,
uuid=entity.get('uuid'),
org=org,
access_token=config['access_token'],
**url_data)
logger.info('GET [%s]: %s' % ('...', url))
session = session_map[region_id]
while not response:
try:
r = session.get(url)
if r.status_code != 200:
logger.error('GET [%s] (%s): %s' % (r.status_code, r.elapsed, url))
logger.warning('Sleeping for 5 on connection retry...')
return get_by_UUID(org, app, collection, entity, counter, attempts=attempts + 1)
else:
logger.info('GET [%s] (%s): %s' % (r.status_code, r.elapsed, url))
response = True
if counter % 10 == 0:
logger.info('COUNTER=[%s] time=[%s] GET [%s]: %s' % (counter,
r.elapsed,
r.status_code,
url))
except:
logger.error(traceback.format_exc())
logger.error('EXCEPTION on GET [...] (...): %s' % url)
response = False
logger.warning('Sleeping for 5 on connection retry...')
time.sleep(5)
return response
def init(args):
global config
if args.get('config') is not None:
config_filename = args.get('config')
logger.warning('Using config file: %s' % config_filename)
try:
with open(config_filename, 'r') as f:
parsed_config = json.load(f)
logger.warning('Updating config with: %s' % parsed_config)
config.update(parsed_config)
except:
print traceback.format_exc()
for region_id, region_data in config.get('regions', {}).iteritems():
session_map[region_id] = requests.Session()
def main():
global config
args = parse_args()
init(args)
management_region_id = config.get('management_region_id', '')
management_region = config.get('regions', {}).get(management_region_id)
query_region_id = config.get('query_region_id', '')
query_region = config.get('regions', {}).get(query_region_id)
start = datetime.datetime.now()
queue = JoinableQueue()
logger.warning('Starting workers...')
init_logging()
token_request = {
'grant_type': 'password',
'username': args.get('username'),
'ttl': args.get('ttl')
}
url = token_url_template.format(**management_region)
logger.info('getting token with url=[%s] data=[%s]' % (url, token_request))
token_request['password'] = args.get('password')
r = requests.post(url, data=json.dumps(token_request))
if r.status_code != 200:
logger.critical('did not get access token! response: %s' % r.json())
exit(-1)
logger.info(r.json())
config['access_token'] = r.json().get('access_token')
org_mgmt_url = org_management_url_template.format(org=args.get('org'),
access_token=config['access_token'],
**management_region)
logger.info(org_mgmt_url)
session = session_map[management_region_id]
r = session.get(org_mgmt_url)
logger.info(r.json())
logger.info('starting [%s] workers...' % args.get('workers'))
workers = [Worker(queue, get_by_UUID) for x in xrange(args.get('workers'))]
[w.start() for w in workers]
try:
org_app_data = r.json().get('data')
logger.info(org_app_data)
apps_to_process = config.get('app', [])
collections_to_process = config.get('collection', [])
for org_app, app_uuid in org_app_data.iteritems():
parts = org_app.split('/')
app = parts[1]
if len(apps_to_process) > 0 and app not in apps_to_process:
logger.info('Skipping app/uuid: %s/%s' % (org_app, app_uuid))
continue
logger.info('app UUID: %s' % app_uuid)
url = app_url_template.format(app=app,
org=args.get('org'),
access_token=config['access_token'],
**management_region)
logger.info('GET [...]: %s' % url)
session = session_map[management_region_id]
r = session.get(url)
for collection_name in r.json().get('entities', [{}])[0].get('metadata', {}).get('collections', {}):
if collection_name in ['events']:
continue
elif len(collections_to_process) > 0 and collection_name not in collections_to_process:
logger.info('skipping collection=%s' % collection_name)
continue
logger.info('processing collection=%s' % collection_name)
url = collection_query_url_template.format(ql='select * order by created asc',
collection=collection_name,
org=args['org'],
app=app,
limit=args['limit'],
access_token=config['access_token'],
**query_region)
q = UsergridQuery(url)
counter = 0
for x, e in enumerate(q):
counter += 1
queue.put((args['org'], app, collection_name, e))
logger.info('collection=%s, count=%s' % (collection_name, counter))
except KeyboardInterrupt:
[w.terminate() for w in workers]
logger.warning('Waiting for workers to finish...')
wait_for(workers)
finish = datetime.datetime.now()
logger.warning('Done! Took: %s ' % (finish - start))
main()