| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import io |
| |
| import numpy as np |
| import pytest |
| |
| import pyarrow as pa |
| from pyarrow.tests import util |
| |
| parametrize_legacy_dataset = pytest.mark.parametrize( |
| "use_legacy_dataset", |
| [True, pytest.param(False, marks=pytest.mark.dataset)]) |
| parametrize_legacy_dataset_not_supported = pytest.mark.parametrize( |
| "use_legacy_dataset", [True, pytest.param(False, marks=pytest.mark.skip)]) |
| parametrize_legacy_dataset_fixed = pytest.mark.parametrize( |
| "use_legacy_dataset", [pytest.param(True, marks=pytest.mark.xfail), |
| pytest.param(False, marks=pytest.mark.dataset)]) |
| |
| # Marks all of the tests in this module |
| # Ignore these with pytest ... -m 'not parquet' |
| pytestmark = pytest.mark.parquet |
| |
| |
| def _write_table(table, path, **kwargs): |
| # So we see the ImportError somewhere |
| import pyarrow.parquet as pq |
| from pyarrow.pandas_compat import _pandas_api |
| |
| if _pandas_api.is_data_frame(table): |
| table = pa.Table.from_pandas(table) |
| |
| pq.write_table(table, path, **kwargs) |
| return table |
| |
| |
| def _read_table(*args, **kwargs): |
| import pyarrow.parquet as pq |
| |
| table = pq.read_table(*args, **kwargs) |
| table.validate(full=True) |
| return table |
| |
| |
| def _roundtrip_table(table, read_table_kwargs=None, |
| write_table_kwargs=None, use_legacy_dataset=True): |
| read_table_kwargs = read_table_kwargs or {} |
| write_table_kwargs = write_table_kwargs or {} |
| |
| writer = pa.BufferOutputStream() |
| _write_table(table, writer, **write_table_kwargs) |
| reader = pa.BufferReader(writer.getvalue()) |
| return _read_table(reader, use_legacy_dataset=use_legacy_dataset, |
| **read_table_kwargs) |
| |
| |
| def _check_roundtrip(table, expected=None, read_table_kwargs=None, |
| use_legacy_dataset=True, **write_table_kwargs): |
| if expected is None: |
| expected = table |
| |
| read_table_kwargs = read_table_kwargs or {} |
| |
| # intentionally check twice |
| result = _roundtrip_table(table, read_table_kwargs=read_table_kwargs, |
| write_table_kwargs=write_table_kwargs, |
| use_legacy_dataset=use_legacy_dataset) |
| assert result.equals(expected) |
| result = _roundtrip_table(result, read_table_kwargs=read_table_kwargs, |
| write_table_kwargs=write_table_kwargs, |
| use_legacy_dataset=use_legacy_dataset) |
| assert result.equals(expected) |
| |
| |
| def _roundtrip_pandas_dataframe(df, write_kwargs, use_legacy_dataset=True): |
| table = pa.Table.from_pandas(df) |
| result = _roundtrip_table( |
| table, write_table_kwargs=write_kwargs, |
| use_legacy_dataset=use_legacy_dataset) |
| return result.to_pandas() |
| |
| |
| def _random_integers(size, dtype): |
| # We do not generate integers outside the int64 range |
| platform_int_info = np.iinfo('int_') |
| iinfo = np.iinfo(dtype) |
| return np.random.randint(max(iinfo.min, platform_int_info.min), |
| min(iinfo.max, platform_int_info.max), |
| size=size).astype(dtype) |
| |
| |
| def _test_dataframe(size=10000, seed=0): |
| import pandas as pd |
| |
| np.random.seed(seed) |
| df = pd.DataFrame({ |
| 'uint8': _random_integers(size, np.uint8), |
| 'uint16': _random_integers(size, np.uint16), |
| 'uint32': _random_integers(size, np.uint32), |
| 'uint64': _random_integers(size, np.uint64), |
| 'int8': _random_integers(size, np.int8), |
| 'int16': _random_integers(size, np.int16), |
| 'int32': _random_integers(size, np.int32), |
| 'int64': _random_integers(size, np.int64), |
| 'float32': np.random.randn(size).astype(np.float32), |
| 'float64': np.arange(size, dtype=np.float64), |
| 'bool': np.random.randn(size) > 0, |
| 'strings': [util.rands(10) for i in range(size)], |
| 'all_none': [None] * size, |
| 'all_none_category': [None] * size |
| }) |
| |
| # TODO(PARQUET-1015) |
| # df['all_none_category'] = df['all_none_category'].astype('category') |
| return df |
| |
| |
| def make_sample_file(table_or_df): |
| import pyarrow.parquet as pq |
| |
| if isinstance(table_or_df, pa.Table): |
| a_table = table_or_df |
| else: |
| a_table = pa.Table.from_pandas(table_or_df) |
| |
| buf = io.BytesIO() |
| _write_table(a_table, buf, compression='SNAPPY', version='2.6', |
| coerce_timestamps='ms') |
| |
| buf.seek(0) |
| return pq.ParquetFile(buf) |
| |
| |
| def alltypes_sample(size=10000, seed=0, categorical=False): |
| import pandas as pd |
| |
| np.random.seed(seed) |
| arrays = { |
| 'uint8': np.arange(size, dtype=np.uint8), |
| 'uint16': np.arange(size, dtype=np.uint16), |
| 'uint32': np.arange(size, dtype=np.uint32), |
| 'uint64': np.arange(size, dtype=np.uint64), |
| 'int8': np.arange(size, dtype=np.int16), |
| 'int16': np.arange(size, dtype=np.int16), |
| 'int32': np.arange(size, dtype=np.int32), |
| 'int64': np.arange(size, dtype=np.int64), |
| 'float32': np.arange(size, dtype=np.float32), |
| 'float64': np.arange(size, dtype=np.float64), |
| 'bool': np.random.randn(size) > 0, |
| # TODO(wesm): Test other timestamp resolutions now that arrow supports |
| # them |
| 'datetime': np.arange("2016-01-01T00:00:00.001", size, |
| dtype='datetime64[ms]'), |
| 'str': pd.Series([str(x) for x in range(size)]), |
| 'empty_str': [''] * size, |
| 'str_with_nulls': [None] + [str(x) for x in range(size - 2)] + [None], |
| 'null': [None] * size, |
| 'null_list': [None] * 2 + [[None] * (x % 4) for x in range(size - 2)], |
| } |
| if categorical: |
| arrays['str_category'] = arrays['str'].astype('category') |
| return pd.DataFrame(arrays) |