blob: fa406252df59a254c86cdac773e5f7505693f0cb [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import division
from kudu.compat import unittest
from kudu.tests.util import TestScanBase
from kudu.tests.common import KuduTestBase, TimeoutError
import kudu
import datetime
import time
import pytest
class TestScanner(TestScanBase):
@classmethod
def setUpClass(self):
super(TestScanner, self).setUpClass()
def setUp(self):
pass
def test_scan_rows_basic(self):
# Let's scan with no predicates
scanner = self.table.scanner().open()
tuples = scanner.read_all_tuples()
self.assertEqual(sorted(tuples), self.tuples)
def test_scan_rows_simple_predicate(self):
key = self.table['key']
preds = [key > 19, key < 50]
def _read_predicates(preds):
scanner = self.table.scanner()
scanner.add_predicates(preds)
scanner.open()
return scanner.read_all_tuples()
tuples = _read_predicates(preds)
self.assertEqual(sorted(tuples), self.tuples[20:50])
# verify predicates reusable
tuples = _read_predicates(preds)
self.assertEqual(sorted(tuples), self.tuples[20:50])
def test_scan_limit(self):
# Set limits both below and above the max number of rows.
limits = [self.nrows - 1, self.nrows, self.nrows + 1]
for limit in limits:
scanner = self.table.scanner()
scanner.set_limit(limit)
tuples = scanner.read_all_tuples()
self.assertEqual(len(tuples), min(limit, self.nrows))
def test_scan_rows_string_predicate_and_projection(self):
scanner = self.table.scanner()
scanner.set_projected_column_names(['key', 'string_val'])
sv = self.table['string_val']
scanner.add_predicates([sv >= 'hello_20',
sv <= 'hello_22'])
scanner.set_fault_tolerant()
scanner.open()
tuples = scanner.read_all_tuples()
self.assertEqual(sorted(tuples), [(20, 'hello_20'), (22, 'hello_22')])
def test_scan_rows_in_list_predicate(self):
"""
Test scanner with an InList predicate and
a string comparison predicate
"""
key_list = [2, 98]
scanner = self.table.scanner()
scanner.set_fault_tolerant()\
.add_predicates([
self.table[0].in_list(key_list),
self.table['string_val'] >= 'hello_9'
])
scanner.open()
tuples = scanner.read_all_tuples()
self.assertEqual(tuples, [self.tuples[98]])
def test_scan_rows_is_not_null_predicate(self):
"""
Test scanner with an IsNotNull predicate on string_val column
"""
pred = self.table['string_val'].is_not_null()
scanner = self.table.scanner()
scanner.add_predicate(pred)
scanner.open()
tuples = scanner.read_all_tuples()
rows = [i for i in range(100) if i % 2 == 0]
self.assertEqual(sorted(tuples), [self.tuples[i] for i in rows])
def test_scan_rows_is_null_predicate(self):
"""
Test scanner with an IsNull predicate on string_val column
"""
pred = self.table['string_val'].is_null()
scanner = self.table.scanner()
scanner.add_predicate(pred)
scanner.open()
tuples = scanner.read_all_tuples()
rows = [i for i in range(100) if i % 2 != 0]
self.assertEqual(sorted(tuples), [self.tuples[i] for i in rows])
def test_index_projection_with_schema(self):
scanner = self.table.scanner()
scanner.set_projected_column_indexes([0, 1])
scanner.set_fault_tolerant()
scanner.open()
tuples = scanner.read_all_tuples()
# Build schema to check against
builder = kudu.schema_builder()
builder.add_column('key', kudu.int32, nullable=False)
builder.add_column('int_val', kudu.int32)
builder.set_primary_keys(['key'])
expected_schema = builder.build()
# Build new schema from projection schema
builder = kudu.schema_builder()
for col in scanner.get_projection_schema():
builder.copy_column(col)
builder.set_primary_keys(['key'])
new_schema = builder.build()
self.assertEqual(tuples, [t[0:2] for t in self.tuples])
self.assertTrue(expected_schema.equals(new_schema))
def test_scan_with_bounds(self):
scanner = self.table.scanner()
scanner.set_fault_tolerant()\
.add_lower_bound({'key': 50})\
.add_exclusive_upper_bound({'key': 55})
scanner.open()
tuples = scanner.read_all_tuples()
self.assertEqual(sorted(tuples), self.tuples[50:55])
def test_scan_invalid_predicates(self):
scanner = self.table.scanner()
sv = self.table['string_val']
with self.assertRaises(TypeError):
scanner.add_predicates([sv >= None])
with self.assertRaises(TypeError):
scanner.add_predicates([sv >= 1])
with self.assertRaises(TypeError):
scanner.add_predicates([sv.in_list(['testing',
datetime.datetime.utcnow()])])
with self.assertRaises(TypeError):
scanner.add_predicates([sv.in_list([
'hello_20',
120
])])
def test_scan_batch_by_batch(self):
scanner = self.table.scanner()
scanner.set_fault_tolerant()
lower_bound = scanner.new_bound()
lower_bound['key'] = 10
scanner.add_lower_bound(lower_bound)
upper_bound = scanner.new_bound()
upper_bound['key'] = 90
scanner.add_exclusive_upper_bound(upper_bound)
scanner.open()
tuples = []
while scanner.has_more_rows():
batch = scanner.next_batch()
tuples.extend(batch.as_tuples())
self.assertEqual(sorted(tuples), self.tuples[10:90])
def test_unixtime_micros(self):
"""
Test setting and getting unixtime_micros fields
"""
# Insert new rows
self.insert_new_unixtime_micros_rows()
# Validate results
scanner = self.table.scanner()
scanner.set_fault_tolerant().open()
self.assertEqual(sorted(self.tuples), scanner.read_all_tuples())
def test_read_mode(self):
"""
Test scanning in latest, snapshot and read_your_writes read modes.
"""
# Delete row
self.delete_insert_row_for_read_test()
# Check scanner results prior to delete
scanner = self.table.scanner()
scanner.set_read_mode('snapshot')\
.set_snapshot(self.snapshot_timestamp)\
.open()
self.assertEqual(sorted(self.tuples[1:]), sorted(scanner.read_all_tuples()))
# Check scanner results after delete with latest mode
timeout = time.time() + 10
check_tuples = []
while check_tuples != sorted(self.tuples):
if time.time() > timeout:
raise TimeoutError("Could not validate results in allocated" +
"time.")
scanner = self.table.scanner()
scanner.set_read_mode(kudu.READ_LATEST)\
.open()
check_tuples = sorted(scanner.read_all_tuples())
# Avoid tight looping
time.sleep(0.05)
# Check scanner results after delete with read_your_writes mode
scanner = self.table.scanner()
scanner.set_read_mode('read_your_writes')\
.open()
self.assertEqual(sorted(self.tuples), sorted(scanner.read_all_tuples()))
def test_resource_metrics_and_cache_blocks(self):
"""
Test getting the resource metrics after scanning and
setting the scanner to not cache blocks.
"""
# Build scanner and read through all batches and retrieve metrics.
scanner = self.table.scanner()
scanner.set_fault_tolerant().set_cache_blocks(False).open()
scanner.read_all_tuples()
metrics = scanner.get_resource_metrics()
# Confirm that the scanner returned cache hit and miss values.
self.assertTrue('cfile_cache_hit_bytes' in metrics)
self.assertTrue('cfile_cache_miss_bytes' in metrics)
def verify_pred_type_scans(self, preds, row_indexes, count_only=False):
# Using the incoming list of predicates, verify that the row returned
# matches the inserted tuple at the row indexes specified in a
# slice object
scanner = self.type_table.scanner()
scanner.set_fault_tolerant()
scanner.add_predicates(preds)
scanner.set_projected_column_names(self.projected_names_w_o_float)
tuples = scanner.open().read_all_tuples()
# verify rows
if count_only:
self.assertEqual(len(self.type_test_rows[row_indexes]), len(tuples))
else:
self.assertEqual(sorted(self.type_test_rows[row_indexes]), tuples)
def test_unixtime_micros_pred(self):
# Test unixtime_micros value predicate
self._test_unixtime_micros_pred()
def test_bool_pred(self):
# Test a boolean value predicate
self._test_bool_pred()
def test_double_pred(self):
# Test a double precision float predicate
self._test_double_pred()
def test_float_pred(self):
# Test a single precision float predicate
# Does a row check count only
self._test_float_pred()
def test_decimal_pred(self):
if kudu.CLIENT_SUPPORTS_DECIMAL:
# Test a decimal predicate
self._test_decimal_pred()
def test_binary_pred(self):
# Test a binary predicate
self._test_binary_pred()
def test_varchar_pred(self):
self._test_varchar_pred()
def test_date_pred(self):
self._test_date_pred()
def test_scan_selection(self):
"""
This test confirms that setting the scan selection policy on the
scanner does not cause any errors. There is no way to confirm
that the policy was actually set. This functionality is
tested in the C++ test:
ClientTest.TestReplicatedMultiTabletTableFailover.
"""
for policy in ['leader', kudu.CLOSEST_REPLICA, 2]:
scanner = self.table.scanner()
scanner.set_selection(policy)
scanner.open()
self.assertEqual(sorted(scanner.read_all_tuples()),
sorted(self.tuples))
@pytest.mark.skipif(not (kudu.CLIENT_SUPPORTS_PANDAS),
reason="Pandas required to run this test.")
def test_scanner_to_pandas_types(self):
"""
This test confirms that data types are converted as expected to Pandas.
"""
import numpy as np
scanner = self.type_table.scanner()
df = scanner.to_pandas()
types = df.dtypes
if kudu.CLIENT_SUPPORTS_DECIMAL:
self.assertEqual(types[0], np.int64)
self.assertEqual(types[1], 'datetime64[ns, UTC]')
self.assertEqual(types[2], np.object)
self.assertEqual(types[3], np.object)
self.assertEqual(types[4], np.bool)
self.assertEqual(types[5], np.float64)
self.assertEqual(types[6], np.int8)
self.assertEqual(types[7], np.object)
self.assertEqual(types[8], np.object)
self.assertEqual(types[9], np.object)
self.assertEqual(types[10], np.float32)
else:
self.assertEqual(types[0], np.int64)
self.assertEqual(types[1], 'datetime64[ns, UTC]')
self.assertEqual(types[2], np.object)
self.assertEqual(types[3], np.bool)
self.assertEqual(types[4], np.float64)
self.assertEqual(types[5], np.int8)
self.assertEqual(types[6], np.object)
self.assertEqual(types[7], np.object)
self.assertEqual(types[8], np.object)
self.assertEqual(types[9], np.float32)
@pytest.mark.skipif(not (kudu.CLIENT_SUPPORTS_PANDAS),
reason="Pandas required to run this test.")
def test_scanner_to_pandas_row_count(self):
"""
This test confirms that the record counts match between Pandas and the scanner.
"""
scanner = self.type_table.scanner()
scanner_count = len(scanner.read_all_tuples())
scanner = self.type_table.scanner()
df = scanner.to_pandas()
self.assertEqual(scanner_count, df.shape[0])
@pytest.mark.skipif(not (kudu.CLIENT_SUPPORTS_PANDAS),
reason="Pandas required to run this test.")
def test_scanner_to_pandas_index(self):
"""
This test confirms that an index is correctly applied.
"""
scanner = self.type_table.scanner()
df = scanner.to_pandas(index='key')
self.assertEqual(df.index.name, 'key')
self.assertEqual(list(df.index), [1, 2])
@pytest.mark.skipif((not(kudu.CLIENT_SUPPORTS_PANDAS) or
(not(kudu.CLIENT_SUPPORTS_DECIMAL))),
reason="Pandas and Decimal support required to run this test.")
def test_scanner_to_pandas_index(self):
"""
This test confirms that a decimal column is coerced to a double when specified.
"""
import numpy as np
scanner = self.type_table.scanner()
df = scanner.to_pandas(coerce_float=True)
types = df.dtypes
self.assertEqual(types[2], np.float64)