blob: 4a971b280551d8b02e1af64907375cbee23f5225 [file] [log] [blame]
#!/usr/bin/python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import logging
import os
import sys
import urllib2, ssl
import json
import base64
import optparse
import socket
import time
import traceback
import ConfigParser
from random import randrange
from subprocess import Popen, PIPE
HTTP_PROTOCOL = 'http'
HTTPS_PROTOCOL = 'https'
AMBARI_SUDO = "/var/lib/ambari-agent/ambari-sudo.sh"
SOLR_SERVICE_NAME = 'AMBARI_INFRA_SOLR'
SOLR_COMPONENT_NAME ='INFRA_SOLR'
LOGSEARCH_SERVICE_NAME = 'LOGSEARCH'
LOGSEARCH_SERVER_COMPONENT_NAME ='LOGSEARCH_SERVER'
LOGSEARCH_LOGFEEDER_COMPONENT_NAME ='LOGSEARCH_LOGFEEDER'
CLUSTERS_URL = '/api/v1/clusters/{0}'
GET_HOSTS_COMPONENTS_URL = '/services/{0}/components/{1}?fields=host_components'
REQUESTS_API_URL = '/requests'
BATCH_REQUEST_API_URL = "/api/v1/clusters/{0}/request_schedules"
LIST_SOLR_COLLECTION_URL = '{0}/admin/collections?action=LIST&wt=json'
CREATE_SOLR_COLLECTION_URL = '{0}/admin/collections?action=CREATE&name={1}&collection.configName={2}&numShards={3}&replicationFactor={4}&maxShardsPerNode={5}&wt=json'
DELETE_SOLR_COLLECTION_URL = '{0}/admin/collections?action=DELETE&name={1}&wt=json'
RELOAD_SOLR_COLLECTION_URL = '{0}/admin/collections?action=RELOAD&name={1}&wt=json'
INFRA_SOLR_CLIENT_BASE_PATH = '/usr/lib/ambari-infra-solr-client/'
RANGER_NEW_SCHEMA = 'migrate/managed-schema'
SOLR_CLOUD_CLI_SCRIPT = 'solrCloudCli.sh'
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
class colors:
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
def api_accessor(host, username, password, protocol, port):
def do_request(api_url, request_type, request_body=''):
try:
url = '{0}://{1}:{2}{3}'.format(protocol, host, port, api_url)
logger.debug('Execute {0} {1}'.format(request_type, url))
if request_body:
logger.debug('Request body: {0}'.format(request_body))
admin_auth = base64.encodestring('%s:%s' % (username, password)).replace('\n', '')
request = urllib2.Request(url)
request.add_header('Authorization', 'Basic %s' % admin_auth)
request.add_header('X-Requested-By', 'ambari')
request.add_data(request_body)
request.get_method = lambda: request_type
response = None
if protocol == 'https':
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
response = urllib2.urlopen(request, context=ctx)
else:
response = urllib2.urlopen(request)
response_body = response.read()
except Exception as exc:
raise Exception('Problem with accessing api. Reason: {0}'.format(exc))
return response_body
return do_request
def set_log_level(verbose):
if verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
def retry(func, *args, **kwargs):
retry_count = kwargs.pop("count", 10)
delay = kwargs.pop("delay", 5)
context = kwargs.pop("context", "")
for r in range(retry_count):
try:
result = func(*args, **kwargs)
if result is not None: return result
except Exception as e:
logger.error("Error occurred during {0} operation: {1}".format(context, str(traceback.format_exc())))
logger.info("\n{0}: waiting for {1} seconds before retyring again (retry count: {2})".format(context, delay, r+1))
time.sleep(delay)
print '{0} operation {1}FAILED{2}'.format(context, colors.FAIL, colors.ENDC)
exit(1)
def create_solr_api_request_command(request_url, config, output=None):
user='infra-solr'
kerberos_enabled='false'
keytab=None
principal=None
if config.has_section('cluster') and config.has_option('cluster', 'kerberos_enabled'):
kerberos_enabled=config.get('cluster', 'kerberos_enabled')
if config.has_section('infra_solr'):
if config.has_option('infra_solr', 'user'):
user=config.get('infra_solr', 'user')
if kerberos_enabled == 'true':
if config.has_option('infra_solr', 'keytab'):
keytab=config.get('infra_solr', 'keytab')
if config.has_option('infra_solr', 'principal'):
principal=config.get('infra_solr', 'principal')
use_infra_solr_user="sudo -u {0}".format(user)
curl_prefix = "curl -k"
if output is not None:
curl_prefix+=" -o {0}".format(output)
api_cmd = '{0} kinit -kt {1} {2} && {3} {4} --negotiate -u : "{5}"'.format(use_infra_solr_user, keytab, principal, use_infra_solr_user, curl_prefix, request_url) \
if kerberos_enabled == 'true' else '{0} {1} "{2}"'.format(use_infra_solr_user, curl_prefix, request_url)
logger.debug("Solr API command: {0}".format(api_cmd))
return api_cmd
def create_infra_solr_client_command(options, config, command):
user='infra-solr'
kerberos_enabled='false'
infra_solr_cli_opts = ''
java_home=None
jaasOption=None
zkConnectString=None
if config.has_section('cluster') and config.has_option('cluster', 'kerberos_enabled'):
kerberos_enabled=config.get('cluster', 'kerberos_enabled')
if config.has_section('infra_solr'):
if config.has_option('infra_solr', 'user'):
user=config.get('infra_solr', 'user')
if config.has_option('infra_solr', 'external_zk_connect_string'):
zkConnectString=config.get('infra_solr', 'external_zk_connect_string')
elif config.has_option('infra_solr', 'zk_connect_string'):
zkConnectString=config.get('infra_solr', 'zk_connect_string')
if kerberos_enabled == 'true':
zk_principal_user = config.get('infra_solr', 'zk_principal_user') if config.has_option('infra_solr', 'zk_principal_user') else 'zookeeper'
infra_solr_cli_opts= '-Dzookeeper.sasl.client=true -Dzookeeper.sasl.client.username={0} -Dzookeeper.sasl.clientconfig=Client'.format(zk_principal_user)
jaasOption=" --jaas-file /etc/ambari-infra-solr/conf/infra_solr_jaas.conf"
command+=jaasOption
if config.has_section('local') and config.has_option('local', 'java_home'):
java_home=config.get('local', 'java_home')
if not java_home:
raise Exception("'local' section or 'java_home' is missing (or empty) from the configuration")
if not zkConnectString:
raise Exception("'zk_connect_string' section or 'external_zk_connect_string' is missing (or empty) from the configuration")
set_java_home_= 'JAVA_HOME={0}'.format(java_home)
set_infra_solr_cli_opts = ' INFRA_SOLR_CLI_OPTS="{0}"'.format(infra_solr_cli_opts) if infra_solr_cli_opts != '' else ''
solr_cli_cmd = '{0} {1}{2} /usr/lib/ambari-infra-solr-client/solrCloudCli.sh --zookeeper-connect-string {3} {4}'\
.format(AMBARI_SUDO, set_java_home_, set_infra_solr_cli_opts, zkConnectString, command)
return solr_cli_cmd
def get_random_solr_url(solr_urls, options):
splitted_solr_urls = solr_urls.split(',')
if options.include_solr_hosts:
# keep only included ones, do not override any
include_solr_hosts_list = options.include_solr_hosts.split(',')
new_splitted_urls = []
for url in splitted_solr_urls:
if any(inc_solr_host in url for inc_solr_host in include_solr_hosts_list):
new_splitted_urls.append(url)
splitted_solr_urls = new_splitted_urls
if options.exclude_solr_hosts:
exclude_solr_hosts_list = options.exclude_solr_hosts.split(',')
urls_to_exclude = []
for url in splitted_solr_urls:
if any(exc_solr_host in url for exc_solr_host in exclude_solr_hosts_list):
urls_to_exclude.append(url)
for excluded_url in urls_to_exclude:
splitted_solr_urls.remove(excluded_url)
random_index = randrange(0, len(splitted_solr_urls))
result = splitted_solr_urls[random_index]
logger.debug("Use {0} solr address for next request.".format(result))
return result
def format_json(dictionary, tab_level=0):
output = ''
tab = ' ' * 2 * tab_level
for key, value in dictionary.iteritems():
output += ',\n{0}"{1}": '.format(tab, key)
if isinstance(value, dict):
output += '{\n' + format_json(value, tab_level + 1) + tab + '}'
else:
output += '"{0}"'.format(value)
output += '\n'
return output[2:]
def read_json(json_file):
with open(json_file) as data_file:
data = json.load(data_file)
return data
def get_json(accessor, url):
response = accessor(url, 'GET')
logger.debug('GET ' + url + ' response: ')
logger.debug('----------------------------')
logger.debug(str(response))
json_resp = json.loads(response)
return json_resp
def post_json(accessor, url, request_body):
response = accessor(url, 'POST', json.dumps(request_body))
logger.debug('POST ' + url + ' response: ')
logger.debug( '----------------------------')
logger.debug(str(response))
json_resp = json.loads(response)
return json_resp
def get_component_hosts(host_components_json):
hosts = []
if "host_components" in host_components_json and len(host_components_json['host_components']) > 0:
for host_component in host_components_json['host_components']:
if 'HostRoles' in host_component:
hosts.append(host_component['HostRoles']['host_name'])
return hosts
def create_batch_command(command, hosts, cluster, service_name, component_name, interval_seconds, fault_tolerance, context):
request_schedules = []
request_schedule = {}
batch = []
requests = []
order_id = 1
all = len(hosts)
for host in hosts:
request = {}
request['order_id'] = order_id
request['type'] = 'POST'
request['uri'] = "/clusters/{0}/requests".format(cluster)
request_body_info = {}
request_info = {}
request_info["context"] = context + " ({0} of {1})".format(order_id, all)
request_info["command"] = command
order_id = order_id + 1
resource_filter = {}
resource_filter["service_name"] = service_name
resource_filter["component_name"] = component_name
resource_filter["hosts"] = host
resource_filters = []
resource_filters.append(resource_filter)
request_body_info["Requests/resource_filters"] = resource_filters
request_body_info['RequestInfo'] = request_info
request['RequestBodyInfo'] = request_body_info
requests.append(request)
batch_requests_item = {}
batch_requests_item['requests'] = requests
batch.append(batch_requests_item)
batch_settings_item = {}
batch_settings = {}
batch_settings['batch_separation_in_seconds'] = interval_seconds
batch_settings['task_failure_tolerance'] = fault_tolerance
batch_settings_item['batch_settings'] = batch_settings
batch.append(batch_settings_item)
request_schedule['batch'] = batch
request_schedule_item = {}
request_schedule_item['RequestSchedule'] = request_schedule
request_schedules.append(request_schedule_item)
return request_schedules
def create_command_request(command, parameters, hosts, cluster, context, service=SOLR_SERVICE_NAME, component=SOLR_COMPONENT_NAME):
request = {}
request_info = {}
request_info["context"] = context
request_info["command"] = command
request_info["parameters"] = parameters
operation_level = {}
operation_level["level"] = "HOST_COMPONENT"
operation_level["cluster_name"] = cluster
request_info["operation_level"] = operation_level
request["RequestInfo"] = request_info
resource_filter = {}
resource_filter["service_name"] = service
resource_filter["component_name"] = component
resource_filter["hosts"] = ','.join(hosts)
resource_filters = []
resource_filters.append(resource_filter)
request["Requests/resource_filters"] = resource_filters
return request
def fill_parameters(options, config, collection, index_location, hdfs_path=None, shards=None):
params = {}
if collection:
params['solr_collection'] = collection
params['solr_backup_name'] = collection
if index_location:
params['solr_index_location'] = index_location
if options.index_version:
params['solr_index_version'] = options.index_version
if options.force:
params['solr_index_upgrade_force'] = options.force
if options.async:
params['solr_request_async'] = options.request_async
if options.request_tries:
params['solr_request_tries'] = options.request_tries
if options.request_time_interval:
params['solr_request_time_interval'] = options.request_time_interval
if options.disable_solr_host_check:
params['solr_check_hosts'] = False
if options.core_filter:
params['solr_core_filter'] = options.core_filter
if options.core_filter:
params['solr_skip_cores'] = options.skip_cores
if shards:
params['solr_shards'] = shards
if options.shared_drive:
params['solr_shared_fs'] = True
elif config.has_section('local') and config.has_option('local', 'shared_drive') and config.get('local', 'shared_drive') == 'true':
params['solr_shared_fs'] = True
if hdfs_path:
params['solr_hdfs_path'] = hdfs_path
if options.keep_backup:
params['solr_keep_backup'] = True
if options.skip_generate_restore_host_cores:
params['solr_skip_generate_restore_host_cores'] = True
return params
def validte_common_options(options, parser, config):
if not options.index_location:
parser.print_help()
print 'index-location option is required'
sys.exit(1)
if not options.collection:
parser.print_help()
print 'collection option is required'
sys.exit(1)
def get_service_components(options, accessor, cluster, service, component):
host_components_json = get_json(accessor, CLUSTERS_URL.format(cluster) + GET_HOSTS_COMPONENTS_URL.format(service, component))
component_hosts = get_component_hosts(host_components_json)
return component_hosts
def get_solr_hosts(options, accessor, cluster):
component_hosts = get_service_components(options, accessor, cluster, SOLR_SERVICE_NAME, SOLR_COMPONENT_NAME)
if options.include_solr_hosts:
new_component_hosts = []
include_solr_hosts_list = options.include_solr_hosts.split(',')
for include_host in include_solr_hosts_list:
if include_host in component_hosts:
new_component_hosts.append(include_host)
component_hosts = new_component_hosts
if options.exclude_solr_hosts:
exclude_solr_hosts_list = options.exclude_solr_hosts.split(',')
for exclude_host in exclude_solr_hosts_list:
if exclude_host in component_hosts:
component_hosts.remove(exclude_host)
return component_hosts
def restore(options, accessor, parser, config, collection, index_location, hdfs_path, shards):
"""
Send restore solr collection custom command request to ambari-server
"""
cluster = config.get('ambari_server', 'cluster')
component_hosts = get_solr_hosts(options, accessor, cluster)
parameters = fill_parameters(options, config, collection, index_location, hdfs_path, shards)
cmd_request = create_command_request("RESTORE", parameters, component_hosts, cluster, 'Restore Solr Collection: ' + collection)
return post_json(accessor, CLUSTERS_URL.format(cluster) + REQUESTS_API_URL, cmd_request)
def migrate(options, accessor, parser, config, collection, index_location):
"""
Send migrate lucene index custom command request to ambari-server
"""
cluster = config.get('ambari_server', 'cluster')
component_hosts = get_solr_hosts(options, accessor, cluster)
parameters = fill_parameters(options, config, collection, index_location)
cmd_request = create_command_request("MIGRATE", parameters, component_hosts, cluster, 'Migrating Solr Collection: ' + collection)
return post_json(accessor, CLUSTERS_URL.format(cluster) + REQUESTS_API_URL, cmd_request)
def backup(options, accessor, parser, config, collection, index_location):
"""
Send backup solr collection custom command request to ambari-server
"""
cluster = config.get('ambari_server', 'cluster')
component_hosts = get_solr_hosts(options, accessor, cluster)
parameters = fill_parameters(options, config, collection, index_location)
cmd_request = create_command_request("BACKUP", parameters, component_hosts, cluster, 'Backup Solr Collection: ' + collection)
return post_json(accessor, CLUSTERS_URL.format(cluster) + REQUESTS_API_URL, cmd_request)
def upgrade_solr_instances(options, accessor, parser, config):
"""
Upgrade (remove & re-install) infra solr instances
"""
cluster = config.get('ambari_server', 'cluster')
solr_instance_hosts = get_service_components(options, accessor, cluster, "AMBARI_INFRA_SOLR", "INFRA_SOLR")
context = "Upgrade Solr Instances"
sys.stdout.write("Sending upgrade request: [{0}] ".format(context))
sys.stdout.flush()
cmd_request = create_command_request("UPGRADE_SOLR_INSTANCE", {}, solr_instance_hosts, cluster, context)
response = post_json(accessor, CLUSTERS_URL.format(cluster) + REQUESTS_API_URL, cmd_request)
request_id = get_request_id(response)
sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
sys.stdout.flush()
print 'Upgrade command request id: {0}'.format(request_id)
if options.async:
print "Upgrade request sent to Ambari server. Check Ambari UI about the results."
sys.exit(0)
else:
sys.stdout.write("Start monitoring Ambari request with id {0} ...".format(request_id))
sys.stdout.flush()
cluster = config.get('ambari_server', 'cluster')
monitor_request(options, accessor, cluster, request_id, context)
print "{0}... {1} DONE{2}".format(context, colors.OKGREEN, colors.ENDC)
def upgrade_solr_clients(options, accessor, parser, config):
"""
Upgrade (remove & re-install) infra solr clients
"""
cluster = config.get('ambari_server', 'cluster')
solr_client_hosts = get_service_components(options, accessor, cluster, "AMBARI_INFRA_SOLR", "INFRA_SOLR_CLIENT")
fqdn = socket.getfqdn()
if fqdn in solr_client_hosts:
solr_client_hosts.remove(fqdn)
host = socket.gethostname()
if host in solr_client_hosts:
solr_client_hosts.remove(host)
context = "Upgrade Solr Clients"
sys.stdout.write("Sending upgrade request: [{0}] ".format(context))
sys.stdout.flush()
cmd_request = create_command_request("UPGRADE_SOLR_CLIENT", {}, solr_client_hosts, cluster, context, component="INFRA_SOLR_CLIENT")
response = post_json(accessor, CLUSTERS_URL.format(cluster) + REQUESTS_API_URL, cmd_request)
request_id = get_request_id(response)
sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
sys.stdout.flush()
print 'Upgrade command request id: {0}'.format(request_id)
if options.async:
print "Upgrade request sent to Ambari server. Check Ambari UI about the results."
sys.exit(0)
else:
sys.stdout.write("Start monitoring Ambari request with id {0} ...".format(request_id))
sys.stdout.flush()
cluster = config.get('ambari_server', 'cluster')
monitor_request(options, accessor, cluster, request_id, context)
print "{0}... {1}DONE{2}".format(context, colors.OKGREEN, colors.ENDC)
def upgrade_logfeeders(options, accessor, parser, config):
"""
Upgrade (remove & re-install) logfeeders
"""
cluster = config.get('ambari_server', 'cluster')
logfeeder_hosts = get_service_components(options, accessor, cluster, "LOGSEARCH", "LOGSEARCH_SERVER")
context = "Upgrade Log Feeders"
sys.stdout.write("Sending upgrade request: [{0}] ".format(context))
sys.stdout.flush()
cmd_request = create_command_request("UPGRADE_LOGFEEDER", {}, logfeeder_hosts, cluster, context, service="LOGSEARCH", component="LOGSEARCH_LOGFEEDER")
response = post_json(accessor, CLUSTERS_URL.format(cluster) + REQUESTS_API_URL, cmd_request)
request_id = get_request_id(response)
sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
sys.stdout.flush()
print 'Upgrade command request id: {0}'.format(request_id)
if options.async:
print "Upgrade request sent to Ambari server. Check Ambari UI about the results."
sys.exit(0)
else:
sys.stdout.write("Start monitoring Ambari request with id {0} ...".format(request_id))
sys.stdout.flush()
cluster = config.get('ambari_server', 'cluster')
monitor_request(options, accessor, cluster, request_id, context)
print "{0}... {1} DONE{2}".format(context, colors.OKGREEN, colors.ENDC)
def upgrade_logsearch_portal(options, accessor, parser, config):
"""
Upgrade (remove & re-install) logsearch server instances
"""
cluster = config.get('ambari_server', 'cluster')
logsearch_portal_hosts = get_service_components(options, accessor, cluster, "LOGSEARCH", "LOGSEARCH_SERVER")
context = "Upgrade Log Search Portal"
sys.stdout.write("Sending upgrade request: [{0}] ".format(context))
sys.stdout.flush()
cmd_request = create_command_request("UPGRADE_LOGSEARCH_PORTAL", {}, logsearch_portal_hosts, cluster, context, service="LOGSEARCH", component="LOGSEARCH_SERVER")
response = post_json(accessor, CLUSTERS_URL.format(cluster) + REQUESTS_API_URL, cmd_request)
request_id = get_request_id(response)
sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
sys.stdout.flush()
print 'Upgrade command request id: {0}'.format(request_id)
if options.async:
print "Upgrade request sent to Ambari server. Check Ambari UI about the results."
sys.exit(0)
else:
sys.stdout.write("Start monitoring Ambari request with id {0} ...".format(request_id))
sys.stdout.flush()
cluster = config.get('ambari_server', 'cluster')
monitor_request(options, accessor, cluster, request_id, context)
print "{0}... {1} DONE{2}".format(context, colors.OKGREEN, colors.ENDC)
def service_components_command(options, accessor, parser, config, service, component, command, command_str):
"""
Run command on service components
"""
cluster = config.get('ambari_server', 'cluster')
service_components = get_service_components(options, accessor, cluster, service, component)
context = "{0} {1}".format(command_str, component)
sys.stdout.write("Sending '{0}' request: [{1}] ".format(command, context))
sys.stdout.flush()
cmd_request = create_command_request(command, {}, service_components, cluster, context, service=service, component=component)
response = post_json(accessor, CLUSTERS_URL.format(cluster) + REQUESTS_API_URL, cmd_request)
request_id = get_request_id(response)
sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
sys.stdout.flush()
print '{0} command request id: {1}'.format(command_str, request_id)
if options.async:
print "{0} request sent to Ambari server. Check Ambari UI about the results.".format(command_str)
sys.exit(0)
else:
sys.stdout.write("Start monitoring Ambari request with id {0} ...".format(request_id))
sys.stdout.flush()
cluster = config.get('ambari_server', 'cluster')
monitor_request(options, accessor, cluster, request_id, context)
print "{0}... {1} DONE{2}".format(context, colors.OKGREEN, colors.ENDC)
def monitor_request(options, accessor, cluster, request_id, context):
while True:
request_response=get_json(accessor, "/api/v1/clusters/{0}{1}/{2}".format(cluster, REQUESTS_API_URL, request_id))
if 'Requests' in request_response and 'request_status' in request_response['Requests']:
request_status = request_response['Requests']['request_status']
logger.debug("\nMonitoring '{0}' request (id: '{1}') status is {2}".format(context, request_id, request_status))
if request_status in ['FAILED', 'TIMEDOUT', 'ABORTED', 'COMPLETED', 'SKIPPED_FAILED']:
if request_status == 'COMPLETED':
print "\nRequest (id: {0}) {1}COMPLETED{2}".format(request_id, colors.OKGREEN, colors.ENDC)
time.sleep(4)
else:
print "\nRequest (id: {0}) {1}FAILED{2} (checkout Ambari UI about the failed tasks)\n".format(request_id, colors.FAIL, colors.ENDC)
sys.exit(1)
break
else:
if not options.verbose:
sys.stdout.write(".")
sys.stdout.flush()
logger.debug("Sleep 5 seconds ...")
time.sleep(5)
else:
print "'Requests' or 'request_status' cannot be found in JSON response: {0}".format(request_response)
sys.exit(1)
def get_request_id(json_response):
if "Requests" in json_response:
if "id" in json_response['Requests']:
return json_response['Requests']['id']
raise Exception("Cannot access request id from Ambari response: {0}".format(json_response))
def filter_collections(options, collections):
if options.collection is not None:
filtered_collections = []
if options.collection in collections:
filtered_collections.append(options.collection)
return filtered_collections
else:
return collections
def get_solr_urls(config):
solr_urls = None
if config.has_section('infra_solr') and config.has_option('infra_solr', 'urls'):
return config.get('infra_solr', 'urls')
return solr_urls
def is_atlas_available(config, service_filter):
return 'ATLAS' in service_filter and config.has_section('atlas_collections') \
and config.has_option('atlas_collections', 'enabled') and config.get('atlas_collections', 'enabled') == 'true'
def is_ranger_available(config, service_filter):
return 'RANGER' in service_filter and config.has_section('ranger_collection') \
and config.has_option('ranger_collection', 'enabled') and config.get('ranger_collection', 'enabled') == 'true'
def is_logsearch_available(config, service_filter):
return 'LOGSEARCH' in service_filter and config.has_section('logsearch_collections') \
and config.has_option('logsearch_collections', 'enabled') and config.get('logsearch_collections', 'enabled') == 'true'
def delete_collection(options, config, collection, solr_urls):
request = DELETE_SOLR_COLLECTION_URL.format(get_random_solr_url(solr_urls, options), collection)
logger.debug("Solr request: {0}".format(request))
delete_collection_json_cmd=create_solr_api_request_command(request, config)
process = Popen(delete_collection_json_cmd, stdout=PIPE, stderr=PIPE, shell=True)
out, err = process.communicate()
if process.returncode != 0:
raise Exception("{0} command failed: {1}".format(delete_collection_json_cmd, str(err)))
response=json.loads(str(out))
if 'success' in response:
print 'Deleting collection {0} was {1}SUCCESSFUL{2}'.format(collection, colors.OKGREEN, colors.ENDC)
return collection
else:
raise Exception("DELETE collection ('{0}') failed. Response: {1}".format(collection, str(out)))
def list_collections(options, config, solr_urls):
request = LIST_SOLR_COLLECTION_URL.format(get_random_solr_url(solr_urls, options))
logger.debug("Solr request: {0}".format(request))
list_collection_json_cmd=create_solr_api_request_command(request, config)
process = Popen(list_collection_json_cmd, stdout=PIPE, stderr=PIPE, shell=True)
out, err = process.communicate()
if process.returncode != 0:
raise Exception("{0} command failed: {1}".format(list_collection_json_cmd, str(err)))
response=json.loads(str(out))
if 'collections' in response:
return response['collections']
else:
raise Exception("LIST collections failed ({0}). Response: {1}".format(request, str(out)))
def create_collection(options, config, solr_urls, collection, config_set, shards, replica, max_shards_per_node):
request = CREATE_SOLR_COLLECTION_URL.format(get_random_solr_url(solr_urls, options), collection, config_set, shards, replica, max_shards_per_node)
logger.debug("Solr request: {0}".format(request))
create_collection_json_cmd=create_solr_api_request_command(request, config)
process = Popen(create_collection_json_cmd, stdout=PIPE, stderr=PIPE, shell=True)
out, err = process.communicate()
if process.returncode != 0:
raise Exception("{0} command failed: {1}".format(create_collection_json_cmd, str(err)))
response=json.loads(str(out))
if 'success' in response:
print 'Creating collection {0} was {1}SUCCESSFUL{2}'.format(collection, colors.OKGREEN, colors.ENDC)
return collection
else:
raise Exception("CREATE collection ('{0}') failed. ({1}) Response: {1}".format(collection, str(out)))
def reload_collection(options, config, solr_urls, collection):
request = RELOAD_SOLR_COLLECTION_URL.format(get_random_solr_url(solr_urls, options), collection)
logger.debug("Solr request: {0}".format(request))
reload_collection_json_cmd=create_solr_api_request_command(request, config)
process = Popen(reload_collection_json_cmd, stdout=PIPE, stderr=PIPE, shell=True)
out, err = process.communicate()
if process.returncode != 0:
raise Exception("{0} command failed: {1}".format(reload_collection_json_cmd, str(err)))
response=json.loads(str(out))
if 'success' in response:
print 'Reloading collection {0} was {1}SUCCESSFUL{2}'.format(collection, colors.OKGREEN, colors.ENDC)
return collection
else:
raise Exception("RELOAD collection ('{0}') failed. ({1}) Response: {1}".format(collection, str(out)))
def delete_znode(options, config, znode):
solr_cli_command=create_infra_solr_client_command(options, config, '--delete-znode --znode {0}'.format(znode))
logger.debug("Solr cli command: {0}".format(solr_cli_command))
sys.stdout.write('Deleting znode {0} ... '.format(znode))
sys.stdout.flush()
process = Popen(solr_cli_command, stdout=PIPE, stderr=PIPE, shell=True)
out, err = process.communicate()
if process.returncode != 0:
sys.stdout.write(colors.FAIL + 'FAILED\n' + colors.ENDC)
sys.stdout.flush()
raise Exception("{0} command failed: {1}".format(solr_cli_command, str(err)))
sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
sys.stdout.flush()
logger.debug(str(out))
def copy_znode(options, config, copy_src, copy_dest, copy_from_local=False, copy_to_local=False):
solr_cli_command=create_infra_solr_client_command(options, config, '--transfer-znode --copy-src {0} --copy-dest {1}'.format(copy_src, copy_dest))
if copy_from_local:
solr_cli_command+=" --transfer-mode copyFromLocal"
elif copy_to_local:
solr_cli_command+=" --transfer-mode copyToLocal"
logger.debug("Solr cli command: {0}".format(solr_cli_command))
sys.stdout.write('Transferring data from {0} to {1} ... '.format(copy_src, copy_dest))
sys.stdout.flush()
process = Popen(solr_cli_command, stdout=PIPE, stderr=PIPE, shell=True)
out, err = process.communicate()
if process.returncode != 0:
sys.stdout.write(colors.FAIL + 'FAILED\n' + colors.ENDC)
sys.stdout.flush()
raise Exception("{0} command failed: {1}".format(solr_cli_command, str(err)))
sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
sys.stdout.flush()
logger.debug(str(out))
def delete_logsearch_collections(options, config, solr_urls, collections):
service_logs_collection = config.get('logsearch_collections', 'hadoop_logs_collection_name')
audit_logs_collection = config.get('logsearch_collections', 'audit_logs_collection_name')
history_collection = config.get('logsearch_collections', 'history_collection_name')
if service_logs_collection in collections:
retry(delete_collection, options, config, service_logs_collection, solr_urls, context='[Delete {0} collection]'.format(service_logs_collection))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(service_logs_collection)
if audit_logs_collection in collections:
retry(delete_collection, options, config, audit_logs_collection, solr_urls, context='[Delete {0} collection]'.format(audit_logs_collection))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(audit_logs_collection)
if history_collection in collections:
retry(delete_collection, options, config, history_collection, solr_urls, context='[Delete {0} collection]'.format(history_collection))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(history_collection)
def delete_atlas_collections(options, config, solr_urls, collections):
fulltext_collection = config.get('atlas_collections', 'fulltext_index_name')
edge_index_collection = config.get('atlas_collections', 'edge_index_name')
vertex_index_collection = config.get('atlas_collections', 'vertex_index_name')
if fulltext_collection in collections:
retry(delete_collection, options, config, fulltext_collection, solr_urls, context='[Delete {0} collection]'.format(fulltext_collection))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(fulltext_collection)
if edge_index_collection in collections:
retry(delete_collection, options, config, edge_index_collection, solr_urls, context='[Delete {0} collection]'.format(edge_index_collection))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(edge_index_collection)
if vertex_index_collection in collections:
retry(delete_collection, options, config, vertex_index_collection, solr_urls, context='[Delete {0} collection]'.format(vertex_index_collection))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation.'.format(vertex_index_collection)
def delete_ranger_collection(options, config, solr_urls, collections):
ranger_collection_name = config.get('ranger_collection', 'ranger_collection_name')
if ranger_collection_name in collections:
retry(delete_collection, options, config, ranger_collection_name, solr_urls, context='[Delete {0} collection]'.format(ranger_collection_name))
else:
print 'Collection {0} does not exist or filtered out. Skipping delete operation'.format(ranger_collection_name)
def delete_collections(options, config, service_filter):
solr_urls = get_solr_urls(config)
collections=retry(list_collections, options, config, solr_urls, context="[List Solr Collections]")
collections=filter_collections(options, collections)
if is_ranger_available(config, service_filter):
delete_ranger_collection(options, config, solr_urls, collections)
if is_atlas_available(config, service_filter):
delete_atlas_collections(options, config, solr_urls, collections)
if is_logsearch_available(config, service_filter):
delete_logsearch_collections(options, config, solr_urls, collections)
def upgrade_ranger_schema(options, config, service_filter):
solr_znode='/infra-solr'
if is_ranger_available(config, service_filter):
if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'):
solr_znode=config.get('infra_solr', 'znode')
ranger_config_set_name = config.get('ranger_collection', 'ranger_config_set_name')
copy_znode(options, config, "{0}{1}".format(INFRA_SOLR_CLIENT_BASE_PATH, RANGER_NEW_SCHEMA),
"{0}/configs/{1}/managed-schema".format(solr_znode, ranger_config_set_name), copy_from_local=True)
def backup_ranger_configs(options, config, service_filter):
solr_znode='/infra-solr'
if is_ranger_available(config, service_filter):
if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'):
solr_znode=config.get('infra_solr', 'znode')
ranger_config_set_name = config.get('ranger_collection', 'ranger_config_set_name')
backup_ranger_config_set_name = config.get('ranger_collection', 'backup_ranger_config_set_name')
copy_znode(options, config, "{0}/configs/{1}".format(solr_znode, ranger_config_set_name),
"{0}/configs/{1}".format(solr_znode, backup_ranger_config_set_name))
def upgrade_ranger_solrconfig_xml(options, config, service_filter):
solr_znode='/infra-solr'
if is_ranger_available(config, service_filter):
if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'):
solr_znode=config.get('infra_solr', 'znode')
ranger_config_set_name = config.get('ranger_collection', 'ranger_config_set_name')
backup_ranger_config_set_name = config.get('ranger_collection', 'backup_ranger_config_set_name')
copy_znode(options, config, "{0}/configs/solrconfig.xml".format(solr_znode, ranger_config_set_name),
"{0}/configs/solrconfig.xml".format(solr_znode, backup_ranger_config_set_name))
def delete_znodes(options, config, service_filter):
solr_znode='/infra-solr'
if is_logsearch_available(config, service_filter):
if config.has_section('infra_solr') and config.has_option('infra_solr', 'znode'):
solr_znode=config.get('infra_solr', 'znode')
delete_znode(options, config, "{0}/configs/hadoop_logs".format(solr_znode))
delete_znode(options, config, "{0}/configs/audit_logs".format(solr_znode))
delete_znode(options, config, "{0}/configs/history".format(solr_znode))
def do_backup_request(options, accessor, parser, config, collection, index_location):
sys.stdout.write("Sending backup collection request ('{0}') to Ambari to process (backup destination: '{1}')..."
.format(collection, index_location))
sys.stdout.flush()
response = backup(options, accessor, parser, config, collection, index_location)
request_id = get_request_id(response)
sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
sys.stdout.flush()
print 'Backup command request id: {0}'.format(request_id)
if options.async:
print "Backup {0} collection request sent to Ambari server. Check Ambari UI about the results.".format(collection)
sys.exit(0)
else:
sys.stdout.write("Start monitoring Ambari request with id {0} ...".format(request_id))
sys.stdout.flush()
cluster = config.get('ambari_server', 'cluster')
monitor_request(options, accessor, cluster, request_id, 'Backup Solr collection: ' + collection)
print "Backup collection '{0}'... {1}DONE{2}".format(collection, colors.OKGREEN, colors.ENDC)
def do_migrate_request(options, accessor, parser, config, collection, index_location):
sys.stdout.write("Sending migrate collection request ('{0}') to Ambari to process (migrate folder: '{1}')..."
.format(collection, index_location))
sys.stdout.flush()
response = migrate(options, accessor, parser, config, collection, index_location)
request_id = get_request_id(response)
sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
sys.stdout.flush()
print 'Migrate command request id: {0}'.format(request_id)
if options.async:
print "Migrate {0} collection index request sent to Ambari server. Check Ambari UI about the results.".format(collection)
sys.exit(0)
else:
sys.stdout.write("Start monitoring Ambari request with id {0} ...".format(request_id))
sys.stdout.flush()
cluster = config.get('ambari_server', 'cluster')
monitor_request(options, accessor, cluster, request_id, 'Migrate Solr collection index: ' + collection)
print "Migrate index '{0}'... {1}DONE{2}".format(collection, colors.OKGREEN, colors.ENDC)
def do_restore_request(options, accessor, parser, config, collection, index_location, shards, hdfs_path):
sys.stdout.write("Sending restore collection request ('{0}') to Ambari to process (backup location: '{1}')..."
.format(collection, index_location))
sys.stdout.flush()
response = restore(options, accessor, parser, config, collection, index_location, hdfs_path, shards)
request_id = get_request_id(response)
sys.stdout.write(colors.OKGREEN + 'DONE\n' + colors.ENDC)
sys.stdout.flush()
print 'Restore command request id: {0}'.format(request_id)
if options.async:
print "Restore {0} collection request sent to Ambari server. Check Ambari UI about the results.".format(collection)
sys.exit(0)
else:
sys.stdout.write("Start monitoring Ambari request with id {0} ...".format(request_id))
sys.stdout.flush()
cluster = config.get('ambari_server', 'cluster')
monitor_request(options, accessor, cluster, request_id, 'Restore Solr collection: ' + collection)
print "Restoring collection '{0}'... {1}DONE{2}".format(collection, colors.OKGREEN, colors.ENDC)
def get_ranger_index_location(collection, config, options):
ranger_index_location = None
if options.index_location:
ranger_index_location = os.path.join(options.index_location, "ranger")
elif options.ranger_index_location:
ranger_index_location = options.ranger_index_location
elif config.has_option('ranger_collection', 'backup_path'):
ranger_index_location = config.get('ranger_collection', 'backup_path')
else:
print "'backup_path'is missing from config file and --index-location or --ranger-index-location options are missing as well. Backup collection {0} {1}FAILED{2}." \
.format(collection, colors.FAIL, colors.ENDC)
sys.exit(1)
return ranger_index_location
def get_atlas_index_location(collection, config, options):
atlas_index_location = None
if options.index_location:
atlas_index_location = os.path.join(options.index_location, "atlas", collection)
elif options.ranger_index_location:
atlas_index_location = os.path.join(options.atlas_index_location, collection)
elif config.has_option('atlas_collections', 'backup_path'):
atlas_index_location = os.path.join(config.get('atlas_collections', 'backup_path'), collection)
else:
print "'backup_path'is missing from config file and --index-location or --atlas-index-location options are missing as well. Backup collection {0} {1}FAILED{2}." \
.format(collection, colors.FAIL, colors.ENDC)
sys.exit(1)
return atlas_index_location
def backup_collections(options, accessor, parser, config, service_filter):
solr_urls = get_solr_urls(config)
collections=retry(list_collections, options, config, solr_urls, context="[List Solr Collections]")
collections=filter_collections(options, collections)
if is_ranger_available(config, service_filter):
collection_name = config.get('ranger_collection', 'ranger_collection_name')
if collection_name in collections:
ranger_index_location=get_ranger_index_location(collection_name, config, options)
do_backup_request(options, accessor, parser, config, collection_name, ranger_index_location)
else:
print 'Collection {0} does not exist or filtered out. Skipping backup operation.'.format(collection_name)
if is_atlas_available(config, service_filter):
fulltext_index_collection = config.get('atlas_collections', 'fulltext_index_name')
if fulltext_index_collection in collections:
fulltext_index_location = get_atlas_index_location(fulltext_index_collection, config, options)
do_backup_request(options, accessor, parser, config, fulltext_index_collection, fulltext_index_location)
else:
print 'Collection {0} does not exist or filtered out. Skipping backup operation.'.format(fulltext_index_collection)
vertex_index_collection = config.get('atlas_collections', 'vertex_index_name')
if vertex_index_collection in collections:
vertex_index_location = get_atlas_index_location(vertex_index_collection, config, options)
do_backup_request(options, accessor, parser, config, vertex_index_collection, vertex_index_location)
else:
print 'Collection {0} does not exist or filtered out. Skipping backup operation.'.format(vertex_index_collection)
edge_index_collection = config.get('atlas_collections', 'edge_index_name')
if edge_index_collection in collections:
edge_index_location = get_atlas_index_location(edge_index_collection, config, options)
do_backup_request(options, accessor, parser, config, edge_index_collection, edge_index_location)
else:
print 'Collection {0} does not exist or filtered out. Skipping backup operation.'.format(edge_index_collection)
def migrate_snapshots(options, accessor, parser, config, service_filter):
if is_ranger_available(config, service_filter):
collection_name = config.get('ranger_collection', 'ranger_collection_name')
if options.collection is None or options.collection == collection_name:
ranger_index_location=get_ranger_index_location(collection_name, config, options)
do_migrate_request(options, accessor, parser, config, collection_name, ranger_index_location)
else:
print "Collection ('{0}') backup index has filtered out. Skipping migrate operation.".format(collection_name)
if is_atlas_available(config, service_filter):
fulltext_index_collection = config.get('atlas_collections', 'fulltext_index_name')
if options.collection is None or options.collection == fulltext_index_collection:
fulltext_index_location=get_atlas_index_location(fulltext_index_collection, config, options)
do_migrate_request(options, accessor, parser, config, fulltext_index_collection, fulltext_index_location)
else:
print "Collection ('{0}') backup index has filtered out. Skipping migrate operation.".format(fulltext_index_collection)
vertex_index_collection = config.get('atlas_collections', 'vertex_index_name')
if options.collection is None or options.collection == vertex_index_collection:
vertex_index_location=get_atlas_index_location(vertex_index_collection, config, options)
do_migrate_request(options, accessor, parser, config, fulltext_index_collection, vertex_index_location)
else:
print "Collection ('{0}') backup index has filtered out. Skipping migrate operation.".format(vertex_index_collection)
edge_index_collection = config.get('atlas_collections', 'edge_index_name')
if options.collection is None or options.collection == edge_index_collection:
edge_index_location=get_atlas_index_location(edge_index_collection, config, options)
do_migrate_request(options, accessor, parser, config, edge_index_collection, edge_index_location)
else:
print "Collection ('{0}') backup index has filtered out. Skipping migrate operation.".format(edge_index_collection)
def create_backup_collections(options, accessor, parser, config, service_filter):
solr_urls = get_solr_urls(config)
collections=retry(list_collections, options, config, solr_urls, context="[List Solr Collections]")
replica_number = "1" # hard coded
if is_ranger_available(config, service_filter):
backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name')
if backup_ranger_collection not in collections:
if options.collection is not None and options.collection != backup_ranger_collection:
print "Collection {0} has filtered out. Skipping create operation.".format(backup_ranger_collection)
else:
backup_ranger_config_set = config.get('ranger_collection', 'backup_ranger_config_set_name')
backup_ranger_shards = config.get('ranger_collection', 'ranger_collection_shards')
backup_ranger_max_shards = config.get('ranger_collection', 'ranger_collection_max_shards_per_node')
retry(create_collection, options, config, solr_urls, backup_ranger_collection, backup_ranger_config_set,
backup_ranger_shards, replica_number, backup_ranger_max_shards, context="[Create Solr Collections]")
else:
print "Collection {0} has already exist. Skipping create operation.".format(backup_ranger_collection)
if is_atlas_available(config, service_filter):
backup_atlas_config_set = config.get('atlas_collections', 'config_set')
backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name')
if backup_fulltext_index_name not in collections:
if options.collection is not None and options.collection != backup_fulltext_index_name:
print "Collection {0} has filtered out. Skipping create operation.".format(backup_fulltext_index_name)
else:
backup_fulltext_index_shards = config.get('atlas_collections', 'fulltext_index_shards')
backup_fulltext_index_max_shards = config.get('atlas_collections', 'fulltext_index_max_shards_per_node')
retry(create_collection, options, config, solr_urls, backup_fulltext_index_name, backup_atlas_config_set,
backup_fulltext_index_shards, replica_number, backup_fulltext_index_max_shards, context="[Create Solr Collections]")
else:
print "Collection {0} has already exist. Skipping create operation.".format(backup_fulltext_index_name)
backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name')
if backup_edge_index_name not in collections:
if options.collection is not None and options.collection != backup_edge_index_name:
print "Collection {0} has filtered out. Skipping create operation.".format(backup_edge_index_name)
else:
backup_edge_index_shards = config.get('atlas_collections', 'edge_index_shards')
backup_edge_index_max_shards = config.get('atlas_collections', 'edge_index_max_shards_per_node')
retry(create_collection, options, config, solr_urls, backup_edge_index_name, backup_atlas_config_set,
backup_edge_index_shards, replica_number, backup_edge_index_max_shards, context="[Create Solr Collections]")
else:
print "Collection {0} has already exist. Skipping create operation.".format(backup_edge_index_name)
backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name')
if backup_vertex_index_name not in collections:
if options.collection is not None and options.collection != backup_vertex_index_name:
print "Collection {0} has filtered out. Skipping create operation.".format(backup_vertex_index_name)
else:
backup_vertex_index_shards = config.get('atlas_collections', 'vertex_index_shards')
backup_vertex_index_max_shards = config.get('atlas_collections', 'vertex_index_max_shards_per_node')
retry(create_collection, options, config, solr_urls, backup_vertex_index_name, backup_atlas_config_set,
backup_vertex_index_shards, replica_number, backup_vertex_index_max_shards, context="[Create Solr Collections]")
else:
print "Collection {0} has already exist. Skipping create operation.".format(backup_fulltext_index_name)
def restore_collections(options, accessor, parser, config, service_filter):
solr_urls = get_solr_urls(config)
collections=retry(list_collections, options, config, solr_urls, context="[List Solr Collections]")
collections=filter_collections(options, collections)
if 'RANGER' in service_filter and config.has_section('ranger_collection') and config.has_option('ranger_collection', 'enabled') \
and config.get('ranger_collection', 'enabled') == 'true':
collection_name = config.get('ranger_collection', 'ranger_collection_name')
backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name')
hdfs_base_path = None
if options.ranger_hdfs_base_path:
hdfs_base_path = options.ranger_hdfs_base_path
elif options.hdfs_base_path:
hdfs_base_path = options.hdfs_base_path
elif config.has_option('ranger_collection', 'hdfs_base_path'):
hdfs_base_path = config.get('ranger_collection', 'hdfs_base_path')
if backup_ranger_collection in collections:
backup_ranger_shards = config.get('ranger_collection', 'ranger_collection_shards')
ranger_index_location=get_ranger_index_location(collection_name, config, options)
do_restore_request(options, accessor, parser, config, backup_ranger_collection, ranger_index_location, backup_ranger_shards, hdfs_base_path)
else:
print "Collection ('{0}') does not exist or filtered out. Skipping restore operation.".format(backup_ranger_collection)
if is_atlas_available(config, service_filter):
hdfs_base_path = None
if options.ranger_hdfs_base_path:
hdfs_base_path = options.atlas_hdfs_base_path
elif options.hdfs_base_path:
hdfs_base_path = options.hdfs_base_path
elif config.has_option('atlas_collections', 'hdfs_base_path'):
hdfs_base_path = config.get('atlas_collections', 'hdfs_base_path')
fulltext_index_collection = config.get('atlas_collections', 'fulltext_index_name')
backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name')
if backup_fulltext_index_name in collections:
backup_fulltext_index_shards = config.get('atlas_collections', 'fulltext_index_shards')
fulltext_index_location=get_atlas_index_location(fulltext_index_collection, config, options)
do_restore_request(options, accessor, parser, config, backup_fulltext_index_name, fulltext_index_location, backup_fulltext_index_shards, hdfs_base_path)
else:
print "Collection ('{0}') does not exist or filtered out. Skipping restore operation.".format(fulltext_index_collection)
edge_index_collection = config.get('atlas_collections', 'edge_index_name')
backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name')
if backup_edge_index_name in collections:
backup_edge_index_shards = config.get('atlas_collections', 'edge_index_shards')
edge_index_location=get_atlas_index_location(edge_index_collection, config, options)
do_restore_request(options, accessor, parser, config, backup_edge_index_name, edge_index_location, backup_edge_index_shards, hdfs_base_path)
else:
print "Collection ('{0}') does not exist or filtered out. Skipping restore operation.".format(edge_index_collection)
vertex_index_collection = config.get('atlas_collections', 'vertex_index_name')
backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name')
if backup_vertex_index_name in collections:
backup_vertex_index_shards = config.get('atlas_collections', 'vertex_index_shards')
vertex_index_location=get_atlas_index_location(vertex_index_collection, config, options)
do_restore_request(options, accessor, parser, config, backup_vertex_index_name, vertex_index_location, backup_vertex_index_shards, hdfs_base_path)
else:
print "Collection ('{0}') does not exist or filtered out. Skipping restore operation.".format(vertex_index_collection)
def reload_collections(options, accessor, parser, config, service_filter):
solr_urls = get_solr_urls(config)
collections=retry(list_collections, options, config, solr_urls, context="[List Solr Collections]")
collections=filter_collections(options, collections)
if is_ranger_available(config, service_filter):
backup_ranger_collection = config.get('ranger_collection', 'backup_ranger_collection_name')
if backup_ranger_collection in collections:
retry(reload_collection, options, config, solr_urls, backup_ranger_collection, context="[Reload Solr Collections]")
else:
print "Collection ('{0}') does not exist or filtered out. Skipping reload operation.".format(backup_ranger_collection)
if is_atlas_available(config, service_filter):
backup_fulltext_index_name = config.get('atlas_collections', 'backup_fulltext_index_name')
if backup_fulltext_index_name in collections:
retry(reload_collection, options, config, solr_urls, backup_fulltext_index_name, context="[Reload Solr Collections]")
else:
print "Collection ('{0}') does not exist or filtered out. Skipping reload operation.".format(backup_fulltext_index_name)
backup_edge_index_name = config.get('atlas_collections', 'backup_edge_index_name')
if backup_edge_index_name in collections:
retry(reload_collection, options, config, solr_urls, backup_edge_index_name, context="[Reload Solr Collections]")
else:
print "Collection ('{0}') does not exist or filtered out. Skipping reload operation.".format(backup_edge_index_name)
backup_vertex_index_name = config.get('atlas_collections', 'backup_vertex_index_name')
if backup_vertex_index_name in collections:
retry(reload_collection, options, config, solr_urls, backup_vertex_index_name, context="[Reload Solr Collections]")
else:
print "Collection ('{0}') does not exist or filtered out. Skipping reload operation.".format(backup_fulltext_index_name)
def validate_ini_file(options, parser):
if options.ini_file is None:
parser.print_help()
print 'ini-file option is missing'
sys.exit(1)
elif not os.path.isfile(options.ini_file):
parser.print_help()
print 'ini file ({0}) does not exist'.format(options.ini_file)
sys.exit(1)
def rolling_restart_solr(options, accessor, parser, config):
cluster = config.get('ambari_server', 'cluster')
component_hosts = get_solr_hosts(options, accessor, cluster)
interval_secs = options.batch_interval
fault_tolerance = options.batch_fault_tolerance
request_body = create_batch_command("RESTART", component_hosts, cluster, "AMBARI_INFRA_SOLR", "INFRA_SOLR", interval_secs, fault_tolerance, "Rolling restart Infra Solr Instances")
post_json(accessor, BATCH_REQUEST_API_URL.format(cluster), request_body)
print "Rolling Restart Infra Solr Instances request sent. (check Ambari UI about the requests)"
if __name__=="__main__":
parser = optparse.OptionParser("usage: %prog [options]")
parser.add_option("-a", "--action", dest="action", type="string", help="delete-collections | backup | cleanup-znodes | backup-and-cleanup | migrate | restore | rolling-restart-solr")
parser.add_option("-i", "--ini-file", dest="ini_file", type="string", help="Config ini file to parse (required)")
parser.add_option("-f", "--force", dest="force", default=False, action="store_true", help="force index upgrade even if it's the right version")
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="use for verbose logging")
parser.add_option("-s", "--service-filter", dest="service_filter", default=None, type="string", help="run commands only selected services (comma separated: LOGSEARCH,ATLAS,RANGER)")
parser.add_option("-c", "--collection", dest="collection", default=None, type="string", help="selected collection to run an operation")
parser.add_option("--async", dest="async", action="store_true", default=False, help="async Ambari operations (backup | restore | migrate)")
parser.add_option("--index-location", dest="index_location", type="string", help="location of the index backups. add ranger/atlas prefix after the path. required only if no backup path in the ini file")
parser.add_option("--atlas-index-location", dest="atlas_index_location", type="string", help="location of the index backups (for atlas). required only if no backup path in the ini file")
parser.add_option("--ranger-index-location", dest="ranger_index_location", type="string", help="location of the index backups (for ranger). required only if no backup path in the ini file")
parser.add_option("--version", dest="index_version", type="string", default="6.6.2", help="lucene index version for migration (6.6.2 or 7.3.1)")
parser.add_option("--request-tries", dest="request_tries", type="int", help="number of tries for BACKUP/RESTORE status api calls in the request")
parser.add_option("--request-time-interval", dest="request_time_interval", type="int", help="time interval between BACKUP/RESTORE status api calls in the request")
parser.add_option("--request-async", dest="request_async", action="store_true", default=False, help="skip BACKUP/RESTORE status api calls from the command")
parser.add_option("--include-solr-hosts", dest="include_solr_hosts", type="string", help="comma separated list of included solr hosts")
parser.add_option("--exclude-solr-hosts", dest="exclude_solr_hosts", type="string", help="comma separated list of excluded solr hosts")
parser.add_option("--disable-solr-host-check", dest="disable_solr_host_check", action="store_true", default=False, help="Disable to check solr hosts are good for the collection backups")
parser.add_option("--core-filter", dest="core_filter", default=None, type="string", help="core filter for replica folders")
parser.add_option("--skip-cores", dest="skip_cores", default=None, type="string", help="specific cores to skip (comma separated)")
parser.add_option("--skip-generate-restore-host-cores", dest="skip_generate_restore_host_cores", default=False, action="store_true", help="Skip the generation of restore_host_cores.json, just read the file itself, can be useful if command failed at some point.")
parser.add_option("--hdfs-base-path", dest="hdfs_base_path", default=None, type="string", help="hdfs base path where the collections are located (e.g.: /user/infrasolr). Use if both atlas and ranger collections are on hdfs.")
parser.add_option("--ranger-hdfs-base-path", dest="ranger_hdfs_base_path", default=None, type="string", help="hdfs base path where the ranger collection is located (e.g.: /user/infra-solr). Use if only ranger collection is on hdfs.")
parser.add_option("--atlas-hdfs-base-path", dest="atlas_hdfs_base_path", default=None, type="string", help="hdfs base path where the atlas collections are located (e.g.: /user/infra-solr). Use if only atlas collections are on hdfs.")
parser.add_option("--keep-backup", dest="keep_backup", default=False, action="store_true", help="If it is turned on, Snapshot Solr data will not be deleted from the filesystem during restore.")
parser.add_option("--batch-interval", dest="batch_interval", type="int", default=60 ,help="batch time interval (seconds) between requests (for restarting INFRA SOLR, default: 60)")
parser.add_option("--batch-fault-tolerance", dest="batch_fault_tolerance", type="int", default=0 ,help="fault tolerance of tasks for batch request (for restarting INFRA SOLR, default: 0)")
parser.add_option("--shared-drive", dest="shared_drive", default=False, action="store_true", help="Use if the backup location is shared between hosts. (override config from config ini file)")
(options, args) = parser.parse_args()
set_log_level(options.verbose)
validate_ini_file(options, parser)
config = ConfigParser.RawConfigParser()
config.read(options.ini_file)
service_filter=options.service_filter.upper().split(',') if options.service_filter is not None else ['LOGSEARCH', 'ATLAS', 'RANGER']
if options.action is None:
parser.print_help()
print 'action option is missing'
sys.exit(1)
else:
if config.has_section('ambari_server'):
host = config.get('ambari_server', 'host')
port = config.get('ambari_server', 'port')
protocol = config.get('ambari_server', 'protocol')
username = config.get('ambari_server', 'username')
password = config.get('ambari_server', 'password')
accessor = api_accessor(host, username, password, protocol, port)
if options.action.lower() == 'backup':
backup_ranger_configs(options, config, service_filter)
backup_collections(options, accessor, parser, config, service_filter)
elif options.action.lower() == 'delete-collections':
delete_collections(options, config, service_filter)
delete_znodes(options, config, service_filter)
upgrade_ranger_schema(options, config, service_filter)
elif options.action.lower() == 'cleanup-znodes':
delete_znodes(options, config, service_filter)
upgrade_ranger_schema(options, config, service_filter)
elif options.action.lower() == 'backup-and-cleanup':
backup_ranger_configs(options, config, service_filter)
backup_collections(options, accessor, parser, config, service_filter)
delete_collections(options, config, service_filter)
delete_znodes(options, config, service_filter)
upgrade_ranger_schema(options, config, service_filter)
elif options.action.lower() == 'restore':
upgrade_ranger_solrconfig_xml(options, config, service_filter)
create_backup_collections(options, accessor, parser, config, service_filter)
restore_collections(options, accessor, parser, config, service_filter)
reload_collections(options, accessor, parser, config, service_filter)
elif options.action.lower() == 'migrate':
migrate_snapshots(options, accessor, parser, config, service_filter)
elif options.action.lower() == 'upgrade-solr-clients':
upgrade_solr_clients(options, accessor, parser, config)
elif options.action.lower() == 'upgrade-solr-instances':
upgrade_solr_instances(options, accessor, parser, config)
elif options.action.lower() == 'upgrade-logsearch-portal':
upgrade_logsearch_portal(options, accessor, parser, config)
elif options.action.lower() == 'upgrade-logfeeders':
upgrade_logfeeders(options, accessor, parser, config)
elif options.action.lower() == 'stop-logsearch':
service_components_command(options, accessor, parser, config, LOGSEARCH_SERVICE_NAME, LOGSEARCH_SERVER_COMPONENT_NAME, "STOP", "Stop")
service_components_command(options, accessor, parser, config, LOGSEARCH_SERVICE_NAME, LOGSEARCH_LOGFEEDER_COMPONENT_NAME, "STOP", "Stop")
elif options.action.lower() == 'restart-logsearch':
service_components_command(options, accessor, parser, config, LOGSEARCH_SERVICE_NAME, LOGSEARCH_SERVER_COMPONENT_NAME, "RESTART", "Restart")
service_components_command(options, accessor, parser, config, LOGSEARCH_SERVICE_NAME, LOGSEARCH_LOGFEEDER_COMPONENT_NAME, "RESTART", "Restart")
elif options.action.lower() == 'rolling-restart-solr':
rolling_restart_solr(options, accessor, parser, config)
else:
parser.print_help()
print 'action option is invalid (available actions: delete-collections | backup | cleanup-znodes | backup-and-cleanup | migrate | restore | rolling-restart-solr)'
sys.exit(1)
print "Migration helper command {0}FINISHED{1}".format(colors.OKGREEN, colors.ENDC)