blob: 26620cdfd0f7e02cd601a5d7e473520eba451d95 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A set of TensorFlow specific helper functions"""
import datetime
import sys
import warnings
from calendar import timegm
from collections import OrderedDict, namedtuple
from decimal import Decimal
import numpy as np
import tensorflow as tf
from pycarbon.core.carbon_tf_utils import RANDOM_SHUFFLING_QUEUE_SIZE, make_namedtuple_tf_ngram
class TensorFlow(object):
def __init__(self):
self
# Mapping of identical datatypes in numpy-ish and tensorflow-ish
_NUMPY_TO_TF_DTYPES_MAPPING = {
np.bool: tf.bool,
np.int8: tf.int8,
np.int16: tf.int16,
np.int32: tf.int32,
np.int64: tf.int64,
np.uint8: tf.uint8,
np.uint16: tf.int32,
np.float32: tf.float32,
np.float64: tf.float64,
np.string_: tf.string,
np.unicode_: tf.string,
np.str_: tf.string,
np.bool_: tf.bool,
Decimal: tf.string,
np.datetime64: tf.int64,
}
# Name of an op in the TF graph used for the random shuffling queue. This name can be used by diagnostics code that
# wishes to read-out shuffling queue size
RANDOM_SHUFFLING_QUEUE_SIZE = 'random_shuffling_queue_size'
def date_to_nsec_from_epoch(self, dt):
return timegm(dt.timetuple()) * 1000000000
_date_to_nsec_from_epoch_vectorized = np.vectorize(date_to_nsec_from_epoch)
def _sanitize_field_tf_types(self, sample):
"""Takes a named tuple and casts/promotes types unknown to TF to the types that are known.
Three casts that are currently implemented
- Decimal to string
- uint16 to int32
- np.datetime64 to int64, as nanoseconds since unix epoch
:param sample: named tuple or a dictionary
:return: same type as the input with values casted to types supported by Tensorflow
"""
next_sample_dict = sample._asdict()
for k, v in next_sample_dict.items():
if v is None:
raise RuntimeError('Encountered "{}"=None. Tensorflow does not support None values as a tensor.'
'Consider filtering out these rows using a predicate.'.format(k))
# Assuming conversion to the same numpy type is trivial and dirty cheap
if isinstance(v, Decimal):
# Normalizing decimals only to get rid of the trailing zeros (makes testing easier, assuming has
# no other effect)
next_sample_dict[k] = str(v.normalize())
elif isinstance(v, np.ndarray) and np.issubdtype(v.dtype, np.datetime64):
# Convert to nanoseconds from POSIX epoch
next_sample_dict[k] = (v - np.datetime64('1970-01-01T00:00:00.0')) \
.astype('timedelta64[ns]').astype(np.int64)
elif isinstance(v, np.ndarray) and v.dtype == np.uint16:
next_sample_dict[k] = v.astype(np.int32)
elif isinstance(v, np.ndarray) and v.dtype.type in (np.bytes_, np.unicode_):
if v.size != 0:
next_sample_dict[k] = v.tolist()
elif isinstance(v, np.ndarray) and v.dtype.kind == 'O' and isinstance(v[0], datetime.date):
# Pyarrow 0.12.1 started returning python datetime.date when parquet column is a DateType() column.
# Convert values in such column into nsec from epoch int64.
next_sample_dict[k] = self._date_to_nsec_from_epoch_vectorized(v)
# Construct object of the same type as the input
return sample.__class__(**next_sample_dict)
def _schema_to_tf_dtypes(self, schema):
"""Returns schema as a list of tensorflow dtypes.
:param schema: The schema.
:return: List of tensorflow dtypes.
"""
return [self._numpy_to_tf_dtypes(f.numpy_dtype) for f in schema.fields.values()]
def _schema_to_tf_dtypes_ngram(self, schema, ngram):
"""Returns schema as a list of tensorflow dtypes for a ngram.
:param schema: The schema.
:param ngram: The ngram.
:return: tensorflow dtypes for a ngram.
"""
result = []
# Iterate over each timestep
for key in sorted(ngram.fields.keys()):
# Get schema at that timestep
new_schema = ngram.get_schema_at_timestep(schema=schema, timestep=key)
for field in new_schema.fields.values():
result.append(self._numpy_to_tf_dtypes(field.numpy_dtype))
return result
def _numpy_to_tf_dtypes(self, numpy_dtype):
"""Returns a tensorflow dtype object corresponding to numpy's dtype.
A :class:`ValueError` is raised if there is no known mapping between the types
:param numpy_dtype: numpy dtype object
:return: tensorflow dtype object
"""
if numpy_dtype in self._NUMPY_TO_TF_DTYPES_MAPPING:
if numpy_dtype == np.unicode_ and sys.version_info >= (3, 0):
warnings.warn("Tensorflow will convert all unicode strings back to bytes type. "
"You may need to decode values.", UnicodeWarning)
return self._NUMPY_TO_TF_DTYPES_MAPPING[numpy_dtype]
else:
raise ValueError('Unknown mapping of numpy {} to tensorflow dtype'.format(numpy_dtype))
def _flatten(self, data):
"""Flattens the data, where it takes a dictionary of timesteps, each value is a dictionary and converts it to
one flat dictionary having a key that is the key of the inner dictionary + '_' + timestep.
For example, ``data`` would be ``{1: {'a': 'avalue', 'b': 'bvalue'}, 2: {'c': 'cvalue', 'd': 'dvalue'}}`` and the
output of :func:`._flatten` would be ``{'a_1': 'avalue', 'b_1': 'bvalue', 'c_2': 'cvalue', 'd_2': 'dvalue'}``.
:param data: The data to flatten.
:return: The flattened dictionary.
"""
flattened = OrderedDict()
for index, key in enumerate(sorted(data.keys())):
data_dict = data[key]._asdict()
for subkey in data_dict:
encoded_key = subkey + '_' + str(index)
flattened[encoded_key] = data_dict[subkey]
FlattenedTuple = namedtuple('flattened', list(flattened.keys()))
return FlattenedTuple(**flattened)
def make_namedtuple_tf_ngram(self, unischema, ngram, *args, **kargs):
"""Creates a dictionary of timestep keys and namedtuple values from args and kargs.
:param ngram: The ngram definition.
:param args: args.
:param kargs: kargs.
:return: A dictionary of timestep keys and namedtuple values.
"""
ngram_result = {}
previous_args_end = 0
for timestep in range(min(ngram.fields.keys()), max(ngram.fields.keys()) + 1):
# For each timestep iteration, mark the args and kargs for that timestep and create
# a namedtuple from them.
current_field_names = ngram.get_field_names_at_timestep(timestep)
new_schema = ngram.get_schema_at_timestep(schema=unischema, timestep=timestep)
new_args_end = previous_args_end + len(current_field_names)
args_timestep = args[previous_args_end:new_args_end]
previous_args_end = new_args_end
kargs_timestep = (kargs[str(timestep)] if str(timestep) in kargs else {})
ngram_result[timestep] = new_schema._get_namedtuple()(*args_timestep, **kargs_timestep)
return ngram_result
def _set_shape(self, schema, fields_as_dict, batched_output=None):
# Assign static shape for all tensors
# Workaround of an issue described here:
# https://stackoverflow.com/questions/49161316/trailing-x00-characters-in-tensor-when-numpy-string-array-is-returned-from-tf
for k in fields_as_dict.keys():
unischema_field = schema.fields[k]
if batched_output:
shape = (None,) + unischema_field.shape
else:
shape = unischema_field.shape
# Set static shape
fields_as_dict[k].set_shape(shape)
def _shuffling_queue(self, shuffling_queue_capacity, min_after_dequeue, dtypes, fields_as_list):
"""Creates a shuffling queue with enqueue/dequeue pair. Always a single writing thread."""
# Named tuples loose the 'named' part when going via queue
shuffling_queue = tf.RandomShuffleQueue(shuffling_queue_capacity, min_after_dequeue, dtypes)
# The following call to .size has a side effect of creating a new node in the TF graph. We are interested
# in the side effect so we can read the queue size somewhere else, addressing the node by a 'well-known-name'
shuffling_queue.size(name=RANDOM_SHUFFLING_QUEUE_SIZE)
# We need the queue only for shuffling, so we use only a single enqueuing thread (actually would be happy
# not to introduce any threads. Not sure if there is such a mechanism in TF)
queue_runner = tf.train.QueueRunner(shuffling_queue, 1 * [shuffling_queue.enqueue(fields_as_list)])
tf.train.add_queue_runner(queue_runner)
# Passed through the queue. We got an ordered list. The order matches the order of fields in unischema
fields_as_list = shuffling_queue.dequeue()
return fields_as_list
def _tf_tensors_nonngram(self, reader, shuffling_queue_capacity, min_after_dequeue):
"""A tensorflow data adapter for non ngrams. Return value is a named tuple with tensorflow tensors supplying
the data directly into a Tensoflow graph. See `tf_tensor` documentation for input/output arguments meaning."""
# TODO: implement a mechanism for signaling that we have no more data
def dequeue_sample_impl(x):
next_sample = next(reader)
# Decimal is not supported by TF. int8,16,32,64 scalars are all returned as python native int type
# (casted to 64 bit by tensorflow). sanitize_field_tf_types will explicitly convert all values
# to explicit numpy types making it compatible with return values expected by Tensorflow
return self._sanitize_field_tf_types(next_sample)
# fields_as_list is a list with tensors matching the order of the values in the schema. named-tuple semantics is
# not preserved across tf.py_func call boundary.
fields_as_list = tf.py_func(dequeue_sample_impl, [tf.constant(1)], self._schema_to_tf_dtypes(reader.schema))
if shuffling_queue_capacity > 0:
# Pass py_func output via shuffling queue if requested.
fields_as_list = self._shuffling_queue(shuffling_queue_capacity, min_after_dequeue,
self._schema_to_tf_dtypes(reader.schema), fields_as_list)
# Going via `make_namedtuple_tf` is a little wasteful, since we are converting directly to dict. However, this
# spares the need to implement a function similar to make_namedtuple_tf that returns dict instead of a named tuple
fields_as_dict = reader.schema.make_namedtuple_tf(*fields_as_list)._asdict()
# Force all static shapes to be set in the returned value based on the unischema
self._set_shape(reader.schema, fields_as_dict, reader.batched_output)
# Make a row tensor into a nice named tuple
return reader.schema.make_namedtuple_tf(**fields_as_dict)
def _tf_tensors_ngram(self, reader, shuffling_queue_capacity, min_after_dequeue):
"""A tensorflow data adapter for ngrams. Return value is a named tuple with tensorflow tensors supplying
the data directly into a Tensoflow graph. See `tf_tensor` documentation for input/output arguments meaning."""
# TODO: implement a mechanism for signaling that we have no more data
def dequeue_sample_impl(x):
next_sample = next(reader)
assert isinstance(next_sample, dict)
# Create a dictionary, where each key is a timestep, and value is named tuple or dictionary.
ngram = {}
for timestep in next_sample:
ngram[timestep] = self._sanitize_field_tf_types(next_sample[timestep])
return self._flatten(ngram)
fields_as_list = tf.py_func(dequeue_sample_impl, [tf.constant(1)],
self._schema_to_tf_dtypes_ngram(reader.schema, reader.ngram))
if shuffling_queue_capacity > 0:
# Pass py_func output via shuffling queue if requested.
fields_as_list = self._shuffling_queue(shuffling_queue_capacity, min_after_dequeue,
self._schema_to_tf_dtypes_ngram(reader.schema, reader.ngram),
fields_as_list)
fields_as_namedtuple = make_namedtuple_tf_ngram(reader.schema, reader.ngram, *fields_as_list)
# We change the key to str format here in order to be able to use ** later to expand the dictionary as kargs.
fields_as_dict = {
str(timestep): fields_as_namedtuple[timestep]._asdict() for timestep in fields_as_namedtuple}
for timestep in fields_as_dict:
self._set_shape(reader.schema, fields_as_dict[timestep])
return make_namedtuple_tf_ngram(reader.schema, reader.ngram, **fields_as_dict)
def make_tensor(self, reader, shuffling_queue_capacity=0, min_after_dequeue=0):
"""Bridges between python-only interface of the Reader (next(Reader)) and tensorflow world.
This function returns a named tuple of tensors from the dataset, e.g.,
If the reader was created with ``ngram=NGram(...)`` parameter, then a dictionary of named tuples is returned
(indexed by time):
An optional shuffling queue is created if shuffling_queue_capacity is greater than 0.
Note that if reading a unischema field that is unicode (``np.unicode_`` or ``np.str_``) tensorflow will
represent it as a tf.string which will be an array of bytes. If using python3 you may need to decode
it to convert it back to a python str type.
:param reader: An instance of Reader object used as the data source
:param shuffling_queue_capacity: Queue capacity is passed to the underlying :class:`tf.RandomShuffleQueue`
instance. If set to 0, no suffling will be done.
:param min_after_dequeue: If ``shuffling_queue_capacity > 0``, this value is passed to the underlying
:class:`tf.RandomShuffleQueue`.
:return: If no ngram reading is used, the function will return a named tuple with tensors that are populated
from the underlying dataset. If ngram reading is enabled, a dictionary of named tuples of tensors is returned.
The dictionary is indexed by time.
"""
# NGram enabled and disabled code is quite different. It appears to be cleaner to simply go in orthogonal
# execution paths.
if reader.batched_output:
if shuffling_queue_capacity > 0:
raise ValueError('shuffling_queue_capacity can not be used with a reader that produces '
'batched_output. Extra shuffling of the batches does not further '
'decrease correlation.')
if reader.ngram:
result = self._tf_tensors_ngram(reader, shuffling_queue_capacity, min_after_dequeue)
else:
result = self._tf_tensors_nonngram(reader, shuffling_queue_capacity, min_after_dequeue)
return result
def _set_shape_to_named_tuple(self, schema, fields, batched_output):
"""Assign static shape for all tensors"""
fields_as_dict = fields._asdict()
self._set_shape(schema, fields_as_dict, batched_output)
return schema.make_namedtuple_tf(**fields_as_dict)
def make_dataset(self, reader):
"""Creates a `tensorflow.data.Dataset <https://www.tensorflow.org/api_docs/python/tf/data/Dataset>`_ object from
NGrams are not yet supported by this function.
:param reader: An instance of :class:`Reader` object that would serve as a data source.
:return: A ``tf.data.Dataset`` instance.
"""
if not reader.ngram:
def dequeue_sample_impl():
if reader.last_row_consumed:
# This means that Dataset is trying to create a new instance of the generator. Can not do that
# (nor want to do that) since this is an expensive operation. num_epochs is a more efficient way
# to do this.
raise RuntimeError('Multiple iterations over make_dataset are not supported. '
'Multiple iterations can be triggered by calling \'repeat\' method of Dataset class.'
'Use Reader\'s num_epochs constructor arguments to set number of iterations.')
for row in reader:
yield self._sanitize_field_tf_types(row)
flat_dataset = tf.data.Dataset.from_generator(dequeue_sample_impl, tuple(self._schema_to_tf_dtypes(reader.schema)))
named_tuple_dataset = flat_dataset \
.map(reader.schema.make_namedtuple_tf) \
.map(lambda row: self._set_shape_to_named_tuple(reader.schema, row, reader.batched_output))
return named_tuple_dataset
else:
raise NotImplementedError('make_dataset does not support NGram yet.')