blob: 0c1fda608bb7d7f7ba0219f170fc43ee9cf33c0f [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from kudu.compat import unittest
from kudu.tests.util import TestScanBase
from kudu.tests.common import KuduTestBase
import kudu
import datetime
import time
def _get_scan_token_results(input):
client = kudu.connect(input[1], input[2])
scanner = client.deserialize_token_into_scanner(input[0])
scanner.open()
tuples = scanner.read_all_tuples()
# Test explicit closing of scanner
scanner.close()
return tuples
class TestScanToken(TestScanBase):
@classmethod
def setUpClass(self):
super(TestScanToken, self).setUpClass()
def setUp(self):
pass
def _subtest_serialize_thread_and_verify(self, tokens, expected_tuples, count_only=False):
"""
Given the input serialized tokens, hydrate the scanners, execute the
scans, and validate the results
"""
input = [(token.serialize(), self.master_hosts, self.master_ports) for token in tokens]
# Validate results
results = [_get_scan_token_results(i) for i in input]
actual_tuples = []
for result in results:
actual_tuples += result
if count_only:
self.assertEqual(expected_tuples, actual_tuples)
else:
self.assertEqual(sorted(expected_tuples), sorted(actual_tuples))
def test_scan_token_serde_threaded_with_named_projection(self):
"""
Creates scan tokens, serializes them, delivers them to new
threads then executes them in parallel with seperate clients.
"""
builder = self.table.scan_token_builder()
builder.set_projected_column_names(['key', 'string_val']).set_fault_tolerant()
# Serialize execute and verify
self._subtest_serialize_thread_and_verify(builder.build(),
[(x[0], x[2]) for x in self.tuples])
def test_scan_token_serde_threaded_simple_predicate_and_index_projection(self):
"""
Creates scan tokens with predicates and an index projection,
serializes them, delivers them to new threads then executes
them in parallel with seperate clients.
"""
key = self.table['key']
preds = [key > 19, key < 50]
builder = self.table.scan_token_builder()
builder.set_projected_column_indexes([0, 1])\
.set_fault_tolerant()\
.add_predicates(preds)
# Serialize execute and verify
self._subtest_serialize_thread_and_verify(builder.build(),
[x[0:2] for x in self.tuples[20:50]])
def test_scan_token_serde_threaded_with_bounds(self):
"""
Creates scan tokens with bounds, serializes them,
delivers them to new threads then executes them
in parallel with seperate clients.
"""
builder = self.table.scan_token_builder()
builder.set_fault_tolerant()\
.add_lower_bound([50])\
.add_upper_bound([55])
# Serialize execute and verify
self._subtest_serialize_thread_and_verify(builder.build(),
self.tuples[50:55])
def test_scan_token_invalid_predicates(self):
builder = self.table.scan_token_builder()
sv = self.table['string_val']
with self.assertRaises(TypeError):
builder.add_predicates([sv >= None])
with self.assertRaises(TypeError):
builder.add_predicates([sv >= 1])
def _subtest_open_and_confirm_leader_tserver(self, token):
for replica in token.tablet().replicas():
if replica.is_leader():
leader_tserver = replica.ts()
scanner = token.into_kudu_scanner()
scanner.open()
self.assertEqual(scanner.get_current_server(), leader_tserver)
return scanner
def test_scan_token_batch_by_batch_with_local_scanner(self):
builder = self.table.scan_token_builder()
lower_bound = builder.new_bound()
lower_bound['key'] = 10
upper_bound = builder.new_bound()
upper_bound['key'] = 90
builder.set_fault_tolerant() \
.add_lower_bound(lower_bound) \
.add_upper_bound(upper_bound)
tokens = builder.build()
tuples = []
for token in tokens:
scanner = self._subtest_open_and_confirm_leader_tserver(token)
while scanner.has_more_rows():
batch = scanner.next_batch()
tuples.extend(batch.as_tuples())
self.assertEqual(sorted(tuples), self.tuples[10:90])
def test_unixtime_micros(self):
"""
Test setting and getting unixtime_micros fields
"""
# Insert new rows
self.insert_new_unixtime_micros_rows()
# Validate results
builder = self.table.scan_token_builder()
tokens = builder.set_fault_tolerant().build()
tuples = []
for token in tokens:
scanner = self._subtest_open_and_confirm_leader_tserver(token)
while scanner.has_more_rows():
scanner.keep_alive()
batch = scanner.next_batch()
tuples.extend(batch.as_tuples())
self.assertEqual(sorted(self.tuples), tuples)
def test_scan_rows_in_list_predicate(self):
"""
Test scan token builder/scanner with an InList predicate and
a string comparison predicate
"""
key_list = [2, 98]
builder = self.table.scan_token_builder()
builder.set_fault_tolerant() \
.add_predicates([
self.table[0].in_list(key_list),
self.table['string_val'] >= 'hello_9'
])
# Serialize execute and verify
self._subtest_serialize_thread_and_verify(builder.build(),
[self.tuples[98]])
def test_read_mode(self):
"""
Test scanning in latest, snapshot and read_your_writes read modes.
"""
# Delete row
self.delete_insert_row_for_read_test()
# Check scanner results prior to delete
builder = self.table.scan_token_builder()
tokens = builder.set_read_mode('snapshot') \
.set_snapshot(self.snapshot_timestamp) \
.build()
tuples = []
for token in tokens:
scanner = self._subtest_open_and_confirm_leader_tserver(token)
tuples.extend(scanner.read_all_tuples())
self.assertEqual(sorted(self.tuples[1:]), sorted(tuples))
# Check scanner results after inserts with latest mode
builder = self.table.scan_token_builder()
tokens = builder.set_read_mode(kudu.READ_LATEST) \
.build()
tuples = []
for token in tokens:
scanner = self._subtest_open_and_confirm_leader_tserver(token)
tuples.extend(scanner.read_all_tuples())
self.assertEqual(sorted(self.tuples), sorted(tuples))
# Check scanner results after inserts with read_your_writes mode
builder = self.table.scan_token_builder()
tokens = builder.set_read_mode(kudu.READ_YOUR_WRITES)\
.build()
tuples = []
for token in tokens:
scanner = self._subtest_open_and_confirm_leader_tserver(token)
tuples.extend(scanner.read_all_tuples())
self.assertEqual(sorted(self.tuples), sorted(tuples))
def verify_pred_type_scans(self, preds, row_indexes, count_only=False):
# Using the incoming list of predicates, verify that the row returned
# matches the inserted tuple at the row indexes specified in a
# slice object
builder = self.type_table.scan_token_builder()
builder.set_fault_tolerant()
builder.set_projected_column_names(self.projected_names_w_o_float)
builder.add_predicates(preds)
# Verify rows
self._subtest_serialize_thread_and_verify(builder.build(),
self.type_test_rows[row_indexes],
count_only)
def test_unixtime_micros_pred(self):
# Test unixtime_micros value predicate
self._test_unixtime_micros_pred()
def test_decimal_pred(self):
if kudu.CLIENT_SUPPORTS_DECIMAL:
# Test decimal value predicate
self._test_decimal_pred()
def test_bool_pred(self):
# Test a boolean value predicate
self._test_bool_pred()
def test_double_pred(self):
# Test a double precision float predicate
self._test_double_pred()
def test_float_pred(self):
# Test a single precision float predicate
# Does a row check count only
self._test_float_pred()
def test_binary_pred(self):
# Test a binary predicate
self._test_binary_pred()
def test_varchar_pred(self):
# Test a varchar predicate
self._test_varchar_pred()
def test_date_pred(self):
# Test a date predicate
self._test_date_pred()
def test_scan_selection(self):
"""
This test confirms that setting the scan selection policy on the
ScanTokenBuilder does not cause any errors . There is no way to
confirm that the policy was actually set. This functionality is
tested in the C++ test:
ClientTest.TestReplicatedMultiTabletTableFailover.
"""
for policy in ['leader', kudu.CLOSEST_REPLICA, 2]:
builder = self.table.scan_token_builder()
builder.set_selection(policy)
tokens = builder.build()
tuples = []
for token in tokens:
scanner = self._subtest_open_and_confirm_leader_tserver(token)
tuples.extend(scanner.read_all_tuples())
self.assertEqual(sorted(tuples),
sorted(self.tuples))