blob: ab10023dd93e41ef8682312830bbce93aa5b8924 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Talk to an impalad through beeswax.
# Usage:
# * impalad is a string with the host and port of the impalad
# with which the connection should be established.
# The format is "<hostname>:<port>"
# * query_string is the query to be executed, as a string.
# client = ImpalaBeeswaxClient(impalad)
# client.connect()
# result = client.execute(query_string)
# where result is an object of the class ImpalaBeeswaxResult.
import logging
import time
import shlex
import getpass
import re
from beeswaxd import BeeswaxService
from beeswaxd.BeeswaxService import QueryState
from datetime import datetime
# If Exec Summary is not implemented in Impala, this cannot be imported
from ExecStats.ttypes import TExecStats
except ImportError:
from ImpalaService import ImpalaService
from tests.util.thrift_util import create_transport
from thrift.transport.TTransport import TTransportException
from thrift.protocol import TBinaryProtocol
from thrift.Thrift import TApplicationException
LOG = logging.getLogger('impala_beeswax')
# Custom exception wrapper.
# All exceptions coming from thrift/beeswax etc. go through this wrapper.
# __str__ preserves the exception type.
# TODO: Add the ability to print some of the stack.
class ImpalaBeeswaxException(Exception):
__name__ = "ImpalaBeeswaxException"
def __init__(self, message, inner_exception):
self.__message = message
self.inner_exception = inner_exception
def __str__(self):
return "%s:\n %s" % (self.__name__, self.__message)
class ImpalaBeeswaxResult(object):
def __init__(self, **kwargs):
self.query = kwargs.get('query', None)
self.query_id = kwargs['query_id']
self.success = kwargs.get('success', False)
# Insert returns an int, convert into list to have a uniform data type.
# TODO: We should revisit this if we have more datatypes to deal with. = kwargs.get('data', None)
if not isinstance(, list): = str( = []
self.log = None
self.time_taken = kwargs.get('time_taken', 0)
self.summary = kwargs.get('summary', str())
self.schema = kwargs.get('schema', None)
self.column_types = None
self.column_labels = None
if self.schema is not None:
# Extract labels and types so there is a shared interface with HS2ResultSet.
self.column_types = [fs.type.upper() for fs in self.schema.fieldSchemas]
self.column_labels = [ for fs in self.schema.fieldSchemas]
self.runtime_profile = kwargs.get('runtime_profile', str())
self.exec_summary = kwargs.get('exec_summary', None)
def get_data(self):
return self.__format_data()
def __format_data(self):
return '\n'.join(
return ''
def __str__(self):
message = ('Summary: %s\n'
'Success: %s\n'
'Took: %s(s)\n'
% (self.summary, self.success, self.time_taken,
return message
# Interface to beeswax. Responsible for executing queries, fetching results.
class ImpalaBeeswaxClient(object):
# Regex applied to all tokens of a query to detect the query type.
INSERT_REGEX = re.compile("^insert$", re.I)
def __init__(self, impalad, use_kerberos=False, user=None, password=None,
self.connected = False
split_impalad = impalad.split(":")
assert len(split_impalad) in [1, 2]
self.impalad_host = split_impalad[0]
self.impalad_port = 21000 # Default beeswax port
if len(split_impalad) == 2:
self.impalad_port = int(split_impalad[1])
self.imp_service = None
self.transport = None
self.use_kerberos = use_kerberos
self.use_ssl = use_ssl
self.user, self.password = user, password
self.use_ldap = (self.user is not None)
self.__query_options = {}
self.query_states = QueryState._NAMES_TO_VALUES
def __options_to_string_list(self):
return ["%s=%s" % (k,v) for (k,v) in self.__query_options.iteritems()]
def get_query_options(self):
return self.__query_options
def set_query_option(self, name, value):
self.__query_options[name.upper()] = value
def set_query_options(self, query_option_dict):
if query_option_dict is None:
raise ValueError, 'Cannot pass None value for query options'
for name, value in query_option_dict.iteritems():
self.set_query_option(name, value)
def get_query_option(self, name):
return self.__query_options.get(name.upper())
def clear_query_options(self):
def connect(self):
"""Connect to impalad specified in intializing this object
Raises an exception if the connection is unsuccesful.
self.transport = self.__get_transport()
protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
self.imp_service = ImpalaService.Client(protocol)
self.connected = True
except Exception, e:
raise ImpalaBeeswaxException(self.__build_error_message(e), e)
def close_connection(self):
"""Close the transport if it's still open"""
if self.transport:
self.connected = False
def __get_transport(self):
"""Creates the proper transport type based environment (secure vs unsecure)"""
trans_type = 'buffered'
if self.use_kerberos:
trans_type = 'kerberos'
elif self.use_ldap:
trans_type = 'plain_sasl'
return create_transport(host=self.impalad_host, port=self.impalad_port,
service='impala', transport_type=trans_type, user=self.user,
password=self.password, use_ssl=self.use_ssl)
def execute(self, query_string, user=None):
"""Re-directs the query to its appropriate handler, returns ImpalaBeeswaxResult"""
# Take care of leading/trailing whitespaces.
query_string = query_string.strip()
start = time.time()
start_time =
handle = self.__execute_query(query_string.strip(), user=user)
if self.__get_query_type(query_string) == 'insert':
# DML queries are finished by this point.
time_taken = time.time() - start
# fetch_results() will close the query after which there is no guarantee that
# profile and log will be available so fetch them first.
runtime_profile = self.get_runtime_profile(handle)
exec_summary = self.get_exec_summary_and_parse(handle)
log = self.get_log(handle.log_context)
result = self.fetch_results(query_string, handle)
result.time_taken, result.start_time, result.runtime_profile, result.log = \
time_taken, start_time, runtime_profile, log
result.exec_summary = exec_summary
# For SELECT queries, execution might still be ongoing. fetch_results() will block
# until the query is completed.
result = self.fetch_results(query_string, handle)
result.time_taken = time.time() - start
result.start_time = start_time
result.exec_summary = self.get_exec_summary_and_parse(handle)
result.log = self.get_log(handle.log_context)
result.runtime_profile = self.get_runtime_profile(handle)
return result
def get_exec_summary(self, handle):
return self.__do_rpc(lambda: self.imp_service.GetExecSummary(handle))
def get_exec_summary_and_parse(self, handle):
"""Calls GetExecSummary() for the last query handle, parses it and returns a summary
table. Returns None in case of an error or an empty result"""
summary = self.get_exec_summary(handle)
except ImpalaBeeswaxException:
summary = None
if summary is None or summary.nodes is None:
return None
# If exec summary is not implemented in Impala, this function returns, so we do not
# get the function __build_summary_table which requires TExecStats to be imported.
output = []
self.__build_summary_table(summary, 0, False, 0, False, output)
return output
def __build_summary_table(self, summary, idx, is_fragment_root, indent_level,
new_indent_level, output):
"""NOTE: This was taken from Changes made here must be made there as
well. TODO: This method will be a placed in a library that is shared between
impala_shell and this file. (IMPALA-5792)
Direct translation of Coordinator::PrintExecSummary() to recursively build a list
of rows of summary statistics, one per exec node
summary: the TExecSummary object that contains all the summary data
idx: the index of the node to print
is_fragment_root: true if the node to print is the root of a fragment (and therefore
feeds into an exchange)
indent_level: the number of spaces to print before writing the node's label, to give
the appearance of a tree. The 0th child of a node has the same indent_level as its
parent. All other children have an indent_level of one greater than their parent.
new_indent_level: If true, this indent level is different from the previous row's.
output: the list of rows into which to append the rows produced for this node and its
Returns the index of the next exec node in summary.exec_nodes that should be
processed, used internally to this method only.
attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
# Initialise aggregate and maximum stats
agg_stats, max_stats = TExecStats(), TExecStats()
for attr in attrs:
setattr(agg_stats, attr, 0)
setattr(max_stats, attr, 0)
row = {}
node = summary.nodes[idx]
# exec_stats may not be set even if the query is FINISHED if there are fragments that
# are still executing or that were cancelled before sending a status report.
if node.exec_stats is not None:
for stats in node.exec_stats:
for attr in attrs:
val = getattr(stats, attr)
if val is not None:
setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
setattr(max_stats, attr, max(getattr(max_stats, attr), val))
if len(node.exec_stats) > 0:
avg_time = agg_stats.latency_ns / len(node.exec_stats)
avg_time = 0
row["num_instances"] = len(node.exec_stats)
row["num_hosts"] = node.num_hosts
row["avg_time"] = avg_time
is_sink = node.node_id == -1
# If the node is a broadcast-receiving exchange node, the cardinality of rows produced
# is the max over all instances (which should all have received the same number of
# rows). Otherwise, the cardinality is the sum over all instances which process
# disjoint partitions.
if is_sink:
cardinality = -1
elif node.is_broadcast:
cardinality = max_stats.cardinality
cardinality = agg_stats.cardinality
est_stats = node.estimated_stats
label_prefix = ""
if indent_level > 0:
label_prefix = "|"
label_prefix += " |" * (indent_level - 1)
if new_indent_level:
label_prefix += "--"
label_prefix += " "
row["prefix"] = label_prefix
row["operator"] = node.label
row["max_time"] = max_stats.latency_ns
row["num_rows"] = cardinality
row["est_num_rows"] = est_stats.cardinality
row["peak_mem"] = max_stats.memory_used
row["est_peak_mem"] = est_stats.memory_used
row["detail"] = node.label_detail
if summary.exch_to_sender_map is not None and idx in summary.exch_to_sender_map:
sender_idx = summary.exch_to_sender_map[idx]
# This is an exchange node, so the sender is a fragment root, and should be printed
# next.
self.__build_summary_table(summary, sender_idx, True, indent_level, False, output)
idx += 1
if node.num_children > 0:
first_child_output = []
idx = \
summary, idx, False, indent_level, False, first_child_output)
for child_idx in xrange(1, node.num_children):
# All other children are indented (we only have 0, 1 or 2 children for every exec
# node at the moment)
idx = self.__build_summary_table(
summary, idx, False, indent_level + 1, True, output)
output += first_child_output
return idx
def get_runtime_profile(self, handle):
return self.__do_rpc(lambda: self.imp_service.GetRuntimeProfile(handle))
def execute_query_async(self, query_string, user=None):
Executes a query asynchronously
Issues a query and returns the query handle to the caller for processing.
query = BeeswaxService.Query()
query.query = query_string
query.hadoop_user = user if user is not None else getpass.getuser()
query.configuration = self.__options_to_string_list()
handle = self.__do_rpc(lambda: self.imp_service.query(query,))"Started query {0}".format(
return handle
def __execute_query(self, query_string, user=None):
"""Executes a query and waits for completion"""
handle = self.execute_query_async(query_string, user=user)
# Wait for the query to finish execution.
return handle
def cancel_query(self, query_id):
return self.__do_rpc(lambda: self.imp_service.Cancel(query_id))
def close_query(self, handle):
self.__do_rpc(lambda: self.imp_service.close(handle))
def wait_for_finished(self, query_handle):
"""Given a query handle, polls the coordinator waiting for the query to transition to
'FINISHED' state"""
while True:
query_state = self.get_state(query_handle)
# if the rpc succeeded, the output is the query state
if query_state == self.query_states["FINISHED"]:
elif query_state == self.query_states["EXCEPTION"]:
error_log = self.__do_rpc(
lambda: self.imp_service.get_log(query_handle.log_context))
raise ImpalaBeeswaxException("Query aborted:" + error_log, None)
def wait_for_finished_timeout(self, query_handle, timeout=10):
"""Given a query handle and a timeout, polls the coordinator waiting for the query to
transition to 'FINISHED' state till 'timeout' seconds"""
start_time = time.time()
while (time.time() - start_time < timeout):
query_state = self.get_state(query_handle)
# if the rpc succeeded, the output is the query state
if query_state == self.query_states["FINISHED"]:
return True
elif query_state == self.query_states["EXCEPTION"]:
error_log = self.__do_rpc(
lambda: self.imp_service.get_log(query_handle.log_context))
raise ImpalaBeeswaxException("Query aborted:" + error_log, None)
return False
def wait_for_admission_control(self, query_handle):
"""Given a query handle, polls the coordinator waiting for it to complete
admission control processing of the query"""
while True:
query_state = self.get_state(query_handle)
if query_state > self.query_states["COMPILED"]:
def get_admission_result(self, query_handle):
"""Given a query handle, returns the admission result from the query profile"""
query_state = self.get_state(query_handle)
if query_state > self.query_states["COMPILED"]:
query_profile = self.get_runtime_profile(query_handle)
admit_result ="Admission result: (.*)", query_profile)
if admit_result:
return ""
def get_default_configuration(self):
return self.__do_rpc(lambda: self.imp_service.get_default_configuration(False))
def get_state(self, query_handle):
return self.__do_rpc(lambda: self.imp_service.get_state(query_handle))
def get_log(self, query_handle):
return self.__do_rpc(lambda: self.imp_service.get_log(query_handle))
def fetch_results(self, query_string, query_handle, max_rows = -1):
"""Fetches query results given a handle and query type (insert, use, other)"""
query_type = self.__get_query_type(query_string)
if query_type == 'use':
# TODO: "use <database>" does not currently throw an error. Need to update this
# to handle the error case once that behavior has been changed.
return ImpalaBeeswaxResult(query=query_string,,
success=True, data=[])
# Result fetching for insert is different from other queries.
exec_result = None
if query_type == 'insert':
exec_result = self.__fetch_insert_results(query_handle)
exec_result = self.__fetch_results(query_handle, max_rows)
exec_result.query = query_string
return exec_result
def __fetch_results(self, handle, max_rows = -1):
"""Handles query results, returns a ImpalaBeeswaxResult object"""
schema = self.__do_rpc(lambda: self.imp_service.get_results_metadata(handle)).schema
# The query has finished, we can fetch the results
result_rows = []
while len(result_rows) < max_rows or max_rows < 0:
fetch_rows = -1 if max_rows < 0 else max_rows - len(result_rows)
results = self.__do_rpc(lambda: self.imp_service.fetch(handle, False, fetch_rows))
if not results.has_more:
# The query executed successfully and all the data was fetched.
exec_result = ImpalaBeeswaxResult(, success=True, data=result_rows,
exec_result.summary = 'Returned %d rows' % (len(result_rows))
return exec_result
def close_dml(self, handle):
return self.__do_rpc(lambda: self.imp_service.CloseInsert(handle))
def __fetch_insert_results(self, handle):
"""Executes an insert query"""
result = self.close_dml(handle)
# The insert was successful
num_rows = sum(map(int, result.rows_modified.values()))
data = ["%s: %s" % row for row in result.rows_modified.iteritems()]
exec_result = ImpalaBeeswaxResult(, success=True, data=data)
exec_result.summary = "Inserted %d rows" % (num_rows,)
return exec_result
def __get_query_type(self, query_string):
# Set posix=True and add "'" to escaped quotes
# to deal with escaped quotes in string literals
lexer = shlex.shlex(query_string.lstrip(), posix=True)
lexer.escapedquotes += "'"
tokens = list(lexer)
# Do not classify explain queries as 'insert'
if (tokens[0].lower() == "explain"):
return tokens[0].lower()
# Because the WITH clause may precede INSERT or SELECT queries,
# just checking the first token is insufficient.
if filter(self.INSERT_REGEX.match, tokens):
return "insert"
return tokens[0].lower()
def __build_error_message(self, exception):
"""Construct a meaningful exception string"""
message = str(exception)
if isinstance(exception, BeeswaxService.BeeswaxException):
message = exception.message
return 'INNER EXCEPTION: %s\n MESSAGE: %s' % (exception.__class__, message)
def __do_rpc(self, rpc):
"""Executes the RPC lambda provided with some error checking.
Catches all the relevant exceptions and re throws them wrapped
in a custom exception [ImpalaBeeswaxException].
if not self.connected:
raise ImpalaBeeswaxException("Not connected", None)
return rpc()
except BeeswaxService.BeeswaxException, b:
raise ImpalaBeeswaxException(self.__build_error_message(b), b)
except TTransportException, e:
self.connected = False
raise ImpalaBeeswaxException(self.__build_error_message(e), e)
except TApplicationException, t:
raise ImpalaBeeswaxException(self.__build_error_message(t), t)
except Exception, u:
raise ImpalaBeeswaxException(self.__build_error_message(u), u)