| #!/usr/bin/env python |
| |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import argparse |
| import base64 |
| import json |
| import re |
| import sys |
| import time |
| import urllib2 |
| import urlparse |
| |
| def read_task_file(args): |
| with open(args.file, 'r') as f: |
| contents = f.read() |
| # We don't use the parsed data, but we want to throw early if it's invalid |
| try: |
| json.loads(contents) |
| except Exception, e: |
| sys.stderr.write('Invalid JSON in task file "{0}": {1}\n'.format(args.file, repr(e))) |
| sys.exit(1) |
| return contents |
| |
| def add_basic_auth_header(args, req): |
| if (args.user is not None): |
| basic_auth_encoded = base64.b64encode('%s:%s' % (args.user, args.password)) |
| req.add_header("Authorization", "Basic %s" % basic_auth_encoded) |
| |
| # Keep trying until timeout_at, maybe die then |
| def post_task(args, task_json, timeout_at): |
| try: |
| url = args.url.rstrip("/") + "/druid/indexer/v1/task" |
| req = urllib2.Request(url, task_json, {'Content-Type' : 'application/json'}) |
| add_basic_auth_header(args, req) |
| timeleft = timeout_at - time.time() |
| response_timeout = min(max(timeleft, 5), 10) |
| response = urllib2.urlopen(req, None, response_timeout) |
| return response.read().rstrip() |
| except urllib2.URLError as e: |
| if isinstance(e, urllib2.HTTPError) and e.code >= 400 and e.code <= 500: |
| # 4xx (problem with the request) or 500 (something wrong on the server) |
| raise_friendly_error(e) |
| elif time.time() >= timeout_at: |
| # No futher retries |
| raise_friendly_error(e) |
| elif isinstance(e, urllib2.HTTPError) and e.code in [301, 302, 303, 305, 307] and \ |
| e.info().getheader("Location") is not None: |
| # Set the new location in args.url so it can be used by await_task_completion and re-issue the request |
| location = urlparse.urlparse(e.info().getheader("Location")) |
| args.url = "{0}://{1}".format(location.scheme, location.netloc) |
| sys.stderr.write("Redirect response received, setting url to [{0}]\n".format(args.url)) |
| return post_task(args, task_json, timeout_at) |
| else: |
| # If at first you don't succeed, try, try again! |
| sleep_time = 5 |
| if not args.quiet: |
| extra = '' |
| if hasattr(e, 'read'): |
| extra = e.read().rstrip() |
| sys.stderr.write("Waiting up to {0}s for indexing service [{1}] to become available. [Got: {2} {3}]".format(max(sleep_time, int(timeout_at - time.time())), args.url, str(e), extra).rstrip()) |
| sys.stderr.write("\n") |
| time.sleep(sleep_time) |
| return post_task(args, task_json, timeout_at) |
| |
| # Keep trying until timeout_at, maybe die then |
| def await_task_completion(args, task_id, timeout_at): |
| while True: |
| url = args.url.rstrip("/") + "/druid/indexer/v1/task/{0}/status".format(task_id) |
| req = urllib2.Request(url) |
| add_basic_auth_header(args, req) |
| timeleft = timeout_at - time.time() |
| response_timeout = min(max(timeleft, 5), 10) |
| response = urllib2.urlopen(req, None, response_timeout) |
| response_obj = json.loads(response.read()) |
| response_status_code = response_obj["status"]["statusCode"] |
| if response_status_code in ['SUCCESS', 'FAILED']: |
| return response_status_code |
| else: |
| if time.time() < timeout_at: |
| if not args.quiet: |
| sys.stderr.write("Task {0} still running...\n".format(task_id)) |
| timeleft = timeout_at - time.time() |
| time.sleep(min(5, timeleft)) |
| else: |
| raise Exception("Task {0} did not finish in time!".format(task_id)) |
| |
| def raise_friendly_error(e): |
| if isinstance(e, urllib2.HTTPError): |
| text = e.read().strip() |
| reresult = re.search(r'<pre>(.*?)</pre>', text, re.DOTALL) |
| if reresult: |
| text = reresult.group(1).strip() |
| raise Exception("HTTP Error {0}: {1}, check overlord log for more details.\n{2}".format(e.code, e.reason, text)) |
| raise e |
| |
| def await_load_completion(args, datasource, timeout_at): |
| while True: |
| url = args.coordinator_url.rstrip("/") + "/druid/coordinator/v1/loadstatus" |
| req = urllib2.Request(url) |
| add_basic_auth_header(args, req) |
| timeleft = timeout_at - time.time() |
| response_timeout = min(max(timeleft, 5), 10) |
| response = urllib2.urlopen(req, None, response_timeout) |
| response_obj = json.loads(response.read()) |
| load_status = response_obj.get(datasource, 0.0) |
| if load_status >= 100.0: |
| sys.stderr.write("{0} loading complete! You may now query your data\n".format(datasource)) |
| return |
| else: |
| if time.time() < timeout_at: |
| if not args.quiet: |
| sys.stderr.write("{0} is {1}% finished loading...\n".format(datasource, load_status)) |
| timeleft = timeout_at - time.time() |
| time.sleep(min(5, timeleft)) |
| else: |
| raise Exception("{0} was not loaded in time!".format(datasource)) |
| |
| def main(): |
| parser = argparse.ArgumentParser(description='Post Druid indexing tasks.') |
| parser.add_argument('--url', '-u', metavar='url', type=str, default='http://localhost:8090/', help='Druid Overlord url') |
| parser.add_argument('--coordinator-url', type=str, default='http://localhost:8081/', help='Druid Coordinator url') |
| parser.add_argument('--file', '-f', type=str, required=True, help='Query JSON file') |
| parser.add_argument('--submit-timeout', type=int, default=120, help='Timeout (in seconds) for submitting tasks') |
| parser.add_argument('--complete-timeout', type=int, default=14400, help='Timeout (in seconds) for completing tasks') |
| parser.add_argument('--load-timeout', type=int, default=14400, help='Timeout (in seconds) for waiting for tasks to load') |
| parser.add_argument('--quiet', '-q', action='store_true', help='Suppress retryable errors') |
| parser.add_argument('--user', type=str, default=None, help='Basic auth username') |
| parser.add_argument('--password', type=str, default=None, help='Basic auth password') |
| args = parser.parse_args() |
| |
| submit_timeout_at = time.time() + args.submit_timeout |
| complete_timeout_at = time.time() + args.complete_timeout |
| |
| task_contents = read_task_file(args) |
| task_json = json.loads(task_contents) |
| if task_json['type'] == "compact": |
| datasource = task_json['dataSource'] |
| else: |
| datasource = task_json["spec"]["dataSchema"]["dataSource"] |
| sys.stderr.write("Beginning indexing data for {0}\n".format(datasource)) |
| |
| task_id = json.loads(post_task(args, task_contents, submit_timeout_at))["task"] |
| |
| sys.stderr.write('\033[1m' + "Task started: " + '\033[0m' + "{0}\n".format(task_id)) |
| sys.stderr.write('\033[1m' + "Task log: " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/log\n".format(args.url.rstrip("/"),task_id)) |
| sys.stderr.write('\033[1m' + "Task status: " + '\033[0m' + "{0}/druid/indexer/v1/task/{1}/status\n".format(args.url.rstrip("/"),task_id)) |
| |
| task_status = await_task_completion(args, task_id, complete_timeout_at) |
| sys.stderr.write("Task finished with status: {0}\n".format(task_status)) |
| if task_status != 'SUCCESS': |
| sys.exit(1) |
| |
| sys.stderr.write("Completed indexing data for {0}. Now loading indexed data onto the cluster...\n".format(datasource)) |
| load_timeout_at = time.time() + args.load_timeout |
| await_load_completion(args, datasource, load_timeout_at) |
| |
| try: |
| main() |
| except KeyboardInterrupt: |
| sys.exit(1) |