blob: 9711b722c2f8ef2b23276bb0d1f34927e6ac66e4 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import re
import json
import pandas as pd
import six
from pyarrow.compat import PY2
INDEX_LEVEL_NAME_REGEX = re.compile(r'^__index_level_\d+__$')
def is_unnamed_index_level(name):
return INDEX_LEVEL_NAME_REGEX.match(name) is not None
def infer_dtype(column):
try:
return pd.api.types.infer_dtype(column)
except AttributeError:
return pd.lib.infer_dtype(column)
def get_column_metadata(column, name):
inferred_dtype = infer_dtype(column)
dtype = column.dtype
if hasattr(dtype, 'categories'):
extra_metadata = {
'num_categories': len(column.cat.categories),
'ordered': column.cat.ordered,
}
elif hasattr(dtype, 'tz'):
extra_metadata = {'timezone': str(dtype.tz)}
else:
extra_metadata = None
if not isinstance(name, six.string_types):
raise TypeError(
'Column name must be a string. Got column {} of type {}'.format(
name, type(name).__name__
)
)
return {
'name': name,
'pandas_type': {
'string': 'bytes' if PY2 else 'unicode',
'datetime64': (
'datetimetz' if hasattr(dtype, 'tz')
else 'datetime'
),
'integer': str(dtype),
'floating': str(dtype),
}.get(inferred_dtype, inferred_dtype),
'numpy_type': str(dtype),
'metadata': extra_metadata,
}
def index_level_name(index, i):
return index.name or '__index_level_{:d}__'.format(i)
def construct_metadata(df, index_levels, preserve_index):
return {
b'pandas': json.dumps(
{
'index_columns': [
index_level_name(level, i)
for i, level in enumerate(index_levels)
] if preserve_index else [],
'columns': [
get_column_metadata(df[name], name=name)
for name in df.columns
] + (
[
get_column_metadata(
level, name=index_level_name(level, i)
)
for i, level in enumerate(index_levels)
] if preserve_index else []
),
'pandas_version': pd.__version__,
}
).encode('utf8')
}
def table_to_blockmanager(table, nthreads=1):
import pandas.core.internals as _int
from pyarrow.compat import DatetimeTZDtype
import pyarrow.lib as lib
block_table = table
index_columns = []
index_arrays = []
index_names = []
schema = table.schema
row_count = table.num_rows
metadata = schema.metadata
if metadata is not None and b'pandas' in metadata:
pandas_metadata = json.loads(metadata[b'pandas'].decode('utf8'))
index_columns = pandas_metadata['index_columns']
for name in index_columns:
i = schema.get_field_index(name)
if i != -1:
col = table.column(i)
index_name = (None if is_unnamed_index_level(name)
else name)
values = col.to_pandas().values
if not values.flags.writeable:
# ARROW-1054: in pandas 0.19.2, factorize will reject
# non-writeable arrays when calling MultiIndex.from_arrays
values = values.copy()
index_arrays.append(values)
index_names.append(index_name)
block_table = block_table.remove_column(
block_table.schema.get_field_index(name)
)
result = lib.table_to_blocks(block_table, nthreads)
blocks = []
for item in result:
block_arr = item['block']
placement = item['placement']
if 'dictionary' in item:
cat = pd.Categorical(block_arr,
categories=item['dictionary'],
ordered=False, fastpath=True)
block = _int.make_block(cat, placement=placement,
klass=_int.CategoricalBlock,
fastpath=True)
elif 'timezone' in item:
dtype = DatetimeTZDtype('ns', tz=item['timezone'])
block = _int.make_block(block_arr, placement=placement,
klass=_int.DatetimeTZBlock,
dtype=dtype, fastpath=True)
else:
block = _int.make_block(block_arr, placement=placement)
blocks.append(block)
if len(index_arrays) > 1:
index = pd.MultiIndex.from_arrays(index_arrays, names=index_names)
elif len(index_arrays) == 1:
index = pd.Index(index_arrays[0], name=index_names[0])
else:
index = pd.RangeIndex(row_count)
axes = [
[column.name for column in block_table.itercolumns()],
index
]
return _int.BlockManager(blocks, axes)