blob: c3c91a5aea85cb63accee44a9995199f488293ba [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from decimal import Decimal
from kudu.compat import unittest
from kudu.client import Partitioning
from kudu.tests.common import KuduTestBase
import kudu
import datetime
import pytz
class TestScanBase(KuduTestBase, unittest.TestCase):
@classmethod
def setUpClass(self):
"""
Parent class for both the Scan tests and the
Scan Token tests
"""
super(TestScanBase, self).setUpClass()
self.nrows = 100
table = self.client.table(self.ex_table)
session = self.client.new_session()
tuples = []
for i in range(self.nrows):
op = table.new_insert()
tup = i, \
i * 2, \
'hello_%d' % i if i % 2 == 0 else None, \
datetime.datetime.utcnow().replace(tzinfo=pytz.utc)
op['key'] = tup[0]
op['int_val'] = tup[1]
if i % 2 == 0:
op['string_val'] = tup[2]
op['unixtime_micros_val'] = tup[3]
session.apply(op)
tuples.append(tup)
session.flush()
self.table = table
self.tuples = tuples
# Create table to test all types
# for various predicate tests
table_name = 'type-test'
# Create schema, partitioning and then table
builder = kudu.schema_builder()
builder.add_column('key').type(kudu.int64).nullable(False)
builder.add_column('unixtime_micros_val', type_=kudu.unixtime_micros, nullable=False)
if kudu.CLIENT_SUPPORTS_DECIMAL:
builder.add_column('decimal_val', type_=kudu.decimal, precision=5, scale=2)
builder.add_column('string_val', type_=kudu.string, compression=kudu.COMPRESSION_LZ4, encoding='prefix')
builder.add_column('bool_val', type_=kudu.bool)
builder.add_column('double_val', type_=kudu.double)
builder.add_column('int8_val', type_=kudu.int8)
builder.add_column('binary_val', type_='binary', compression=kudu.COMPRESSION_SNAPPY, encoding='prefix')
builder.add_column('varchar_val', type_=kudu.varchar, length=10)
builder.add_column('date_val', type_=kudu.date)
builder.add_column('float_val', type_=kudu.float)
builder.set_primary_keys(['key', 'unixtime_micros_val'])
schema = builder.build()
self.projected_names_w_o_float = [
col for col in schema.names if col != 'float_val'
]
partitioning = Partitioning() \
.add_hash_partitions(column_names=['key'], num_buckets=3)\
.set_range_partition_columns(['unixtime_micros_val'])\
.add_range_partition(
upper_bound={'unixtime_micros_val': ("2016-01-01", "%Y-%m-%d")},
upper_bound_type=kudu.EXCLUSIVE_BOUND
)\
.add_range_partition(
lower_bound={'unixtime_micros_val': datetime.datetime(2016, 1, 1)},
lower_bound_type='INCLUSIVE',
upper_bound={'unixtime_micros_val': datetime.datetime(9999, 12, 31)}
)
self.client.create_table(table_name, schema, partitioning)
self.type_table = self.client.table(table_name)
# Insert new rows
if kudu.CLIENT_SUPPORTS_DECIMAL:
self.type_test_rows = [
(1, datetime.datetime(2016, 1, 1).replace(tzinfo=pytz.utc), Decimal('111.11'),
"Test One", True, 1.7976931348623157 * (10^308), 127,
b'\xce\x99\xce\xbf\xcf\x81\xce\xb4\xce\xb1\xce\xbd\xce\xaf\xce\xb1',
"Test One", datetime.date(1970,1,1), 3.402823 * (10^38)),
(2, datetime.datetime.utcnow().replace(tzinfo=pytz.utc), Decimal('0.99'),
"测试二", False, 200.1, -1,
b'\xd0\x98\xd0\xbe\xd1\x80\xd0\xb4\xd0\xb0\xd0\xbd\xd0\xb8\xd1\x8f',
"测试二", datetime.date(2020,1,1), -150.2)
]
else:
self.type_test_rows = [
(1, datetime.datetime(2016, 1, 1).replace(tzinfo=pytz.utc),
"Test One", True, 1.7976931348623157 * (10 ^ 308), 127,
b'\xce\x99\xce\xbf\xcf\x81\xce\xb4\xce\xb1\xce\xbd\xce\xaf\xce\xb1',
"Test One", datetime.date(1970,1,1), 3.402823 * (10 ^ 38)),
(2, datetime.datetime.utcnow().replace(tzinfo=pytz.utc),
"测试二", False, 200.1, -1,
b'\xd0\x98\xd0\xbe\xd1\x80\xd0\xb4\xd0\xb0\xd0\xbd\xd0\xb8\xd1\x8f',
"测试二", datetime.date(2020,1,1), -150.2)
]
session = self.client.new_session()
for row in self.type_test_rows:
op = self.type_table.new_insert(row)
session.apply(op)
session.flush()
# Remove the float values from the type_test_rows tuples so we can
# compare the other vals
self.type_test_rows = [
tuple[:-1] for tuple in self.type_test_rows
]
def setUp(self):
pass
def insert_new_unixtime_micros_rows(self):
# Get current UTC datetime to be used for read at snapshot test
# Not using the easter time value below as that may not be the
# actual local timezone of the host executing this test and as
# such does would not accurately be offset to UTC
self.snapshot_timestamp = datetime.datetime.utcnow()
# Insert new rows
# Also test a timezone other than UTC to confirm that
# conversion to UTC is properly applied
eastern = datetime.datetime.now()\
.replace(tzinfo=pytz.timezone("America/New_York"))
rows = [[100, "2016-09-14T23:11:32.432019"],
[101, ("2016-09-15", "%Y-%m-%d")],
[102, eastern]]
session = self.client.new_session()
for row in rows:
op = self.table.new_insert()
list = [row[0],
row[0]*2,
'hello_%d' % row[0] if row[0] % 2 == 0 else None,
row[1]]
for i, val in enumerate(list):
op[i] = val
session.apply(op)
# convert datetime if needed to validate rows
if not isinstance(list[3], datetime.datetime):
if type(list[3]) is tuple:
list[3] = datetime.datetime \
.strptime(list[3][0], list[3][1])
else:
list[3] = datetime.datetime \
.strptime(list[3], "%Y-%m-%dT%H:%M:%S.%f")
else:
# Convert Eastern Time datetime to UTC for confirmation
list[3] -= list[3].utcoffset()
# Apply timezone
list[3] = list[3].replace(tzinfo=pytz.utc)
self.tuples.append(tuple(list))
session.flush()
def delete_insert_row_for_read_test(self):
# Retrive row to delete so it can be reinserted into the table so
# that other tests do not fail
row = self.table.scanner()\
.set_fault_tolerant()\
.open()\
.read_all_tuples()[0]
# Delete row from table
session = self.client.new_session()
op = self.table.new_delete()
op['key'] = row[0]
session.apply(op)
session.flush()
# Get latest observed timestamp for snapshot
self.snapshot_timestamp = self.client.latest_observed_timestamp()
# Insert row back into table so that other tests don't fail.
session = self.client.new_session()
op = self.table.new_insert()
for idx, val in enumerate(row):
op[idx] = val
session.apply(op)
session.flush()
def _test_unixtime_micros_pred(self):
# Test unixtime_micros value predicate
self.verify_pred_type_scans(
preds=[
self.type_table['unixtime_micros_val'] == ("2016-01-01", "%Y-%m-%d")
],
row_indexes=slice(0,1)
)
def _test_bool_pred(self):
# Test a boolean value predicate
self.verify_pred_type_scans(
preds=[
self.type_table['bool_val'] == False
],
row_indexes=slice(1,2)
)
def _test_double_pred(self):
# Test a double precision float predicate
self.verify_pred_type_scans(
preds=[
self.type_table['double_val'] < 200.11
],
row_indexes=slice(1,2)
)
def _test_float_pred(self):
self.verify_pred_type_scans(
preds=[
self.type_table['float_val'] == 3.402823 * (10^38)
],
row_indexes=slice(0, 1),
count_only=True
)
def _test_decimal_pred(self):
if kudu.CLIENT_SUPPORTS_DECIMAL:
self.verify_pred_type_scans(
preds=[
self.type_table['decimal_val'] == Decimal('111.11')
],
row_indexes=slice(0, 1),
)
def _test_binary_pred(self):
self.verify_pred_type_scans(
preds=[
self.type_table['binary_val'] == 'Иордания'
],
row_indexes=slice(1, 2)
)
def _test_varchar_pred(self):
self.verify_pred_type_scans(
preds=[
self.type_table['varchar_val'] == 'Test One'
],
row_indexes=slice(0, 1)
)
def _test_date_pred(self):
self.verify_pred_type_scans(
preds=[
self.type_table['date_val'] == datetime.date(1970, 1, 1)
],
row_indexes=slice(0, 1)
)