blob: e9119c9282413af2e715d4bdbad79f861d4bdeaa [file] [log] [blame]
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import fnmatch
import json
import os
import params
import socket
import time
import traceback
from resource_management.core.shell import call
from resource_management.core.logger import Logger
from resource_management.core.resources.system import Execute
from resource_management.libraries.functions.default import default
from resource_management.libraries.functions.format import format
from resource_management.libraries.resources.hdfs_resource import HdfsResource
index_helper_script = '/usr/lib/ambari-infra-solr-client/solrIndexHelper.sh'
# folder location which contains the snapshot/core folder
index_location = default("/commandParams/solr_index_location", None)
# index version (available index versions: 6.6.2 and 7.3.1, second one is used by default)
index_version = default("/commandParams/solr_index_version", '6.6.2')
# if this flag is false, skip upgrade if the version is proper, you can force to re-run the tool with setting the flag to true
force = default("/commandParams/solr_index_upgrade_force", False)
# if this flag is true, then it will generate specific folder for every backup with a hostname suffix
# where "." chars replaced with "_"(e.g.: /my/path/backup_locationc7301_ambari_apache_org), that can be useful if different
# hosts share the same filesystem where the backup is stored.
shared_fs = default("/commandParams/solr_shared_fs", False)
# set verbose log for index migration (default: true)
debug = default("/commandParams/solr_migrate_debug", True)
# used for filtering folders in backup location (like: if the filter is ranger, that will include snapshot.ranger folder but won't include snapshot.hadoop_logs)
core_filter = default("/commandParams/solr_core_filter", None)
# used to filer out comma separated cores - can be useful if backup/resotre failed in some point
skip_cores = default("/commandParams/solr_skip_cores", "").split(",")
# delete write.lock file at the start of lucene index migration process
delete_lock_on_start = default("/commandParams/solr_delete_lock_on_start", True)
# if it used, then core filter will be used with snapshot.* folder pattern
backup_mode = default("/commandParams/solr_migrate_backup", True)
log_output = default("/commandParams/solr_migrate_logoutput", True)
# Solr colleection name (used for DELETE/BACKUP/RESTORE)
collection = default("/commandParams/solr_collection", "ranger_audits")
# it will be used in the snapshot name, if it's ranger, the snapshot folder will be snapshot.ranger
backup_name = default("/commandParams/solr_backup_name", "ranger")
request_async = default("/commandParams/solr_request_async", False)
request_tries = int(default("/commandParams/solr_request_tries", 30))
request_time_interval = int(default("/commandParams/solr_request_time_interval", 5))
check_hosts_default = True if params.security_enabled else False
check_hosts = default("/commandParams/solr_check_hosts", check_hosts_default)
solr_protocol = "https" if params.infra_solr_ssl_enabled else "http"
solr_port = format("{params.infra_solr_port}")
solr_base_url = format("{solr_protocol}://{params.hostname}:{params.infra_solr_port}/solr")
solr_datadir = params.infra_solr_datadir
solr_keep_backup=default("/commandParams/solr_keep_backup", False)
solr_num_shards = int(default("/commandParams/solr_shards", "0"))
solr_hdfs_path=default("/commandParams/solr_hdfs_path", None)
solr_backup_host_cores_map = json.loads(default("/commandParams/solr_backup_host_cores_map", "{}"))
solr_backup_core_host_map = json.loads(default("/commandParams/solr_backup_core_host_map", "{}"))
solr_restore_host_cores_map = json.loads(default("/commandParams/solr_restore_host_cores_map", "{}"))
solr_restore_core_host_map = json.loads(default("/commandParams/solr_restore_core_host_map", "{}"))
solr_restore_core_data = json.loads(default("/commandParams/solr_restore_core_data", "{}"))
solr_restore_config_set = default("/commandParams/solr_restore_config_set", None)
keytab = None
principal = None
if params.security_enabled:
keytab = params.infra_solr_kerberos_keytab
principal = params.infra_solr_kerberos_principal
if solr_hdfs_path:
import functools
from resource_management.libraries.functions import conf_select
from resource_management.libraries.functions import stack_select
from resource_management.libraries.functions import get_klist_path
from resource_management.libraries.functions import get_kinit_path
from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources
klist_path_local = get_klist_path(default('/configurations/kerberos-env/executable_search_paths', None))
kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None))
# hadoop default parameters
hdfs_user = params.config['configurations']['hadoop-env']['hdfs_user']
hadoop_bin = stack_select.get_hadoop_dir("sbin")
hadoop_bin_dir = stack_select.get_hadoop_dir("bin")
hadoop_conf_dir = conf_select.get_hadoop_conf_dir()
hadoop_conf_secure_dir = os.path.join(hadoop_conf_dir, "secure")
hadoop_lib_home = stack_select.get_hadoop_dir("lib")
hdfs_principal_name = default('/configurations/hadoop-env/hdfs_principal_name', None)
hdfs_user_keytab = params.config['configurations']['hadoop-env']['hdfs_user_keytab']
dfs_type = default("/clusterLevelParams/dfs_type", "")
hdfs_site = params.config['configurations']['hdfs-site']
default_fs = params.config['configurations']['core-site']['fs.defaultFS']
#create partial functions with common arguments for every HdfsResource call
#to create/delete/copyfromlocal hdfs directories/files we need to call params.HdfsResource in code
HdfsResource = functools.partial(
HdfsResource,
user=params.infra_solr_user,
hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore",
security_enabled = params.security_enabled,
keytab = keytab,
kinit_path_local = kinit_path_local,
hadoop_bin_dir = hadoop_bin_dir,
hadoop_conf_dir = hadoop_conf_dir,
principal_name = principal,
hdfs_site = hdfs_site,
default_fs = default_fs,
immutable_paths = get_not_managed_resources(),
dfs_type = dfs_type
)
hostname_suffix = params.hostname.replace(".", "_")
if shared_fs:
index_location = format("{index_location}_{hostname_suffix}")
def get_files_by_pattern(directory, pattern):
for root, dirs, files in os.walk(directory):
for basename in files:
try:
matched = pattern.match(basename)
except AttributeError:
matched = fnmatch.fnmatch(basename, pattern)
if matched:
yield os.path.join(root, basename)
def create_solr_api_request_command(request_path, output=None, override_solr_base_url=None):
solr_url = format("{solr_base_url}/{request_path}") if override_solr_base_url is None else format("{override_solr_base_url}/{request_path}")
grep_cmd = " | grep 'solr_rs_status: 200'"
api_cmd = format("kinit -kt {keytab} {principal} && curl -w'solr_rs_status: %{{http_code}}' -k --negotiate -u : '{solr_url}'") \
if params.security_enabled else format("curl -w'solr_rs_status: %{{http_code}}' -k '{solr_url}'")
if output is not None:
api_cmd+=format(" -o {output}")
api_cmd+=grep_cmd
return api_cmd
def snapshot_status_check(request_cmd, json_output, snapshot_name, backup=True, log_output=True, tries=30, time_interval=5):
"""
Check BACKUP/RESTORE status until the response status will be successful or failed.
:param request_cmd: backup or restore api path
:param json_output: json file which will store the response output
:param snapshot_name: snapshot name, it will be used to check the proper status in the status response (backup: <snapshot_name>, restore: snapshot.<snapshot_name>)
:param backup: this flag is true if the check is against backup, otherwise it will be restore
:param log_output: print the output of the downloaded json file (backup/restore response)
:param tries: number of tries of the requests - it stops after the response status is successful for backup/restore
:param time_interval: time to wait in seconds between retries
"""
failed = True
num_tries = 0
for i in range(tries):
try:
num_tries+=1
if (num_tries > 1):
Logger.info(format("Number of tries: {num_tries} ..."))
Execute(request_cmd, user=params.infra_solr_user)
with open(json_output) as json_file:
json_data = json.load(json_file)
if backup:
details = json_data['details']
if 'backup' in details:
backup_list = details['backup']
if log_output:
Logger.info(str(backup_list))
if type(backup_list) == type(list()): # support map and list format as well
backup_data = dict(backup_list[i:i+2] for i in range(0, len(backup_list), 2))
else:
backup_data = backup_list
if (not 'snapshotName' in backup_data) or backup_data['snapshotName'] != snapshot_name:
snapshot = backup_data['snapshotName']
Logger.info(format("Snapshot name: {snapshot}, wait until {snapshot_name} will be available."))
time.sleep(time_interval)
continue
if backup_data['status'] == 'success':
Logger.info("Backup command status: success.")
failed = False
elif backup_data['status'] == 'failed':
Logger.info("Backup command status: failed.")
else:
Logger.info(format("Backup command is in progress... Sleep for {time_interval} seconds."))
time.sleep(time_interval)
continue
else:
Logger.info("Backup data is not found yet in details JSON response...")
time.sleep(time_interval)
continue
else:
if 'restorestatus' in json_data:
restorestatus_data = json_data['restorestatus']
if log_output:
Logger.info(str(restorestatus_data))
if (not 'snapshotName' in restorestatus_data) or restorestatus_data['snapshotName'] != format("snapshot.{snapshot_name}"):
snapshot = restorestatus_data['snapshotName']
Logger.info(format("Snapshot name: {snapshot}, wait until snapshot.{snapshot_name} will be available."))
time.sleep(time_interval)
continue
if restorestatus_data['status'] == 'success':
Logger.info("Restore command successfully finished.")
failed = False
elif restorestatus_data['status'] == 'failed':
Logger.info("Restore command failed.")
else:
Logger.info(format("Restore command is in progress... Sleep for {time_interval} seconds."))
time.sleep(time_interval)
continue
else:
Logger.info("Restore status data is not found yet in details JSON response...")
time.sleep(time_interval)
continue
except Exception:
traceback.print_exc()
time.sleep(time_interval)
continue
break
if failed:
raise Exception("Status Command failed.")
else:
Logger.info("Status command finished successfully.")
def __get_domain_name(url):
spltAr = url.split("://")
i = (0,1)[len(spltAr) > 1]
dm = spltAr[i].split('/')[0].split(':')[0].lower()
return dm
def write_core_file(core, core_data):
core_json_location = format("{index_location}/{core}.json")
with open(core_json_location, 'w') as outfile:
json.dump(core_data, outfile)
def create_core_pairs(original_cores, new_cores):
"""
Create core pairss from the original and new cores (backups -> restored ones), use alphabetic order
"""
core_pairs_data=[]
if len(new_cores) < len(original_cores):
raise Exception("Old collection core size is: " + str(len(new_cores)) +
". You will need at least: " + str(len(original_cores)))
else:
for index, core_data in enumerate(original_cores):
value={}
value['src_core']=core_data[0]
value['src_host']=core_data[1]
value['target_core']=new_cores[index][0]
value['target_host']=new_cores[index][1]
core_pairs_data.append(value)
with open(format("{index_location}/restore_core_pairs.json"), 'w') as outfile:
json.dump(core_pairs_data, outfile)
return core_pairs_data
def sort_core_host_pairs(host_core_map):
"""
Sort host core map by key
"""
core_host_pairs=[]
for key in sorted(host_core_map):
core_host_pairs.append((key, host_core_map[key]))
return core_host_pairs
def is_ip(addr):
try:
socket.inet_aton(addr)
return True
except socket.error:
return False
def resolve_ip_to_hostname(ip):
try:
host_name = socket.gethostbyaddr(ip)[0].lower()
Logger.info(format("Resolved {ip} to {host_name}"))
fqdn_name = socket.getaddrinfo(host_name, 0, 0, 0, 0, socket.AI_CANONNAME)[0][3].lower()
return host_name if host_name == fqdn_name else fqdn_name
except socket.error:
pass
return ip
def create_command(command):
"""
Create hdfs command. Append kinit to the command if required.
"""
kinit_cmd = "{0} -kt {1} {2};".format(kinit_path_local, params.infra_solr_kerberos_keytab, params.infra_solr_kerberos_principal) if params.security_enabled else ""
return kinit_cmd + command
def execute_commad(command):
"""
Run hdfs command by infra-solr user
"""
return call(command, user=params.infra_solr_user, timeout=300)
def move_hdfs_folder(source_dir, target_dir):
cmd=create_command(format('hdfs dfs -mv {source_dir} {target_dir}'))
returncode, stdout = execute_commad(cmd)
if returncode:
raise Exception("Unable to move HDFS dir '{0}' to '{1}' (return code: {2})".format(source_dir, target_dir, str(returncode)))
return stdout.strip()
def check_hdfs_folder_exists(hdfs_dir):
"""
Check that hdfs folder exists or not
"""
cmd=create_command(format("hdfs dfs -ls {hdfs_dir}"))
returncode, stdout = execute_commad(cmd)
if returncode:
return False
return True
def check_folder_exists(dir):
"""
Check that folder exists or not
"""
returncode, stdout = call(format("test -d {dir}"), user=params.infra_solr_user, timeout=300)
if returncode:
return False
return True