blob: 2b3217f7b2708e3e0c5b7e0ae3fa3cbe332576bf [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
from collections import defaultdict
from copy import deepcopy
# The model related modules (types.py, query.py, etc) are interconnected by circular
# imports which causes problems for the python import system. This module is intended to
# be the first of the circular modules imported. To be importable, no direct references
# are made to the other modules from this modules namespace. Instead, other modules are
# lazyily imported using the following function. Keep in mind that python "globals" are
# module local, there is no such thing as a cross-module global.
__ALREADY_IMPORTED = False
def get_import(name):
# noqa below tells flake8 to not warn when it thinks imports are not used
global __ALREADY_IMPORTED
if not __ALREADY_IMPORTED:
from tests.comparison.db_types import ( # noqa
BigInt,
Boolean,
Char,
DataType,
Decimal,
Float,
Int,
JOINABLE_TYPES,
Number,
Timestamp)
from tests.comparison.funcs import AggFunc, AnalyticFunc, Func # noqa
from tests.comparison.query import InlineView, Subquery, WithClauseInlineView # noqa
for key, value in locals().items():
globals()[key] = value
__ALREADY_IMPORTED = True
return globals()[name]
class ValExpr(object):
'''This is class that represents a generic expr that results in a scalar.'''
@property
def type(self):
'''Returns the type that this expr evaluates to. The type may be Int or Char but
never BigInt or String. Valid return value are the set defined in types.TYPES.
'''
return self.exact_type.get_generic_type()
@property
def exact_type(self):
'''Return the actual type of the val expr. For example "type" could return Int and
"exact_type" could return TinyInt.
'''
pass
@property
def base_type(self):
'''Returns the lowest type in the type heirarchy that is not DataType. For non-
numeric types, the return value will be the same is self.type. Numeric types
will return Number whereas self.type may be Decimal, Int, or Float.
'''
return self.type.get_base_type()
@property
def is_func(self):
'''Evaluates to True if this expr is an instance of a function.'''
return isinstance(self, get_import('Func'))
@property
def is_agg(self):
'''Evaluates to True if this expr is an instance of an aggregate function.'''
return isinstance(self, get_import('AggFunc'))
@property
def is_analytic(self):
'''Evaluates to True if this expr is an instance of an analytic function.'''
return isinstance(self, get_import('AnalyticFunc'))
@property
def contains_agg(self):
'''Evaluates to True if this expression is an aggregate function or contains an
aggregate function.
'''
return self.is_agg or self.is_func and any(
isinstance(arg, ValExpr) and arg.contains_agg for arg in self.args)
@property
def contains_analytic(self):
'''Evaluates to True if this expression is an analytic function or contains an
analytic function.
'''
return self.is_analytic or self.is_func and any(
isinstance(arg, ValExpr) and arg.contains_analytic for arg in self.args)
@property
def contains_subquery(self):
'''Evaluates to True if this expression is a subquery or contains a subquery.'''
return self.is_subquery or self.is_func and any(
isinstance(arg, ValExpr) and arg.contains_subquery for arg in self.args)
@property
def is_col(self):
return isinstance(self, Column)
@property
def is_constant(self):
return isinstance(self, get_import('DataType'))
@property
def is_subquery(self):
return isinstance(self, get_import('Subquery'))
@property
def returns_boolean(self):
return issubclass(self.type, get_import('Boolean'))
@property
def returns_int(self):
return issubclass(self.type, get_import('Int'))
@property
def returns_float(self):
return issubclass(self.type, get_import('Float'))
@property
def returns_char(self):
return issubclass(self.type, get_import('Char'))
@property
def returns_timestamp(self):
return issubclass(self.type, get_import('Timestamp'))
def iter_exprs(self, filter=None):
'''Return an iterator over all exprs that this expr contains as a function argument
including this expr itself.
'''
if not filter or filter(self):
yield self
def count_col_refs(self):
'''Return a dict with Columns as keys and the number of times the column was used
in this expr as values.
'''
col_ref_counts = defaultdict(int)
if self.is_func:
for arg in self.args:
if isinstance(arg, ValExpr):
for col, count in arg.count_col_refs().iteritems():
col_ref_counts[col] += count
elif self.is_col:
col_ref_counts[self] += 1
return col_ref_counts
class StructColumn(object):
'''The methods in this class are similar to TableExpr.
In Impala it's not possible to select from a struct column. To mirror this behavior
here, StructColumn does not inherit from TableExpr.
TODO: Maybe make a parent class that both StructColumn and TableExpr will inhert
from.
'''
def __init__(self, owner, name):
self.owner = owner
self.name = name
self._cols = []
self.alias = None
@property
def identifier(self):
return self.alias or self.name
@property
def cols(self):
'''Returns a ValExprList containing all scalar cols in this StructColumn. '''
# TODO: Since Impala now supports nested types, this method can be renamed to
# scalar_cols
result = ValExprList()
for col in self._cols:
if not isinstance(col, CollectionColumn):
result.extend(col.cols)
return result
@property
def collections(self):
result = []
for col in self._cols:
if isinstance(col, CollectionColumn):
result.append(col)
result.extend(col.collections)
elif isinstance(col, StructColumn):
result.extend(col.collections)
return result
def get_col_by_name(self, col_name):
for col in self._cols:
if col.name == col_name:
return col
return None
def add_col(self, col):
col.owner = self
self._cols.append(col)
def __eq__(self, other):
if not isinstance(other, StructColumn):
return False
if self is other:
return True
return self.name == other.name and self.owner.identifier == other.owner.identifier
def __hash__(self):
return hash(self.name)
def __repr__(self):
cols_str = ', '.join(str(f) for f in self._cols)
return '%s<name: %s, cols: [%s]>' % (type(self).__name__, self.name, cols_str)
def __deepcopy__(self, memo):
other = StructColumn(self.owner, self.name)
other.alias = self.alias
for col in self._cols:
other.add_col(deepcopy(col, memo))
return other
class Column(ValExpr):
'''A representation of a column. All TableExprs will have Columns. So a Column
may belong to an InlineView as well as a standard Table.
This class is used in two ways:
1) As a piece of metadata in a table definiton. In this usage the col isn't
intended to represent an val.
2) As an expr in a query, for example an item being selected or as part of
a JOIN condition. In this usage the col is more like a val, which is why
it implements/extends ValExpr.
This class can also be used to represent Map keys, Map values, Array pos,
scalar struct field, and scalar array item.
'''
def __init__(self, owner, name, exact_type):
self.owner = owner
self.name = name
self._exact_type = exact_type
self.is_primary_key = False
@property
def exact_type(self):
return self._exact_type
@exact_type.setter
def exact_type(self, exact_type):
self._exact_type = exact_type
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
if not isinstance(other, Column):
return False
if self is other:
return True
return self.name == other.name and self.owner.identifier == other.owner.identifier
@property
def cols(self):
return ValExprList([self])
def __repr__(self):
return '%s<name: %s, type: %s>' % (
type(self).__name__, self.name, self.exact_type.__name__)
def __deepcopy__(self, memo):
# Don't return a deep copy of owner, since that is a circular reference
return Column(self.owner, self.name, self.exact_type)
class ValExprList(list):
'''A list of ValExprs.'''
@property
def by_type(self):
return get_import('DataType').group_by_type(self)
def __repr__(self):
return 'ValExprList: ' + ', '.join(str(x) for x in self)
class TableExpr(object):
'''This class represents something that a query may use to SELECT from or JOIN on.'''
@property
def identifier(self):
'''Returns either a table name or alias if one has been declared.'''
pass
@property
def unique_cols(self):
'''Returns a list of lists of Cols that in combination define a unique set of values
within the table. The returned list could be thought of as a list of uniqueness
constraints (though there may be no actual constraints or any other type of
enforcement).
'''
return ValExprList()
@property
def joinable_cols(self):
'''Returns a list of Cols that are of a type that is allowed in a JOIN. This is
mostly an Impala specific thing since Impala requires at least one equality based
join and not all types are allowed in equality comparisons. Also Boolean is
excluded because of low cardinality.
'''
joinable_types = tuple(get_import('JOINABLE_TYPES'))
return ValExprList(col for col in self.cols if issubclass(col.type, joinable_types))
@property
def col_types(self):
'''Returns a Set containing the various column types that this TableExpr contains.'''
return set(self.cols_by_type)
@property
def collections(self):
'''Returns all nested collections that can be accessed from this TableExpr.'''
result = []
for col in self._cols:
if isinstance(col, CollectionColumn):
result.append(col)
result.extend(col.collections)
elif isinstance(col, StructColumn):
result.extend(col.collections)
return result
def add_col(self, col):
col.owner = self
self._cols.append(col)
def is_visible(self):
'''If False is returned, columns from this TableExpr may only be used in JOIN
conditions. This is intended to be used to identify ANTI and SEMI joined table
exprs.
'''
pass
@property
def cols_by_type(self):
'''Group cols of the same type into lists and return a dict of the results.'''
return get_import('DataType').group_by_type(self.cols)
@property
def joinable_cols_by_type(self):
return get_import('DataType').group_by_type(self.joinable_cols)
@property
def is_table(self):
return isinstance(self, Table)
@property
def is_inline_view(self):
return isinstance(self, get_import('InlineView'))
@property
def is_with_clause_inline_view(self):
return isinstance(self, get_import('WithClauseInlineView'))
def __hash__(self):
return hash(self.identifier)
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return self.identifier == other.identifier
class CollectionColumn(TableExpr):
'''Used for representing Map or Array columns.'''
def __init__(self, owner, name):
self.name = name
# Owner can be one of: Table, ArrayColumn or StructColumn.
self.owner = owner
self.is_visible = True
self.alias = None
self._cols = []
@property
def identifier(self):
return self.alias or self.name
@property
def cols(self):
result = ValExprList()
for col in self._cols:
if not isinstance(col, CollectionColumn):
result.extend(col.cols)
return result
def get_col_by_name(self, col_name):
for col in self._cols:
if col.name == col_name:
return col
return None
def __hash__(self):
return hash(self.name)
def __repr__(self):
cols_str = ', '.join(str(f) for f in self._cols)
return '%s<name: %s, cols: [%s]>' % (type(self).__name__, self.name, cols_str)
class ArrayColumn(CollectionColumn):
def __init__(self, owner, name, item):
'''Item represents the type of array. For example if array type is Int, item should be
Column of type Int.
'''
super(ArrayColumn, self).__init__(owner, name)
item.owner = self
item.name = 'item'
self._cols.append(item)
# Arrays have 2 fields: pos and item. Pos is automatically set to BigInt.
self._cols.append(Column(
owner=self, name='pos', exact_type=get_import('BigInt')))
def __eq__(self, other):
if not isinstance(other, ArrayColumn):
return False
if self is other:
return True
return self.name == other.name and self.owner.identifier == other.owner.identifier
def __deepcopy__(self, memo):
other = ArrayColumn(
owner=self.owner,
name=self.name,
item=deepcopy(self.get_col_by_name('item')))
other.alias = self.alias
return other
class MapColumn(CollectionColumn):
def __init__(self, owner, name, key, value):
super(MapColumn, self).__init__(owner, name)
# Set key
key.owner = self
key.name = 'key'
self._cols.append(key)
# Set value
value.owner = self
value.name = 'value'
self._cols.append(value)
def __eq__(self, other):
if not isinstance(other, MapColumn):
return False
if self is other:
return True
return self.name == other.name and self.owner.identifier == other.owner.identifier
def __deepcopy__(self, memo):
other = MapColumn(
owner=self.owner,
name=self.name,
key=deepcopy(self.get_col_by_name('key')),
value=deepcopy(self.get_col_by_name('value')))
other.alias = self.alias
return other
class Table(TableExpr):
'''Represents a standard database table.'''
def __init__(self, name):
self.name = name
self._cols = [] # can include CollectionColumns and StructColumns
self._unique_cols = []
self.alias = None
self.is_visible = True # Tables used in SEMI or ANTI JOINs are invisible
# Only used for data loading. Always stored in upper-case. If set, values will be
# something like 'PARQUET' or 'TEXT'. See cli_options.py for a full list of
# possible values.
self._storage_format = None
# Only used for data loading. For Impala and Hive, this is the path to the directory
# in the storage system, such as an HDFS URL.
self.storage_location = None
# Only used for data loading. Avro tables may require a separate schema definition,
# this is the path to the schema file in the storage system, such as an HDFS URL.
self.schema_location = None
@property
def identifier(self):
return self.alias or self.name
@property
def primary_keys(self):
"""
Return immutable sequence of primary keys.
"""
return tuple(col for col in self._cols if col.is_primary_key)
@property
def primary_key_names(self):
"""
Return immutable sequence for primary key names.
"""
return tuple(col.name for col in self.primary_keys)
@property
def updatable_columns(self):
"""
Return immutable sequence of columns that may be updated (i.e., not primary keys).
If the table doesn't have primary keys, no columns are updatable.
"""
if self.primary_keys:
return tuple(col for col in self._cols if not col.is_primary_key)
else:
return ()
@property
def updatable_column_names(self):
"""
Return immutable sequence of column names that may be updated
"""
return tuple(col.name for col in self.updatable_columns)
@property
def cols(self):
result = ValExprList()
for col in self._cols:
if not isinstance(col, CollectionColumn):
result.extend(col.cols)
return result
@cols.setter
def cols(self, cols):
self._cols = cols
@property
def unique_cols(self):
return self._unique_cols
@unique_cols.setter
def unique_cols(self, unique_cols):
self._unique_cols = unique_cols
@property
def storage_format(self):
return self._storage_format
@storage_format.setter
def storage_format(self, storage_format):
self._storage_format = storage_format and storage_format.upper()
def get_avro_schema(self):
avro_schema = {'name': 'my_record', 'type': 'record', 'fields': []}
for col in self.cols:
if issubclass(col.type, get_import('Int')):
avro_type = 'int'
elif issubclass(col.type, get_import('Char')):
avro_type = 'string'
elif issubclass(col.type, get_import('Decimal')):
avro_type = {
"type": "bytes",
"logicalType": "decimal",
"precision": col.exact_type.MAX_DIGITS,
"scale": col.exact_type.MAX_FRACTIONAL_DIGITS}
else:
avro_type = col.type.__name__.lower()
avro_schema['fields'].append({'name': col.name, 'type': ['null', avro_type]})
return json.dumps(avro_schema)
def __repr__(self):
return 'Table<name: %s, cols: %s>' \
% (self.name, ', '.join([str(col) for col in self._cols]))
def __deepcopy__(self, memo):
other = Table(self.name)
other.alias = self.alias
other.is_visible = self.is_visible
cols_by_name = dict()
for col in self._cols:
result_col = deepcopy(col, memo)
other.add_col(result_col)
cols_by_name[result_col.name] = result_col
other._unique_cols = []
for col_combo in self._unique_cols:
other_col_combo = set()
for col in col_combo:
other_col_combo.add(cols_by_name[col.name])
other.unique_cols.append(other_col_combo)
return other
class TableExprList(list):
'''A list of TableExprs.'''
@property
def cols(self):
'''Return a list of all the Columns containd in all the TableExprs.'''
return ValExprList(col for table_expr in self for col in table_expr.cols)
@property
def joinable_cols_by_type(self):
cols_by_type = defaultdict(ValExprList)
for table_expr in self:
for type_, cols in table_expr.joinable_cols_by_type.iteritems():
cols_by_type[type_].extend(cols)
return cols_by_type
@property
def cols_by_type(self):
cols_by_type = defaultdict(ValExprList)
for table_expr in self:
for type_, cols in table_expr.cols_by_type.iteritems():
cols_by_type[type_].extend(cols)
return cols_by_type
@property
def col_types(self):
return tuple(self.cols_by_type)
@property
def collections(self):
result = []
for table_expr in self:
result.extend(table_expr.collections)
return result
@property
def by_col_type(self):
'''Return a dict with keys being column types and values being lists of TableExprs
that have at least one Column of that type.
'''
table_exprs_by_type = defaultdict(TableExprList)
for table_expr in self:
for col_type in table_expr.col_types:
table_exprs_by_type[col_type].append(table_expr)
return table_exprs_by_type