blob: 439b6fe16d9279153b9cc2e588893f0637d2df2b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import ast
import json
import operator
import re
import warnings
import numpy as np
import six
import pyarrow as pa
from pyarrow.lib import _pandas_api
from pyarrow.compat import (builtin_pickle, PY2, zip_longest, Sequence) # noqa
_logical_type_map = {}
def get_logical_type_map():
global _logical_type_map
if not _logical_type_map:
_logical_type_map.update({
pa.lib.Type_NA: 'empty',
pa.lib.Type_BOOL: 'bool',
pa.lib.Type_INT8: 'int8',
pa.lib.Type_INT16: 'int16',
pa.lib.Type_INT32: 'int32',
pa.lib.Type_INT64: 'int64',
pa.lib.Type_UINT8: 'uint8',
pa.lib.Type_UINT16: 'uint16',
pa.lib.Type_UINT32: 'uint32',
pa.lib.Type_UINT64: 'uint64',
pa.lib.Type_HALF_FLOAT: 'float16',
pa.lib.Type_FLOAT: 'float32',
pa.lib.Type_DOUBLE: 'float64',
pa.lib.Type_DATE32: 'date',
pa.lib.Type_DATE64: 'date',
pa.lib.Type_TIME32: 'time',
pa.lib.Type_TIME64: 'time',
pa.lib.Type_BINARY: 'bytes',
pa.lib.Type_FIXED_SIZE_BINARY: 'bytes',
pa.lib.Type_STRING: 'unicode',
})
return _logical_type_map
def get_logical_type(arrow_type):
logical_type_map = get_logical_type_map()
try:
return logical_type_map[arrow_type.id]
except KeyError:
if isinstance(arrow_type, pa.lib.DictionaryType):
return 'categorical'
elif isinstance(arrow_type, pa.lib.ListType):
return 'list[{}]'.format(get_logical_type(arrow_type.value_type))
elif isinstance(arrow_type, pa.lib.TimestampType):
return 'datetimetz' if arrow_type.tz is not None else 'datetime'
elif isinstance(arrow_type, pa.lib.Decimal128Type):
return 'decimal'
return 'object'
_numpy_logical_type_map = {
np.bool_: 'bool',
np.int8: 'int8',
np.int16: 'int16',
np.int32: 'int32',
np.int64: 'int64',
np.uint8: 'uint8',
np.uint16: 'uint16',
np.uint32: 'uint32',
np.uint64: 'uint64',
np.float32: 'float32',
np.float64: 'float64',
'datetime64[D]': 'date',
np.unicode_: 'string' if not PY2 else 'unicode',
np.bytes_: 'bytes' if not PY2 else 'string',
}
def get_logical_type_from_numpy(pandas_collection):
try:
return _numpy_logical_type_map[pandas_collection.dtype.type]
except KeyError:
if hasattr(pandas_collection.dtype, 'tz'):
return 'datetimetz'
# See https://github.com/pandas-dev/pandas/issues/24739
if str(pandas_collection.dtype) == 'datetime64[ns]':
return 'datetime64[ns]'
result = _pandas_api.infer_dtype(pandas_collection)
if result == 'string':
return 'bytes' if PY2 else 'unicode'
return result
def get_extension_dtype_info(column):
dtype = column.dtype
if str(dtype) == 'category':
cats = getattr(column, 'cat', column)
assert cats is not None
metadata = {
'num_categories': len(cats.categories),
'ordered': cats.ordered,
}
physical_dtype = str(cats.codes.dtype)
elif hasattr(dtype, 'tz'):
metadata = {'timezone': pa.lib.tzinfo_to_string(dtype.tz)}
physical_dtype = 'datetime64[ns]'
else:
metadata = None
physical_dtype = str(dtype)
return physical_dtype, metadata
def get_column_metadata(column, name, arrow_type, field_name):
"""Construct the metadata for a given column
Parameters
----------
column : pandas.Series or pandas.Index
name : str
arrow_type : pyarrow.DataType
field_name : str
Equivalent to `name` when `column` is a `Series`, otherwise if `column`
is a pandas Index then `field_name` will not be the same as `name`.
This is the name of the field in the arrow Table's schema.
Returns
-------
dict
"""
logical_type = get_logical_type(arrow_type)
string_dtype, extra_metadata = get_extension_dtype_info(column)
if logical_type == 'decimal':
extra_metadata = {
'precision': arrow_type.precision,
'scale': arrow_type.scale,
}
string_dtype = 'object'
if name is not None and not isinstance(name, six.string_types):
raise TypeError(
'Column name must be a string. Got column {} of type {}'.format(
name, type(name).__name__
)
)
assert field_name is None or isinstance(field_name, six.string_types), \
str(type(field_name))
return {
'name': name,
'field_name': 'None' if field_name is None else field_name,
'pandas_type': logical_type,
'numpy_type': string_dtype,
'metadata': extra_metadata,
}
def construct_metadata(df, column_names, index_levels, index_descriptors,
preserve_index, types):
"""Returns a dictionary containing enough metadata to reconstruct a pandas
DataFrame as an Arrow Table, including index columns.
Parameters
----------
df : pandas.DataFrame
index_levels : List[pd.Index]
index_descriptors : List[Dict]
preserve_index : bool
types : List[pyarrow.DataType]
Returns
-------
dict
"""
num_serialized_index_levels = len([descr for descr in index_descriptors
if not isinstance(descr, dict)])
# Use ntypes instead of Python shorthand notation [:-len(x)] as [:-0]
# behaves differently to what we want.
ntypes = len(types)
df_types = types[:ntypes - num_serialized_index_levels]
index_types = types[ntypes - num_serialized_index_levels:]
column_metadata = []
for col_name, sanitized_name, arrow_type in zip(df.columns, column_names,
df_types):
metadata = get_column_metadata(df[col_name], name=sanitized_name,
arrow_type=arrow_type,
field_name=sanitized_name)
column_metadata.append(metadata)
index_column_metadata = []
if preserve_index is not False:
for level, arrow_type, descriptor in zip(index_levels, index_types,
index_descriptors):
if isinstance(descriptor, dict):
# The index is represented in a non-serialized fashion,
# e.g. RangeIndex
continue
metadata = get_column_metadata(level, name=level.name,
arrow_type=arrow_type,
field_name=descriptor)
index_column_metadata.append(metadata)
column_indexes = []
for level in getattr(df.columns, 'levels', [df.columns]):
metadata = _get_simple_index_descriptor(level)
column_indexes.append(metadata)
else:
index_descriptors = index_column_metadata = column_indexes = []
return {
b'pandas': json.dumps({
'index_columns': index_descriptors,
'column_indexes': column_indexes,
'columns': column_metadata + index_column_metadata,
'creator': {
'library': 'pyarrow',
'version': pa.__version__
},
'pandas_version': _pandas_api.version
}).encode('utf8')
}
def _get_simple_index_descriptor(level):
string_dtype, extra_metadata = get_extension_dtype_info(level)
pandas_type = get_logical_type_from_numpy(level)
if 'mixed' in pandas_type:
warnings.warn(
"The DataFrame has column names of mixed type. They will be "
"converted to strings and not roundtrip correctly.",
UserWarning, stacklevel=4)
if pandas_type == 'unicode':
assert not extra_metadata
extra_metadata = {'encoding': 'UTF-8'}
return {
'name': level.name,
'field_name': level.name,
'pandas_type': pandas_type,
'numpy_type': string_dtype,
'metadata': extra_metadata,
}
def _column_name_to_strings(name):
"""Convert a column name (or level) to either a string or a recursive
collection of strings.
Parameters
----------
name : str or tuple
Returns
-------
value : str or tuple
Examples
--------
>>> name = 'foo'
>>> _column_name_to_strings(name)
'foo'
>>> name = ('foo', 'bar')
>>> _column_name_to_strings(name)
('foo', 'bar')
>>> import pandas as pd
>>> name = (1, pd.Timestamp('2017-02-01 00:00:00'))
>>> _column_name_to_strings(name)
('1', '2017-02-01 00:00:00')
"""
if isinstance(name, six.string_types):
return name
elif isinstance(name, six.binary_type):
# XXX: should we assume that bytes in Python 3 are UTF-8?
return name.decode('utf8')
elif isinstance(name, tuple):
return str(tuple(map(_column_name_to_strings, name)))
elif isinstance(name, Sequence):
raise TypeError("Unsupported type for MultiIndex level")
elif name is None:
return None
return str(name)
def _index_level_name(index, i, column_names):
"""Return the name of an index level or a default name if `index.name` is
None or is already a column name.
Parameters
----------
index : pandas.Index
i : int
Returns
-------
name : str
"""
if index.name is not None and index.name not in column_names:
return index.name
else:
return '__index_level_{:d}__'.format(i)
def _get_columns_to_convert(df, schema, preserve_index, columns):
columns = _resolve_columns_of_interest(df, schema, columns)
column_names = []
index_levels = (
_get_index_level_values(df.index) if preserve_index is not False
else []
)
columns_to_convert = []
convert_fields = []
if not df.columns.is_unique:
raise ValueError(
'Duplicate column names found: {}'.format(list(df.columns))
)
for name in columns:
col = df[name]
name = _column_name_to_strings(name)
if _pandas_api.is_sparse(col):
raise TypeError(
"Sparse pandas data (column {}) not supported.".format(name))
if schema is not None:
field = schema.field_by_name(name)
else:
field = None
columns_to_convert.append(col)
convert_fields.append(field)
column_names.append(name)
index_descriptors = []
index_column_names = []
for i, index_level in enumerate(index_levels):
name = _index_level_name(index_level, i, column_names)
if (isinstance(index_level, _pandas_api.pd.RangeIndex)
and preserve_index is None):
descr = _get_range_index_descriptor(index_level)
else:
columns_to_convert.append(index_level)
convert_fields.append(None)
descr = name
index_column_names.append(name)
index_descriptors.append(descr)
all_names = column_names + index_column_names
# all_names : all of the columns in the resulting table including the data
# columns and serialized index columns
# column_names : the names of the data columns
# index_column_names : the names of the serialized index columns
# index_descriptors : descriptions of each index to be used for
# reconstruction
# index_levels : the extracted index level values
# columns_to_convert : assembled raw data (both data columns and indexes)
# to be converted to Arrow format
# columns_fields : specified column to use for coercion / casting
# during serialization, if a Schema was provided
return (all_names, column_names, index_column_names, index_descriptors,
index_levels, columns_to_convert, convert_fields)
def _get_range_index_descriptor(level):
# public start/stop/step attributes added in pandas 0.25.0
return {
'kind': 'range',
'name': level.name,
'start': _pandas_api.get_rangeindex_attribute(level, 'start'),
'stop': _pandas_api.get_rangeindex_attribute(level, 'stop'),
'step': _pandas_api.get_rangeindex_attribute(level, 'step')
}
def _get_index_level_values(index):
n = len(getattr(index, 'levels', [index]))
return [index.get_level_values(i) for i in range(n)]
def _resolve_columns_of_interest(df, schema, columns):
if schema is not None and columns is not None:
raise ValueError('Schema and columns arguments are mutually '
'exclusive, pass only one of them')
elif schema is not None:
columns = schema.names
elif columns is not None:
columns = [c for c in columns if c in df.columns]
else:
columns = df.columns
return columns
def dataframe_to_types(df, preserve_index, columns=None):
(all_names,
column_names,
_,
index_descriptors,
index_columns,
columns_to_convert,
_) = _get_columns_to_convert(df, None, preserve_index, columns)
types = []
# If pandas knows type, skip conversion
for c in columns_to_convert:
values = c.values
if _pandas_api.is_categorical(values):
type_ = pa.array(c, from_pandas=True).type
else:
values, type_ = get_datetimetz_type(values, c.dtype, None)
type_ = pa.lib._ndarray_to_arrow_type(values, type_)
if type_ is None:
type_ = pa.array(c, from_pandas=True).type
types.append(type_)
metadata = construct_metadata(df, column_names, index_columns,
index_descriptors, preserve_index, types)
return all_names, types, metadata
def dataframe_to_arrays(df, schema, preserve_index, nthreads=1, columns=None,
safe=True):
(all_names,
column_names,
index_column_names,
index_descriptors,
index_columns,
columns_to_convert,
convert_fields) = _get_columns_to_convert(df, schema, preserve_index,
columns)
# NOTE(wesm): If nthreads=None, then we use a heuristic to decide whether
# using a thread pool is worth it. Currently the heuristic is whether the
# nrows > 100 * ncols.
if nthreads is None:
nrows, ncols = len(df), len(df.columns)
if nrows > ncols * 100:
nthreads = pa.cpu_count()
else:
nthreads = 1
def convert_column(col, field):
if field is None:
field_nullable = True
type_ = None
else:
field_nullable = field.nullable
type_ = field.type
try:
result = pa.array(col, type=type_, from_pandas=True, safe=safe)
except (pa.ArrowInvalid,
pa.ArrowNotImplementedError,
pa.ArrowTypeError) as e:
e.args += ("Conversion failed for column {0!s} with type {1!s}"
.format(col.name, col.dtype),)
raise e
if not field_nullable and result.null_count > 0:
raise ValueError("Field {} was non-nullable but pandas column "
"had {} null values".format(str(field),
result.null_count))
return result
if nthreads == 1:
arrays = [convert_column(c, f)
for c, f in zip(columns_to_convert, convert_fields)]
else:
from concurrent import futures
with futures.ThreadPoolExecutor(nthreads) as executor:
arrays = list(executor.map(convert_column, columns_to_convert,
convert_fields))
types = [x.type for x in arrays]
if schema is not None:
# add index columns
index_types = types[len(column_names):]
for name, type_ in zip(index_column_names, index_types):
name = name if name is not None else 'None'
schema = schema.append(pa.field(name, type_))
else:
fields = []
for name, type_ in zip(all_names, types):
name = name if name is not None else 'None'
fields.append(pa.field(name, type_))
schema = pa.schema(fields)
metadata = construct_metadata(df, column_names, index_columns,
index_descriptors, preserve_index,
types)
schema = schema.add_metadata(metadata)
return arrays, schema
def get_datetimetz_type(values, dtype, type_):
if values.dtype.type != np.datetime64:
return values, type_
if _pandas_api.is_datetimetz(dtype) and type_ is None:
# If no user type passed, construct a tz-aware timestamp type
tz = dtype.tz
unit = dtype.unit
type_ = pa.timestamp(unit, tz)
elif type_ is None:
# Trust the NumPy dtype
type_ = pa.from_numpy_dtype(values.dtype)
return values, type_
# ----------------------------------------------------------------------
# Converting pandas.DataFrame to a dict containing only NumPy arrays or other
# objects friendly to pyarrow.serialize
def dataframe_to_serialized_dict(frame):
import pandas.core.internals as _int
block_manager = frame._data
blocks = []
axes = [ax for ax in block_manager.axes]
for block in block_manager.blocks:
values = block.values
block_data = {}
if isinstance(block, _int.DatetimeTZBlock):
block_data['timezone'] = pa.lib.tzinfo_to_string(values.tz)
if hasattr(values, 'values'):
values = values.values
elif isinstance(block, _int.CategoricalBlock):
block_data.update(dictionary=values.categories,
ordered=values.ordered)
values = values.codes
block_data.update(
placement=block.mgr_locs.as_array,
block=values
)
# If we are dealing with an object array, pickle it instead. Note that
# we do not use isinstance here because _int.CategoricalBlock is a
# subclass of _int.ObjectBlock.
if type(block) == _int.ObjectBlock:
block_data['object'] = None
block_data['block'] = builtin_pickle.dumps(
values, protocol=builtin_pickle.HIGHEST_PROTOCOL)
blocks.append(block_data)
return {
'blocks': blocks,
'axes': axes
}
def serialized_dict_to_dataframe(data):
import pandas.core.internals as _int
reconstructed_blocks = [_reconstruct_block(block)
for block in data['blocks']]
block_mgr = _int.BlockManager(reconstructed_blocks, data['axes'])
return _pandas_api.data_frame(block_mgr)
def _reconstruct_block(item):
import pandas.core.internals as _int
# Construct the individual blocks converting dictionary types to pandas
# categorical types and Timestamps-with-timezones types to the proper
# pandas Blocks
block_arr = item['block']
placement = item['placement']
if 'dictionary' in item:
cat = _pandas_api.categorical_type.from_codes(
block_arr, categories=item['dictionary'],
ordered=item['ordered'])
block = _int.make_block(cat, placement=placement,
klass=_int.CategoricalBlock)
elif 'timezone' in item:
dtype = make_datetimetz(item['timezone'])
block = _int.make_block(block_arr, placement=placement,
klass=_int.DatetimeTZBlock,
dtype=dtype)
elif 'object' in item:
block = _int.make_block(builtin_pickle.loads(block_arr),
placement=placement, klass=_int.ObjectBlock)
else:
block = _int.make_block(block_arr, placement=placement)
return block
def make_datetimetz(tz):
tz = pa.lib.string_to_tzinfo(tz)
return _pandas_api.datetimetz_type('ns', tz=tz)
# ----------------------------------------------------------------------
# Converting pyarrow.Table efficiently to pandas.DataFrame
def table_to_blockmanager(options, table, categories=None,
ignore_metadata=False):
from pandas.core.internals import BlockManager
all_columns = []
column_indexes = []
pandas_metadata = table.schema.pandas_metadata
if not ignore_metadata and pandas_metadata is not None:
all_columns = pandas_metadata['columns']
column_indexes = pandas_metadata.get('column_indexes', [])
index_descriptors = pandas_metadata['index_columns']
table = _add_any_metadata(table, pandas_metadata)
table, index = _reconstruct_index(table, index_descriptors,
all_columns)
else:
index = _pandas_api.pd.RangeIndex(table.num_rows)
_check_data_column_metadata_consistency(all_columns)
blocks = _table_to_blocks(options, table, pa.default_memory_pool(),
categories)
columns = _deserialize_column_index(table, all_columns, column_indexes)
axes = [columns, index]
return BlockManager(blocks, axes)
def _check_data_column_metadata_consistency(all_columns):
# It can never be the case in a released version of pyarrow that
# c['name'] is None *and* 'field_name' is not a key in the column metadata,
# because the change to allow c['name'] to be None and the change to add
# 'field_name' are in the same release (0.8.0)
assert all(
(c['name'] is None and 'field_name' in c) or c['name'] is not None
for c in all_columns
)
def _deserialize_column_index(block_table, all_columns, column_indexes):
column_strings = [x.name for x in block_table.itercolumns()]
if all_columns:
columns_name_dict = {
c.get('field_name', _column_name_to_strings(c['name'])): c['name']
for c in all_columns
}
columns_values = [
columns_name_dict.get(name, name) for name in column_strings
]
else:
columns_values = column_strings
# If we're passed multiple column indexes then evaluate with
# ast.literal_eval, since the column index values show up as a list of
# tuples
to_pair = ast.literal_eval if len(column_indexes) > 1 else lambda x: (x,)
# Create the column index
# Construct the base index
if not columns_values:
columns = _pandas_api.pd.Index(columns_values)
else:
columns = _pandas_api.pd.MultiIndex.from_tuples(
list(map(to_pair, columns_values)),
names=[col_index['name'] for col_index in column_indexes] or None,
)
# if we're reconstructing the index
if len(column_indexes) > 0:
columns = _reconstruct_columns_from_metadata(columns, column_indexes)
# ARROW-1751: flatten a single level column MultiIndex for pandas 0.21.0
columns = _flatten_single_level_multiindex(columns)
return columns
def _reconstruct_index(table, index_descriptors, all_columns):
# 0. 'field_name' is the name of the column in the arrow Table
# 1. 'name' is the user-facing name of the column, that is, it came from
# pandas
# 2. 'field_name' and 'name' differ for index columns
# 3. We fall back on c['name'] for backwards compatibility
field_name_to_metadata = {
c.get('field_name', c['name']): c
for c in all_columns
}
# Build up a list of index columns and names while removing those columns
# from the original table
index_arrays = []
index_names = []
result_table = table
for descr in index_descriptors:
if isinstance(descr, six.string_types):
result_table, index_level, index_name = _extract_index_level(
table, result_table, descr, field_name_to_metadata)
if index_level is None:
# ARROW-1883: the serialized index column was not found
continue
elif descr['kind'] == 'range':
index_name = descr['name']
index_level = _pandas_api.pd.RangeIndex(descr['start'],
descr['stop'],
step=descr['step'],
name=index_name)
if len(index_level) != len(table):
# Possibly the result of munged metadata
continue
else:
raise ValueError("Unrecognized index kind: {0}"
.format(descr['kind']))
index_arrays.append(index_level)
index_names.append(index_name)
pd = _pandas_api.pd
# Reconstruct the row index
if len(index_arrays) > 1:
index = pd.MultiIndex.from_arrays(index_arrays, names=index_names)
elif len(index_arrays) == 1:
index = index_arrays[0]
if not isinstance(index, pd.Index):
# Box anything that wasn't boxed above
index = pd.Index(index, name=index_names[0])
else:
index = pd.RangeIndex(table.num_rows)
return result_table, index
def _extract_index_level(table, result_table, field_name,
field_name_to_metadata):
logical_name = field_name_to_metadata[field_name]['name']
index_name = _backwards_compatible_index_name(field_name, logical_name)
i = table.schema.get_field_index(field_name)
if i == -1:
# The serialized index column was removed by the user
return table, None, None
col = table.column(i)
col_pandas = col.to_pandas()
values = col_pandas.values
if hasattr(values, 'flags') and not values.flags.writeable:
# ARROW-1054: in pandas 0.19.2, factorize will reject
# non-writeable arrays when calling MultiIndex.from_arrays
values = values.copy()
pd = _pandas_api.pd
if _pandas_api.is_datetimetz(col_pandas.dtype):
index_level = (pd.Series(values).dt.tz_localize('utc')
.dt.tz_convert(col_pandas.dtype.tz))
else:
index_level = pd.Series(values, dtype=col_pandas.dtype)
result_table = result_table.remove_column(
result_table.schema.get_field_index(field_name)
)
return result_table, index_level, index_name
def _backwards_compatible_index_name(raw_name, logical_name):
"""Compute the name of an index column that is compatible with older
versions of :mod:`pyarrow`.
Parameters
----------
raw_name : str
logical_name : str
Returns
-------
result : str
Notes
-----
* Part of :func:`~pyarrow.pandas_compat.table_to_blockmanager`
"""
# Part of table_to_blockmanager
if raw_name == logical_name and _is_generated_index_name(raw_name):
return None
else:
return logical_name
def _is_generated_index_name(name):
pattern = r'^__index_level_\d+__$'
return re.match(pattern, name) is not None
_pandas_logical_type_map = {
'date': 'datetime64[D]',
'unicode': np.unicode_,
'bytes': np.bytes_,
'string': np.str_,
'empty': np.object_,
}
def _pandas_type_to_numpy_type(pandas_type):
"""Get the numpy dtype that corresponds to a pandas type.
Parameters
----------
pandas_type : str
The result of a call to pandas.lib.infer_dtype.
Returns
-------
dtype : np.dtype
The dtype that corresponds to `pandas_type`.
"""
try:
return _pandas_logical_type_map[pandas_type]
except KeyError:
if 'mixed' in pandas_type:
# catching 'mixed', 'mixed-integer' and 'mixed-integer-float'
return np.object_
return np.dtype(pandas_type)
def _get_multiindex_codes(mi):
# compat for pandas < 0.24 (MI labels renamed to codes).
if isinstance(mi, _pandas_api.pd.MultiIndex):
return mi.codes if hasattr(mi, 'codes') else mi.labels
else:
return None
def _reconstruct_columns_from_metadata(columns, column_indexes):
"""Construct a pandas MultiIndex from `columns` and column index metadata
in `column_indexes`.
Parameters
----------
columns : List[pd.Index]
The columns coming from a pyarrow.Table
column_indexes : List[Dict[str, str]]
The column index metadata deserialized from the JSON schema metadata
in a :class:`~pyarrow.Table`.
Returns
-------
result : MultiIndex
The index reconstructed using `column_indexes` metadata with levels of
the correct type.
Notes
-----
* Part of :func:`~pyarrow.pandas_compat.table_to_blockmanager`
"""
pd = _pandas_api.pd
# Get levels and labels, and provide sane defaults if the index has a
# single level to avoid if/else spaghetti.
levels = getattr(columns, 'levels', None) or [columns]
labels = _get_multiindex_codes(columns) or [
pd.RangeIndex(len(level)) for level in levels
]
# Convert each level to the dtype provided in the metadata
levels_dtypes = [
(level, col_index.get('pandas_type', str(level.dtype)))
for level, col_index in zip_longest(
levels, column_indexes, fillvalue={}
)
]
new_levels = []
encoder = operator.methodcaller('encode', 'UTF-8')
for level, pandas_dtype in levels_dtypes:
dtype = _pandas_type_to_numpy_type(pandas_dtype)
# Since our metadata is UTF-8 encoded, Python turns things that were
# bytes into unicode strings when json.loads-ing them. We need to
# convert them back to bytes to preserve metadata.
if dtype == np.bytes_:
level = level.map(encoder)
elif level.dtype != dtype:
level = level.astype(dtype)
new_levels.append(level)
return pd.MultiIndex(new_levels, labels, names=columns.names)
def _table_to_blocks(options, block_table, memory_pool, categories):
# Part of table_to_blockmanager
# Convert an arrow table to Block from the internal pandas API
result = pa.lib.table_to_blocks(options, block_table, memory_pool,
categories)
# Defined above
return [_reconstruct_block(item) for item in result]
def _flatten_single_level_multiindex(index):
pd = _pandas_api.pd
if isinstance(index, pd.MultiIndex) and index.nlevels == 1:
levels, = index.levels
labels, = _get_multiindex_codes(index)
# Cheaply check that we do not somehow have duplicate column names
if not index.is_unique:
raise ValueError('Found non-unique column index')
return pd.Index([levels[_label] if _label != -1 else None
for _label in labels],
name=index.names[0])
return index
def _add_any_metadata(table, pandas_metadata):
modified_columns = {}
schema = table.schema
index_columns = pandas_metadata['index_columns']
n_index_levels = len(index_columns)
n_columns = len(pandas_metadata['columns']) - n_index_levels
# Add time zones
for i, col_meta in enumerate(pandas_metadata['columns']):
raw_name = col_meta.get('field_name')
if not raw_name:
# deal with metadata written with arrow < 0.8
raw_name = col_meta['name']
if i >= n_columns:
# index columns
raw_name = index_columns[i - n_columns]
if raw_name is None:
raw_name = 'None'
idx = schema.get_field_index(raw_name)
if idx != -1:
if col_meta['pandas_type'] == 'datetimetz':
col = table[idx]
converted = col.to_pandas()
tz = col_meta['metadata']['timezone']
tz_aware_type = pa.timestamp('ns', tz=tz)
with_metadata = pa.Array.from_pandas(converted.values,
type=tz_aware_type)
field = pa.field(schema[idx].name, tz_aware_type)
modified_columns[idx] = pa.Column.from_array(field,
with_metadata)
if len(modified_columns) > 0:
columns = []
for i in range(len(table.schema)):
if i in modified_columns:
columns.append(modified_columns[i])
else:
columns.append(table[i])
return pa.Table.from_arrays(columns)
else:
return table