blob: 4527c5bb56c2ea36dd05cedb5651525a0268b068 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import datetime
from calendar import timegm
from collections import namedtuple
from contextlib import contextmanager
from decimal import Decimal
import os
import numpy as np
import pytest
import tensorflow as tf
from petastorm.ngram import NGram
from petastorm.unischema import Unischema, UnischemaField
from pycarbon.core.carbon_reader import make_carbon_reader, make_batch_carbon_reader
from pycarbon.tests.core.test_carbon_common import TestSchema
from pycarbon.core.carbon_tf_utils import _sanitize_field_tf_types, _numpy_to_tf_dtypes, \
_schema_to_tf_dtypes, tf_tensors
import jnius_config
jnius_config.set_classpath(pytest.config.getoption("--carbon-sdk-path"))
if pytest.config.getoption("--pyspark-python") is not None and \
pytest.config.getoption("--pyspark-driver-python") is not None:
os.environ['PYSPARK_PYTHON'] = pytest.config.getoption("--pyspark-python")
os.environ['PYSPARK_DRIVER_PYTHON'] = pytest.config.getoption("--pyspark-driver-python")
elif 'PYSPARK_PYTHON' in os.environ.keys() and 'PYSPARK_DRIVER_PYTHON' in os.environ.keys():
pass
else:
raise ValueError("please set PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON variables, "
"using cmd line "
"--pyspark-python=PYSPARK_PYTHON_PATH --pyspark-driver-python=PYSPARK_DRIVER_PYTHON_PATH "
"or set PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON in system env")
NON_NULLABLE_FIELDS = set(TestSchema.fields.values())
@contextmanager
def _tf_session():
with tf.Session() as sess:
sess.run([tf.global_variables_initializer(), tf.local_variables_initializer()])
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord, start=True)
yield sess
coord.request_stop()
coord.join(threads)
def test_sanitize_field_tf_types():
expected_datetime_array = [datetime.date(1970, 1, 1), datetime.date(2015, 9, 29)]
expected_datetime_ns_from_epoch = [timegm(dt.timetuple()) * 1000000000 for dt in expected_datetime_array]
assert expected_datetime_ns_from_epoch[0] == 0
sample_input_dict = {
'int32': np.asarray([-2 ** 31, 0, 100, 2 ** 31 - 1], dtype=np.int32),
'uint16': np.asarray([0, 2, 2 ** 16 - 1], dtype=np.uint16),
'Decimal': Decimal(1234) / Decimal(10),
'array_of_datetime_date': np.asarray(expected_datetime_array),
'array_of_np_datetime_64': np.asarray(expected_datetime_array).astype(np.datetime64),
}
TestNamedTuple = namedtuple('TestNamedTuple', sample_input_dict.keys())
sample_input_tuple = TestNamedTuple(**sample_input_dict)
sanitized_tuple = _sanitize_field_tf_types(sample_input_tuple)
np.testing.assert_equal(sanitized_tuple.int32.dtype, np.int32)
np.testing.assert_equal(sanitized_tuple.uint16.dtype, np.int32)
assert isinstance(sanitized_tuple.Decimal, str)
np.testing.assert_equal(sanitized_tuple.int32, sample_input_dict['int32'])
np.testing.assert_equal(sanitized_tuple.uint16, sample_input_dict['uint16'])
np.testing.assert_equal(str(sanitized_tuple.Decimal), str(sample_input_dict['Decimal'].normalize()))
np.testing.assert_equal(sanitized_tuple.array_of_datetime_date, expected_datetime_ns_from_epoch)
np.testing.assert_equal(sanitized_tuple.array_of_np_datetime_64, expected_datetime_ns_from_epoch)
def test_invalid_sanitize_field_tf_types():
sample_input_dict = {
'int32': None,
}
TestNamedTuple = namedtuple('TestNamedTuple', sample_input_dict.keys())
sample_input_tuple = TestNamedTuple(**sample_input_dict)
with pytest.raises(RuntimeError):
_sanitize_field_tf_types(sample_input_tuple)
def test_decimal_conversion():
assert _numpy_to_tf_dtypes(Decimal) == tf.string
def test_uint16_promotion_to_int32():
assert _numpy_to_tf_dtypes(np.uint16) == tf.int32
def test_unknown_type():
with pytest.raises(ValueError):
_numpy_to_tf_dtypes(np.uint64)
def test_schema_to_dtype_list():
TestSchema = Unischema('TestSchema', [
UnischemaField('int32', np.int32, (), None, False),
UnischemaField('uint8', np.uint8, (), None, False),
UnischemaField('uint16', np.uint16, (), None, False),
UnischemaField('Decimal', Decimal, (), None, False),
])
actual_tf_dtype_list = _schema_to_tf_dtypes(TestSchema)
# Note that the order of the fields is defined by alphabetical order of keys and always sorted by Unischema
# to avoid ambiguity
# [Decimal, int32, uint16, uint8] <- alphabetical order
# [tf.string, tf.int32, tf.int32, tf.uint8]
np.testing.assert_equal(actual_tf_dtype_list, [tf.string, tf.int32, tf.int32, tf.uint8])
def _read_from_tf_tensors(synthetic_dataset, count, shuffling_queue_capacity, min_after_dequeue, ngram,
workers_count=10):
"""Used by several test cases. Reads a 'count' rows using reader.
The reader is configured without row-group shuffling and guarantees deterministic order of rows up to the
results queue TF shuffling which is controlled by 'shuffling_queue_capacity', 'min_after_dequeue' arguments.
The function returns a tuple with: (actual data read from the dataset, a TF tensor returned by the reader)
"""
schema_fields = (NON_NULLABLE_FIELDS if ngram is None else ngram)
with make_carbon_reader(schema_fields=schema_fields, dataset_url=synthetic_dataset.url, reader_pool_type='thread',
shuffle_blocklets=False, workers_count=workers_count) as reader:
row_tensors = tf_tensors(reader, shuffling_queue_capacity=shuffling_queue_capacity,
min_after_dequeue=min_after_dequeue)
with _tf_session() as sess:
rows_data = [sess.run(row_tensors) for _ in range(count)]
return rows_data, row_tensors
def _assert_all_tensors_have_shape(row_tensors):
"""Asserts that all elements in row_tensors list/tuple have static shape."""
for column in row_tensors:
assert column.get_shape().dims is not None
def _assert_fields_eq(actual, desired):
if isinstance(desired, Decimal) or isinstance(actual, bytes):
# Tensorflow returns all strings as bytes in python3. So we will need to decode it
actual = actual.decode()
elif isinstance(desired, np.ndarray) and desired.dtype.type == np.unicode_:
actual = np.array([item.decode() for item in actual])
if isinstance(desired, Decimal):
np.testing.assert_equal(Decimal(actual), desired)
elif issubclass(desired.dtype.type, np.datetime64):
# tf_utils will convert timestamps to ns from epoch int64 value.
assert desired.astype('<M8[ns]').astype(np.int64) == actual
else:
np.testing.assert_equal(actual, desired)
def _assert_expected_rows_data(expected_data, rows_data):
"""Asserts all elements of rows_data list of rows match reference data used to create the dataset"""
for row_tuple in rows_data:
# It is easier to work with dict as we will be indexing column names using strings
row = row_tuple._asdict()
# Find corresponding row in the reference data
expected = next(d for d in expected_data if d['id'] == row['id'])
# Check equivalence of all values between a checked row and a row from reference data
for column_name, actual in row.items():
expected_val = expected[column_name]
_assert_fields_eq(actual, expected_val)
@pytest.mark.forked
def test_simple_read_tensorflow(carbon_synthetic_dataset):
"""Read couple of rows. Make sure all tensors have static shape sizes assigned and the data matches reference
data"""
with make_carbon_reader(schema_fields=NON_NULLABLE_FIELDS, dataset_url=carbon_synthetic_dataset.url) as reader:
row_tensors = tf_tensors(reader)
with _tf_session() as sess:
rows_data = [sess.run(row_tensors) for _ in range(30)]
# Make sure we have static shape info for all fields
_assert_all_tensors_have_shape(row_tensors)
_assert_expected_rows_data(carbon_synthetic_dataset.data, rows_data)
@pytest.mark.forked
def test_shuffling_queue(carbon_synthetic_dataset):
"""Read data without tensorflow shuffling queue and with it. Check the the order is deterministic within
unshuffled read and is random with shuffled read"""
unshuffled_1, _ = _read_from_tf_tensors(carbon_synthetic_dataset, 30, shuffling_queue_capacity=0, min_after_dequeue=0,
ngram=None, workers_count=1)
unshuffled_2, _ = _read_from_tf_tensors(carbon_synthetic_dataset, 30, shuffling_queue_capacity=0, min_after_dequeue=0,
ngram=None, workers_count=1)
shuffled_1, shuffled_1_row_tensors = \
_read_from_tf_tensors(carbon_synthetic_dataset, 30, shuffling_queue_capacity=10, min_after_dequeue=9, ngram=None)
shuffled_2, _ = \
_read_from_tf_tensors(carbon_synthetic_dataset, 30, shuffling_queue_capacity=10, min_after_dequeue=9, ngram=None)
# Make sure we have static shapes and the data matches reference data (important since a different code path
# is executed within tf_tensors when shuffling is specified
_assert_all_tensors_have_shape(shuffled_1_row_tensors)
_assert_expected_rows_data(carbon_synthetic_dataset.data, shuffled_1)
assert [f.id for f in unshuffled_1] == [f.id for f in unshuffled_2]
assert [f.id for f in unshuffled_1] != [f.id for f in shuffled_2]
assert [f.id for f in shuffled_1] != [f.id for f in shuffled_2]
@pytest.mark.forked
def test_simple_ngram_read_tensorflow(carbon_synthetic_dataset):
"""Read a single ngram. Make sure all shapes are set and the data read matches reference data"""
fields = {
0: [TestSchema.id],
1: [TestSchema.id],
2: [TestSchema.id]
}
# Expecting delta between ids to be 1. Setting 1.5 as upper bound
ngram = NGram(fields=fields, delta_threshold=1.5, timestamp_field=TestSchema.id)
ngrams, row_tensors_seq = \
_read_from_tf_tensors(carbon_synthetic_dataset, 30, shuffling_queue_capacity=0, min_after_dequeue=0, ngram=ngram)
for row_tensors in row_tensors_seq.values():
_assert_all_tensors_have_shape(row_tensors)
for one_ngram_dict in ngrams:
_assert_expected_rows_data(carbon_synthetic_dataset.data, one_ngram_dict.values())
@pytest.mark.forked
def test_shuffling_queue_with_ngrams(carbon_synthetic_dataset):
"""Read data without tensorflow shuffling queue and with it (no rowgroup shuffling). Read ngrams
Check the the order is deterministic within unshuffled read and is random with shuffled read"""
fields = {
0: [TestSchema.id],
1: [TestSchema.id],
2: [TestSchema.id]
}
# Expecting delta between ids to be 1. Setting 1.5 as upper bound
ngram = NGram(fields=fields, delta_threshold=1.5, timestamp_field=TestSchema.id)
unshuffled_1, _ = _read_from_tf_tensors(carbon_synthetic_dataset, 30, shuffling_queue_capacity=0, min_after_dequeue=0,
ngram=ngram, workers_count=1)
unshuffled_2, _ = _read_from_tf_tensors(carbon_synthetic_dataset, 30, shuffling_queue_capacity=0, min_after_dequeue=0,
ngram=ngram, workers_count=1)
shuffled_1, shuffled_1_ngram = \
_read_from_tf_tensors(carbon_synthetic_dataset, 20, shuffling_queue_capacity=30, min_after_dequeue=29, ngram=ngram)
shuffled_2, _ = \
_read_from_tf_tensors(carbon_synthetic_dataset, 20, shuffling_queue_capacity=30, min_after_dequeue=29, ngram=ngram)
# shuffled_1_ngram is a dictionary of named tuple indexed by time:
# {0: (tensor, tensor, tensor, ...),
# 1: (tensor, tensor, tensor, ...),
# ...}
for row_tensor in shuffled_1_ngram.values():
_assert_all_tensors_have_shape(row_tensor)
# shuffled_1 is a list of dictionaries of named tuples indexed by time:
# [{0: (tensor, tensor, tensor, ...),
# 1: (tensor, tensor, tensor, ...),
# ...}
# {0: (tensor, tensor, tensor, ...),
# 1: (tensor, tensor, tensor, ...),
# ...},...
# ]
for one_ngram_dict in shuffled_1:
_assert_expected_rows_data(carbon_synthetic_dataset.data, one_ngram_dict.values())
def flatten(list_of_ngrams):
return [row for seq in list_of_ngrams for row in seq.values()]
assert [f.id for f in flatten(unshuffled_1)] == [f.id for f in flatten(unshuffled_2)]
assert [f.id for f in flatten(unshuffled_1)] != [f.id for f in flatten(shuffled_2)]
assert [f.id for f in flatten(shuffled_1)] != [f.id for f in flatten(shuffled_2)]
@pytest.mark.forked
def test_simple_read_tensorflow_with_carbon_dataset(carbon_scalar_dataset):
"""Read couple of rows. Make sure all tensors have static shape sizes assigned and the data matches reference
data"""
with make_batch_carbon_reader(dataset_url=carbon_scalar_dataset.url) as reader:
row_tensors = tf_tensors(reader)
# Make sure we have static shape info for all fields
for column in row_tensors:
assert column.get_shape().as_list() == [None]
with _tf_session() as sess:
for _ in range(2):
batch = sess.run(row_tensors)._asdict()
for i, id_value in enumerate(batch['id']):
expected_row = next(d for d in carbon_scalar_dataset.data if d['id'] == id_value)
for field_name in expected_row.keys():
_assert_fields_eq(batch[field_name][i], expected_row[field_name])
@pytest.mark.forked
def test_simple_read_tensorflow_with_non_unischema_many_columns_dataset(carbon_many_columns_non_unischema_dataset):
"""Read couple of rows. Make sure all tensors have static shape sizes assigned and the data matches reference
data"""
with make_batch_carbon_reader(dataset_url=carbon_many_columns_non_unischema_dataset.url) as reader:
row_tensors = tf_tensors(reader)
# Make sure we have static shape info for all fields
for column in row_tensors:
assert column.get_shape().as_list() == [None]
with _tf_session() as sess:
batch = sess.run(row_tensors)._asdict()
assert set(batch.keys()) == set(carbon_many_columns_non_unischema_dataset.data[0].keys())
def test_shuffling_queue_with_make_batch_carbon_reader(carbon_scalar_dataset):
with make_batch_carbon_reader(dataset_url=carbon_scalar_dataset.url) as reader:
with pytest.raises(ValueError):
tf_tensors(reader, 100, 90)