blob: c3a0a1ca0985926d0409031c13639f473cc89b6b [file] [log] [blame]
#!/usr/bin/python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import gzip
import hashlib
import json
import logging
import optparse
import os
import shutil
import signal
import sys
import tarfile
import time
from datetime import datetime, timedelta
from subprocess import call, Popen, PIPE
from urllib.parse import quote, unquote
from zipfile import ZipFile, ZIP_DEFLATED
VERSION = "1.0"
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
verbose = False
def parse_arguments():
parser = optparse.OptionParser("usage: %prog [options]", version="Solr Data Manager {0}".format(VERSION))
parser.add_option("-m", "--mode", dest="mode", type="string", help="archive | delete | save")
parser.add_option("-s", "--solr-url", dest="solr_url", type="string", help="the url of the solr server including the port and protocol")
parser.add_option("-c", "--collection", dest="collection", type="string", help="the name of the solr collection")
parser.add_option("-f", "--filter-field", dest="filter_field", type="string", help="the name of the field to filter on")
parser.add_option("-r", "--read-block-size", dest="read_block_size", type="int", help="block size to use for reading from solr",
default=1000)
parser.add_option("-w", "--write-block-size", dest="write_block_size", type="int", help="number of records in the output files",
default=100000)
parser.add_option("-i", "--id-field", dest="id_field", type="string", help="the name of the id field", default="id")
end_group = optparse.OptionGroup(parser, "specifying the end of the range")
end_group.add_option("-e", "--end", dest="end", type="string", help="end of the range")
end_group.add_option("-d", "--days", dest="days", type="int", help="number of days to keep")
parser.add_option_group(end_group)
parser.add_option("-o", "--date-format", dest="date_format", type="string", help="the date format to use for --days",
default="%Y-%m-%dT%H:%M:%S.%fZ")
parser.add_option("-q", "--additional-filter", dest="additional_filter", type="string", help="additional solr filter")
parser.add_option("-j", "--name", dest="name", type="string", help="name included in result files")
parser.add_option("-g", "--ignore-unfinished-uploading", dest="ignore_unfinished_uploading", action="store_true", default=False)
parser.add_option("--json-file", dest="json_file", help="create a json file instead of line delimited json", action="store_true", default=False)
parser.add_option("-z", "--compression", dest="compression", help="none | tar.gz | tar.bz2 | zip | gz", default="gz")
parser.add_option("-k", "--solr-keytab", dest="solr_keytab", type="string", help="the keytab for a kerberized solr")
parser.add_option("-n", "--solr-principal", dest="solr_principal", type="string", help="the principal for a kerberized solr")
parser.add_option("-a", "--hdfs-keytab", dest="hdfs_keytab", type="string", help="the keytab for a kerberized hdfs")
parser.add_option("-l", "--hdfs-principal", dest="hdfs_principal", type="string", help="the principal for a kerberized hdfs")
parser.add_option("-u", "--hdfs-user", dest="hdfs_user", type="string", help="the user for accessing hdfs")
parser.add_option("-p", "--hdfs-path", dest="hdfs_path", type="string", help="the hdfs path to upload to")
parser.add_option("-t", "--key-file-path", dest="key_file_path", type="string", help="the file that contains S3 <accessKey>,<secretKey>")
parser.add_option("-b", "--bucket", dest="bucket", type="string", help="the bucket name for S3 upload")
parser.add_option("-y", "--key-prefix", dest="key_prefix", type="string", help="the key prefix for S3 upload")
parser.add_option("-x", "--local-path", dest="local_path", type="string", help="the local path to save the files to")
parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False)
parser.add_option("--solr-output-collection", dest="solr_output_collection", help="target output solr collection for archive", type="string", default=None)
parser.add_option("--solr-output-url", dest="solr_output_url", default=None, type="string", help="the url of the output solr server including the port and protocol")
parser.add_option("--exclude-fields", dest="exclude_fields", help="Comma separated list of excluded fields from json response", type="string", default=None)
parser.add_option("--skip-date-usage", dest="skip_date_usage", action="store_true", default=False, help="datestamp field won't be used for queries (sort based on id field)")
(options, args) = parser.parse_args()
for r in ["mode", "solr_url", "collection"]:
if options.__dict__[r] is None:
print("argument '{0}' is mandatory".format(r))
parser.print_help()
sys.exit()
if not options.skip_date_usage:
if options.filter_field is None:
print("argument 'filter_field' is mandatory")
parser.print_help()
sys.exit()
mode_values = ["archive", "delete", "save"]
if options.mode not in mode_values:
print("mode must be one of {0}".format(" | ".join(mode_values)))
parser.print_help()
sys.exit()
if options.mode == "delete":
for r in ["name", "hdfs_keytab", "hdfs_principal", "hdfs_user", "hdfs_path", "key_file_path", "bucket", "key_prefix", "local_path"]:
if options.__dict__[r] is not None:
print("argument '{0}' may not be specified in delete mode".format(r))
parser.print_help()
sys.exit()
if not options.skip_date_usage and options.__dict__["end"] is None and options.__dict__["days"] is None or \
options.__dict__["end"] is not None and options.__dict__["days"] is not None:
print("exactly one of 'end' or 'days' must be specfied")
parser.print_help()
sys.exit()
is_any_solr_kerberos_property = options.__dict__["solr_keytab"] is not None or options.__dict__["solr_principal"] is not None
is_all_solr_kerberos_property = options.__dict__["solr_keytab"] is not None and options.__dict__["solr_principal"] is not None
if is_any_solr_kerberos_property and not is_all_solr_kerberos_property:
print("either both 'solr-keytab' and 'solr-principal' must be specfied, or neither of them")
parser.print_help()
sys.exit()
compression_values = ["none", "tar.gz", "tar.bz2", "zip", "gz"]
if options.compression not in compression_values:
print("compression must be one of {0}".format(" | ".join(compression_values)))
parser.print_help()
sys.exit()
is_any_solr_output_property = options.__dict__["solr_output_collection"] is not None
is_any_hdfs_kerberos_property = options.__dict__["hdfs_keytab"] is not None or options.__dict__["hdfs_principal"] is not None
is_all_hdfs_kerberos_property = options.__dict__["hdfs_keytab"] is not None and options.__dict__["hdfs_principal"] is not None
if is_any_hdfs_kerberos_property and not is_all_hdfs_kerberos_property:
print("either both 'hdfs_keytab' and 'hdfs_principal' must be specfied, or neither of them")
parser.print_help()
sys.exit()
is_any_hdfs_property = options.__dict__["hdfs_user"] is not None or options.__dict__["hdfs_path"] is not None
is_all_hdfs_property = options.__dict__["hdfs_user"] is not None and options.__dict__["hdfs_path"] is not None
if is_any_hdfs_property and not is_all_hdfs_property:
print("either both 'hdfs_user' and 'hdfs_path' must be specfied, or neither of them")
parser.print_help()
sys.exit()
is_any_s3_property = options.__dict__["key_file_path"] is not None or options.__dict__["bucket"] is not None or \
options.__dict__["key_prefix"] is not None
is_all_s3_property = options.__dict__["key_file_path"] is not None and options.__dict__["bucket"] is not None and \
options.__dict__["key_prefix"] is not None
if is_any_s3_property and not is_all_s3_property:
print("either all the S3 arguments ('key_file_path', 'bucket', 'key_prefix') must be specfied, or none of them")
parser.print_help()
sys.exit()
if options.mode in ["archive", "save"]:
count = (1 if is_any_solr_output_property else 0) + (1 if is_any_hdfs_property else 0) + \
(1 if is_any_s3_property else 0) + (1 if options.__dict__["local_path"] is not None else 0)
if count != 1:
print("exactly one of the HDFS arguments ('hdfs_user', 'hdfs_path') or the S3 arguments ('key_file_path', 'bucket', 'key_prefix') or the solr arguments ('solr_output_collection') or the 'local_path' argument must be specified")
parser.print_help()
sys.exit()
if options.__dict__["hdfs_keytab"] is not None and options.__dict__["hdfs_user"] is None:
print("HDFS kerberos keytab and principal may only be specified if the upload target is HDFS")
parser.print_help()
sys.exit()
print(("You are running Solr Data Manager {0} with arguments:".format(VERSION)))
print((" mode: " + options.mode))
print((" solr-url: " + options.solr_url))
print((" collection: " + options.collection))
if options.__dict__["filter_field"] is not None:
print((" filter-field: " + options.filter_field))
if options.mode in ["archive", "save"]:
print((" id-field: " + options.id_field))
if options.__dict__["exclude_fields"] is not None:
print((" exclude fields: " + options.exclude_fields))
if options.__dict__["end"] is not None:
print((" end: " + options.end))
else:
print((" days: " + str(options.days)))
print((" date-format: " + options.date_format))
if options.__dict__["additional_filter"] is not None:
print((" additional-filter: " + str(options.additional_filter)))
if options.__dict__["name"] is not None:
print((" name: " + str(options.name)))
if options.mode in ["archive", "save"]:
print((" read-block-size: " + str(options.read_block_size)))
print((" write-block-size: " + str(options.write_block_size)))
print((" ignore-unfinished-uploading: " + str(options.ignore_unfinished_uploading)))
if (options.__dict__["solr_keytab"] is not None):
print((" solr-keytab: " + options.solr_keytab))
print((" solr-principal: " + options.solr_principal))
if options.mode in ["archive", "save"]:
print((" output: " + ("json" if options.json_file else "line-delimited-json")))
print((" compression: " + options.compression))
if options.__dict__["solr_output_collection"] is not None:
print((" solr output collection: " + options.solr_output_collection))
if options.__dict__["solr_output_url"] is not None:
print((" solr output url: " + options.solr_output_collection))
if (options.__dict__["hdfs_keytab"] is not None):
print((" hdfs-keytab: " + options.hdfs_keytab))
print((" hdfs-principal: " + options.hdfs_principal))
if (options.__dict__["hdfs_user"] is not None):
print((" hdfs-user: " + options.hdfs_user))
print((" hdfs-path: " + options.hdfs_path))
if (options.__dict__["key_file_path"] is not None):
print((" key-file-path: " + options.key_file_path))
print((" bucket: " + options.bucket))
print((" key-prefix: " + options.key_prefix))
if (options.__dict__["local_path"] is not None):
print((" local-path: " + options.local_path))
print((" skip-date-usage: " + str(options.skip_date_usage)))
print((" verbose: " + str(options.verbose)))
print()
if options.__dict__["additional_filter"] is not None and options.__dict__["name"] is None:
go = False
while not go:
sys.stdout.write("It is recommended to set --name in case of any additional filter is set.\n")
sys.stdout.write("Are you sure that you want to proceed without a name (yes/no)? ")
choice = input().lower()
if choice in ['yes', 'ye', 'y']:
go = True
elif choice in ['no', 'n']:
sys.exit()
return options
def set_log_level(disable=False):
if verbose:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
if disable:
logger.removeHandler(handler)
def get_end(options):
if options.end:
return options.end
else:
d = datetime.now() - timedelta(days=options.days)
end = d.strftime(options.date_format)
logger.info("The end date will be: %s", end)
return end
def delete(solr_url, collection, filter_field, end, solr_keytab, solr_principal, skip_date_usage):
logger.info("Deleting data where %s <= %s", filter_field, end)
solr_kinit_command = None
if solr_keytab:
solr_kinit_command = "kinit -kt {0} {1}".format(solr_keytab, solr_principal)
curl_prefix = "curl -k --negotiate -u : "
else:
curl_prefix = "curl -k"
if skip_date_usage:
delete_query = "*:*"
else:
delete_query = "{0}:[* TO \"{1}\"]".format(filter_field, end)
delete_command = "{0}/{1}/update?commit=true&wt=json".format(solr_url, collection)
delete_data = "<delete><query>{0}</query></delete>".format(delete_query)
query_solr(solr_kinit_command, delete_command, "{0} -H Content-Type:text/xml {1}".format(curl_prefix, delete_command), "Deleting", delete_data)
def save(mode, solr_url, collection, filter_field, id_field, range_end, read_block_size, write_block_size,
ignore_unfinished_uploading, additional_filter, name, solr_keytab, solr_principal, json_file,
compression, hdfs_keytab, hdfs_principal, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path,
solr_output_collection, solr_output_url, exclude_fields, skip_date_usage):
solr_kinit_command = None
if solr_keytab:
solr_kinit_command = "kinit -kt {0} {1}".format(solr_keytab, solr_principal)
curl_prefix = "curl -k --negotiate -u : "
else:
curl_prefix = "curl -k"
hdfs_kinit_command = None
if hdfs_keytab:
hdfs_kinit_command = "sudo -u {0} kinit -kt {1} {2}".format(hdfs_user, hdfs_keytab, hdfs_principal)
if hdfs_path:
ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path)
working_dir = get_working_dir(solr_url, collection)
if mode == "archive":
handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir,
ignore_unfinished_uploading, skip_date_usage)
save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file, compression,
hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection, solr_output_url,
exclude_fields, skip_date_usage)
def ensure_hdfs_path(hdfs_kinit_command, hdfs_user, hdfs_path):
if hdfs_kinit_command:
run_kinit(hdfs_kinit_command, "HDFS")
try:
hdfs_create_dir_command = "sudo -u {0} hadoop fs -mkdir -p {1}".format(hdfs_user, hdfs_path)
logger.debug("Ensuring that the HDFS path %s exists:\n%s", hdfs_path, hdfs_create_dir_command)
result = call(hdfs_create_dir_command.split())
except Exception as e:
print()
logger.warn("Could not execute hdfs ensure dir command:\n%s", hdfs_create_dir_command)
logger.warn(str(e))
sys.exit()
if result != 0:
print()
logger.warn("Could not ensure HDFS dir command:\n%s", hdfs_create_dir_command)
logger.warn(str(err))
sys.exit()
def get_working_dir(solr_url, collection):
md5 = hashlib.md5()
md5.update(solr_url)
md5.update(collection)
hash = md5.hexdigest()
working_dir = "/tmp/solrDataManager/{0}".format(hash)
if not(os.path.isdir(working_dir)):
os.makedirs(working_dir)
logger.debug("Working directory is %s", working_dir)
return working_dir
def handle_unfinished_uploading(solr_kinit_command, hdfs_kinit_command, curl_prefix, working_dir, ignore_unfinished_uploading, skip_date_usage):
command_json_path = "{0}/command.json".format(working_dir)
if os.path.isfile(command_json_path):
with open(command_json_path) as command_file:
command = json.load(command_file)
if "upload" in list(command.keys()) and ignore_unfinished_uploading:
logger.info("Ignoring unfinished uploading left by previous run")
os.remove(command_json_path)
return
if "upload" in list(command.keys()):
logger.info("Previous run has left unfinished uploading")
logger.info("You may try to run the program with '-g' or '--ignore-unfinished-uploading' to ignore it if it keeps on failing")
if command["upload"]["type"] == "solr":
upload_file_to_solr(solr_kinit_command, curl_prefix, command["upload"]["command"], command["upload"]["upload_file_path"],
command["upload"]["solr_output_collection"])
elif command["upload"]["type"] == "hdfs":
upload_file_hdfs(hdfs_kinit_command, command["upload"]["command"], command["upload"]["upload_file_path"],
command["upload"]["hdfs_path"], command["upload"]["hdfs_user"])
elif command["upload"]["type"] == "s3":
upload_file_s3(command["upload"]["command"], command["upload"]["upload_file_path"], command["upload"]["bucket"],
command["upload"]["key_prefix"])
elif command["upload"]["type"] == "local":
upload_file_local(command["upload"]["command"], command["upload"]["upload_file_path"], command["upload"]["local_path"])
else:
logger.warn("Unknown upload type: %s", command["upload"]["type"])
sys.exit()
if "delete" in list(command.keys()):
delete_data(solr_kinit_command, curl_prefix, command["delete"]["command"], command["delete"]["collection"],
command["delete"]["filter_field"], command["delete"]["id_field"], command["delete"]["prev_lot_end_value"],
command["delete"]["prev_lot_end_id"], skip_date_usage)
os.remove(command_json_path)
def save_data(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field, id_field,
range_end, read_block_size, write_block_size, working_dir, additional_filter, name, json_file,
compression, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix, local_path, solr_output_collection,
solr_output_url, exclude_fields, skip_date_usage):
logger.info("Starting to save data")
tmp_file_path = "{0}/tmp.json".format(working_dir)
prev_lot_end_value = None
prev_lot_end_id = None
if skip_date_usage:
if additional_filter:
q = quote("*:*+AND+{0}".format(additional_filter), safe="/+\"*")
else:
q = quote("*:*", safe="/+\"*")
sort = quote("{0}+asc".format(id_field), safe="/+\"*")
else:
if additional_filter:
q = quote("{0}+AND+{1}:[*+TO+\"{2}\"]".format(additional_filter, filter_field, range_end), safe="/+\"*")
else:
q = quote("{0}:[*+TO+\"{1}\"]".format(filter_field, range_end), safe="/+\"*")
sort = quote("{0}+asc,{1}+asc".format(filter_field, id_field), safe="/+\"*")
solr_query_url_prefix = "{0}/{1}/select?q={2}&sort={3}&rows={4}&wt=json".format(solr_url, collection, q, sort, read_block_size)
exclude_field_list = exclude_fields.split(',') if exclude_fields else None
if solr_output_collection and not exclude_field_list:
exclude_field_list = ['_version_']
done = False
total_records = 0
while not done:
results = create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field,
id_field, range_end, write_block_size, prev_lot_end_value, prev_lot_end_id, json_file,
exclude_field_list, skip_date_usage)
done = results[0]
records = results[1]
prev_lot_end_value = results[2]
prev_lot_end_id = results[3]
if records > 0:
upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user,
hdfs_path, key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection,
solr_output_url, skip_date_usage)
total_records += records
logger.info("A total of %d records are saved", total_records)
def create_block(tmp_file_path, solr_kinit_command, curl_prefix, solr_query_url_prefix, filter_field, id_field, range_end,
write_block_size, prev_lot_end_value, prev_lot_end_id, json_file, exclude_field_list, skip_date_usage):
if os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
tmp_file = open(tmp_file_path, 'w')
logger.debug("Created tmp file %s", tmp_file_path)
init_file(tmp_file, json_file)
records = 0
done = False
while records < write_block_size:
if skip_date_usage:
if prev_lot_end_id:
fq = "({0}:{{\"{1}\"+TO+*])".format(id_field, prev_lot_end_id)
url = "{0}&fq={1}".format(solr_query_url_prefix, quote(fq, safe="/+\"*"))
else:
url = "{0}".format(solr_query_url_prefix)
else:
if prev_lot_end_value:
fq_prev_end_rest = "({0}:\"{1}\"+AND+{2}:{{\"{3}\"+TO+*])".format(filter_field, prev_lot_end_value, id_field,
prev_lot_end_id)
fq_new = "{0}:{{\"{1}\"+TO+\"{2}\"]".format(filter_field, prev_lot_end_value, range_end)
fq = "{0}+OR+{1}".format(fq_prev_end_rest, fq_new)
else:
fq = "{0}:[*+TO+\"{1}\"]".format(filter_field, range_end)
url = "{0}&fq={1}".format(solr_query_url_prefix, quote(fq, safe="/+\"*"))
curl_command = "{0} {1}".format(curl_prefix, url)
rsp = query_solr(solr_kinit_command, url, curl_command, "Obtaining")
if rsp['response']['numFound'] == 0:
done = True
break
for doc in rsp['response']['docs']:
last_doc = doc
add_line(tmp_file, doc, json_file, records, exclude_field_list)
records += 1
if records == write_block_size:
break
prev_lot_end_value = last_doc[filter_field] if not skip_date_usage else prev_lot_end_value
prev_lot_end_id = last_doc[id_field]
sys.stdout.write("\r{0} records are written".format(records))
sys.stdout.flush()
if verbose and records < write_block_size:
print()
logger.debug("Collecting next lot of data")
finish_file(tmp_file, json_file)
sys.stdout.write("\n")
logger.debug("Finished data collection")
return [done, records, prev_lot_end_value, prev_lot_end_id]
def init_file(tmp_file, json_file):
if json_file:
tmp_file.write("{\n")
def add_line(tmp_file, doc, json_file, records, exclude_fields):
if records > 0:
if json_file:
tmp_file.write(",\n")
else:
tmp_file.write("\n")
if exclude_fields:
for exclude_field in exclude_fields:
if doc and exclude_field in doc:
del doc[exclude_field]
tmp_file.write(json.dumps(doc))
def finish_file(tmp_file, json_file):
if json_file:
tmp_file.write("\n}")
def upload_block(mode, solr_kinit_command, hdfs_kinit_command, curl_prefix, solr_url, collection, filter_field,
id_field, working_dir, tmp_file_path, name, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
key_file_path, bucket, key_prefix, local_path, compression, solr_output_collection, solr_output_url,
skip_date_usage):
if name:
file_name = "{0}_-_{1}_-_{2}_-_{3}".format(collection, name, prev_lot_end_value, prev_lot_end_id).replace(':', '_')
else:
file_name = "{0}_-_{1}_-_{2}".format(collection, prev_lot_end_value, prev_lot_end_id).replace(':', '_')
upload_file_path = compress_file(working_dir, tmp_file_path, file_name, compression)
upload_command = create_command_file(mode, True, working_dir, upload_file_path, solr_url, collection, filter_field,
id_field, prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path,
key_file_path, bucket, key_prefix, local_path, solr_output_collection, solr_output_url,
skip_date_usage)
if solr_output_collection:
upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_file_path, solr_output_collection)
elif hdfs_user:
upload_file_hdfs(hdfs_kinit_command, upload_command, upload_file_path, hdfs_path, hdfs_user)
elif key_file_path:
upload_file_s3(upload_command, upload_file_path, bucket, key_prefix)
elif local_path:
upload_file_local(upload_command, upload_file_path, local_path)
else:
logger.warn("Unknown upload destination")
sys.exit()
delete_command = create_command_file(mode, False, working_dir, upload_file_path, solr_url, collection, filter_field,
id_field, prev_lot_end_value, prev_lot_end_id, None, None, None, None, None, None, None,
None, skip_date_usage)
if mode == "archive":
delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
prev_lot_end_id, skip_date_usage)
os.remove("{0}/command.json".format(working_dir))
def compress_file(working_dir, tmp_file_path, file_name, compression):
data_file_name = "{0}.json".format(file_name)
if compression == "none":
upload_file_path = "{0}/{1}.json".format(working_dir, file_name)
os.rename(tmp_file_path, upload_file_path)
elif compression == "tar.gz":
upload_file_path = "{0}/{1}.json.tar.gz".format(working_dir, file_name)
tar = tarfile.open(upload_file_path, mode="w:gz")
try:
tar.add(tmp_file_path, arcname=data_file_name)
finally:
tar.close()
elif compression == "tar.bz2":
upload_file_path = "{0}/{1}.json.tar.bz2".format(working_dir, file_name)
tar = tarfile.open(upload_file_path, mode="w:bz2")
try:
tar.add(tmp_file_path, arcname=data_file_name)
finally:
tar.close()
elif compression == "zip":
upload_file_path = "{0}/{1}.json.zip".format(working_dir, file_name)
zip = ZipFile(upload_file_path, 'w')
zip.write(tmp_file_path, data_file_name, ZIP_DEFLATED)
elif compression == "gz":
upload_file_path = "{0}/{1}.json.gz".format(working_dir, file_name)
gz = gzip.open(upload_file_path, mode="wb")
f = open(tmp_file_path)
try:
shutil.copyfileobj(f, gz)
finally:
gz.close()
f.close()
else:
logger.warn("Unknown compression type")
sys.exit()
logger.info("Created data file %s", data_file_name)
return upload_file_path
def create_command_file(mode, upload, working_dir, upload_file_path, solr_url, collection, filter_field, id_field,
prev_lot_end_value, prev_lot_end_id, hdfs_user, hdfs_path, key_file_path, bucket, key_prefix,
local_path, solr_output_collection, solr_output_url, skip_date_usage):
commands = {}
if upload:
logger.debug("Creating command file with upload and delete instructions in case of an interruption")
else:
logger.debug("Creating command file with delete instructions in case of an interruption")
if upload:
if solr_output_collection:
command_url = solr_output_url if solr_output_url else solr_url
upload_command = "{0}/{1}/update/json/docs?commit=true&wt=json --data-binary @{2}"\
.format(command_url, solr_output_collection, upload_file_path)
upload_command_data = {}
upload_command_data["type"] = "solr"
upload_command_data["command"] = upload_command
upload_command_data["upload_file_path"] = upload_file_path
upload_command_data["solr_output_collection"] = solr_output_collection
commands["upload"] = upload_command_data
elif hdfs_path:
upload_command = "sudo -u {0} hadoop fs -put {1} {2}".format(hdfs_user, upload_file_path, hdfs_path)
upload_command_data = {}
upload_command_data["type"] = "hdfs"
upload_command_data["command"] = upload_command
upload_command_data["upload_file_path"] = upload_file_path
upload_command_data["hdfs_path"] = hdfs_path
upload_command_data["hdfs_user"] = hdfs_user
commands["upload"] = upload_command_data
elif key_file_path:
upload_command = "java -cp {0}/libs/* org.apache.ambari.infra.solr.S3Uploader {1} {2} {3} {4}".format( \
os.path.dirname(os.path.realpath(__file__)), key_file_path, bucket, key_prefix, upload_file_path)
upload_command_data = {}
upload_command_data["type"] = "s3"
upload_command_data["command"] = upload_command
upload_command_data["upload_file_path"] = upload_file_path
upload_command_data["bucket"] = bucket
upload_command_data["key_prefix"] = key_prefix
commands["upload"] = upload_command_data
elif local_path:
upload_command = "mv {0} {1}".format(upload_file_path, local_path)
upload_command_data = {}
upload_command_data["type"] = "local"
upload_command_data["command"] = upload_command
upload_command_data["upload_file_path"] = upload_file_path
upload_command_data["local_path"] = local_path
commands["upload"] = upload_command_data
else:
logger.warn("Unknown upload destination")
sys.exit()
if mode == "save":
return upload_command
if skip_date_usage:
delete_query = "({0}:[*+TO+\"{1}\"])".format(id_field, prev_lot_end_id)
else:
delete_prev = "{0}:[*+TO+\"{1}\"]".format(filter_field, prev_lot_end_value)
delete_last = "({0}:\"{1}\"+AND+{2}:[*+TO+\"{3}\"])".format(filter_field, prev_lot_end_value, id_field, prev_lot_end_id)
delete_query = "{0}+OR+{1}".format(delete_prev, delete_last)
delete_command = "{0}/{1}/update?commit=true&wt=json --data-binary <delete><query>{2}</query></delete>" \
.format(solr_url, collection, delete_query)
if mode == "save":
return delete_command
delete_command_data = {}
delete_command_data["command"] = delete_command
delete_command_data["collection"] = collection
delete_command_data["filter_field"] = filter_field
delete_command_data["id_field"] = id_field
delete_command_data["prev_lot_end_value"] = prev_lot_end_value
delete_command_data["prev_lot_end_id"] = prev_lot_end_id
commands["delete"] = delete_command_data
command_file_path = "{0}/command.json".format(working_dir)
command_file_path_tmp = "{0}.tmp".format(command_file_path)
cft = open(command_file_path_tmp, 'w')
cft.write(json.dumps(commands, indent=4))
os.rename(command_file_path_tmp, command_file_path)
logger.debug("Command file %s was created", command_file_path)
if upload:
return upload_command
else:
return delete_command
def upload_file_hdfs(hdfs_kinit_command, upload_command, upload_file_path, hdfs_path, hdfs_user):
if hdfs_kinit_command:
run_kinit(hdfs_kinit_command, "HDFS")
try:
hdfs_file_exists_command = "sudo -u {0} hadoop fs -test -e {1}".format(hdfs_user, hdfs_path + os.path.basename(upload_file_path))
logger.debug("Checking if file already exists on hdfs:\n%s", hdfs_file_exists_command)
hdfs_file_exists = (0 == call(hdfs_file_exists_command.split()))
except Exception as e:
print()
logger.warn("Could not execute command to check if file already exists on HDFS:\n%s", hdfs_file_exists_command)
logger.warn(str(e))
sys.exit()
if os.path.isfile(upload_file_path) and not hdfs_file_exists:
try:
logger.debug("Uploading file to hdfs:\n%s", upload_command)
result = call(upload_command.split())
except Exception as e:
print()
logger.warn("Could not execute command to upload file to HDFS:\n%s", upload_command)
logger.warn(str(e))
sys.exit()
if result != 0:
logger.warn("Could not upload file to HDFS with command:\n%s", upload_command)
sys.exit()
logger.info("File %s was uploaded to hdfs %s", os.path.basename(upload_file_path), hdfs_path)
os.remove(upload_file_path)
def upload_file_s3(upload_command, upload_file_path, bucket, key_prefix):
if os.path.isfile(upload_file_path):
try:
logger.debug("Uploading file to s3:\n%s", upload_command)
result = call(upload_command.split())
except Exception as e:
print()
logger.warn("Could not execute command to upload file to S3:\n%s", upload_command)
logger.warn(str(e))
sys.exit()
if result != 0:
logger.warn("Could not upload file to S3 with command:\n%s", upload_command)
sys.exit()
logger.info("File %s was uploaded to s3 bucket '%s', key '%s'", os.path.basename(upload_file_path), bucket,
key_prefix + os.path.basename(upload_file_path))
os.remove(upload_file_path)
def upload_file_local(upload_command, upload_file_path, local_path):
if os.path.exists(local_path) and not os.path.isdir(local_path):
logger.warn("Local path %s exists, but not a directory, can not save there", local_path)
if not os.path.isdir(local_path):
os.mkdir(local_path)
logger.debug("Directory %s was created", local_path)
try:
logger.debug("Moving file to local directory %s with command\n%s", local_path, upload_command)
call(upload_command.split())
logger.info("File %s was moved to local directory %s", os.path.basename(upload_file_path), local_path)
except Exception as e:
print()
logger.warn("Could not execute move command command:\n%s", upload_command)
logger.warn(str(e))
sys.exit()
def upload_file_to_solr(solr_kinit_command, curl_prefix, upload_command, upload_file_path, collection):
if os.path.isfile(upload_file_path):
query_solr(solr_kinit_command, upload_command, "{0} -H Content-type:application/json {1}".format(curl_prefix, upload_command), "Saving")
logger.info("Save data to collection: %s", collection)
def delete_data(solr_kinit_command, curl_prefix, delete_command, collection, filter_field, id_field, prev_lot_end_value,
prev_lot_end_id, skip_date_usage):
delete_cmd = delete_command.split(" --data-binary")[0]
delete_query_data = delete_command.split("--data-binary ")[1].replace("+", " ")
query_solr(solr_kinit_command, delete_cmd, "{0} -H Content-Type:text/xml {1}".format(curl_prefix, delete_cmd), "Deleting", delete_query_data)
if skip_date_usage:
logger.info("Deleted data from collection %s where %s < %s", collection, id_field, prev_lot_end_id)
else:
logger.info("Deleted data from collection %s where %s,%s < %s,%s", collection, filter_field, id_field, prev_lot_end_value,
prev_lot_end_id)
def query_solr(solr_kinit_command, url, curl_command, action, data=None):
if solr_kinit_command:
run_kinit(solr_kinit_command, "Solr")
try:
cmd = curl_command.split()
if data:
cmd.append("--data-binary")
cmd.append(data)
logger.debug("%s data from solr:\n%s", action, ' '.join(cmd))
process = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
except Exception as e:
print()
logger.warn("Could not execute curl command:\n%s", ' '.join(cmd))
logger.warn(str(e))
sys.exit()
out, err = process.communicate()
if process.returncode != 0:
print()
logger.warn("Could not execute curl command:\n%s", ' '.join(cmd))
logger.warn(str(err))
sys.exit()
true = True # needed to be able to eval 'true' in the returned json
rsp = eval(str(out))
if rsp["responseHeader"]["status"] != 0:
print()
logger.warn("Could not execute solr query:\n%s", unquote(url))
logger.warn(rsp["error"]["msg"])
sys.exit()
return rsp
def run_kinit(kinit_command, program):
try:
logger.debug("Running kinit for %s:\n%s", program, kinit_command)
result = call(kinit_command.split())
except Exception as e:
print()
logger.warn("Could not execute %s kinit command:\n%s", program, kinit_command)
logger.warn(str(e))
sys.exit()
if result != 0:
print()
logger.warn("%s kinit command was not successful:\n%s", program, kinit_command)
sys.exit()
if __name__ == '__main__':
try:
start_time = time.time()
options = parse_arguments()
verbose = options.verbose
set_log_level()
end = get_end(options) if not options.skip_date_usage else None
if options.mode == "delete":
delete(options.solr_url, options.collection, options.filter_field, end, options.solr_keytab, options.solr_principal, options.skip_date_usage)
elif options.mode in ["archive", "save"]:
save(options.mode, options.solr_url, options.collection, options.filter_field, options.id_field, end,
options.read_block_size, options.write_block_size, options.ignore_unfinished_uploading,
options.additional_filter, options.name, options.solr_keytab, options.solr_principal, options.json_file,
options.compression, options.hdfs_keytab, options.hdfs_principal, options.hdfs_user, options.hdfs_path,
options.key_file_path, options.bucket, options.key_prefix, options.local_path, options.solr_output_collection,
options.solr_output_url, options.exclude_fields, options.skip_date_usage)
else:
logger.warn("Unknown mode: %s", options.mode)
print(("--- %s seconds ---" % (time.time() - start_time)))
except KeyboardInterrupt:
print()
sys.exit(128 + signal.SIGINT)