blob: 1cd32919c940f6e8d0b67a6fd752147d88a06c3d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import codecs
import csv
import logging
import time
import zipfile
import requests
from requests.exceptions import HTTPError
from six import string_types, BytesIO, PY2, PY3
from .models import WrappedJson
from .utils import conf_to_xml
logger = logging.getLogger(__name__)
long_type = int
if PY3:
from collections.abc import Iterable as Iterable
elif PY2:
from collections import Iterable as Iterable
long_type = long
class LensQuery(WrappedJson):
def __init__(self, client, *args, **kwargs):
super(LensQuery, self).__init__(*args, **kwargs)
self.client = client
@property
def finished(self):
return self.status.status in ('SUCCESSFUL', 'FAILED', 'CANCELED', 'CLOSED')
def get_result(self, *args, **kwargs):
return self.client.get_result(self, *args, **kwargs)
result = property(get_result)
type_mappings = {'BOOLEAN': bool,
'TINYINT': int,
'SMALLINT': int,
'INT': int,
'BIGINT': long_type,
'FLOAT': float,
'DOUBLE': float,
'TIMESTAMP': long_type,
'BINARY': bin,
'ARRAY': lambda x: list(y.strip() for y in x.split(",")),
'MAP': dict,
# 'STRUCT,': str,
# 'UNIONTYPE,': float,
# 3'USER_DEFINED,': float,
'DECIMAL,': float,
# 'NULL,': float,
# 'DATE,': float,
# 'VARCHAR,': float,
# 'CHAR': float
}
default_mapping = lambda x: x
class LensQueryResult(Iterable):
def __init__(self, custom_mappings=None):
if custom_mappings is None:
custom_mappings = {}
self.custom_mappings = custom_mappings
def _mapping(self, type_name):
if type_name in self.custom_mappings:
return self.custom_mappings[type_name]
if type_name in type_mappings:
return type_mappings[type_name]
return default_mapping
class LensInMemoryResult(LensQueryResult):
def __init__(self, resp, custom_mappings=None):
super(LensInMemoryResult, self).__init__(custom_mappings)
self.rows = resp.in_memory_query_result.rows
def __iter__(self):
for row in self.rows:
yield list(self._mapping(value.type)(value.value) if value else None for value in row['values'])
class LensPersistentResult(LensQueryResult):
def __init__(self, header, response, encoding=None, is_header_present=True, delimiter=",",
custom_mappings=None):
super(LensPersistentResult, self).__init__(custom_mappings)
self.response = response
self.is_zipped = 'zip' in self.response.headers['content-disposition']
self.delimiter = str(delimiter)
self.is_header_present = is_header_present
self.encoding = encoding
self.header = header
def _parse_line(self, line):
return list(self._mapping(self.header.columns[index].type)(line[index]) for index in range(len(line)))
def get_csv_reader(self, file):
if PY3:
file = codecs.iterdecode(file, 'utf-8')
return csv.reader(file, delimiter=self.delimiter)
def __iter__(self):
if self.is_zipped:
byte_stream = BytesIO(self.response.content)
with zipfile.ZipFile(byte_stream) as self.zipfile:
for name in self.zipfile.namelist():
with self.zipfile.open(name) as single_file:
if name[-3:] == 'csv':
reader = self.get_csv_reader(single_file)
else:
reader = single_file
reader_iterator = iter(reader)
if self.is_header_present:
next(reader_iterator)
for line in reader_iterator:
yield self._parse_line(line)
byte_stream.close()
else:
stream = codecs.iterdecode(self.response.iter_lines(),
self.response.encoding or self.response.apparent_encoding)
reader = csv.reader(stream, delimiter=self.delimiter)
reader_iterator = iter(reader)
if self.is_header_present:
next(reader_iterator)
for line in reader_iterator:
yield self._parse_line(line)
stream.close()
class LensQueryClient(object):
def __init__(self, base_url, session):
self._session = session
self.base_url = base_url + "queryapi/"
self.launched_queries = []
self.finished_queries = {}
self.query_confs = {}
self.is_header_present_in_result = self._session['lens.query.output.write.header'].lower() \
in ['true', '1', 't', 'y', 'yes', 'yeah', 'yup']
def __call__(self, **filters):
filters['sessionid'] = self._session._sessionid
resp = requests.get(self.base_url + "queries/", params=filters, headers={'accept': 'application/json'})
return self.sanitize_response(resp)
def __getitem__(self, item):
if isinstance(item, string_types):
if item in self.finished_queries:
return self.finished_queries[item]
resp = requests.get(self.base_url + "queries/" + item, params={'sessionid': self._session._sessionid},
headers={'accept': 'application/json'})
resp.raise_for_status()
query = LensQuery(self, resp.json(object_hook=WrappedJson))
if query.finished:
query.client = self
self.finished_queries[item] = query
return query
elif isinstance(item, LensQuery):
return self[item.query_handle]
elif isinstance(item, WrappedJson):
if item._is_wrapper:
return self[item._wrapped_value]
if item.query_handle:
return self[item.query_handle]
raise Exception("Can't get query: " + str(item))
def submit(self, query, operation=None, query_name=None, timeout=None, conf=None, wait=False, fetch_result=False,
*args, **kwargs):
payload = [('sessionid', self._session._sessionid), ('query', query)]
if query_name:
payload.append(('queryName', query_name))
if timeout:
payload.append(('timeoutmillis', str(int(timeout) * 1000)))
if not operation:
operation = "execute_with_timeout" if timeout else "execute"
payload.append(('operation', operation))
payload.append(('conf', conf_to_xml(conf)))
resp = requests.post(self.base_url + "queries/", files=payload, headers={'accept': 'application/json'})
query = self.sanitize_response(resp)
logger.info("Submitted query %s", query)
if conf:
self.query_confs[str(query)] = conf
if fetch_result:
# get result and return
return self.get_result(query, *args, **kwargs) # query is handle here
elif wait:
# fetch details and return
return self.wait_till_finish(query, *args, **kwargs)
# just return handle. This would be the async case. Or execute with timeout, without wait
return query
def wait_till_finish(self, handle_or_query, poll_interval=5, *args, **kwargs):
while not self[handle_or_query].finished:
time.sleep(poll_interval)
return self[handle_or_query]
def get_result(self, handle_or_query, *args, **kwargs):
query = self.wait_till_finish(handle_or_query, *args, **kwargs)
handle = str(query.query_handle)
if query.status.status == 'SUCCESSFUL' and query.status.is_result_set_available:
resp = requests.get(self.base_url + "queries/" + handle + "/resultsetmetadata",
params={'sessionid': self._session._sessionid}, headers={'accept': 'application/json'})
metadata = self.sanitize_response(resp)
# Try getting the result through http result
resp = requests.get(self.base_url + "queries/" + handle + "/httpresultset",
params={'sessionid': self._session._sessionid}, stream=True)
if resp.ok:
is_header_present = self.is_header_present_in_result
if handle in self.query_confs and 'lens.query.output.write.header' in self.query_confs[handle]:
is_header_present = bool(self.query_confs[handle]['lens.query.output.write.header'])
return LensPersistentResult(metadata, resp, is_header_present=is_header_present, *args, **kwargs)
else:
response = requests.get(self.base_url + "queries/" + handle + "/resultset",
params={'sessionid': self._session._sessionid},
headers={'accept': 'application/json'})
resp = self.sanitize_response(response)
# If it has in memory result, return inmemory result iterator
if resp._is_wrapper and resp._wrapped_key == u'inMemoryQueryResult':
return LensInMemoryResult(resp)
# Else, return whatever you got
return resp
else:
raise Exception("Result set not available")
def sanitize_response(self, resp):
try:
resp_json = resp.json(object_hook=WrappedJson)
except:
resp_json = resp.json()
if resp_json is not None:
if 'lensAPIResult' in resp_json:
resp_json = resp_json.lens_a_p_i_result
if 'error' in resp_json:
error = resp_json['error']
if "stackTrace" in error:
logger.error(error['stackTrace'])
raise HTTPError(error, request=resp.request, response=resp)
if 'data' in resp_json:
data = resp_json.data
if len(data) == 2 and 'type' in data:
keys = list(data.keys())
keys.remove('type')
return WrappedJson({data['type']: data[keys[0]]})
return data
if resp_json is not None:
return resp_json
else:
resp.raise_for_status()
logger.error(resp.text)
raise Exception("Unknown error with response", resp)