blob: 352c9a737c1c2649d530b5abf16acda5f7fb8167 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import logging
import re
from datetime import datetime
from impala.dbapi import connect
from tests.beeswax.impala_beeswax import ImpalaBeeswaxClient, ImpalaBeeswaxResult
from sys import maxint
from tests.performance.query import HiveQueryResult, ImpalaQueryResult
from tests.util.shell_util import exec_process
from time import time
import threading
DEFAULT_BEESWAX_PORT = 21000
DEFAULT_HS2_PORT = 21050
DEFAULT_HIVE_HS2_PORT = 10000
LOG = logging.getLogger('query_exec_functions')
def get_hs2_hive_cursor(hiveserver, user=None, use_kerberos=False,
database=None, execOptions=None):
host, port = hiveserver, DEFAULT_HIVE_HS2_PORT
cursor = None
try:
conn = connect(host=host,
port=DEFAULT_HIVE_HS2_PORT,
user=user,
database=database,
auth_mechanism="GSSAPI" if use_kerberos else "PLAIN",
timeout=maxint)
cursor = conn.cursor(configuration=execOptions)
LOG.info("Connected to {0}:{1}".format(host, port))
except Exception, e:
LOG.error("Error Connecting: {0}".format(str(e)))
return cursor
def execute_using_hive_hs2(query, query_config):
exec_result = HiveQueryResult(query, query_config=query_config)
plugin_runner = query_config.plugin_runner
cursor = getattr(threading.current_thread(), 'cursor', None)
if cursor is None:
cursor = get_hs2_hive_cursor(query_config.hiveserver,
user=query_config.user,
database=query.db,
use_kerberos=query_config.use_kerberos,
execOptions=query_config.exec_options)
threading.current_thread().cursor = cursor
if cursor is None: return exec_result
if plugin_runner: plugin_runner.run_plugins_pre(scope="Query")
try:
exec_result.start_time, start = datetime.now(), time()
cursor.execute(query.query_str)
exec_result.data = cursor.fetchall()
exec_result.time_taken = time() - start
exec_result.success = True
except Exception as e:
LOG.error(str(e))
exec_result.query_error = str(e)
finally:
if plugin_runner: plugin_runner.run_plugins_post(scope="Query")
return exec_result
def get_hs2_impala_cursor(impalad, use_kerberos=False, database=None):
"""Get a cursor to an impalad
Args:
impalad: A string in form 'hostname:port' or 'hostname'
use_kerberos: boolean indication whether to get a secure connection.
database: default db to use in the connection.
Returns:
HiveServer2Cursor if the connection suceeds, None otherwise.
"""
try:
host, port = impalad.split(":")
except ValueError:
host, port = impalad, DEFAULT_HS2_PORT
cursor = None
try:
conn = connect(host=host,
port=port,
database=database,
auth_mechanism="GSSAPI" if use_kerberos else "NOSASL")
cursor = conn.cursor()
LOG.info("Connected to {0}:{1}".format(host, port))
except Exception, e:
LOG.error("Error connecting: {0}".format(str(e)))
return cursor
def execute_using_impala_hs2(query, query_config):
"""Executes a sql query against Impala using the hs2 interface.
Args:
query: Query
query_config: ImpalaHS2Config
Returns:
ImpalaQueryResult
"""
exec_result = ImpalaQueryResult(query, query_config=query_config)
plugin_runner = query_config.plugin_runner
cursor = get_hs2_impala_cursor(query_config.impalad,
use_kerberos=query_config.use_kerberos,
database=query.db)
if cursor is None: return exec_result
if plugin_runner: plugin_runner.run_plugins_pre(scope="Query")
try:
exec_result.start_time, start = datetime.now(), time()
cursor.execute(query.query_str)
exec_result.data = cursor.fetchall()
exec_result.time_taken = time() - start
exec_result.runtime_profile = cursor.get_profile()
exec_result.exec_summary = str(cursor.get_summary())
exec_result.success = True
except Exception as e:
LOG.error(str(e))
exec_result.query_error = str(e)
finally:
cursor.close()
if plugin_runner: plugin_runner.run_plugins_post(scope="Query")
return exec_result
def establish_beeswax_connection(query_config):
"""Establish a connection to the user specified impalad.
Args:
query_config (QueryExecConfig)
Returns:
ImpalaBeeswaxClient is the connection suceeds, None otherwise.
"""
use_kerberos = query_config.use_kerberos
user = query_config.user
password = query_config.password
use_ssl = query_config.use_ssl
# If the impalad is for the form host, convert it to host:port that the Impala beeswax
# client accepts.
if len(query_config.impalad.split(":")) == 1:
query_config.impalad = "{0}:{1}".format(query_config.impalad, DEFAULT_BEESWAX_PORT)
client = None
try:
client = ImpalaBeeswaxClient(query_config.impalad, use_kerberos=use_kerberos,
user=user, password=password, use_ssl=use_ssl)
# Try connect
client.connect()
# Set the exec options.
client.set_query_options(query_config.exec_options)
LOG.info("Connected to %s" % query_config.impalad)
except Exception, e:
LOG.error("Error connecting: {0}".format(str(e)))
return client
def execute_using_impala_beeswax(query, query_config):
"""Executes a query using beeswax.
A new client is created per query, then destroyed.
Args:
query (str): string containing the query to be executed.
query_config (QueryExecConfig)
Returns:
ImpalaQueryResult
"""
# Create a client object to talk to impalad
exec_result = ImpalaQueryResult(query, query_config=query_config)
plugin_runner = query_config.plugin_runner
client = establish_beeswax_connection(query_config)
if client is None: return exec_result
# We need to issue a use database here.
if query.db: client.execute("use {0}".format(query.db))
# create a map for query options and the query names to send to the plugin
context = build_context(query, query_config)
if plugin_runner: plugin_runner.run_plugins_pre(context=context, scope="Query")
result = None
try:
result = client.execute(query.query_str)
except Exception, e:
LOG.error(e)
exec_result.query_error = str(e)
finally:
client.close_connection()
if plugin_runner: plugin_runner.run_plugins_post(context=context, scope="Query")
return construct_exec_result(result, exec_result)
def build_context(query, query_config):
"""Build context based on query config for plugin_runner.
Why not pass QueryExecConfig to plugins directly?
Args:
query (str)
query_config (QueryExecConfig)
Returns:
dict str -> str
"""
context = vars(query_config)
context['query'] = query
return context
def construct_exec_result(result, exec_result):
""" Transform an ImpalaBeeswaxResult object to a ImpalaQueryResult object.
Args:
result (ImpalaBeeswasResult): Tranfers data from here.
exec_result (ImpalaQueryResult): Transfers data to here.
Returns:
ImpalaQueryResult
"""
# Return immedietely if the query failed.
if result is None or not result.success: return exec_result
exec_result.success = True
attrs = ['data', 'runtime_profile', 'start_time',
'time_taken', 'summary', 'exec_summary']
for attr in attrs:
setattr(exec_result, attr, getattr(result, attr))
return exec_result
def execute_using_jdbc(query, query_config):
"""Executes a query using JDBC"""
query_string = query.query_str + ';'
if query.db:
query_string = 'use %s; %s' % (query.db, query_string)
cmd = query_config.jdbc_client_cmd + " -q \"%s\"" % query_string
return run_query_capture_results(cmd, query, exit_on_error=False)
def parse_jdbc_query_results(stdout, stderr, query):
"""
Parse query execution results for the Impala JDBC client
Parses the query execution details (avg time, stddev) from the output of the Impala
JDBC test client.
"""
jdbc_result_regex = 'row\(s\) in (\d*).(\d*)s'
time_taken = 0.0
for line in stdout.split('\n'):
match = re.search(jdbc_result_regex, line)
if match:
time_taken = float(('%s.%s') % (match.group(1), match.group(2)))
break
result_data = re.findall(r'\[START\]----\n(.*?)\n----\[END\]', stdout, re.DOTALL)[0]
return create_exec_result(time_taken, result_data, query)
def create_exec_result(time_taken, result_data, query):
exec_result = HiveQueryResult(query)
if result_data:
LOG.debug('Data:\n%s\n' % result_data)
exec_result.data = result_data
exec_result.time_taken = time_taken
exec_result.success = True
return exec_result
def run_query_capture_results(cmd, query, exit_on_error):
"""
Runs the given query command and returns the execution result.
Takes in a match function that is used to parse stderr/stdout to extract the results.
"""
exec_result = HiveQueryResult(query)
start_time = datetime.now()
try:
rc, stdout, stderr = exec_process(cmd)
except Exception, e:
LOG.error('Error while executing query command: %s' % e)
exec_result.query_error = str(e)
# TODO: Should probably save the start time and query string for failed queries.
return exec_result
if rc != 0:
msg = ('Command returned with an error:\n'
'rc: %d\n'
'STDERR:\n%s'
'STDOUT:\n%s'
% (rc, stderr, stdout))
LOG.error(msg)
exec_result.query_error = msg
return exec_result
# The command completed
exec_result = parse_jdbc_query_results(stdout, stderr, query)
exec_result.query = query
exec_result.start_time = start_time
return exec_result