blob: 617f406833b3b72dd4d1833479950ac04e952aeb [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from concurrent.futures import ThreadPoolExecutor
from shutil import rmtree, copytree
import numpy as np
import pytest
from petastorm.codecs import ScalarCodec
from pycarbon import make_carbon_reader, make_batch_carbon_reader
from pycarbon.reader import make_reader
from pycarbon.tests.core.test_carbon_common import TestSchema
from pycarbon.tests.conftest import _ROWS_COUNT
import os
import jnius_config
jnius_config.set_classpath(pytest.config.getoption("--carbon-sdk-path"))
if pytest.config.getoption("--pyspark-python") is not None and \
pytest.config.getoption("--pyspark-driver-python") is not None:
os.environ['PYSPARK_PYTHON'] = pytest.config.getoption("--pyspark-python")
os.environ['PYSPARK_DRIVER_PYTHON'] = pytest.config.getoption("--pyspark-driver-python")
elif 'PYSPARK_PYTHON' in os.environ.keys() and 'PYSPARK_DRIVER_PYTHON' in os.environ.keys():
pass
else:
raise ValueError("please set PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON variables, "
"using cmd line "
"--pyspark-python=PYSPARK_PYTHON_PATH --pyspark-driver-python=PYSPARK_DRIVER_PYTHON_PATH "
"or set PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON in system env")
# pylint: disable=unnecessary-lambda
ALL_READER_FLAVOR_FACTORIES = [
lambda url, **kwargs: make_carbon_reader(url, reader_pool_type='thread', **kwargs),
]
SCALAR_FIELDS = [f for f in TestSchema.fields.values() if isinstance(f.codec, ScalarCodec)]
SCALAR_ONLY_READER_FACTORIES = [
lambda url, **kwargs: make_batch_carbon_reader(url, reader_pool_type='thread', **kwargs),
]
def _check_simple_reader(reader, expected_data, expected_rows_count=None, check_types=True, limit_checked_rows=None):
# Read a bunch of entries from the dataset and compare the data to reference
def _type(v):
if isinstance(v, np.ndarray):
if v.dtype.str.startswith('|S'):
return '|S'
else:
return v.dtype
else:
return type(v)
expected_rows_count = expected_rows_count or len(expected_data)
count = 0
for i, row in enumerate(reader):
if limit_checked_rows and i >= limit_checked_rows:
break
actual = row._asdict()
expected = next(d for d in expected_data if d['id'] == actual['id'])
np.testing.assert_equal(actual, expected)
actual_types = {k: _type(v) for k, v in actual.items()}
expected_types = {k: _type(v) for k, v in expected.items()}
assert not check_types or actual_types == expected_types
count += 1
if limit_checked_rows:
assert count == min(expected_rows_count, limit_checked_rows)
else:
assert count == expected_rows_count
def _readout_all_ids(reader, limit=None):
ids = []
for i, row in enumerate(reader):
if limit is not None and i >= limit:
break
ids.append(row.id)
# Flatten ids if reader returns batches (make_batch_reader)
if isinstance(ids[0], np.ndarray):
ids = [i for arr in ids for i in arr]
return ids
@pytest.mark.parametrize('reader_factory', ALL_READER_FLAVOR_FACTORIES)
def test_simple_read(carbon_synthetic_dataset, reader_factory):
"""Just a bunch of read and compares of all values to the expected values using the different reader pools"""
with reader_factory(carbon_synthetic_dataset.url) as reader:
_check_simple_reader(reader, carbon_synthetic_dataset.data)
@pytest.mark.parametrize('reader_factory', ALL_READER_FLAVOR_FACTORIES + SCALAR_ONLY_READER_FACTORIES)
@pytest.mark.forked
def test_simple_read_with_disk_cache(carbon_synthetic_dataset, reader_factory, tmpdir):
"""Try using the Reader with LocalDiskCache using different flavors of pools"""
CACHE_SIZE = 10 * 2 ** 30 # 20GB
ROW_SIZE_BYTES = 100 # not really important for this test
with reader_factory(carbon_synthetic_dataset.url, num_epochs=2,
cache_type='local-disk', cache_location=tmpdir.strpath,
cache_size_limit=CACHE_SIZE, cache_row_size_estimate=ROW_SIZE_BYTES) as reader:
ids = _readout_all_ids(reader)
assert 200 == len(ids) # We read 2 epochs
assert set(ids) == set(range(100))
@pytest.mark.parametrize('reader_factory', ALL_READER_FLAVOR_FACTORIES + SCALAR_ONLY_READER_FACTORIES)
def test_simple_read_with_added_slashes(carbon_synthetic_dataset, reader_factory):
"""Tests that using relative paths for the dataset metadata works as expected"""
with reader_factory(carbon_synthetic_dataset.url + '///') as reader:
next(reader)
@pytest.mark.parametrize('reader_factory', ALL_READER_FLAVOR_FACTORIES + SCALAR_ONLY_READER_FACTORIES)
def test_simple_read_moved_dataset(carbon_synthetic_dataset, tmpdir, reader_factory):
"""Tests that a dataset may be opened after being moved to a new location"""
a_moved_path = tmpdir.join('moved').strpath
copytree(carbon_synthetic_dataset.path, a_moved_path)
with reader_factory('file://{}'.format(a_moved_path)) as reader:
next(reader)
rmtree(a_moved_path)
@pytest.mark.parametrize('reader_factory', ALL_READER_FLAVOR_FACTORIES)
def test_reading_subset_of_columns(carbon_synthetic_dataset, reader_factory):
"""Just a bunch of read and compares of all values to the expected values"""
with reader_factory(carbon_synthetic_dataset.url, schema_fields=[TestSchema.id2, TestSchema.id]) as reader:
# Read a bunch of entries from the dataset and compare the data to reference
for row in reader:
actual = dict(row._asdict())
expected = next(d for d in carbon_synthetic_dataset.data if d['id'] == actual['id'])
np.testing.assert_equal(expected['id2'], actual['id2'])
@pytest.mark.parametrize('reader_factory', ALL_READER_FLAVOR_FACTORIES)
def test_reading_subset_of_columns_using_regex(carbon_synthetic_dataset, reader_factory):
"""Just a bunch of read and compares of all values to the expected values"""
with reader_factory(carbon_synthetic_dataset.url, schema_fields=['id$', 'id_.*$', 'partition_key$']) as reader:
# Read a bunch of entries from the dataset and compare the data to reference
for row in reader:
actual = dict(row._asdict())
assert set(actual.keys()) == {'id_float', 'id_odd', 'id', 'partition_key'}
expected = next(d for d in carbon_synthetic_dataset.data if d['id'] == actual['id'])
np.testing.assert_equal(expected['id_float'], actual['id_float'])
# TODO: fix make_batch_carbon_reader error
@pytest.mark.parametrize('reader_factory', [
lambda url, **kwargs: make_carbon_reader(url, reader_pool_type='thread', **kwargs),
lambda url, **kwargs: make_batch_carbon_reader(url, reader_pool_type='thread', **kwargs)])
def test_shuffle(carbon_synthetic_dataset, reader_factory):
rows_count = len(carbon_synthetic_dataset.data)
# Read ids twice without shuffle: assert we have the same array and all expected ids are in the array
with reader_factory(carbon_synthetic_dataset.url, shuffle_blocklets=False, shuffle_row_drop_partitions=5,
workers_count=1) as reader_1:
first_readout = _readout_all_ids(reader_1)
with reader_factory(carbon_synthetic_dataset.url, shuffle_blocklets=False, shuffle_row_drop_partitions=5,
workers_count=1) as reader_2:
second_readout = _readout_all_ids(reader_2)
np.testing.assert_array_equal(range(rows_count), sorted(first_readout))
np.testing.assert_array_equal(first_readout, second_readout)
# Now read with shuffling
with reader_factory(carbon_synthetic_dataset.url, shuffle_blocklets=True, shuffle_row_drop_partitions=5,
workers_count=1) as shuffled_reader:
shuffled_readout = _readout_all_ids(shuffled_reader)
assert np.any(np.not_equal(first_readout, shuffled_readout))
# TODO: fix make_batch_carbon_reader error
@pytest.mark.parametrize('reader_factory', [
lambda url, **kwargs: make_carbon_reader(url, reader_pool_type='thread', **kwargs),
lambda url, **kwargs: make_batch_carbon_reader(url, reader_pool_type='thread', **kwargs)])
def test_shuffle_drop_ratio(carbon_synthetic_dataset, reader_factory):
# Read ids twice without shuffle: assert we have the same array and all expected ids are in the array
with reader_factory(carbon_synthetic_dataset.url, shuffle_blocklets=False, shuffle_row_drop_partitions=1) as reader:
first_readout = _readout_all_ids(reader)
np.testing.assert_array_equal([r['id'] for r in carbon_synthetic_dataset.data], sorted(first_readout))
# Test that the ids are increasingly not consecutive numbers as we increase the shuffle dropout
prev_jumps_not_1 = 0
for shuffle_dropout in [2, 5, 8]:
with reader_factory(carbon_synthetic_dataset.url, shuffle_blocklets=True,
shuffle_row_drop_partitions=shuffle_dropout) as reader:
readout = _readout_all_ids(reader)
assert len(first_readout) == len(readout)
jumps_not_1 = np.sum(np.diff(readout) != 1)
assert jumps_not_1 > prev_jumps_not_1
prev_jumps_not_1 = jumps_not_1
# TODO: fix test_partition_multi_node
# @pytest.mark.parametrize('reader_factory', ALL_READER_FLAVOR_FACTORIES)
# def test_partition_multi_node(carbon_synthetic_dataset, reader_factory):
# """Tests that the reader only returns half of the expected data consistently"""
# with reader_factory(carbon_synthetic_dataset.url, cur_shard=0, shard_count=5) as reader:
# with reader_factory(carbon_synthetic_dataset.url, cur_shard=0, shard_count=5) as reader_2:
# results_1 = set(_readout_all_ids(reader))
# results_2 = set(_readout_all_ids(reader_2))
#
# assert results_1, 'Non empty shard expected'
#
# np.testing.assert_equal(results_1, results_2)
#
# assert len(results_1) < len(carbon_synthetic_dataset.data)
#
# # Test that separate partitions also have no overlap by checking ids)
# for partition in range(1, 5):
# with reader_factory(carbon_synthetic_dataset.url, cur_shard=partition,
# shard_count=5) as reader_other:
# ids_in_other_partition = set(_readout_all_ids(reader_other))
#
# assert not ids_in_other_partition.intersection(results_1)
# TODO: fix test_partition_value_error
# @pytest.mark.parametrize('reader_factory', ALL_READER_FLAVOR_FACTORIES)
# def test_partition_value_error(carbon_synthetic_dataset, reader_factory):
# """Tests that the reader raises value errors when appropriate"""
#
# # shard_count has to be greater than 0
# with pytest.raises(ValueError):
# reader_factory(carbon_synthetic_dataset.url, shard_count=0)
#
# # missing cur_shard value
# with pytest.raises(ValueError):
# reader_factory(carbon_synthetic_dataset.url, shard_count=5)
#
# # cur_shard is a string
# with pytest.raises(ValueError):
# reader_factory(carbon_synthetic_dataset.url, cur_shard='0',
# shard_count=5)
#
# # shard_count is a string
# with pytest.raises(ValueError):
# reader_factory(carbon_synthetic_dataset.url, cur_shard=0,
# shard_count='5')
@pytest.mark.parametrize('reader_factory', [
lambda url, **kwargs: make_carbon_reader(url, reader_pool_type='thread', **kwargs),
lambda url, **kwargs: make_batch_carbon_reader(url, reader_pool_type='thread', **kwargs),
])
def test_stable_pieces_order(carbon_synthetic_dataset, reader_factory):
"""Tests that the reader raises value errors when appropriate"""
RERUN_THE_TEST_COUNT = 4
baseline_run = None
for _ in range(RERUN_THE_TEST_COUNT):
# TODO(yevgeni): factor out. Reading all ids appears multiple times in this test.
with reader_factory(carbon_synthetic_dataset.url, shuffle_blocklets=False, workers_count=1) as reader:
this_run = _readout_all_ids(reader)
if baseline_run:
assert this_run == baseline_run
baseline_run = this_run
@pytest.mark.parametrize('reader_factory',
[lambda url, **kwargs: make_reader(url, reader_pool_type='thread', is_batch=False, **kwargs)])
def test_multiple_epochs(carbon_synthetic_dataset, reader_factory):
"""Tests that multiple epochs works as expected"""
num_epochs = 5
with reader_factory(carbon_synthetic_dataset.url, num_epochs=num_epochs) as reader:
# Read all expected entries from the dataset and compare the data to reference
single_epoch_id_set = [d['id'] for d in carbon_synthetic_dataset.data]
actual_ids_in_all_epochs = _readout_all_ids(reader)
np.testing.assert_equal(sorted(actual_ids_in_all_epochs), sorted(num_epochs * single_epoch_id_set))
# Reset reader should reset ventilator. Should produce another `num_epochs` results
reader.reset()
actual_ids_in_all_epochs = _readout_all_ids(reader)
np.testing.assert_equal(sorted(actual_ids_in_all_epochs), sorted(num_epochs * single_epoch_id_set))
@pytest.mark.parametrize('reader_factory',
[lambda url, **kwargs: make_reader(url, reader_pool_type='thread', is_batch=False, **kwargs)])
def test_fail_if_resetting_in_the_middle_of_epoch(carbon_synthetic_dataset, reader_factory):
"""Tests that multiple epochs works as expected"""
num_epochs = 5
with reader_factory(carbon_synthetic_dataset.url, num_epochs=num_epochs) as reader:
# Read all expected entries from the dataset and compare the data to reference
actual_ids = _readout_all_ids(reader, limit=20)
assert len(actual_ids) == 20
with pytest.raises(NotImplementedError):
reader.reset()
@pytest.mark.parametrize('reader_factory', ALL_READER_FLAVOR_FACTORIES + SCALAR_ONLY_READER_FACTORIES)
def test_unlimited_epochs(carbon_synthetic_dataset, reader_factory):
"""Tests that unlimited epochs works as expected"""
with reader_factory(carbon_synthetic_dataset.url, num_epochs=None) as reader:
read_limit = len(carbon_synthetic_dataset.data) * 3 + 2
actual_ids = _readout_all_ids(reader, read_limit)
expected_ids = [d['id'] for d in carbon_synthetic_dataset.data]
assert len(actual_ids) > len(expected_ids)
assert set(actual_ids) == set(expected_ids)
@pytest.mark.parametrize('reader_factory', ALL_READER_FLAVOR_FACTORIES + SCALAR_ONLY_READER_FACTORIES)
def test_num_epochs_value_error(carbon_synthetic_dataset, reader_factory):
"""Tests that the reader raises value errors when appropriate"""
# Testing only Reader v1, as the v2 uses an epoch generator. The error would raise only when the generator is
# evaluated. Parameter validation for Reader v2 is covered by test_epoch_generator.py
with pytest.raises(ValueError):
reader_factory(carbon_synthetic_dataset.url, num_epochs=-10)
with pytest.raises(ValueError):
reader_factory(carbon_synthetic_dataset.url, num_epochs='abc')
@pytest.mark.parametrize('reader_factory', ALL_READER_FLAVOR_FACTORIES + SCALAR_ONLY_READER_FACTORIES)
def test_dataset_path_is_a_unicode(carbon_synthetic_dataset, reader_factory):
"""Just a bunch of read and compares of all values to the expected values using the different reader pools"""
# Making sure unicode_in_p23 is a unicode both in python 2 and 3
unicode_in_p23 = carbon_synthetic_dataset.url.encode().decode('utf-8')
with reader_factory(unicode_in_p23) as reader:
next(reader)
# TODO: fix test_make_carbon_reader_fails_loading_non_unischema_dataset
def test_make_carbon_reader_fails_loading_non_unischema_dataset(carbon_many_columns_non_unischema_dataset):
with pytest.raises(RuntimeError, match='use make_batch_carbon_reader'):
make_carbon_reader(carbon_many_columns_non_unischema_dataset.url)
def test_multithreaded_reads(carbon_synthetic_dataset):
with make_reader(carbon_synthetic_dataset.url, is_batch=False, workers_count=5, num_epochs=1) as reader:
with ThreadPoolExecutor(max_workers=10) as executor:
def read_one_row():
return next(reader)
futures = [executor.submit(read_one_row) for _ in range(100)]
results = [f.result() for f in futures]
assert len(results) == len(carbon_synthetic_dataset.data)
assert set(r.id for r in results) == set(d['id'] for d in carbon_synthetic_dataset.data)
@pytest.mark.forked
def test_make_reader(carbon_synthetic_dataset):
with make_reader(carbon_synthetic_dataset.url, is_batch=False, num_epochs=1) as reader:
i = 0
for sample in reader:
print(sample.id)
i += 1
assert i == _ROWS_COUNT
@pytest.mark.forked
def test_batch_carbon_reader(carbon_synthetic_dataset):
with make_reader(carbon_synthetic_dataset.url, num_epochs=1) as reader:
i = 0
for sample in reader:
for ele in sample.id:
print(ele)
i += 1
assert i == _ROWS_COUNT
@pytest.mark.forked
def test_make_reader_of_obs(carbon_obs_dataset):
with make_reader(carbon_obs_dataset.url,
key=pytest.config.getoption("--access_key"),
secret=pytest.config.getoption("--secret_key"),
endpoint=pytest.config.getoption("--end_point")) as reader:
i = 0
for sample in reader:
i += len(sample.id)
assert i == _ROWS_COUNT