blob: 436d5bfc51582e4253ece2b4d52425c454426499 [file] [log] [blame]
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from collections import OrderedDict
import datetime
import decimal
import io
import json
import os
import six
import pickle
import pytest
import numpy as np
import pyarrow as pa
from pyarrow.compat import guid, u, BytesIO, unichar, PY2
from pyarrow.pandas_compat import _pandas_api
from pyarrow.tests import util
from pyarrow.filesystem import LocalFileSystem, FileSystem
try:
import pyarrow.parquet as pq
except ImportError:
pq = None
try:
import pandas as pd
import pandas.util.testing as tm
from .pandas_examples import dataframe_with_arrays, dataframe_with_lists
except ImportError:
pd = tm = None
# Marks all of the tests in this module
# Ignore these with pytest ... -m 'not parquet'
pytestmark = pytest.mark.parquet
@pytest.fixture(scope='module')
def datadir(datadir):
return datadir / 'parquet'
def _write_table(table, path, **kwargs):
# So we see the ImportError somewhere
import pyarrow.parquet as pq
if _pandas_api.is_data_frame(table):
table = pa.Table.from_pandas(table)
pq.write_table(table, path, **kwargs)
return table
def _read_table(*args, **kwargs):
return pq.read_table(*args, **kwargs)
def _roundtrip_table(table, read_table_kwargs=None,
write_table_kwargs=None):
read_table_kwargs = read_table_kwargs or {}
write_table_kwargs = write_table_kwargs or {}
buf = io.BytesIO()
_write_table(table, buf, **write_table_kwargs)
buf.seek(0)
return _read_table(buf, **read_table_kwargs)
def _check_roundtrip(table, expected=None, read_table_kwargs=None,
**write_table_kwargs):
if expected is None:
expected = table
read_table_kwargs = read_table_kwargs or {}
# intentionally check twice
result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs,
write_table_kwargs=write_table_kwargs)
assert result.equals(expected)
result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs,
write_table_kwargs=write_table_kwargs)
assert result.equals(expected)
def _roundtrip_pandas_dataframe(df, write_kwargs):
table = pa.Table.from_pandas(df)
buf = io.BytesIO()
_write_table(table, buf, **write_kwargs)
buf.seek(0)
table1 = _read_table(buf)
return table1.to_pandas()
@pytest.mark.parametrize('dtype', [int, float])
def test_single_pylist_column_roundtrip(tempdir, dtype):
filename = tempdir / 'single_{}_column.parquet'.format(dtype.__name__)
data = [pa.array(list(map(dtype, range(5))))]
table = pa.Table.from_arrays(data, names=['a'])
_write_table(table, filename)
table_read = _read_table(filename)
for col_written, col_read in zip(table.itercolumns(),
table_read.itercolumns()):
assert col_written.name == col_read.name
assert col_read.data.num_chunks == 1
data_written = col_written.data.chunk(0)
data_read = col_read.data.chunk(0)
assert data_written.equals(data_read)
def alltypes_sample(size=10000, seed=0, categorical=False):
np.random.seed(seed)
arrays = {
'uint8': np.arange(size, dtype=np.uint8),
'uint16': np.arange(size, dtype=np.uint16),
'uint32': np.arange(size, dtype=np.uint32),
'uint64': np.arange(size, dtype=np.uint64),
'int8': np.arange(size, dtype=np.int16),
'int16': np.arange(size, dtype=np.int16),
'int32': np.arange(size, dtype=np.int32),
'int64': np.arange(size, dtype=np.int64),
'float32': np.arange(size, dtype=np.float32),
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0,
# TODO(wesm): Test other timestamp resolutions now that arrow supports
# them
'datetime': np.arange("2016-01-01T00:00:00.001", size,
dtype='datetime64[ms]'),
'str': pd.Series([str(x) for x in range(size)]),
'empty_str': [''] * size,
'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None],
'null': [None] * size,
'null_list': [None] * 2 + [[None] * (x % 4) for x in range(size - 2)],
}
if categorical:
arrays['str_category'] = arrays['str'].astype('category')
return pd.DataFrame(arrays)
@pytest.mark.pandas
@pytest.mark.parametrize('chunk_size', [None, 1000])
def test_pandas_parquet_2_0_rountrip(tempdir, chunk_size):
df = alltypes_sample(size=10000, categorical=True)
filename = tempdir / 'pandas_rountrip.parquet'
arrow_table = pa.Table.from_pandas(df)
assert arrow_table.schema.pandas_metadata is not None
_write_table(arrow_table, filename, version="2.0",
coerce_timestamps='ms', chunk_size=chunk_size)
table_read = pq.read_pandas(filename)
assert table_read.schema.pandas_metadata is not None
assert arrow_table.schema.metadata == table_read.schema.metadata
df_read = table_read.to_pandas(categories=['str_category'])
tm.assert_frame_equal(df, df_read, check_categorical=False)
def test_set_data_page_size():
arr = pa.array([1, 2, 3] * 1000000)
t = pa.Table.from_arrays([arr], names=['f0'])
# 128K, 256K, 512K
page_sizes = [2 << 16, 2 << 17, 2 << 18]
for target_page_size in page_sizes:
_check_roundtrip(t, data_page_size=target_page_size)
@pytest.mark.pandas
def test_chunked_table_write():
# ARROW-232
df = alltypes_sample(size=10)
batch = pa.RecordBatch.from_pandas(df)
table = pa.Table.from_batches([batch] * 3)
_check_roundtrip(table, version='2.0')
df, _ = dataframe_with_lists()
batch = pa.RecordBatch.from_pandas(df)
table = pa.Table.from_batches([batch] * 3)
_check_roundtrip(table, version='2.0')
@pytest.mark.pandas
def test_no_memory_map(tempdir):
df = alltypes_sample(size=10)
table = pa.Table.from_pandas(df)
_check_roundtrip(table, read_table_kwargs={'memory_map': False},
version='2.0')
filename = str(tempdir / 'tmp_file')
with open(filename, 'wb') as f:
_write_table(table, f, version='2.0')
table_read = pq.read_pandas(filename, memory_map=False)
assert table_read.equals(table)
def test_special_chars_filename(tempdir):
table = pa.Table.from_arrays([pa.array([42])], ["ints"])
filename = "foo # bar"
path = tempdir / filename
assert not path.exists()
_write_table(table, str(path))
assert path.exists()
table_read = _read_table(str(path))
assert table_read.equals(table)
@pytest.mark.pandas
def test_empty_table_roundtrip():
df = alltypes_sample(size=10)
# Create a non-empty table to infer the types correctly, then slice to 0
table = pa.Table.from_pandas(df)
table = pa.Table.from_arrays(
[col.data.chunk(0)[:0] for col in table.itercolumns()],
names=table.schema.names)
assert table.schema.field_by_name('null').type == pa.null()
assert table.schema.field_by_name('null_list').type == pa.list_(pa.null())
_check_roundtrip(table, version='2.0')
@pytest.mark.pandas
def test_empty_table_no_columns():
df = pd.DataFrame()
empty = pa.Table.from_pandas(df, preserve_index=False)
_check_roundtrip(empty)
def test_empty_lists_table_roundtrip():
# ARROW-2744: Shouldn't crash when writing an array of empty lists
arr = pa.array([[], []], type=pa.list_(pa.int32()))
table = pa.Table.from_arrays([arr], ["A"])
_check_roundtrip(table)
@pytest.mark.pandas
def test_pandas_parquet_datetime_tz():
s = pd.Series([datetime.datetime(2017, 9, 6)])
s = s.dt.tz_localize('utc')
s.index = s
# Both a column and an index to hit both use cases
df = pd.DataFrame({'tz_aware': s,
'tz_eastern': s.dt.tz_convert('US/Eastern')},
index=s)
f = BytesIO()
arrow_table = pa.Table.from_pandas(df)
_write_table(arrow_table, f, coerce_timestamps='ms')
f.seek(0)
table_read = pq.read_pandas(f)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
@pytest.mark.skipif(six.PY2, reason='datetime.timezone is available since '
'python version 3.2')
def test_datetime_timezone_tzinfo():
value = datetime.datetime(2018, 1, 1, 1, 23, 45,
tzinfo=datetime.timezone.utc)
df = pd.DataFrame({'foo': [value]})
_roundtrip_pandas_dataframe(df, write_kwargs={})
@pytest.mark.pandas
def test_pandas_parquet_custom_metadata(tempdir):
df = alltypes_sample(size=10000)
filename = tempdir / 'pandas_rountrip.parquet'
arrow_table = pa.Table.from_pandas(df)
assert b'pandas' in arrow_table.schema.metadata
_write_table(arrow_table, filename, version='2.0', coerce_timestamps='ms')
metadata = pq.read_metadata(filename).metadata
assert b'pandas' in metadata
js = json.loads(metadata[b'pandas'].decode('utf8'))
assert js['index_columns'] == [{'kind': 'range',
'name': None,
'start': 0, 'stop': 10000,
'step': 1}]
@pytest.mark.pandas
def test_pandas_parquet_column_multiindex(tempdir):
df = alltypes_sample(size=10)
df.columns = pd.MultiIndex.from_tuples(
list(zip(df.columns, df.columns[::-1])),
names=['level_1', 'level_2']
)
filename = tempdir / 'pandas_rountrip.parquet'
arrow_table = pa.Table.from_pandas(df)
assert arrow_table.schema.pandas_metadata is not None
_write_table(arrow_table, filename, version='2.0', coerce_timestamps='ms')
table_read = pq.read_pandas(filename)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_pandas_parquet_2_0_rountrip_read_pandas_no_index_written(tempdir):
df = alltypes_sample(size=10000)
filename = tempdir / 'pandas_rountrip.parquet'
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
js = arrow_table.schema.pandas_metadata
assert not js['index_columns']
# ARROW-2170
# While index_columns should be empty, columns needs to be filled still.
assert js['columns']
_write_table(arrow_table, filename, version='2.0', coerce_timestamps='ms')
table_read = pq.read_pandas(filename)
js = table_read.schema.pandas_metadata
assert not js['index_columns']
assert arrow_table.schema.metadata == table_read.schema.metadata
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_pandas_parquet_1_0_rountrip(tempdir):
size = 10000
np.random.seed(0)
df = pd.DataFrame({
'uint8': np.arange(size, dtype=np.uint8),
'uint16': np.arange(size, dtype=np.uint16),
'uint32': np.arange(size, dtype=np.uint32),
'uint64': np.arange(size, dtype=np.uint64),
'int8': np.arange(size, dtype=np.int16),
'int16': np.arange(size, dtype=np.int16),
'int32': np.arange(size, dtype=np.int32),
'int64': np.arange(size, dtype=np.int64),
'float32': np.arange(size, dtype=np.float32),
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0,
'str': [str(x) for x in range(size)],
'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None],
'empty_str': [''] * size
})
filename = tempdir / 'pandas_rountrip.parquet'
arrow_table = pa.Table.from_pandas(df)
_write_table(arrow_table, filename, version='1.0')
table_read = _read_table(filename)
df_read = table_read.to_pandas()
# We pass uint32_t as int64_t if we write Parquet version 1.0
df['uint32'] = df['uint32'].values.astype(np.int64)
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_multiple_path_types(tempdir):
# Test compatibility with PEP 519 path-like objects
path = tempdir / 'zzz.parquet'
df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
_write_table(df, path)
table_read = _read_table(path)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
# Test compatibility with plain string paths
path = str(tempdir) + 'zzz.parquet'
df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)})
_write_table(df, path)
table_read = _read_table(path)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_pandas_column_selection(tempdir):
size = 10000
np.random.seed(0)
df = pd.DataFrame({
'uint8': np.arange(size, dtype=np.uint8),
'uint16': np.arange(size, dtype=np.uint16)
})
filename = tempdir / 'pandas_rountrip.parquet'
arrow_table = pa.Table.from_pandas(df)
_write_table(arrow_table, filename)
table_read = _read_table(filename, columns=['uint8'])
df_read = table_read.to_pandas()
tm.assert_frame_equal(df[['uint8']], df_read)
# ARROW-4267: Selection of duplicate columns still leads to these columns
# being read uniquely.
table_read = _read_table(filename, columns=['uint8', 'uint8'])
df_read = table_read.to_pandas()
tm.assert_frame_equal(df[['uint8']], df_read)
def _random_integers(size, dtype):
# We do not generate integers outside the int64 range
platform_int_info = np.iinfo('int_')
iinfo = np.iinfo(dtype)
return np.random.randint(max(iinfo.min, platform_int_info.min),
min(iinfo.max, platform_int_info.max),
size=size).astype(dtype)
def _test_dataframe(size=10000, seed=0):
np.random.seed(seed)
df = pd.DataFrame({
'uint8': _random_integers(size, np.uint8),
'uint16': _random_integers(size, np.uint16),
'uint32': _random_integers(size, np.uint32),
'uint64': _random_integers(size, np.uint64),
'int8': _random_integers(size, np.int8),
'int16': _random_integers(size, np.int16),
'int32': _random_integers(size, np.int32),
'int64': _random_integers(size, np.int64),
'float32': np.random.randn(size).astype(np.float32),
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0,
'strings': [tm.rands(10) for i in range(size)],
'all_none': [None] * size,
'all_none_category': [None] * size
})
# TODO(PARQUET-1015)
# df['all_none_category'] = df['all_none_category'].astype('category')
return df
@pytest.mark.pandas
def test_pandas_parquet_native_file_roundtrip(tempdir):
df = _test_dataframe(10000)
arrow_table = pa.Table.from_pandas(df)
imos = pa.BufferOutputStream()
_write_table(arrow_table, imos, version="2.0")
buf = imos.getvalue()
reader = pa.BufferReader(buf)
df_read = _read_table(reader).to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_parquet_incremental_file_build(tempdir):
df = _test_dataframe(100)
df['unique_id'] = 0
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
out = pa.BufferOutputStream()
writer = pq.ParquetWriter(out, arrow_table.schema, version='2.0')
frames = []
for i in range(10):
df['unique_id'] = i
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
writer.write_table(arrow_table)
frames.append(df.copy())
writer.close()
buf = out.getvalue()
result = _read_table(pa.BufferReader(buf))
expected = pd.concat(frames, ignore_index=True)
tm.assert_frame_equal(result.to_pandas(), expected)
@pytest.mark.pandas
def test_read_pandas_column_subset(tempdir):
df = _test_dataframe(10000)
arrow_table = pa.Table.from_pandas(df)
imos = pa.BufferOutputStream()
_write_table(arrow_table, imos, version="2.0")
buf = imos.getvalue()
reader = pa.BufferReader(buf)
df_read = pq.read_pandas(reader, columns=['strings', 'uint8']).to_pandas()
tm.assert_frame_equal(df[['strings', 'uint8']], df_read)
@pytest.mark.pandas
def test_pandas_parquet_empty_roundtrip(tempdir):
df = _test_dataframe(0)
arrow_table = pa.Table.from_pandas(df)
imos = pa.BufferOutputStream()
_write_table(arrow_table, imos, version="2.0")
buf = imos.getvalue()
reader = pa.BufferReader(buf)
df_read = _read_table(reader).to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_pandas_parquet_pyfile_roundtrip(tempdir):
filename = tempdir / 'pandas_pyfile_roundtrip.parquet'
size = 5
df = pd.DataFrame({
'int64': np.arange(size, dtype=np.int64),
'float32': np.arange(size, dtype=np.float32),
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0,
'strings': ['foo', 'bar', None, 'baz', 'qux']
})
arrow_table = pa.Table.from_pandas(df)
with filename.open('wb') as f:
_write_table(arrow_table, f, version="1.0")
data = io.BytesIO(filename.read_bytes())
table_read = _read_table(data)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_pandas_parquet_configuration_options(tempdir):
size = 10000
np.random.seed(0)
df = pd.DataFrame({
'uint8': np.arange(size, dtype=np.uint8),
'uint16': np.arange(size, dtype=np.uint16),
'uint32': np.arange(size, dtype=np.uint32),
'uint64': np.arange(size, dtype=np.uint64),
'int8': np.arange(size, dtype=np.int16),
'int16': np.arange(size, dtype=np.int16),
'int32': np.arange(size, dtype=np.int32),
'int64': np.arange(size, dtype=np.int64),
'float32': np.arange(size, dtype=np.float32),
'float64': np.arange(size, dtype=np.float64),
'bool': np.random.randn(size) > 0
})
filename = tempdir / 'pandas_rountrip.parquet'
arrow_table = pa.Table.from_pandas(df)
for use_dictionary in [True, False]:
_write_table(arrow_table, filename, version='2.0',
use_dictionary=use_dictionary)
table_read = _read_table(filename)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
for write_statistics in [True, False]:
_write_table(arrow_table, filename, version='2.0',
write_statistics=write_statistics)
table_read = _read_table(filename)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
for compression in ['NONE', 'SNAPPY', 'GZIP', 'LZ4', 'ZSTD']:
_write_table(arrow_table, filename, version='2.0',
compression=compression)
table_read = _read_table(filename)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
def make_sample_file(table_or_df):
if isinstance(table_or_df, pa.Table):
a_table = table_or_df
else:
a_table = pa.Table.from_pandas(table_or_df)
buf = io.BytesIO()
_write_table(a_table, buf, compression='SNAPPY', version='2.0',
coerce_timestamps='ms')
buf.seek(0)
return pq.ParquetFile(buf)
@pytest.mark.pandas
def test_parquet_metadata_api():
df = alltypes_sample(size=10000)
df = df.reindex(columns=sorted(df.columns))
df.index = np.random.randint(0, 1000000, size=len(df))
fileh = make_sample_file(df)
ncols = len(df.columns)
# Series of sniff tests
meta = fileh.metadata
repr(meta)
assert meta.num_rows == len(df)
assert meta.num_columns == ncols + 1 # +1 for index
assert meta.num_row_groups == 1
assert meta.format_version == '2.0'
assert 'parquet-cpp' in meta.created_by
assert isinstance(meta.serialized_size, int)
assert isinstance(meta.metadata, dict)
# Schema
schema = fileh.schema
assert meta.schema is schema
assert len(schema) == ncols + 1 # +1 for index
repr(schema)
col = schema[0]
repr(col)
assert col.name == df.columns[0]
assert col.max_definition_level == 1
assert col.max_repetition_level == 0
assert col.max_repetition_level == 0
assert col.physical_type == 'BOOLEAN'
assert col.converted_type == 'NONE'
with pytest.raises(IndexError):
schema[ncols + 1] # +1 for index
with pytest.raises(IndexError):
schema[-1]
# Row group
for rg in range(meta.num_row_groups):
rg_meta = meta.row_group(rg)
assert isinstance(rg_meta, pq.RowGroupMetaData)
repr(rg_meta)
for col in range(rg_meta.num_columns):
col_meta = rg_meta.column(col)
assert isinstance(col_meta, pq.ColumnChunkMetaData)
repr(col_meta)
rg_meta = meta.row_group(0)
assert rg_meta.num_rows == len(df)
assert rg_meta.num_columns == ncols + 1 # +1 for index
assert rg_meta.total_byte_size > 0
col_meta = rg_meta.column(0)
assert col_meta.file_offset > 0
assert col_meta.file_path == '' # created from BytesIO
assert col_meta.physical_type == 'BOOLEAN'
assert col_meta.num_values == 10000
assert col_meta.path_in_schema == 'bool'
assert col_meta.is_stats_set is True
assert isinstance(col_meta.statistics, pq.Statistics)
assert col_meta.compression == 'SNAPPY'
assert col_meta.encodings == ('PLAIN', 'RLE')
assert col_meta.has_dictionary_page is False
assert col_meta.dictionary_page_offset is None
assert col_meta.data_page_offset > 0
assert col_meta.total_compressed_size > 0
assert col_meta.total_uncompressed_size > 0
with pytest.raises(NotImplementedError):
col_meta.has_index_page
with pytest.raises(NotImplementedError):
col_meta.index_page_offset
@pytest.mark.pandas
@pytest.mark.parametrize(
(
'data',
'type',
'physical_type',
'min_value',
'max_value',
'null_count',
'num_values',
'distinct_count'
),
[
([1, 2, 2, None, 4], pa.uint8(), 'INT32', 1, 4, 1, 4, 0),
([1, 2, 2, None, 4], pa.uint16(), 'INT32', 1, 4, 1, 4, 0),
([1, 2, 2, None, 4], pa.uint32(), 'INT32', 1, 4, 1, 4, 0),
([1, 2, 2, None, 4], pa.uint64(), 'INT64', 1, 4, 1, 4, 0),
([-1, 2, 2, None, 4], pa.int8(), 'INT32', -1, 4, 1, 4, 0),
([-1, 2, 2, None, 4], pa.int16(), 'INT32', -1, 4, 1, 4, 0),
([-1, 2, 2, None, 4], pa.int32(), 'INT32', -1, 4, 1, 4, 0),
([-1, 2, 2, None, 4], pa.int64(), 'INT64', -1, 4, 1, 4, 0),
(
[-1.1, 2.2, 2.3, None, 4.4], pa.float32(),
'FLOAT', -1.1, 4.4, 1, 4, 0
),
(
[-1.1, 2.2, 2.3, None, 4.4], pa.float64(),
'DOUBLE', -1.1, 4.4, 1, 4, 0
),
(
[u'', u'b', unichar(1000), None, u'aaa'], pa.binary(),
'BYTE_ARRAY', b'', unichar(1000).encode('utf-8'), 1, 4, 0
),
(
[True, False, False, True, True], pa.bool_(),
'BOOLEAN', False, True, 0, 5, 0
),
(
[b'\x00', b'b', b'12', None, b'aaa'], pa.binary(),
'BYTE_ARRAY', b'\x00', b'b', 1, 4, 0
),
]
)
def test_parquet_column_statistics_api(data, type, physical_type, min_value,
max_value, null_count, num_values,
distinct_count):
df = pd.DataFrame({'data': data})
schema = pa.schema([pa.field('data', type)])
table = pa.Table.from_pandas(df, schema=schema, safe=False)
fileh = make_sample_file(table)
meta = fileh.metadata
rg_meta = meta.row_group(0)
col_meta = rg_meta.column(0)
stat = col_meta.statistics
assert stat.has_min_max
assert _close(type, stat.min, min_value)
assert _close(type, stat.max, max_value)
assert stat.null_count == null_count
assert stat.num_values == num_values
# TODO(kszucs) until parquet-cpp API doesn't expose HasDistinctCount
# method, missing distinct_count is represented as zero instead of None
assert stat.distinct_count == distinct_count
assert stat.physical_type == physical_type
def _close(type, left, right):
if type == pa.float32():
return abs(left - right) < 1E-7
elif type == pa.float64():
return abs(left - right) < 1E-13
else:
return left == right
def test_statistics_convert_logical_types(tempdir):
# ARROW-5166, ARROW-4139
# (min, max, type)
cases = [(10, 11164359321221007157, pa.uint64()),
(10, 4294967295, pa.uint32()),
(u"ähnlich", u"öffentlich", pa.utf8()),
(datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000),
pa.time32('ms')),
(datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000),
pa.time64('us')),
(datetime.datetime(2019, 6, 24, 0, 0, 0, 1000),
datetime.datetime(2019, 6, 25, 0, 0, 0, 1000),
pa.timestamp('ms')),
(datetime.datetime(2019, 6, 24, 0, 0, 0, 1000),
datetime.datetime(2019, 6, 25, 0, 0, 0, 1000),
pa.timestamp('us'))]
for i, (min_val, max_val, typ) in enumerate(cases):
t = pa.Table.from_arrays([pa.array([min_val, max_val], type=typ)],
['col'])
path = str(tempdir / ('example{}.parquet'.format(i)))
pq.write_table(t, path, version='2.0')
pf = pq.ParquetFile(path)
stats = pf.metadata.row_group(0).column(0).statistics
assert stats.min == min_val
assert stats.max == max_val
def test_parquet_write_disable_statistics(tempdir):
table = pa.Table.from_pydict(
{'a': pa.array([1, 2, 3]), 'b': pa.array(['a', 'b', 'c'])})
_write_table(table, tempdir / 'data.parquet')
meta = pq.read_metadata(tempdir / 'data.parquet')
for col in [0, 1]:
cc = meta.row_group(0).column(col)
assert cc.is_stats_set is True
assert cc.statistics is not None
_write_table(table, tempdir / 'data2.parquet', write_statistics=False)
meta = pq.read_metadata(tempdir / 'data2.parquet')
for col in [0, 1]:
cc = meta.row_group(0).column(col)
assert cc.is_stats_set is False
assert cc.statistics is None
_write_table(table, tempdir / 'data3.parquet', write_statistics=['a'])
meta = pq.read_metadata(tempdir / 'data3.parquet')
cc_a = meta.row_group(0).column(0)
assert cc_a.is_stats_set is True
assert cc_a.statistics is not None
cc_b = meta.row_group(0).column(1)
assert cc_b.is_stats_set is False
assert cc_b.statistics is None
@pytest.mark.pandas
def test_compare_schemas():
df = alltypes_sample(size=10000)
fileh = make_sample_file(df)
fileh2 = make_sample_file(df)
fileh3 = make_sample_file(df[df.columns[::2]])
# ParquetSchema
assert isinstance(fileh.schema, pq.ParquetSchema)
assert fileh.schema.equals(fileh.schema)
assert fileh.schema == fileh.schema
assert fileh.schema.equals(fileh2.schema)
assert fileh.schema == fileh2.schema
assert fileh.schema != 'arbitrary object'
assert not fileh.schema.equals(fileh3.schema)
assert fileh.schema != fileh3.schema
# ColumnSchema
assert isinstance(fileh.schema[0], pq.ColumnSchema)
assert fileh.schema[0].equals(fileh.schema[0])
assert fileh.schema[0] == fileh.schema[0]
assert not fileh.schema[0].equals(fileh.schema[1])
assert fileh.schema[0] != fileh.schema[1]
assert fileh.schema[0] != 'arbitrary object'
def test_validate_schema_write_table(tempdir):
# ARROW-2926
simple_fields = [
pa.field('POS', pa.uint32()),
pa.field('desc', pa.string())
]
simple_schema = pa.schema(simple_fields)
# simple_table schema does not match simple_schema
simple_from_array = [pa.array([1]), pa.array(['bla'])]
simple_table = pa.Table.from_arrays(simple_from_array, ['POS', 'desc'])
path = tempdir / 'simple_validate_schema.parquet'
with pq.ParquetWriter(path, simple_schema,
version='2.0',
compression='snappy', flavor='spark') as w:
with pytest.raises(ValueError):
w.write_table(simple_table)
@pytest.mark.pandas
def test_column_of_arrays(tempdir):
df, schema = dataframe_with_arrays()
filename = tempdir / 'pandas_rountrip.parquet'
arrow_table = pa.Table.from_pandas(df, schema=schema)
_write_table(arrow_table, filename, version="2.0", coerce_timestamps='ms')
table_read = _read_table(filename)
df_read = table_read.to_pandas()
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_coerce_timestamps(tempdir):
from collections import OrderedDict
# ARROW-622
arrays = OrderedDict()
fields = [pa.field('datetime64',
pa.list_(pa.timestamp('ms')))]
arrays['datetime64'] = [
np.array(['2007-07-13T01:23:34.123456789',
None,
'2010-08-13T05:46:57.437699912'],
dtype='datetime64[ms]'),
None,
None,
np.array(['2007-07-13T02',
None,
'2010-08-13T05:46:57.437699912'],
dtype='datetime64[ms]'),
]
df = pd.DataFrame(arrays)
schema = pa.schema(fields)
filename = tempdir / 'pandas_rountrip.parquet'
arrow_table = pa.Table.from_pandas(df, schema=schema)
_write_table(arrow_table, filename, version="2.0", coerce_timestamps='us')
table_read = _read_table(filename)
df_read = table_read.to_pandas()
df_expected = df.copy()
for i, x in enumerate(df_expected['datetime64']):
if isinstance(x, np.ndarray):
df_expected['datetime64'][i] = x.astype('M8[us]')
tm.assert_frame_equal(df_expected, df_read)
with pytest.raises(ValueError):
_write_table(arrow_table, filename, version='2.0',
coerce_timestamps='unknown')
@pytest.mark.pandas
def test_coerce_timestamps_truncated(tempdir):
"""
ARROW-2555: Test that we can truncate timestamps when coercing if
explicitly allowed.
"""
dt_us = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1,
second=1, microsecond=1)
dt_ms = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1,
second=1)
fields_us = [pa.field('datetime64', pa.timestamp('us'))]
arrays_us = {'datetime64': [dt_us, dt_ms]}
df_us = pd.DataFrame(arrays_us)
schema_us = pa.schema(fields_us)
filename = tempdir / 'pandas_truncated.parquet'
table_us = pa.Table.from_pandas(df_us, schema=schema_us)
_write_table(table_us, filename, version="2.0", coerce_timestamps='ms',
allow_truncated_timestamps=True)
table_ms = _read_table(filename)
df_ms = table_ms.to_pandas()
arrays_expected = {'datetime64': [dt_ms, dt_ms]}
df_expected = pd.DataFrame(arrays_expected)
tm.assert_frame_equal(df_expected, df_ms)
@pytest.mark.pandas
def test_column_of_lists(tempdir):
df, schema = dataframe_with_lists(parquet_compatible=True)
filename = tempdir / 'pandas_rountrip.parquet'
arrow_table = pa.Table.from_pandas(df, schema=schema)
_write_table(arrow_table, filename, version='2.0')
table_read = _read_table(filename)
df_read = table_read.to_pandas()
if PY2:
# assert_frame_equal fails when comparing datetime.date and
# np.datetime64, even with check_datetimelike_compat=True so
# convert the values to np.datetime64 instead
for col in ['date32[day]_list', 'date64[ms]_list']:
df[col] = df[col].apply(
lambda x: list(map(np.datetime64, x)) if x else x
)
tm.assert_frame_equal(df, df_read)
@pytest.mark.pandas
def test_date_time_types(tempdir):
t1 = pa.date32()
data1 = np.array([17259, 17260, 17261], dtype='int32')
a1 = pa.array(data1, type=t1)
t2 = pa.date64()
data2 = data1.astype('int64') * 86400000
a2 = pa.array(data2, type=t2)
t3 = pa.timestamp('us')
start = pd.Timestamp('2001-01-01').value / 1000
data3 = np.array([start, start + 1, start + 2], dtype='int64')
a3 = pa.array(data3, type=t3)
t4 = pa.time32('ms')
data4 = np.arange(3, dtype='i4')
a4 = pa.array(data4, type=t4)
t5 = pa.time64('us')
a5 = pa.array(data4.astype('int64'), type=t5)
t6 = pa.time32('s')
a6 = pa.array(data4, type=t6)
ex_t6 = pa.time32('ms')
ex_a6 = pa.array(data4 * 1000, type=ex_t6)
t7 = pa.timestamp('ns')
start = pd.Timestamp('2001-01-01').value
data7 = np.array([start, start + 1000, start + 2000],
dtype='int64')
a7 = pa.array(data7, type=t7)
table = pa.Table.from_arrays([a1, a2, a3, a4, a5, a6, a7],
['date32', 'date64', 'timestamp[us]',
'time32[s]', 'time64[us]',
'time32_from64[s]',
'timestamp[ns]'])
# date64 as date32
# time32[s] to time32[ms]
expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7],
['date32', 'date64', 'timestamp[us]',
'time32[s]', 'time64[us]',
'time32_from64[s]',
'timestamp[ns]'])
_check_roundtrip(table, expected=expected, version='2.0')
t0 = pa.timestamp('ms')
data0 = np.arange(4, dtype='int64')
a0 = pa.array(data0, type=t0)
t1 = pa.timestamp('us')
data1 = np.arange(4, dtype='int64')
a1 = pa.array(data1, type=t1)
t2 = pa.timestamp('ns')
data2 = np.arange(4, dtype='int64')
a2 = pa.array(data2, type=t2)
table = pa.Table.from_arrays([a0, a1, a2],
['ts[ms]', 'ts[us]', 'ts[ns]'])
expected = pa.Table.from_arrays([a0, a1, a2],
['ts[ms]', 'ts[us]', 'ts[ns]'])
# int64 for all timestamps supported by default
filename = tempdir / 'int64_timestamps.parquet'
_write_table(table, filename, version='2.0')
parquet_schema = pq.ParquetFile(filename).schema
for i in range(3):
assert parquet_schema.column(i).physical_type == 'INT64'
read_table = _read_table(filename)
assert read_table.equals(expected)
t0_ns = pa.timestamp('ns')
data0_ns = np.array(data0 * 1000000, dtype='int64')
a0_ns = pa.array(data0_ns, type=t0_ns)
t1_ns = pa.timestamp('ns')
data1_ns = np.array(data1 * 1000, dtype='int64')
a1_ns = pa.array(data1_ns, type=t1_ns)
expected = pa.Table.from_arrays([a0_ns, a1_ns, a2],
['ts[ms]', 'ts[us]', 'ts[ns]'])
# int96 nanosecond timestamps produced upon request
filename = tempdir / 'explicit_int96_timestamps.parquet'
_write_table(table, filename, version='2.0',
use_deprecated_int96_timestamps=True)
parquet_schema = pq.ParquetFile(filename).schema
for i in range(3):
assert parquet_schema.column(i).physical_type == 'INT96'
read_table = _read_table(filename)
assert read_table.equals(expected)
# int96 nanosecond timestamps implied by flavor 'spark'
filename = tempdir / 'spark_int96_timestamps.parquet'
_write_table(table, filename, version='2.0',
flavor='spark')
parquet_schema = pq.ParquetFile(filename).schema
for i in range(3):
assert parquet_schema.column(i).physical_type == 'INT96'
read_table = _read_table(filename)
assert read_table.equals(expected)
@pytest.mark.pandas
def test_list_of_datetime_time_roundtrip():
# ARROW-4135
times = pd.to_datetime(['09:00', '09:30', '10:00', '10:30', '11:00',
'11:30', '12:00'])
df = pd.DataFrame({'time': [times.time]})
_roundtrip_pandas_dataframe(df, write_kwargs={})
@pytest.mark.pandas
def test_parquet_version_timestamp_differences():
i_s = pd.Timestamp('2010-01-01').value / 1000000000 # := 1262304000
d_s = np.arange(i_s, i_s + 10, 1, dtype='int64')
d_ms = d_s * 1000
d_us = d_ms * 1000
d_ns = d_us * 1000
a_s = pa.array(d_s, type=pa.timestamp('s'))
a_ms = pa.array(d_ms, type=pa.timestamp('ms'))
a_us = pa.array(d_us, type=pa.timestamp('us'))
a_ns = pa.array(d_ns, type=pa.timestamp('ns'))
names = ['ts:s', 'ts:ms', 'ts:us', 'ts:ns']
table = pa.Table.from_arrays([a_s, a_ms, a_us, a_ns], names)
# Using Parquet version 1.0, seconds should be coerced to milliseconds
# and nanoseconds should be coerced to microseconds by default
expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_us], names)
_check_roundtrip(table, expected)
# Using Parquet version 2.0, seconds should be coerced to milliseconds
# and nanoseconds should be retained by default
expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_ns], names)
_check_roundtrip(table, expected, version='2.0')
# Using Parquet version 1.0, coercing to milliseconds or microseconds
# is allowed
expected = pa.Table.from_arrays([a_ms, a_ms, a_ms, a_ms], names)
_check_roundtrip(table, expected, coerce_timestamps='ms')
# Using Parquet version 2.0, coercing to milliseconds or microseconds
# is allowed
expected = pa.Table.from_arrays([a_us, a_us, a_us, a_us], names)
_check_roundtrip(table, expected, version='2.0', coerce_timestamps='us')
# TODO: after pyarrow allows coerce_timestamps='ns', tests like the
# following should pass ...
# Using Parquet version 1.0, coercing to nanoseconds is not allowed
# expected = None
# with pytest.raises(NotImplementedError):
# _roundtrip_table(table, coerce_timestamps='ns')
# Using Parquet version 2.0, coercing to nanoseconds is allowed
# expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names)
# _check_roundtrip(table, expected, version='2.0', coerce_timestamps='ns')
# For either Parquet version, coercing to nanoseconds is allowed
# if Int96 storage is used
expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names)
_check_roundtrip(table, expected,
use_deprecated_int96_timestamps=True)
_check_roundtrip(table, expected, version='2.0',
use_deprecated_int96_timestamps=True)
def test_large_list_records():
# This was fixed in PARQUET-1100
list_lengths = np.random.randint(0, 500, size=50)
list_lengths[::10] = 0
list_values = [list(map(int, np.random.randint(0, 100, size=x)))
if i % 8 else None
for i, x in enumerate(list_lengths)]
a1 = pa.array(list_values)
table = pa.Table.from_arrays([a1], ['int_lists'])
_check_roundtrip(table)
def test_sanitized_spark_field_names():
a0 = pa.array([0, 1, 2, 3, 4])
name = 'prohib; ,\t{}'
table = pa.Table.from_arrays([a0], [name])
result = _roundtrip_table(table, write_table_kwargs={'flavor': 'spark'})
expected_name = 'prohib______'
assert result.schema[0].name == expected_name
@pytest.mark.pandas
def test_spark_flavor_preserves_pandas_metadata():
df = _test_dataframe(size=100)
df.index = np.arange(0, 10 * len(df), 10)
df.index.name = 'foo'
result = _roundtrip_pandas_dataframe(df, {'version': '2.0',
'flavor': 'spark'})
tm.assert_frame_equal(result, df)
def test_fixed_size_binary():
t0 = pa.binary(10)
data = [b'fooooooooo', None, b'barooooooo', b'quxooooooo']
a0 = pa.array(data, type=t0)
table = pa.Table.from_arrays([a0],
['binary[10]'])
_check_roundtrip(table)
@pytest.mark.pandas
def test_multithreaded_read():
df = alltypes_sample(size=10000)
table = pa.Table.from_pandas(df)
buf = io.BytesIO()
_write_table(table, buf, compression='SNAPPY', version='2.0')
buf.seek(0)
table1 = _read_table(buf, use_threads=True)
buf.seek(0)
table2 = _read_table(buf, use_threads=False)
assert table1.equals(table2)
@pytest.mark.pandas
def test_min_chunksize():
data = pd.DataFrame([np.arange(4)], columns=['A', 'B', 'C', 'D'])
table = pa.Table.from_pandas(data.reset_index())
buf = io.BytesIO()
_write_table(table, buf, chunk_size=-1)
buf.seek(0)
result = _read_table(buf)
assert result.equals(table)
with pytest.raises(ValueError):
_write_table(table, buf, chunk_size=0)
@pytest.mark.pandas
def test_pass_separate_metadata():
# ARROW-471
df = alltypes_sample(size=10000)
a_table = pa.Table.from_pandas(df)
buf = io.BytesIO()
_write_table(a_table, buf, compression='snappy', version='2.0')
buf.seek(0)
metadata = pq.read_metadata(buf)
buf.seek(0)
fileh = pq.ParquetFile(buf, metadata=metadata)
tm.assert_frame_equal(df, fileh.read().to_pandas())
@pytest.mark.pandas
def test_read_single_row_group():
# ARROW-471
N, K = 10000, 4
df = alltypes_sample(size=N)
a_table = pa.Table.from_pandas(df)
buf = io.BytesIO()
_write_table(a_table, buf, row_group_size=N / K,
compression='snappy', version='2.0')
buf.seek(0)
pf = pq.ParquetFile(buf)
assert pf.num_row_groups == K
row_groups = [pf.read_row_group(i) for i in range(K)]
result = pa.concat_tables(row_groups)
tm.assert_frame_equal(df, result.to_pandas())
@pytest.mark.pandas
def test_read_single_row_group_with_column_subset():
N, K = 10000, 4
df = alltypes_sample(size=N)
a_table = pa.Table.from_pandas(df)
buf = io.BytesIO()
_write_table(a_table, buf, row_group_size=N / K,
compression='snappy', version='2.0')
buf.seek(0)
pf = pq.ParquetFile(buf)
cols = list(df.columns[:2])
row_groups = [pf.read_row_group(i, columns=cols) for i in range(K)]
result = pa.concat_tables(row_groups)
tm.assert_frame_equal(df[cols], result.to_pandas())
# ARROW-4267: Selection of duplicate columns still leads to these columns
# being read uniquely.
row_groups = [pf.read_row_group(i, columns=cols + cols) for i in range(K)]
result = pa.concat_tables(row_groups)
tm.assert_frame_equal(df[cols], result.to_pandas())
@pytest.mark.pandas
def test_scan_contents():
N, K = 10000, 4
df = alltypes_sample(size=N)
a_table = pa.Table.from_pandas(df)
buf = io.BytesIO()
_write_table(a_table, buf, row_group_size=N / K,
compression='snappy', version='2.0')
buf.seek(0)
pf = pq.ParquetFile(buf)
assert pf.scan_contents() == 10000
assert pf.scan_contents(df.columns[:4]) == 10000
@pytest.mark.pandas
def test_parquet_piece_read(tempdir):
df = _test_dataframe(1000)
table = pa.Table.from_pandas(df)
path = tempdir / 'parquet_piece_read.parquet'
_write_table(table, path, version='2.0')
piece1 = pq.ParquetDatasetPiece(path)
result = piece1.read()
assert result.equals(table)
@pytest.mark.pandas
def test_parquet_piece_open_and_get_metadata(tempdir):
df = _test_dataframe(100)
table = pa.Table.from_pandas(df)
path = tempdir / 'parquet_piece_read.parquet'
_write_table(table, path, version='2.0')
piece = pq.ParquetDatasetPiece(path)
table1 = piece.read()
assert isinstance(table1, pa.Table)
meta1 = piece.get_metadata()
assert isinstance(meta1, pq.FileMetaData)
def open_parquet(fn):
return pq.ParquetFile(open(fn, mode='rb'))
# test deprecated open_file_func
with pytest.warns(DeprecationWarning):
table2 = piece.read(open_file_func=open_parquet)
assert isinstance(table2, pa.Table)
with pytest.warns(DeprecationWarning):
meta2 = piece.get_metadata(open_file_func=open_parquet)
assert isinstance(meta2, pq.FileMetaData)
assert table == table1 == table2
assert meta1 == meta2
def test_parquet_piece_basics():
path = '/baz.parq'
piece1 = pq.ParquetDatasetPiece(path)
piece2 = pq.ParquetDatasetPiece(path, row_group=1)
piece3 = pq.ParquetDatasetPiece(
path, row_group=1, partition_keys=[('foo', 0), ('bar', 1)])
assert str(piece1) == path
assert str(piece2) == '/baz.parq | row_group=1'
assert str(piece3) == 'partition[foo=0, bar=1] /baz.parq | row_group=1'
assert piece1 == piece1
assert piece2 == piece2
assert piece3 == piece3
assert piece1 != piece3
def test_partition_set_dictionary_type():
set1 = pq.PartitionSet('key1', [u('foo'), u('bar'), u('baz')])
set2 = pq.PartitionSet('key2', [2007, 2008, 2009])
assert isinstance(set1.dictionary, pa.StringArray)
assert isinstance(set2.dictionary, pa.IntegerArray)
set3 = pq.PartitionSet('key2', [datetime.datetime(2007, 1, 1)])
with pytest.raises(TypeError):
set3.dictionary
@pytest.mark.pandas
def test_read_partitioned_directory(tempdir):
fs = LocalFileSystem.get_instance()
_partition_test_for_filesystem(fs, tempdir)
@pytest.mark.pandas
def test_create_parquet_dataset_multi_threaded(tempdir):
fs = LocalFileSystem.get_instance()
base_path = tempdir
_partition_test_for_filesystem(fs, base_path)
manifest = pq.ParquetManifest(base_path, filesystem=fs,
metadata_nthreads=1)
dataset = pq.ParquetDataset(base_path, filesystem=fs, metadata_nthreads=16)
assert len(dataset.pieces) > 0
partitions = dataset.partitions
assert len(partitions.partition_names) > 0
assert partitions.partition_names == manifest.partitions.partition_names
assert len(partitions.levels) == len(manifest.partitions.levels)
@pytest.mark.pandas
def test_equivalency(tempdir):
fs = LocalFileSystem.get_instance()
base_path = tempdir
integer_keys = [0, 1]
string_keys = ['a', 'b', 'c']
boolean_keys = [True, False]
partition_spec = [
['integer', integer_keys],
['string', string_keys],
['boolean', boolean_keys]
]
df = pd.DataFrame({
'integer': np.array(integer_keys, dtype='i4').repeat(15),
'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5),
3),
}, columns=['integer', 'string', 'boolean'])
_generate_partition_directories(fs, base_path, partition_spec, df)
# Old filters syntax:
# integer == 1 AND string != b AND boolean == True
dataset = pq.ParquetDataset(
base_path, filesystem=fs,
filters=[('integer', '=', 1), ('string', '!=', 'b'),
('boolean', '==', True)]
)
table = dataset.read()
result_df = (table.to_pandas().reset_index(drop=True))
assert 0 not in result_df['integer'].values
assert 'b' not in result_df['string'].values
assert False not in result_df['boolean'].values
# filters in disjunctive normal form:
# (integer == 1 AND string != b AND boolean == True) OR
# (integer == 2 AND boolean == False)
# TODO(ARROW-3388): boolean columns are reconstructed as string
filters = [
[
('integer', '=', 1),
('string', '!=', 'b'),
('boolean', '==', 'True')
],
[('integer', '=', 0), ('boolean', '==', 'False')]
]
dataset = pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
table = dataset.read()
result_df = table.to_pandas().reset_index(drop=True)
# Check that all rows in the DF fulfill the filter
# Pandas 0.23.x has problems with indexing constant memoryviews in
# categoricals. Thus we need to make an explicity copy here with np.array.
df_filter_1 = (np.array(result_df['integer']) == 1) \
& (np.array(result_df['string']) != 'b') \
& (np.array(result_df['boolean']) == 'True')
df_filter_2 = (np.array(result_df['integer']) == 0) \
& (np.array(result_df['boolean']) == 'False')
assert df_filter_1.sum() > 0
assert df_filter_2.sum() > 0
assert result_df.shape[0] == (df_filter_1.sum() + df_filter_2.sum())
# Check for \0 in predicate values. Until they are correctly implemented
# in ARROW-3391, they would otherwise lead to weird results with the
# current code.
with pytest.raises(NotImplementedError):
filters = [[('string', '==', b'1\0a')]]
pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
with pytest.raises(NotImplementedError):
filters = [[('string', '==', u'1\0a')]]
pq.ParquetDataset(base_path, filesystem=fs, filters=filters)
@pytest.mark.pandas
def test_cutoff_exclusive_integer(tempdir):
fs = LocalFileSystem.get_instance()
base_path = tempdir
integer_keys = [0, 1, 2, 3, 4]
partition_spec = [
['integers', integer_keys],
]
N = 5
df = pd.DataFrame({
'index': np.arange(N),
'integers': np.array(integer_keys, dtype='i4'),
}, columns=['index', 'integers'])
_generate_partition_directories(fs, base_path, partition_spec, df)
dataset = pq.ParquetDataset(
base_path, filesystem=fs,
filters=[
('integers', '<', 4),
('integers', '>', 1),
]
)
table = dataset.read()
result_df = (table.to_pandas()
.sort_values(by='index')
.reset_index(drop=True))
result_list = [x for x in map(int, result_df['integers'].values)]
assert result_list == [2, 3]
@pytest.mark.pandas
@pytest.mark.xfail(
raises=TypeError,
reason='Loss of type information in creation of categoricals.'
)
def test_cutoff_exclusive_datetime(tempdir):
fs = LocalFileSystem.get_instance()
base_path = tempdir
date_keys = [
datetime.date(2018, 4, 9),
datetime.date(2018, 4, 10),
datetime.date(2018, 4, 11),
datetime.date(2018, 4, 12),
datetime.date(2018, 4, 13)
]
partition_spec = [
['dates', date_keys]
]
N = 5
df = pd.DataFrame({
'index': np.arange(N),
'dates': np.array(date_keys, dtype='datetime64'),
}, columns=['index', 'dates'])
_generate_partition_directories(fs, base_path, partition_spec, df)
dataset = pq.ParquetDataset(
base_path, filesystem=fs,
filters=[
('dates', '<', "2018-04-12"),
('dates', '>', "2018-04-10")
]
)
table = dataset.read()
result_df = (table.to_pandas()
.sort_values(by='index')
.reset_index(drop=True))
expected = pd.Categorical(
np.array([datetime.date(2018, 4, 11)], dtype='datetime64'),
categories=np.array(date_keys, dtype='datetime64'))
assert result_df['dates'].values == expected
@pytest.mark.pandas
def test_inclusive_integer(tempdir):
fs = LocalFileSystem.get_instance()
base_path = tempdir
integer_keys = [0, 1, 2, 3, 4]
partition_spec = [
['integers', integer_keys],
]
N = 5
df = pd.DataFrame({
'index': np.arange(N),
'integers': np.array(integer_keys, dtype='i4'),
}, columns=['index', 'integers'])
_generate_partition_directories(fs, base_path, partition_spec, df)
dataset = pq.ParquetDataset(
base_path, filesystem=fs,
filters=[
('integers', '<=', 3),
('integers', '>=', 2),
]
)
table = dataset.read()
result_df = (table.to_pandas()
.sort_values(by='index')
.reset_index(drop=True))
result_list = [int(x) for x in map(int, result_df['integers'].values)]
assert result_list == [2, 3]
@pytest.mark.pandas
def test_inclusive_set(tempdir):
fs = LocalFileSystem.get_instance()
base_path = tempdir
integer_keys = [0, 1]
string_keys = ['a', 'b', 'c']
boolean_keys = [True, False]
partition_spec = [
['integer', integer_keys],
['string', string_keys],
['boolean', boolean_keys]
]
df = pd.DataFrame({
'integer': np.array(integer_keys, dtype='i4').repeat(15),
'string': np.tile(np.tile(np.array(string_keys, dtype=object), 5), 2),
'boolean': np.tile(np.tile(np.array(boolean_keys, dtype='bool'), 5),
3),
}, columns=['integer', 'string', 'boolean'])
_generate_partition_directories(fs, base_path, partition_spec, df)
dataset = pq.ParquetDataset(
base_path, filesystem=fs,
filters=[('integer', 'in', {1}), ('string', 'in', {'a', 'b'}),
('boolean', 'in', {True})]
)
table = dataset.read()
result_df = (table.to_pandas().reset_index(drop=True))
assert 0 not in result_df['integer'].values
assert 'c' not in result_df['string'].values
assert False not in result_df['boolean'].values
@pytest.mark.pandas
def test_invalid_pred_op(tempdir):
fs = LocalFileSystem.get_instance()
base_path = tempdir
integer_keys = [0, 1, 2, 3, 4]
partition_spec = [
['integers', integer_keys],
]
N = 5
df = pd.DataFrame({
'index': np.arange(N),
'integers': np.array(integer_keys, dtype='i4'),
}, columns=['index', 'integers'])
_generate_partition_directories(fs, base_path, partition_spec, df)
with pytest.raises(ValueError):
pq.ParquetDataset(base_path,
filesystem=fs,
filters=[
('integers', '=<', 3),
])
with pytest.raises(ValueError):
pq.ParquetDataset(base_path,
filesystem=fs,
filters=[
('integers', 'in', set()),
])
with pytest.raises(ValueError):
pq.ParquetDataset(base_path,
filesystem=fs,
filters=[
('integers', '!=', {3}),
])
@pytest.mark.pandas
def test_filters_read_table(tempdir):
# test that filters keyword is passed through in read_table
fs = LocalFileSystem.get_instance()
base_path = tempdir
integer_keys = [0, 1, 2, 3, 4]
partition_spec = [
['integers', integer_keys],
]
N = 5
df = pd.DataFrame({
'index': np.arange(N),
'integers': np.array(integer_keys, dtype='i4'),
}, columns=['index', 'integers'])
_generate_partition_directories(fs, base_path, partition_spec, df)
table = pq.read_table(
base_path, filesystem=fs, filters=[('integers', '<', 3)])
assert table.num_rows == 3
table = pq.read_table(
base_path, filesystem=fs, filters=[[('integers', '<', 3)]])
assert table.num_rows == 3
table = pq.read_pandas(
base_path, filters=[('integers', '<', 3)])
assert table.num_rows == 3
@pytest.yield_fixture
def s3_example():
access_key = os.environ['PYARROW_TEST_S3_ACCESS_KEY']
secret_key = os.environ['PYARROW_TEST_S3_SECRET_KEY']
bucket_name = os.environ['PYARROW_TEST_S3_BUCKET']
import s3fs
fs = s3fs.S3FileSystem(key=access_key, secret=secret_key)
test_dir = guid()
bucket_uri = 's3://{0}/{1}'.format(bucket_name, test_dir)
fs.mkdir(bucket_uri)
yield fs, bucket_uri
fs.rm(bucket_uri, recursive=True)
@pytest.mark.pandas
@pytest.mark.s3
def test_read_partitioned_directory_s3fs(s3_example):
from pyarrow.filesystem import S3FSWrapper
fs, bucket_uri = s3_example
wrapper = S3FSWrapper(fs)
_partition_test_for_filesystem(wrapper, bucket_uri)
# Check that we can auto-wrap
dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
dataset.read()
def _partition_test_for_filesystem(fs, base_path):
foo_keys = [0, 1]
bar_keys = ['a', 'b', 'c']
partition_spec = [
['foo', foo_keys],
['bar', bar_keys]
]
N = 30
df = pd.DataFrame({
'index': np.arange(N),
'foo': np.array(foo_keys, dtype='i4').repeat(15),
'bar': np.tile(np.tile(np.array(bar_keys, dtype=object), 5), 2),
'values': np.random.randn(N)
}, columns=['index', 'foo', 'bar', 'values'])
_generate_partition_directories(fs, base_path, partition_spec, df)
dataset = pq.ParquetDataset(base_path, filesystem=fs)
table = dataset.read()
result_df = (table.to_pandas()
.sort_values(by='index')
.reset_index(drop=True))
expected_df = (df.sort_values(by='index')
.reset_index(drop=True)
.reindex(columns=result_df.columns))
expected_df['foo'] = pd.Categorical(df['foo'], categories=foo_keys)
expected_df['bar'] = pd.Categorical(df['bar'], categories=bar_keys)
assert (result_df.columns == ['index', 'values', 'foo', 'bar']).all()
tm.assert_frame_equal(result_df, expected_df)
def _generate_partition_directories(fs, base_dir, partition_spec, df):
# partition_spec : list of lists, e.g. [['foo', [0, 1, 2],
# ['bar', ['a', 'b', 'c']]
# part_table : a pyarrow.Table to write to each partition
DEPTH = len(partition_spec)
def _visit_level(base_dir, level, part_keys):
name, values = partition_spec[level]
for value in values:
this_part_keys = part_keys + [(name, value)]
level_dir = base_dir / '{0}={1}'.format(name, value)
fs.mkdir(level_dir)
if level == DEPTH - 1:
# Generate example data
file_path = level_dir / guid()
filtered_df = _filter_partition(df, this_part_keys)
part_table = pa.Table.from_pandas(filtered_df)
with fs.open(file_path, 'wb') as f:
_write_table(part_table, f)
assert fs.exists(file_path)
(level_dir / '_SUCCESS').touch()
else:
_visit_level(level_dir, level + 1, this_part_keys)
(level_dir / '_SUCCESS').touch()
_visit_level(base_dir, 0, [])
def _test_read_common_metadata_files(fs, base_path):
N = 100
df = pd.DataFrame({
'index': np.arange(N),
'values': np.random.randn(N)
}, columns=['index', 'values'])
base_path = str(base_path)
data_path = os.path.join(base_path, 'data.parquet')
table = pa.Table.from_pandas(df)
with fs.open(data_path, 'wb') as f:
_write_table(table, f)
metadata_path = os.path.join(base_path, '_common_metadata')
with fs.open(metadata_path, 'wb') as f:
pq.write_metadata(table.schema, f)
dataset = pq.ParquetDataset(base_path, filesystem=fs)
assert dataset.common_metadata_path == str(metadata_path)
with fs.open(data_path) as f:
common_schema = pq.read_metadata(f).schema
assert dataset.schema.equals(common_schema)
# handle list of one directory
dataset2 = pq.ParquetDataset([base_path], filesystem=fs)
assert dataset2.schema.equals(dataset.schema)
@pytest.mark.pandas
def test_read_common_metadata_files(tempdir):
fs = LocalFileSystem.get_instance()
_test_read_common_metadata_files(fs, tempdir)
@pytest.mark.pandas
def test_read_metadata_files(tempdir):
fs = LocalFileSystem.get_instance()
N = 100
df = pd.DataFrame({
'index': np.arange(N),
'values': np.random.randn(N)
}, columns=['index', 'values'])
data_path = tempdir / 'data.parquet'
table = pa.Table.from_pandas(df)
with fs.open(data_path, 'wb') as f:
_write_table(table, f)
metadata_path = tempdir / '_metadata'
with fs.open(metadata_path, 'wb') as f:
pq.write_metadata(table.schema, f)
dataset = pq.ParquetDataset(tempdir, filesystem=fs)
assert dataset.metadata_path == str(metadata_path)
with fs.open(data_path) as f:
metadata_schema = pq.read_metadata(f).schema
assert dataset.schema.equals(metadata_schema)
@pytest.mark.pandas
def test_read_schema(tempdir):
N = 100
df = pd.DataFrame({
'index': np.arange(N),
'values': np.random.randn(N)
}, columns=['index', 'values'])
data_path = tempdir / 'test.parquet'
table = pa.Table.from_pandas(df)
_write_table(table, data_path)
assert table.schema.equals(pq.read_schema(data_path))
assert table.schema.equals(pq.read_schema(data_path, memory_map=True))
def _filter_partition(df, part_keys):
predicate = np.ones(len(df), dtype=bool)
to_drop = []
for name, value in part_keys:
to_drop.append(name)
# to avoid pandas warning
if isinstance(value, (datetime.date, datetime.datetime)):
value = pd.Timestamp(value)
predicate &= df[name] == value
return df[predicate].drop(to_drop, axis=1)
@pytest.mark.pandas
def test_read_multiple_files(tempdir):
nfiles = 10
size = 5
dirpath = tempdir / guid()
dirpath.mkdir()
test_data = []
paths = []
for i in range(nfiles):
df = _test_dataframe(size, seed=i)
# Hack so that we don't have a dtype cast in v1 files
df['uint32'] = df['uint32'].astype(np.int64)
path = dirpath / '{}.parquet'.format(i)
table = pa.Table.from_pandas(df)
_write_table(table, path)
test_data.append(table)
paths.append(path)
# Write a _SUCCESS.crc file
(dirpath / '_SUCCESS.crc').touch()
def read_multiple_files(paths, columns=None, use_threads=True, **kwargs):
dataset = pq.ParquetDataset(paths, **kwargs)
return dataset.read(columns=columns, use_threads=use_threads)
result = read_multiple_files(paths)
expected = pa.concat_tables(test_data)
assert result.equals(expected)
# Read with provided metadata
metadata = pq.read_metadata(paths[0])
result2 = read_multiple_files(paths, metadata=metadata)
assert result2.equals(expected)
result3 = pa.localfs.read_parquet(dirpath, schema=metadata.schema)
assert result3.equals(expected)
# Read column subset
to_read = [result[0], result[2], result[6], result[result.num_columns - 1]]
result = pa.localfs.read_parquet(
dirpath, columns=[c.name for c in to_read])
expected = pa.Table.from_arrays(to_read, metadata=result.schema.metadata)
assert result.equals(expected)
# Read with multiple threads
pa.localfs.read_parquet(dirpath, use_threads=True)
# Test failure modes with non-uniform metadata
bad_apple = _test_dataframe(size, seed=i).iloc[:, :4]
bad_apple_path = tempdir / '{}.parquet'.format(guid())
t = pa.Table.from_pandas(bad_apple)
_write_table(t, bad_apple_path)
bad_meta = pq.read_metadata(bad_apple_path)
with pytest.raises(ValueError):
read_multiple_files(paths + [bad_apple_path])
with pytest.raises(ValueError):
read_multiple_files(paths, metadata=bad_meta)
mixed_paths = [bad_apple_path, paths[0]]
with pytest.raises(ValueError):
read_multiple_files(mixed_paths, schema=bad_meta.schema)
with pytest.raises(ValueError):
read_multiple_files(mixed_paths)
@pytest.mark.pandas
def test_dataset_read_pandas(tempdir):
nfiles = 5
size = 5
dirpath = tempdir / guid()
dirpath.mkdir()
test_data = []
frames = []
paths = []
for i in range(nfiles):
df = _test_dataframe(size, seed=i)
df.index = np.arange(i * size, (i + 1) * size)
df.index.name = 'index'
path = dirpath / '{}.parquet'.format(i)
table = pa.Table.from_pandas(df)
_write_table(table, path)
test_data.append(table)
frames.append(df)
paths.append(path)
dataset = pq.ParquetDataset(dirpath)
columns = ['uint8', 'strings']
result = dataset.read_pandas(columns=columns).to_pandas()
expected = pd.concat([x[columns] for x in frames])
tm.assert_frame_equal(result, expected)
@pytest.mark.pandas
def test_dataset_no_memory_map(tempdir):
# ARROW-2627: Check that we can use ParquetDataset without memory-mapping
dirpath = tempdir / guid()
dirpath.mkdir()
df = _test_dataframe(10, seed=0)
path = dirpath / '{}.parquet'.format(0)
table = pa.Table.from_pandas(df)
_write_table(table, path, version='2.0')
# TODO(wesm): Not sure how to easily check that memory mapping is _not_
# used. Mocking is not especially easy for pa.memory_map
dataset = pq.ParquetDataset(dirpath, memory_map=False)
assert dataset.pieces[0].read().equals(table)
@pytest.mark.pandas
@pytest.mark.parametrize('preserve_index', [True, False, None])
def test_dataset_read_pandas_common_metadata(tempdir, preserve_index):
# ARROW-1103
nfiles = 5
size = 5
dirpath = tempdir / guid()
dirpath.mkdir()
test_data = []
frames = []
paths = []
for i in range(nfiles):
df = _test_dataframe(size, seed=i)
df.index = pd.Index(np.arange(i * size, (i + 1) * size), name='index')
path = dirpath / '{}.parquet'.format(i)
table = pa.Table.from_pandas(df, preserve_index=preserve_index)
# Obliterate metadata
table = table.replace_schema_metadata(None)
assert table.schema.metadata is None
_write_table(table, path)
test_data.append(table)
frames.append(df)
paths.append(path)
# Write _metadata common file
table_for_metadata = pa.Table.from_pandas(
df, preserve_index=preserve_index
)
pq.write_metadata(table_for_metadata.schema, dirpath / '_metadata')
dataset = pq.ParquetDataset(dirpath)
columns = ['uint8', 'strings']
result = dataset.read_pandas(columns=columns).to_pandas()
expected = pd.concat([x[columns] for x in frames])
expected.index.name = (
df.index.name if preserve_index is not False else None)
tm.assert_frame_equal(result, expected)
def _make_example_multifile_dataset(base_path, nfiles=10, file_nrows=5):
test_data = []
paths = []
for i in range(nfiles):
df = _test_dataframe(file_nrows, seed=i)
path = base_path / '{}.parquet'.format(i)
test_data.append(_write_table(df, path))
paths.append(path)
return paths
@pytest.mark.pandas
def test_ignore_private_directories(tempdir):
dirpath = tempdir / guid()
dirpath.mkdir()
paths = _make_example_multifile_dataset(dirpath, nfiles=10,
file_nrows=5)
# private directory
(dirpath / '_impala_staging').mkdir()
dataset = pq.ParquetDataset(dirpath)
assert set(map(str, paths)) == set(x.path for x in dataset.pieces)
@pytest.mark.pandas
def test_ignore_hidden_files_dot(tempdir):
dirpath = tempdir / guid()
dirpath.mkdir()
paths = _make_example_multifile_dataset(dirpath, nfiles=10,
file_nrows=5)
with (dirpath / '.DS_Store').open('wb') as f:
f.write(b'gibberish')
with (dirpath / '.private').open('wb') as f:
f.write(b'gibberish')
dataset = pq.ParquetDataset(dirpath)
assert set(map(str, paths)) == set(x.path for x in dataset.pieces)
@pytest.mark.pandas
def test_ignore_hidden_files_underscore(tempdir):
dirpath = tempdir / guid()
dirpath.mkdir()
paths = _make_example_multifile_dataset(dirpath, nfiles=10,
file_nrows=5)
with (dirpath / '_committed_123').open('wb') as f:
f.write(b'abcd')
with (dirpath / '_started_321').open('wb') as f:
f.write(b'abcd')
dataset = pq.ParquetDataset(dirpath)
assert set(map(str, paths)) == set(x.path for x in dataset.pieces)
@pytest.mark.pandas
def test_multiindex_duplicate_values(tempdir):
num_rows = 3
numbers = list(range(num_rows))
index = pd.MultiIndex.from_arrays(
[['foo', 'foo', 'bar'], numbers],
names=['foobar', 'some_numbers'],
)
df = pd.DataFrame({'numbers': numbers}, index=index)
table = pa.Table.from_pandas(df)
filename = tempdir / 'dup_multi_index_levels.parquet'
_write_table(table, filename)
result_table = _read_table(filename)
assert table.equals(result_table)
result_df = result_table.to_pandas()
tm.assert_frame_equal(result_df, df)
@pytest.mark.pandas
def test_write_error_deletes_incomplete_file(tempdir):
# ARROW-1285
df = pd.DataFrame({'a': list('abc'),
'b': list(range(1, 4)),
'c': np.arange(3, 6).astype('u1'),
'd': np.arange(4.0, 7.0, dtype='float64'),
'e': [True, False, True],
'f': pd.Categorical(list('abc')),
'g': pd.date_range('20130101', periods=3),
'h': pd.date_range('20130101', periods=3,
tz='US/Eastern'),
'i': pd.date_range('20130101', periods=3, freq='ns')})
pdf = pa.Table.from_pandas(df)
filename = tempdir / 'tmp_file'
try:
_write_table(pdf, filename)
except pa.ArrowException:
pass
assert not filename.exists()
@pytest.mark.pandas
def test_noncoerced_nanoseconds_written_without_exception(tempdir):
# ARROW-1957: the Parquet version 2.0 writer preserves Arrow
# nanosecond timestamps by default
n = 9
df = pd.DataFrame({'x': range(n)},
index=pd.DatetimeIndex(start='2017-01-01',
freq='1n',
periods=n))
tb = pa.Table.from_pandas(df)
filename = tempdir / 'written.parquet'
try:
pq.write_table(tb, filename, version='2.0')
except Exception:
pass
assert filename.exists()
recovered_table = pq.read_table(filename)
assert tb.equals(recovered_table)
# Loss of data thru coercion (without explicit override) still an error
filename = tempdir / 'not_written.parquet'
with pytest.raises(ValueError):
pq.write_table(tb, filename, coerce_timestamps='ms', version='2.0')
def test_read_non_existent_file(tempdir):
path = 'non-existent-file.parquet'
try:
pq.read_table(path)
except Exception as e:
assert path in e.args[0]
def test_read_table_doesnt_warn(datadir):
with pytest.warns(None) as record:
pq.read_table(datadir / 'v0.7.1.parquet')
assert len(record) == 0
def _test_write_to_dataset_with_partitions(base_path,
filesystem=None,
schema=None,
index_name=None):
# ARROW-1400
output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
'group2': list('eefeffgeee'),
'num': list(range(10)),
'nan': [pd.np.nan] * 10,
'date': np.arange('2017-01-01', '2017-01-11',
dtype='datetime64[D]')})
cols = output_df.columns.tolist()
partition_by = ['group1', 'group2']
output_table = pa.Table.from_pandas(output_df, schema=schema, safe=False,
preserve_index=False)
pq.write_to_dataset(output_table, base_path, partition_by,
filesystem=filesystem)
metadata_path = os.path.join(base_path, '_common_metadata')
if filesystem is not None:
with filesystem.open(metadata_path, 'wb') as f:
pq.write_metadata(output_table.schema, f)
else:
pq.write_metadata(output_table.schema, metadata_path)
# ARROW-2891: Ensure the output_schema is preserved when writing a
# partitioned dataset
dataset = pq.ParquetDataset(base_path,
filesystem=filesystem,
validate_schema=True)
# ARROW-2209: Ensure the dataset schema also includes the partition columns
dataset_cols = set(dataset.schema.to_arrow_schema().names)
assert dataset_cols == set(output_table.schema.names)
input_table = dataset.read()
input_df = input_table.to_pandas()
# Read data back in and compare with original DataFrame
# Partitioned columns added to the end of the DataFrame when read
input_df_cols = input_df.columns.tolist()
assert partition_by == input_df_cols[-1 * len(partition_by):]
# Partitioned columns become 'categorical' dtypes
input_df = input_df[cols]
for col in partition_by:
output_df[col] = output_df[col].astype('category')
assert output_df.equals(input_df)
def _test_write_to_dataset_no_partitions(base_path, filesystem=None):
# ARROW-1400
output_df = pd.DataFrame({'group1': list('aaabbbbccc'),
'group2': list('eefeffgeee'),
'num': list(range(10)),
'date': np.arange('2017-01-01', '2017-01-11',
dtype='datetime64[D]')})
cols = output_df.columns.tolist()
output_table = pa.Table.from_pandas(output_df)
if filesystem is None:
filesystem = LocalFileSystem.get_instance()
# Without partitions, append files to root_path
n = 5
for i in range(n):
pq.write_to_dataset(output_table, base_path,
filesystem=filesystem)
output_files = [file for file in filesystem.ls(base_path)
if file.endswith(".parquet")]
assert len(output_files) == n
# Deduplicated incoming DataFrame should match
# original outgoing Dataframe
input_table = pq.ParquetDataset(base_path,
filesystem=filesystem).read()
input_df = input_table.to_pandas()
input_df = input_df.drop_duplicates()
input_df = input_df[cols]
assert output_df.equals(input_df)
@pytest.mark.pandas
def test_write_to_dataset_with_partitions(tempdir):
_test_write_to_dataset_with_partitions(str(tempdir))
@pytest.mark.pandas
def test_write_to_dataset_with_partitions_and_schema(tempdir):
schema = pa.schema([pa.field('group1', type=pa.string()),
pa.field('group2', type=pa.string()),
pa.field('num', type=pa.int64()),
pa.field('nan', type=pa.int32()),
pa.field('date', type=pa.timestamp(unit='us'))])
_test_write_to_dataset_with_partitions(str(tempdir), schema=schema)
@pytest.mark.pandas
def test_write_to_dataset_with_partitions_and_index_name(tempdir):
_test_write_to_dataset_with_partitions(str(tempdir),
index_name='index_name')
@pytest.mark.pandas
def test_write_to_dataset_no_partitions(tempdir):
_test_write_to_dataset_no_partitions(str(tempdir))
@pytest.mark.large_memory
def test_large_table_int32_overflow():
size = np.iinfo('int32').max + 1
arr = np.ones(size, dtype='uint8')
parr = pa.array(arr, type=pa.uint8())
table = pa.Table.from_arrays([parr], names=['one'])
f = io.BytesIO()
_write_table(table, f)
def _simple_table_roundtrip(table):
stream = pa.BufferOutputStream()
_write_table(table, stream)
buf = stream.getvalue()
return _read_table(buf)
@pytest.mark.pandas
@pytest.mark.large_memory
def test_binary_array_overflow_to_chunked():
# ARROW-3762
# 2^31 + 1 bytes
values = [b'x'] + [
b'x' * (1 << 20)
] * 2 * (1 << 10)
df = pd.DataFrame({'byte_col': values})
tbl = pa.Table.from_pandas(df, preserve_index=False)
read_tbl = _simple_table_roundtrip(tbl)
col0_data = read_tbl[0].data
assert isinstance(col0_data, pa.ChunkedArray)
# Split up into 2GB chunks
assert col0_data.num_chunks == 2
assert tbl.equals(read_tbl)
@pytest.mark.pandas
@pytest.mark.large_memory
def test_list_of_binary_large_cell():
# ARROW-4688
data = []
# TODO(wesm): handle chunked children
# 2^31 - 1 bytes in a single cell
# data.append([b'x' * (1 << 20)] * 2047 + [b'x' * ((1 << 20) - 1)])
# A little under 2GB in cell each containing approximately 10MB each
data.extend([[b'x' * 1000000] * 10] * 214)
arr = pa.array(data)
table = pa.Table.from_arrays([arr], ['chunky_cells'])
read_table = _simple_table_roundtrip(table)
assert table.equals(read_table)
@pytest.mark.pandas
def test_index_column_name_duplicate(tempdir):
data = {
'close': {
pd.Timestamp('2017-06-30 01:31:00'): 154.99958999999998,
pd.Timestamp('2017-06-30 01:32:00'): 154.99958999999998,
},
'time': {
pd.Timestamp('2017-06-30 01:31:00'): pd.Timestamp(
'2017-06-30 01:31:00'
),
pd.Timestamp('2017-06-30 01:32:00'): pd.Timestamp(
'2017-06-30 01:32:00'
),
}
}
path = str(tempdir / 'data.parquet')
dfx = pd.DataFrame(data).set_index('time', drop=False)
tdfx = pa.Table.from_pandas(dfx)
_write_table(tdfx, path)
arrow_table = _read_table(path)
result_df = arrow_table.to_pandas()
tm.assert_frame_equal(result_df, dfx)
@pytest.mark.pandas
def test_parquet_nested_convenience(tempdir):
# ARROW-1684
df = pd.DataFrame({
'a': [[1, 2, 3], None, [4, 5], []],
'b': [[1.], None, None, [6., 7.]],
})
path = str(tempdir / 'nested_convenience.parquet')
table = pa.Table.from_pandas(df, preserve_index=False)
_write_table(table, path)
read = pq.read_table(path, columns=['a'])
tm.assert_frame_equal(read.to_pandas(), df[['a']])
read = pq.read_table(path, columns=['a', 'b'])
tm.assert_frame_equal(read.to_pandas(), df)
@pytest.mark.pandas
def test_backwards_compatible_index_naming(datadir):
expected_string = b"""\
carat cut color clarity depth table price x y z
0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43
0.21 Premium E SI1 59.8 61.0 326 3.89 3.84 2.31
0.23 Good E VS1 56.9 65.0 327 4.05 4.07 2.31
0.29 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63
0.31 Good J SI2 63.3 58.0 335 4.34 4.35 2.75
0.24 Very Good J VVS2 62.8 57.0 336 3.94 3.96 2.48
0.24 Very Good I VVS1 62.3 57.0 336 3.95 3.98 2.47
0.26 Very Good H SI1 61.9 55.0 337 4.07 4.11 2.53
0.22 Fair E VS2 65.1 61.0 337 3.87 3.78 2.49
0.23 Very Good H VS1 59.4 61.0 338 4.00 4.05 2.39"""
expected = pd.read_csv(io.BytesIO(expected_string), sep=r'\s{2,}',
index_col=None, header=0, engine='python')
table = _read_table(datadir / 'v0.7.1.parquet')
result = table.to_pandas()
tm.assert_frame_equal(result, expected)
@pytest.mark.pandas
def test_backwards_compatible_index_multi_level_named(datadir):
expected_string = b"""\
carat cut color clarity depth table price x y z
0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43
0.21 Premium E SI1 59.8 61.0 326 3.89 3.84 2.31
0.23 Good E VS1 56.9 65.0 327 4.05 4.07 2.31
0.29 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63
0.31 Good J SI2 63.3 58.0 335 4.34 4.35 2.75
0.24 Very Good J VVS2 62.8 57.0 336 3.94 3.96 2.48
0.24 Very Good I VVS1 62.3 57.0 336 3.95 3.98 2.47
0.26 Very Good H SI1 61.9 55.0 337 4.07 4.11 2.53
0.22 Fair E VS2 65.1 61.0 337 3.87 3.78 2.49
0.23 Very Good H VS1 59.4 61.0 338 4.00 4.05 2.39"""
expected = pd.read_csv(
io.BytesIO(expected_string), sep=r'\s{2,}',
index_col=['cut', 'color', 'clarity'],
header=0, engine='python'
).sort_index()
table = _read_table(datadir / 'v0.7.1.all-named-index.parquet')
result = table.to_pandas()
tm.assert_frame_equal(result, expected)
@pytest.mark.pandas
def test_backwards_compatible_index_multi_level_some_named(datadir):
expected_string = b"""\
carat cut color clarity depth table price x y z
0.23 Ideal E SI2 61.5 55.0 326 3.95 3.98 2.43
0.21 Premium E SI1 59.8 61.0 326 3.89 3.84 2.31
0.23 Good E VS1 56.9 65.0 327 4.05 4.07 2.31
0.29 Premium I VS2 62.4 58.0 334 4.20 4.23 2.63
0.31 Good J SI2 63.3 58.0 335 4.34 4.35 2.75
0.24 Very Good J VVS2 62.8 57.0 336 3.94 3.96 2.48
0.24 Very Good I VVS1 62.3 57.0 336 3.95 3.98 2.47
0.26 Very Good H SI1 61.9 55.0 337 4.07 4.11 2.53
0.22 Fair E VS2 65.1 61.0 337 3.87 3.78 2.49
0.23 Very Good H VS1 59.4 61.0 338 4.00 4.05 2.39"""
expected = pd.read_csv(
io.BytesIO(expected_string),
sep=r'\s{2,}', index_col=['cut', 'color', 'clarity'],
header=0, engine='python'
).sort_index()
expected.index = expected.index.set_names(['cut', None, 'clarity'])
table = _read_table(datadir / 'v0.7.1.some-named-index.parquet')
result = table.to_pandas()
tm.assert_frame_equal(result, expected)
@pytest.mark.pandas
def test_backwards_compatible_column_metadata_handling(datadir):
expected = pd.DataFrame(
{'a': [1, 2, 3], 'b': [.1, .2, .3],
'c': pd.date_range("2017-01-01", periods=3, tz='Europe/Brussels')})
expected.index = pd.MultiIndex.from_arrays(
[['a', 'b', 'c'],
pd.date_range("2017-01-01", periods=3, tz='Europe/Brussels')],
names=['index', None])
path = datadir / 'v0.7.1.column-metadata-handling.parquet'
table = _read_table(path)
result = table.to_pandas()
tm.assert_frame_equal(result, expected)
table = _read_table(path, columns=['a'])
result = table.to_pandas()
tm.assert_frame_equal(result, expected[['a']].reset_index(drop=True))
def _make_dataset_for_pickling(tempdir, N=100):
path = tempdir / 'data.parquet'
fs = LocalFileSystem.get_instance()
df = pd.DataFrame({
'index': np.arange(N),
'values': np.random.randn(N)
}, columns=['index', 'values'])
table = pa.Table.from_pandas(df)
num_groups = 3
with pq.ParquetWriter(path, table.schema) as writer:
for i in range(num_groups):
writer.write_table(table)
reader = pq.ParquetFile(path)
assert reader.metadata.num_row_groups == num_groups
metadata_path = tempdir / '_metadata'
with fs.open(metadata_path, 'wb') as f:
pq.write_metadata(table.schema, f)
dataset = pq.ParquetDataset(tempdir, filesystem=fs)
assert dataset.metadata_path == str(metadata_path)
return dataset
@pytest.mark.pandas
@pytest.mark.parametrize('pickler', [
pytest.param(pickle, id='builtin'),
pytest.param(pytest.importorskip('cloudpickle'), id='cloudpickle')
])
def test_pickle_dataset(tempdir, datadir, pickler):
def is_pickleable(obj):
return obj == pickler.loads(pickler.dumps(obj))
dataset = _make_dataset_for_pickling(tempdir)
assert is_pickleable(dataset)
assert is_pickleable(dataset.metadata)
assert is_pickleable(dataset.metadata.schema)
assert len(dataset.metadata.schema)
for column in dataset.metadata.schema:
assert is_pickleable(column)
for piece in dataset.pieces:
assert is_pickleable(piece)
metadata = piece.get_metadata()
assert metadata.num_row_groups
for i in range(metadata.num_row_groups):
assert is_pickleable(metadata.row_group(i))
@pytest.mark.pandas
def test_decimal_roundtrip(tempdir):
num_values = 10
columns = {}
for precision in range(1, 39):
for scale in range(0, precision + 1):
with util.random_seed(0):
random_decimal_values = [
util.randdecimal(precision, scale)
for _ in range(num_values)
]
column_name = ('dec_precision_{:d}_scale_{:d}'
.format(precision, scale))
columns[column_name] = random_decimal_values
expected = pd.DataFrame(columns)
filename = tempdir / 'decimals.parquet'
string_filename = str(filename)
table = pa.Table.from_pandas(expected)
_write_table(table, string_filename)
result_table = _read_table(string_filename)
result = result_table.to_pandas()
tm.assert_frame_equal(result, expected)
@pytest.mark.pandas
@pytest.mark.xfail(
raises=pa.ArrowException, reason='Parquet does not support negative scale'
)
def test_decimal_roundtrip_negative_scale(tempdir):
expected = pd.DataFrame({'decimal_num': [decimal.Decimal('1.23E4')]})
filename = tempdir / 'decimals.parquet'
string_filename = str(filename)
t = pa.Table.from_pandas(expected)
_write_table(t, string_filename)
result_table = _read_table(string_filename)
result = result_table.to_pandas()
tm.assert_frame_equal(result, expected)
@pytest.mark.pandas
def test_parquet_writer_context_obj(tempdir):
df = _test_dataframe(100)
df['unique_id'] = 0
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
out = pa.BufferOutputStream()
with pq.ParquetWriter(out, arrow_table.schema, version='2.0') as writer:
frames = []
for i in range(10):
df['unique_id'] = i
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
writer.write_table(arrow_table)
frames.append(df.copy())
buf = out.getvalue()
result = _read_table(pa.BufferReader(buf))
expected = pd.concat(frames, ignore_index=True)
tm.assert_frame_equal(result.to_pandas(), expected)
@pytest.mark.pandas
def test_parquet_writer_context_obj_with_exception(tempdir):
df = _test_dataframe(100)
df['unique_id'] = 0
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
out = pa.BufferOutputStream()
error_text = 'Artificial Error'
try:
with pq.ParquetWriter(out,
arrow_table.schema,
version='2.0') as writer:
frames = []
for i in range(10):
df['unique_id'] = i
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
writer.write_table(arrow_table)
frames.append(df.copy())
if i == 5:
raise ValueError(error_text)
except Exception as e:
assert str(e) == error_text
buf = out.getvalue()
result = _read_table(pa.BufferReader(buf))
expected = pd.concat(frames, ignore_index=True)
tm.assert_frame_equal(result.to_pandas(), expected)
@pytest.mark.pandas
def test_zlib_compression_bug():
# ARROW-3514: "zlib deflate failed, output buffer too small"
table = pa.Table.from_arrays([pa.array(['abc', 'def'])], ['some_col'])
f = io.BytesIO()
pq.write_table(table, f, compression='gzip')
f.seek(0)
roundtrip = pq.read_table(f)
tm.assert_frame_equal(roundtrip.to_pandas(), table.to_pandas())
@pytest.mark.pandas
def test_merging_parquet_tables_with_different_pandas_metadata(tempdir):
# ARROW-3728: Merging Parquet Files - Pandas Meta in Schema Mismatch
schema = pa.schema([
pa.field('int', pa.int16()),
pa.field('float', pa.float32()),
pa.field('string', pa.string())
])
df1 = pd.DataFrame({
'int': np.arange(3, dtype=np.uint8),
'float': np.arange(3, dtype=np.float32),
'string': ['ABBA', 'EDDA', 'ACDC']
})
df2 = pd.DataFrame({
'int': [4, 5],
'float': [1.1, None],
'string': [None, None]
})
table1 = pa.Table.from_pandas(df1, schema=schema, preserve_index=False)
table2 = pa.Table.from_pandas(df2, schema=schema, preserve_index=False)
assert not table1.schema.equals(table2.schema)
assert table1.schema.equals(table2.schema, check_metadata=False)
writer = pq.ParquetWriter(tempdir / 'merged.parquet', schema=schema)
writer.write_table(table1)
writer.write_table(table2)
def test_empty_row_groups(tempdir):
# ARROW-3020
table = pa.Table.from_arrays([pa.array([], type='int32')], ['f0'])
path = tempdir / 'empty_row_groups.parquet'
num_groups = 3
with pq.ParquetWriter(path, table.schema) as writer:
for i in range(num_groups):
writer.write_table(table)
reader = pq.ParquetFile(path)
assert reader.metadata.num_row_groups == num_groups
for i in range(num_groups):
assert reader.read_row_group(i).equals(table)
@pytest.mark.pandas
def test_parquet_writer_with_caller_provided_filesystem():
out = pa.BufferOutputStream()
class CustomFS(FileSystem):
def __init__(self):
self.path = None
self.mode = None
def open(self, path, mode='rb'):
self.path = path
self.mode = mode
return out
fs = CustomFS()
fname = 'expected_fname.parquet'
df = _test_dataframe(100)
table = pa.Table.from_pandas(df, preserve_index=False)
with pq.ParquetWriter(fname, table.schema, filesystem=fs, version='2.0') \
as writer:
writer.write_table(table)
assert fs.path == fname
assert fs.mode == 'wb'
assert out.closed
buf = out.getvalue()
table_read = _read_table(pa.BufferReader(buf))
df_read = table_read.to_pandas()
tm.assert_frame_equal(df_read, df)
# Should raise ValueError when filesystem is passed with file-like object
with pytest.raises(ValueError) as err_info:
pq.ParquetWriter(pa.BufferOutputStream(), table.schema, filesystem=fs)
expected_msg = ("filesystem passed but where is file-like, so"
" there is nothing to open with filesystem.")
assert str(err_info) == expected_msg
def test_writing_empty_lists():
# ARROW-2591: [Python] Segmentation fault issue in pq.write_table
arr1 = pa.array([[], []], pa.list_(pa.int32()))
table = pa.Table.from_arrays([arr1], ['list(int32)'])
_check_roundtrip(table)
def test_write_nested_zero_length_array_chunk_failure():
# Bug report in ARROW-3792
cols = OrderedDict(
int32=pa.int32(),
list_string=pa.list_(pa.string())
)
data = [[], [OrderedDict(int32=1, list_string=('G',)), ]]
# This produces a table with a column like
# <Column name='list_string' type=ListType(list<item: string>)>
# [
# [],
# [
# [
# "G"
# ]
# ]
# ]
#
# Each column is a ChunkedArray with 2 elements
my_arrays = [pa.array(batch, type=pa.struct(cols)).flatten()
for batch in data]
my_batches = [pa.RecordBatch.from_arrays(batch, pa.schema(cols))
for batch in my_arrays]
tbl = pa.Table.from_batches(my_batches, pa.schema(cols))
_check_roundtrip(tbl)
@pytest.mark.pandas
def test_partitioned_dataset(tempdir):
# ARROW-3208: Segmentation fault when reading a Parquet partitioned dataset
# to a Parquet file
path = tempdir / "ARROW-3208"
df = pd.DataFrame({
'one': [-1, 10, 2.5, 100, 1000, 1, 29.2],
'two': [-1, 10, 2, 100, 1000, 1, 11],
'three': [0, 0, 0, 0, 0, 0, 0]
})
table = pa.Table.from_pandas(df)
pq.write_to_dataset(table, root_path=str(path),
partition_cols=['one', 'two'])
table = pq.ParquetDataset(path).read()
pq.write_table(table, path / "output.parquet")
def test_read_column_invalid_index():
table = pa.Table.from_arrays([pa.array([4, 5]), pa.array(["foo", "bar"])],
['ints', 'strs'])
bio = pa.BufferOutputStream()
pq.write_table(table, bio)
f = pq.ParquetFile(bio.getvalue())
assert f.reader.read_column(0).to_pylist() == [4, 5]
assert f.reader.read_column(1).to_pylist() == ["foo", "bar"]
for index in (-1, 2):
with pytest.raises((ValueError, IndexError)):
f.reader.read_column(index)
@pytest.mark.pandas
def test_dataset_metadata(tempdir):
path = tempdir / "ARROW-1983-dataset"
# create and write a test dataset
df = pd.DataFrame({
'one': [1, 2, 3],
'two': [-1, -2, -3],
'three': [[1, 2], [2, 3], [3, 4]],
})
table = pa.Table.from_pandas(df)
metadata_list = []
pq.write_to_dataset(table, root_path=str(path),
partition_cols=['one', 'two'],
metadata_collector=metadata_list)
# open the dataset and collect metadata from pieces:
dataset = pq.ParquetDataset(path)
metadata_list2 = [p.get_metadata() for p in dataset.pieces]
# compare metadata list content:
assert len(metadata_list) == len(metadata_list2)
for md, md2 in zip(metadata_list, metadata_list2):
d = md.to_dict()
d2 = md2.to_dict()
# serialized_size is initialized in the reader:
assert d.pop('serialized_size') == 0
assert d2.pop('serialized_size') > 0
assert d == d2
def test_parquet_file_too_small(tempdir):
path = str(tempdir / "test.parquet")
with pytest.raises(pa.ArrowIOError,
match='size is 0 bytes'):
with open(path, 'wb') as f:
pass
pq.read_table(path)
with pytest.raises(pa.ArrowIOError,
match='size is 4 bytes'):
with open(path, 'wb') as f:
f.write(b'ffff')
pq.read_table(path)
@pytest.mark.pandas
def test_multi_dataset_metadata(tempdir):
filenames = ["ARROW-1983-dataset.0", "ARROW-1983-dataset.1"]
metapath = str(tempdir / "_metadata")
# create a test dataset
df = pd.DataFrame({
'one': [1, 2, 3],
'two': [-1, -2, -3],
'three': [[1, 2], [2, 3], [3, 4]],
})
table = pa.Table.from_pandas(df)
# write dataset twice and collect/merge metadata
_meta = None
for filename in filenames:
meta = []
pq.write_table(table, str(tempdir / filename),
metadata_collector=meta)
meta[0].set_file_path(filename)
if _meta is None:
_meta = meta[0]
else:
_meta.append_row_groups(meta[0])
# Write merged metadata-only file
with open(metapath, "wb") as f:
_meta.write_metadata_file(f)
# Read back the metadata
meta = pq.read_metadata(metapath)
md = meta.to_dict()
_md = _meta.to_dict()
for key in _md:
if key != 'serialized_size':
assert _md[key] == md[key]
assert _md['num_columns'] == 3
assert _md['num_rows'] == 6
assert _md['num_row_groups'] == 2
assert _md['serialized_size'] == 0
assert md['serialized_size'] > 0
@pytest.mark.pandas
def test_filter_before_validate_schema(tempdir):
# ARROW-4076 apply filter before schema validation
# to avoid checking unneeded schemas
# create partitioned dataset with mismatching schemas which would
# otherwise raise if first validation all schemas
dir1 = tempdir / 'A=0'
dir1.mkdir()
table1 = pa.Table.from_pandas(pd.DataFrame({'B': [1, 2, 3]}))
pq.write_table(table1, dir1 / 'data.parquet')
dir2 = tempdir / 'A=1'
dir2.mkdir()
table2 = pa.Table.from_pandas(pd.DataFrame({'B': ['a', 'b', 'c']}))
pq.write_table(table2, dir2 / 'data.parquet')
# read single file using filter
table = pq.read_table(tempdir, filters=[[('A', '==', 0)]])
assert table.column('B').equals(pa.column('B', pa.array([1, 2, 3])))