| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import datetime |
| import decimal |
| from collections import OrderedDict |
| import io |
| |
| try: |
| import numpy as np |
| except ImportError: |
| np = None |
| import pytest |
| |
| import pyarrow as pa |
| from pyarrow.tests.parquet.common import _check_roundtrip, make_sample_file |
| from pyarrow.fs import LocalFileSystem |
| from pyarrow.tests import util |
| |
| try: |
| import pyarrow.parquet as pq |
| from pyarrow.tests.parquet.common import _write_table |
| except ImportError: |
| pq = None |
| |
| |
| try: |
| import pandas as pd |
| import pandas.testing as tm |
| |
| from pyarrow.tests.parquet.common import alltypes_sample |
| except ImportError: |
| pd = tm = None |
| |
| |
| # Marks all of the tests in this module |
| # Ignore these with pytest ... -m 'not parquet' |
| pytestmark = pytest.mark.parquet |
| |
| |
| @pytest.mark.pandas |
| def test_parquet_metadata_api(): |
| df = alltypes_sample(size=10000) |
| df = df.reindex(columns=sorted(df.columns)) |
| df.index = np.random.randint(0, 1000000, size=len(df)) |
| |
| fileh = make_sample_file(df) |
| ncols = len(df.columns) |
| |
| # Series of sniff tests |
| meta = fileh.metadata |
| repr(meta) |
| assert meta.num_rows == len(df) |
| assert meta.num_columns == ncols + 1 # +1 for index |
| assert meta.num_row_groups == 1 |
| assert meta.format_version == '2.6' |
| assert 'parquet-cpp' in meta.created_by |
| assert isinstance(meta.serialized_size, int) |
| assert isinstance(meta.metadata, dict) |
| |
| # Schema |
| schema = fileh.schema |
| assert meta.schema is schema |
| assert len(schema) == ncols + 1 # +1 for index |
| repr(schema) |
| |
| col = schema[0] |
| repr(col) |
| assert col.name == df.columns[0] |
| assert col.max_definition_level == 1 |
| assert col.max_repetition_level == 0 |
| assert col.max_repetition_level == 0 |
| assert col.physical_type == 'BOOLEAN' |
| assert col.converted_type == 'NONE' |
| |
| col_float16 = schema[5] |
| assert col_float16.logical_type.type == 'FLOAT16' |
| |
| with pytest.raises(IndexError): |
| schema[ncols + 1] # +1 for index |
| |
| with pytest.raises(IndexError): |
| schema[-1] |
| |
| # Row group |
| for rg in range(meta.num_row_groups): |
| rg_meta = meta.row_group(rg) |
| assert isinstance(rg_meta, pq.RowGroupMetaData) |
| repr(rg_meta) |
| |
| for col in range(rg_meta.num_columns): |
| col_meta = rg_meta.column(col) |
| assert isinstance(col_meta, pq.ColumnChunkMetaData) |
| repr(col_meta) |
| |
| with pytest.raises(IndexError): |
| meta.row_group(-1) |
| |
| with pytest.raises(IndexError): |
| meta.row_group(meta.num_row_groups + 1) |
| |
| rg_meta = meta.row_group(0) |
| assert rg_meta.num_rows == len(df) |
| assert rg_meta.num_columns == ncols + 1 # +1 for index |
| assert rg_meta.total_byte_size > 0 |
| |
| with pytest.raises(IndexError): |
| col_meta = rg_meta.column(-1) |
| |
| with pytest.raises(IndexError): |
| col_meta = rg_meta.column(ncols + 2) |
| |
| col_meta = rg_meta.column(0) |
| assert col_meta.file_offset == 0 |
| assert col_meta.file_path == '' # created from BytesIO |
| assert col_meta.physical_type == 'BOOLEAN' |
| assert col_meta.num_values == 10000 |
| assert col_meta.path_in_schema == 'bool' |
| assert col_meta.is_stats_set is True |
| assert isinstance(col_meta.statistics, pq.Statistics) |
| assert col_meta.compression == 'SNAPPY' |
| assert set(col_meta.encodings) == {'PLAIN', 'RLE'} |
| assert col_meta.has_dictionary_page is False |
| assert col_meta.dictionary_page_offset is None |
| assert col_meta.data_page_offset > 0 |
| assert col_meta.total_compressed_size > 0 |
| assert col_meta.total_uncompressed_size > 0 |
| with pytest.raises(NotImplementedError): |
| col_meta.has_index_page |
| with pytest.raises(NotImplementedError): |
| col_meta.index_page_offset |
| |
| |
| def test_parquet_metadata_lifetime(tempdir): |
| # ARROW-6642 - ensure that chained access keeps parent objects alive |
| table = pa.table({'a': [1, 2, 3]}) |
| pq.write_table(table, tempdir / 'test_metadata_segfault.parquet') |
| parquet_file = pq.ParquetFile(tempdir / 'test_metadata_segfault.parquet') |
| parquet_file.metadata.row_group(0).column(0).statistics |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parametrize( |
| ( |
| 'data', |
| 'type', |
| 'physical_type', |
| 'min_value', |
| 'max_value', |
| 'null_count', |
| 'num_values', |
| 'distinct_count' |
| ), |
| [ |
| ([1, 2, 2, None, 4], pa.uint8(), 'INT32', 1, 4, 1, 4, None), |
| ([1, 2, 2, None, 4], pa.uint16(), 'INT32', 1, 4, 1, 4, None), |
| ([1, 2, 2, None, 4], pa.uint32(), 'INT32', 1, 4, 1, 4, None), |
| ([1, 2, 2, None, 4], pa.uint64(), 'INT64', 1, 4, 1, 4, None), |
| ([-1, 2, 2, None, 4], pa.int8(), 'INT32', -1, 4, 1, 4, None), |
| ([-1, 2, 2, None, 4], pa.int16(), 'INT32', -1, 4, 1, 4, None), |
| ([-1, 2, 2, None, 4], pa.int32(), 'INT32', -1, 4, 1, 4, None), |
| ([-1, 2, 2, None, 4], pa.int64(), 'INT64', -1, 4, 1, 4, None), |
| ( |
| [-1.1, 2.2, 2.3, None, 4.4], pa.float32(), |
| 'FLOAT', -1.1, 4.4, 1, 4, None |
| ), |
| ( |
| [-1.1, 2.2, 2.3, None, 4.4], pa.float64(), |
| 'DOUBLE', -1.1, 4.4, 1, 4, None |
| ), |
| ( |
| ['', 'b', chr(1000), None, 'aaa'], pa.binary(), |
| 'BYTE_ARRAY', b'', chr(1000).encode('utf-8'), 1, 4, None |
| ), |
| ( |
| [True, False, False, True, True], pa.bool_(), |
| 'BOOLEAN', False, True, 0, 5, None |
| ), |
| ( |
| [b'\x00', b'b', b'12', None, b'aaa'], pa.binary(), |
| 'BYTE_ARRAY', b'\x00', b'b', 1, 4, None |
| ), |
| ] |
| ) |
| def test_parquet_column_statistics_api(data, type, physical_type, min_value, |
| max_value, null_count, num_values, |
| distinct_count): |
| df = pd.DataFrame({'data': data}) |
| schema = pa.schema([pa.field('data', type)]) |
| table = pa.Table.from_pandas(df, schema=schema, safe=False) |
| fileh = make_sample_file(table) |
| |
| meta = fileh.metadata |
| |
| rg_meta = meta.row_group(0) |
| col_meta = rg_meta.column(0) |
| |
| stat = col_meta.statistics |
| assert stat.has_min_max |
| assert _close(type, stat.min, min_value) |
| assert _close(type, stat.max, max_value) |
| assert stat.null_count == null_count |
| assert stat.num_values == num_values |
| # TODO(kszucs) until parquet-cpp API doesn't expose HasDistinctCount |
| # method, missing distinct_count is represented as zero instead of None |
| assert stat.distinct_count == distinct_count |
| assert stat.physical_type == physical_type |
| |
| |
| def _close(type, left, right): |
| if type == pa.float32(): |
| return abs(left - right) < 1E-7 |
| elif type == pa.float64(): |
| return abs(left - right) < 1E-13 |
| else: |
| return left == right |
| |
| |
| # ARROW-6339 |
| @pytest.mark.pandas |
| def test_parquet_raise_on_unset_statistics(): |
| df = pd.DataFrame({"t": pd.Series([pd.NaT], dtype="datetime64[ns]")}) |
| meta = make_sample_file(pa.Table.from_pandas(df)).metadata |
| |
| assert not meta.row_group(0).column(0).statistics.has_min_max |
| assert meta.row_group(0).column(0).statistics.max is None |
| |
| |
| def test_statistics_convert_logical_types(tempdir): |
| # ARROW-5166, ARROW-4139 |
| |
| # (min, max, type) |
| cases = [(10, 11164359321221007157, pa.uint64()), |
| (10, 4294967295, pa.uint32()), |
| ("ähnlich", "öffentlich", pa.utf8()), |
| (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000), |
| pa.time32('ms')), |
| (datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000), |
| pa.time64('us')), |
| (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000), |
| datetime.datetime(2019, 6, 25, 0, 0, 0, 1000), |
| pa.timestamp('ms')), |
| (datetime.datetime(2019, 6, 24, 0, 0, 0, 1000), |
| datetime.datetime(2019, 6, 25, 0, 0, 0, 1000), |
| pa.timestamp('us')), |
| (datetime.date(2019, 6, 24), |
| datetime.date(2019, 6, 25), |
| pa.date32()), |
| (decimal.Decimal("20.123"), |
| decimal.Decimal("20.124"), |
| pa.decimal128(12, 5))] |
| |
| for i, (min_val, max_val, typ) in enumerate(cases): |
| t = pa.Table.from_arrays([pa.array([min_val, max_val], type=typ)], |
| ['col']) |
| path = str(tempdir / f'example{i}.parquet') |
| pq.write_table(t, path, version='2.6') |
| pf = pq.ParquetFile(path) |
| stats = pf.metadata.row_group(0).column(0).statistics |
| assert stats.min == min_val |
| assert stats.max == max_val |
| |
| |
| def test_parquet_write_disable_statistics(tempdir): |
| table = pa.Table.from_pydict( |
| OrderedDict([ |
| ('a', pa.array([1, 2, 3])), |
| ('b', pa.array(['a', 'b', 'c'])) |
| ]) |
| ) |
| _write_table(table, tempdir / 'data.parquet') |
| meta = pq.read_metadata(tempdir / 'data.parquet') |
| for col in [0, 1]: |
| cc = meta.row_group(0).column(col) |
| assert cc.is_stats_set is True |
| assert cc.statistics is not None |
| |
| _write_table(table, tempdir / 'data2.parquet', write_statistics=False) |
| meta = pq.read_metadata(tempdir / 'data2.parquet') |
| for col in [0, 1]: |
| cc = meta.row_group(0).column(col) |
| assert cc.is_stats_set is False |
| assert cc.statistics is None |
| |
| _write_table(table, tempdir / 'data3.parquet', write_statistics=['a']) |
| meta = pq.read_metadata(tempdir / 'data3.parquet') |
| cc_a = meta.row_group(0).column(0) |
| cc_b = meta.row_group(0).column(1) |
| assert cc_a.is_stats_set is True |
| assert cc_b.is_stats_set is False |
| assert cc_a.statistics is not None |
| assert cc_b.statistics is None |
| |
| |
| def test_parquet_sorting_column(): |
| sorting_col = pq.SortingColumn(10) |
| assert sorting_col.to_dict() == { |
| 'column_index': 10, |
| 'descending': False, |
| 'nulls_first': False |
| } |
| |
| sorting_col = pq.SortingColumn(0, descending=True, nulls_first=True) |
| assert sorting_col.to_dict() == { |
| 'column_index': 0, |
| 'descending': True, |
| 'nulls_first': True |
| } |
| |
| schema = pa.schema([('a', pa.int64()), ('b', pa.int64())]) |
| sorting_cols = ( |
| pq.SortingColumn(1, descending=True), |
| pq.SortingColumn(0, descending=False), |
| ) |
| sort_order, null_placement = pq.SortingColumn.to_ordering(schema, sorting_cols) |
| assert sort_order == (('b', "descending"), ('a', "ascending")) |
| assert null_placement == "at_end" |
| |
| sorting_cols_roundtripped = pq.SortingColumn.from_ordering( |
| schema, sort_order, null_placement) |
| assert sorting_cols_roundtripped == sorting_cols |
| |
| sorting_cols = pq.SortingColumn.from_ordering( |
| schema, ('a', ('b', "descending")), null_placement="at_start") |
| expected = ( |
| pq.SortingColumn(0, descending=False, nulls_first=True), |
| pq.SortingColumn(1, descending=True, nulls_first=True), |
| ) |
| assert sorting_cols == expected |
| |
| # Conversions handle empty tuples |
| empty_sorting_cols = pq.SortingColumn.from_ordering(schema, ()) |
| assert empty_sorting_cols == () |
| |
| assert pq.SortingColumn.to_ordering(schema, ()) == ((), "at_end") |
| |
| with pytest.raises(ValueError): |
| pq.SortingColumn.from_ordering(schema, (("a", "not a valid sort order"))) |
| |
| with pytest.raises(ValueError, match="inconsistent null placement"): |
| sorting_cols = ( |
| pq.SortingColumn(1, nulls_first=True), |
| pq.SortingColumn(0, nulls_first=False), |
| ) |
| pq.SortingColumn.to_ordering(schema, sorting_cols) |
| |
| |
| def test_parquet_sorting_column_nested(): |
| schema = pa.schema({ |
| 'a': pa.struct([('x', pa.int64()), ('y', pa.int64())]), |
| 'b': pa.int64() |
| }) |
| |
| sorting_columns = [ |
| pq.SortingColumn(0, descending=True), # a.x |
| pq.SortingColumn(2, descending=False) # b |
| ] |
| |
| sort_order, null_placement = pq.SortingColumn.to_ordering(schema, sorting_columns) |
| assert null_placement == "at_end" |
| assert len(sort_order) == 2 |
| assert sort_order[0] == ("a.x", "descending") |
| assert sort_order[1] == ("b", "ascending") |
| |
| |
| def test_parquet_file_sorting_columns(): |
| table = pa.table({'a': [1, 2, 3], 'b': ['a', 'b', 'c']}) |
| |
| sorting_columns = ( |
| pq.SortingColumn(column_index=0, descending=True, nulls_first=True), |
| pq.SortingColumn(column_index=1, descending=False), |
| ) |
| writer = pa.BufferOutputStream() |
| _write_table(table, writer, sorting_columns=sorting_columns) |
| reader = pa.BufferReader(writer.getvalue()) |
| |
| # Can retrieve sorting columns from metadata |
| metadata = pq.read_metadata(reader) |
| assert sorting_columns == metadata.row_group(0).sorting_columns |
| |
| metadata_dict = metadata.to_dict() |
| assert metadata_dict.get('num_columns') == 2 |
| assert metadata_dict.get('num_rows') == 3 |
| assert metadata_dict.get('num_row_groups') == 1 |
| |
| |
| def test_field_id_metadata(): |
| # ARROW-7080 |
| field_id = b'PARQUET:field_id' |
| inner = pa.field('inner', pa.int32(), metadata={field_id: b'100'}) |
| middle = pa.field('middle', pa.struct( |
| [inner]), metadata={field_id: b'101'}) |
| fields = [ |
| pa.field('basic', pa.int32(), metadata={ |
| b'other': b'abc', field_id: b'1'}), |
| pa.field( |
| 'list', |
| pa.list_(pa.field('list-inner', pa.int32(), |
| metadata={field_id: b'10'})), |
| metadata={field_id: b'11'}), |
| pa.field('struct', pa.struct([middle]), metadata={field_id: b'102'}), |
| pa.field('no-metadata', pa.int32()), |
| pa.field('non-integral-field-id', pa.int32(), |
| metadata={field_id: b'xyz'}), |
| pa.field('negative-field-id', pa.int32(), |
| metadata={field_id: b'-1000'}) |
| ] |
| arrs = [[] for _ in fields] |
| table = pa.table(arrs, schema=pa.schema(fields)) |
| |
| bio = pa.BufferOutputStream() |
| pq.write_table(table, bio) |
| contents = bio.getvalue() |
| |
| pf = pq.ParquetFile(pa.BufferReader(contents)) |
| schema = pf.schema_arrow |
| |
| assert schema[0].metadata[field_id] == b'1' |
| assert schema[0].metadata[b'other'] == b'abc' |
| |
| list_field = schema[1] |
| assert list_field.metadata[field_id] == b'11' |
| |
| list_item_field = list_field.type.value_field |
| assert list_item_field.metadata[field_id] == b'10' |
| |
| struct_field = schema[2] |
| assert struct_field.metadata[field_id] == b'102' |
| |
| struct_middle_field = struct_field.type[0] |
| assert struct_middle_field.metadata[field_id] == b'101' |
| |
| struct_inner_field = struct_middle_field.type[0] |
| assert struct_inner_field.metadata[field_id] == b'100' |
| |
| assert schema[3].metadata is None |
| # Invalid input is passed through (ok) but does not |
| # have field_id in parquet (not tested) |
| assert schema[4].metadata[field_id] == b'xyz' |
| assert schema[5].metadata[field_id] == b'-1000' |
| |
| |
| def test_parquet_file_page_index(): |
| for write_page_index in (False, True): |
| table = pa.table({'a': [1, 2, 3]}) |
| |
| writer = pa.BufferOutputStream() |
| _write_table(table, writer, write_page_index=write_page_index) |
| reader = pa.BufferReader(writer.getvalue()) |
| |
| # Can retrieve sorting columns from metadata |
| metadata = pq.read_metadata(reader) |
| cc = metadata.row_group(0).column(0) |
| assert cc.has_offset_index is write_page_index |
| assert cc.has_column_index is write_page_index |
| |
| |
| @pytest.mark.pandas |
| def test_multi_dataset_metadata(tempdir): |
| filenames = ["ARROW-1983-dataset.0", "ARROW-1983-dataset.1"] |
| metapath = str(tempdir / "_metadata") |
| |
| # create a test dataset |
| df = pd.DataFrame({ |
| 'one': [1, 2, 3], |
| 'two': [-1, -2, -3], |
| 'three': [[1, 2], [2, 3], [3, 4]], |
| }) |
| table = pa.Table.from_pandas(df) |
| |
| # write dataset twice and collect/merge metadata |
| _meta = None |
| for filename in filenames: |
| meta = [] |
| pq.write_table(table, str(tempdir / filename), |
| metadata_collector=meta) |
| meta[0].set_file_path(filename) |
| if _meta is None: |
| _meta = meta[0] |
| else: |
| _meta.append_row_groups(meta[0]) |
| |
| # Write merged metadata-only file |
| with open(metapath, "wb") as f: |
| _meta.write_metadata_file(f) |
| |
| # Read back the metadata |
| meta = pq.read_metadata(metapath) |
| md = meta.to_dict() |
| _md = _meta.to_dict() |
| for key in _md: |
| if key != 'serialized_size': |
| assert _md[key] == md[key] |
| assert _md['num_columns'] == 3 |
| assert _md['num_rows'] == 6 |
| assert _md['num_row_groups'] == 2 |
| assert _md['serialized_size'] == 0 |
| assert md['serialized_size'] > 0 |
| |
| |
| def test_metadata_hashing(tempdir): |
| path1 = str(tempdir / "metadata1") |
| schema1 = pa.schema([("a", "int64"), ("b", "float64")]) |
| pq.write_metadata(schema1, path1) |
| parquet_meta1 = pq.read_metadata(path1) |
| |
| # Same as 1, just different path |
| path2 = str(tempdir / "metadata2") |
| schema2 = pa.schema([("a", "int64"), ("b", "float64")]) |
| pq.write_metadata(schema2, path2) |
| parquet_meta2 = pq.read_metadata(path2) |
| |
| # different schema |
| path3 = str(tempdir / "metadata3") |
| schema3 = pa.schema([("a", "int64"), ("b", "float32")]) |
| pq.write_metadata(schema3, path3) |
| parquet_meta3 = pq.read_metadata(path3) |
| |
| # Deterministic |
| assert hash(parquet_meta1) == hash(parquet_meta1) # equal w/ same instance |
| assert hash(parquet_meta1) == hash(parquet_meta2) # equal w/ different instance |
| |
| # Not the same as other metadata with different schema |
| assert hash(parquet_meta1) != hash(parquet_meta3) |
| |
| |
| @pytest.mark.filterwarnings("ignore:Parquet format:FutureWarning") |
| def test_write_metadata(tempdir): |
| path = str(tempdir / "metadata") |
| schema = pa.schema([("a", "int64"), ("b", "float64")]) |
| |
| # write a pyarrow schema |
| pq.write_metadata(schema, path) |
| parquet_meta = pq.read_metadata(path) |
| schema_as_arrow = parquet_meta.schema.to_arrow_schema() |
| assert schema_as_arrow.equals(schema) |
| |
| # ARROW-8980: Check that the ARROW:schema metadata key was removed |
| if schema_as_arrow.metadata: |
| assert b'ARROW:schema' not in schema_as_arrow.metadata |
| |
| # pass through writer keyword arguments |
| for version in ["1.0", "2.4", "2.6"]: |
| pq.write_metadata(schema, path, version=version) |
| parquet_meta = pq.read_metadata(path) |
| # The version is stored as a single integer in the Parquet metadata, |
| # so it cannot correctly express dotted format versions |
| expected_version = "1.0" if version == "1.0" else "2.6" |
| assert parquet_meta.format_version == expected_version |
| |
| # metadata_collector: list of FileMetaData objects |
| table = pa.table({'a': [1, 2], 'b': [.1, .2]}, schema=schema) |
| pq.write_table(table, tempdir / "data.parquet") |
| parquet_meta = pq.read_metadata(str(tempdir / "data.parquet")) |
| pq.write_metadata( |
| schema, path, metadata_collector=[parquet_meta, parquet_meta] |
| ) |
| parquet_meta_mult = pq.read_metadata(path) |
| assert parquet_meta_mult.num_row_groups == 2 |
| |
| # append metadata with different schema raises an error |
| msg = ("AppendRowGroups requires equal schemas.\n" |
| "The two columns with index 0 differ.") |
| with pytest.raises(RuntimeError, match=msg): |
| pq.write_metadata( |
| pa.schema([("a", "int32"), ("b", "null")]), |
| path, metadata_collector=[parquet_meta, parquet_meta] |
| ) |
| |
| |
| def test_table_large_metadata(): |
| # ARROW-8694 |
| my_schema = pa.schema([pa.field('f0', 'double')], |
| metadata={'large': 'x' * 10000000}) |
| |
| table = pa.table([range(10)], schema=my_schema) |
| _check_roundtrip(table) |
| |
| |
| @pytest.mark.pandas |
| def test_compare_schemas(): |
| df = alltypes_sample(size=10000) |
| |
| fileh = make_sample_file(df) |
| fileh2 = make_sample_file(df) |
| fileh3 = make_sample_file(df[df.columns[::2]]) |
| |
| # ParquetSchema |
| assert isinstance(fileh.schema, pq.ParquetSchema) |
| assert fileh.schema.equals(fileh.schema) |
| assert fileh.schema == fileh.schema |
| assert fileh.schema.equals(fileh2.schema) |
| assert fileh.schema == fileh2.schema |
| assert fileh.schema != 'arbitrary object' |
| assert not fileh.schema.equals(fileh3.schema) |
| assert fileh.schema != fileh3.schema |
| |
| # ColumnSchema |
| assert isinstance(fileh.schema[0], pq.ColumnSchema) |
| assert fileh.schema[0].equals(fileh.schema[0]) |
| assert fileh.schema[0] == fileh.schema[0] |
| assert not fileh.schema[0].equals(fileh.schema[1]) |
| assert fileh.schema[0] != fileh.schema[1] |
| assert fileh.schema[0] != 'arbitrary object' |
| |
| |
| @pytest.mark.pandas |
| def test_read_schema(tempdir): |
| N = 100 |
| df = pd.DataFrame({ |
| 'index': np.arange(N), |
| 'values': np.random.randn(N) |
| }, columns=['index', 'values']) |
| |
| data_path = tempdir / 'test.parquet' |
| |
| table = pa.Table.from_pandas(df) |
| _write_table(table, data_path) |
| |
| read1 = pq.read_schema(data_path) |
| read2 = pq.read_schema(data_path, memory_map=True) |
| assert table.schema.equals(read1) |
| assert table.schema.equals(read2) |
| |
| assert table.schema.metadata[b'pandas'] == read1.metadata[b'pandas'] |
| |
| |
| def test_parquet_metadata_empty_to_dict(tempdir): |
| # https://issues.apache.org/jira/browse/ARROW-10146 |
| table = pa.table({"a": pa.array([], type="int64")}) |
| pq.write_table(table, tempdir / "data.parquet") |
| metadata = pq.read_metadata(tempdir / "data.parquet") |
| # ensure this doesn't error / statistics set to None |
| metadata_dict = metadata.to_dict() |
| assert len(metadata_dict["row_groups"]) == 1 |
| assert len(metadata_dict["row_groups"][0]["columns"]) == 1 |
| assert metadata_dict["row_groups"][0]["columns"][0]["statistics"] is None |
| |
| |
| @pytest.mark.slow |
| @pytest.mark.large_memory |
| def test_metadata_exceeds_message_size(): |
| # ARROW-13655: Thrift may enable a default message size that limits |
| # the size of Parquet metadata that can be written. |
| NCOLS = 1000 |
| NREPEATS = 4000 |
| |
| table = pa.table({str(i): np.random.randn(10) for i in range(NCOLS)}) |
| |
| with pa.BufferOutputStream() as out: |
| pq.write_table(table, out) |
| buf = out.getvalue() |
| |
| original_metadata = pq.read_metadata(pa.BufferReader(buf)) |
| metadata = pq.read_metadata(pa.BufferReader(buf)) |
| for i in range(NREPEATS): |
| metadata.append_row_groups(original_metadata) |
| |
| with pa.BufferOutputStream() as out: |
| metadata.write_metadata_file(out) |
| buf = out.getvalue() |
| |
| metadata = pq.read_metadata(pa.BufferReader(buf)) |
| |
| |
| def test_metadata_schema_filesystem(tempdir): |
| table = pa.table({"a": [1, 2, 3]}) |
| |
| # URI writing to local file. |
| fname = "data.parquet" |
| file_path = str(tempdir / fname) |
| file_uri = 'file:///' + file_path |
| |
| pq.write_table(table, file_path) |
| |
| # Get expected `metadata` from path. |
| metadata = pq.read_metadata(tempdir / fname) |
| schema = table.schema |
| |
| assert pq.read_metadata(file_uri).equals(metadata) |
| assert pq.read_metadata( |
| file_path, filesystem=LocalFileSystem()).equals(metadata) |
| assert pq.read_metadata( |
| fname, filesystem=f'file:///{tempdir}').equals(metadata) |
| |
| assert pq.read_schema(file_uri).equals(schema) |
| assert pq.read_schema( |
| file_path, filesystem=LocalFileSystem()).equals(schema) |
| assert pq.read_schema( |
| fname, filesystem=f'file:///{tempdir}').equals(schema) |
| |
| with util.change_cwd(tempdir): |
| # Pass `filesystem` arg |
| assert pq.read_metadata( |
| fname, filesystem=LocalFileSystem()).equals(metadata) |
| |
| assert pq.read_schema( |
| fname, filesystem=LocalFileSystem()).equals(schema) |
| |
| |
| def test_metadata_equals(): |
| table = pa.table({"a": [1, 2, 3]}) |
| with pa.BufferOutputStream() as out: |
| pq.write_table(table, out) |
| buf = out.getvalue() |
| |
| original_metadata = pq.read_metadata(pa.BufferReader(buf)) |
| match = "Argument 'other' has incorrect type" |
| with pytest.raises(TypeError, match=match): |
| original_metadata.equals(None) |
| |
| |
| @pytest.mark.parametrize("t1,t2,expected_error", ( |
| ({'col1': range(10)}, {'col1': range(10)}, None), |
| ({'col1': range(10)}, {'col2': range(10)}, |
| "The two columns with index 0 differ."), |
| ({'col1': range(10), 'col2': range(10)}, {'col3': range(10)}, |
| "This schema has 2 columns, other has 1") |
| )) |
| def test_metadata_append_row_groups_diff(t1, t2, expected_error): |
| table1 = pa.table(t1) |
| table2 = pa.table(t2) |
| |
| buf1 = io.BytesIO() |
| buf2 = io.BytesIO() |
| pq.write_table(table1, buf1) |
| pq.write_table(table2, buf2) |
| buf1.seek(0) |
| buf2.seek(0) |
| |
| meta1 = pq.ParquetFile(buf1).metadata |
| meta2 = pq.ParquetFile(buf2).metadata |
| |
| if expected_error: |
| # Error clearly defines it's happening at append row groups call |
| prefix = "AppendRowGroups requires equal schemas.\n" |
| with pytest.raises(RuntimeError, match=prefix + expected_error): |
| meta1.append_row_groups(meta2) |
| else: |
| meta1.append_row_groups(meta2) |
| |
| |
| @pytest.mark.s3 |
| def test_write_metadata_fs_file_combinations(tempdir, s3_example_s3fs): |
| s3_fs, s3_path = s3_example_s3fs |
| |
| meta1 = tempdir / "meta1" |
| meta2 = tempdir / "meta2" |
| meta3 = tempdir / "meta3" |
| meta4 = tempdir / "meta4" |
| meta5 = f"{s3_path}/meta5" |
| |
| table = pa.table({"col": range(5)}) |
| |
| # plain local path |
| pq.write_metadata(table.schema, meta1, []) |
| |
| # Used the localfilesystem to resolve opening an output stream |
| pq.write_metadata(table.schema, meta2, [], filesystem=LocalFileSystem()) |
| |
| # Can resolve local file URI |
| pq.write_metadata(table.schema, meta3.as_uri(), []) |
| |
| # Take a file-like obj all the way thru? |
| with meta4.open('wb+') as meta4_stream: |
| pq.write_metadata(table.schema, meta4_stream, []) |
| |
| # S3FileSystem |
| pq.write_metadata(table.schema, meta5, [], filesystem=s3_fs) |
| |
| assert meta1.read_bytes() == meta2.read_bytes() \ |
| == meta3.read_bytes() == meta4.read_bytes() \ |
| == s3_fs.open(meta5).read() |
| |
| |
| def test_column_chunk_key_value_metadata(parquet_test_datadir): |
| metadata = pq.read_metadata(parquet_test_datadir / |
| 'column_chunk_key_value_metadata.parquet') |
| key_value_metadata1 = metadata.row_group(0).column(0).metadata |
| assert key_value_metadata1 == {b'foo': b'bar', b'thisiskeywithoutvalue': b''} |
| key_value_metadata2 = metadata.row_group(0).column(1).metadata |
| assert key_value_metadata2 is None |
| |
| |
| def test_internal_class_instantiation(): |
| def msg(c): |
| return f"Do not call {c}'s constructor directly" |
| |
| with pytest.raises(TypeError, match=msg("Statistics")): |
| pq.Statistics() |
| |
| with pytest.raises(TypeError, match=msg("ParquetLogicalType")): |
| pq.ParquetLogicalType() |
| |
| with pytest.raises(TypeError, match=msg("ColumnChunkMetaData")): |
| pq.ColumnChunkMetaData() |
| |
| with pytest.raises(TypeError, match=msg("RowGroupMetaData")): |
| pq.RowGroupMetaData() |
| |
| with pytest.raises(TypeError, match=msg("FileMetaData")): |
| pq.FileMetaData() |