| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import contextlib |
| import os |
| import posixpath |
| import pathlib |
| import pickle |
| import textwrap |
| import threading |
| |
| import numpy as np |
| import pytest |
| |
| import pyarrow as pa |
| import pyarrow.csv |
| import pyarrow.fs as fs |
| from pyarrow.tests.util import change_cwd, _filesystem_uri |
| |
| try: |
| import pandas as pd |
| except ImportError: |
| pd = None |
| |
| try: |
| import pyarrow.dataset as ds |
| except ImportError: |
| ds = None |
| |
| # Marks all of the tests in this module |
| # Ignore these with pytest ... -m 'not dataset' |
| pytestmark = pytest.mark.dataset |
| |
| |
| def _generate_data(n): |
| import datetime |
| import itertools |
| |
| day = datetime.datetime(2000, 1, 1) |
| interval = datetime.timedelta(days=5) |
| colors = itertools.cycle(['green', 'blue', 'yellow', 'red', 'orange']) |
| |
| data = [] |
| for i in range(n): |
| data.append((day, i, float(i), next(colors))) |
| day += interval |
| |
| return pd.DataFrame(data, columns=['date', 'index', 'value', 'color']) |
| |
| |
| def _table_from_pandas(df): |
| schema = pa.schema([ |
| pa.field('date', pa.date32()), |
| pa.field('index', pa.int64()), |
| pa.field('value', pa.float64()), |
| pa.field('color', pa.string()), |
| ]) |
| table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) |
| return table.replace_schema_metadata() |
| |
| |
| @pytest.fixture |
| @pytest.mark.parquet |
| def mockfs(): |
| import pyarrow.parquet as pq |
| |
| mockfs = fs._MockFileSystem() |
| |
| directories = [ |
| 'subdir/1/xxx', |
| 'subdir/2/yyy', |
| ] |
| |
| for i, directory in enumerate(directories): |
| path = '{}/file{}.parquet'.format(directory, i) |
| mockfs.create_dir(directory) |
| with mockfs.open_output_stream(path) as out: |
| data = [ |
| list(range(5)), |
| list(map(float, range(5))), |
| list(map(str, range(5))), |
| [i] * 5 |
| ] |
| schema = pa.schema([ |
| pa.field('i64', pa.int64()), |
| pa.field('f64', pa.float64()), |
| pa.field('str', pa.string()), |
| pa.field('const', pa.int64()), |
| ]) |
| batch = pa.record_batch(data, schema=schema) |
| table = pa.Table.from_batches([batch]) |
| |
| pq.write_table(table, out) |
| |
| return mockfs |
| |
| |
| @pytest.fixture |
| def open_logging_fs(monkeypatch): |
| from pyarrow.fs import PyFileSystem, LocalFileSystem |
| from .test_fs import ProxyHandler |
| |
| localfs = LocalFileSystem() |
| |
| def normalized(paths): |
| return {localfs.normalize_path(str(p)) for p in paths} |
| |
| opened = set() |
| |
| def open_input_file(self, path): |
| path = localfs.normalize_path(str(path)) |
| opened.add(path) |
| return self._fs.open_input_file(path) |
| |
| # patch proxyhandler to log calls to open_input_file |
| monkeypatch.setattr(ProxyHandler, "open_input_file", open_input_file) |
| fs = PyFileSystem(ProxyHandler(localfs)) |
| |
| @contextlib.contextmanager |
| def assert_opens(expected_opened): |
| opened.clear() |
| try: |
| yield |
| finally: |
| assert normalized(opened) == normalized(expected_opened) |
| |
| return fs, assert_opens |
| |
| |
| @pytest.fixture(scope='module') |
| def multisourcefs(request): |
| request.config.pyarrow.requires('pandas') |
| request.config.pyarrow.requires('parquet') |
| import pyarrow.parquet as pq |
| |
| df = _generate_data(1000) |
| mockfs = fs._MockFileSystem() |
| |
| # simply split the dataframe into three chunks to construct a data source |
| # from each chunk into its own directory |
| df_a, df_b, df_c, df_d = np.array_split(df, 4) |
| |
| # create a directory containing a flat sequence of parquet files without |
| # any partitioning involved |
| mockfs.create_dir('plain') |
| for i, chunk in enumerate(np.array_split(df_a, 10)): |
| path = 'plain/chunk-{}.parquet'.format(i) |
| with mockfs.open_output_stream(path) as out: |
| pq.write_table(_table_from_pandas(chunk), out) |
| |
| # create one with schema partitioning by weekday and color |
| mockfs.create_dir('schema') |
| for part, chunk in df_b.groupby([df_b.date.dt.dayofweek, df_b.color]): |
| folder = 'schema/{}/{}'.format(*part) |
| path = '{}/chunk.parquet'.format(folder) |
| mockfs.create_dir(folder) |
| with mockfs.open_output_stream(path) as out: |
| pq.write_table(_table_from_pandas(chunk), out) |
| |
| # create one with hive partitioning by year and month |
| mockfs.create_dir('hive') |
| for part, chunk in df_c.groupby([df_c.date.dt.year, df_c.date.dt.month]): |
| folder = 'hive/year={}/month={}'.format(*part) |
| path = '{}/chunk.parquet'.format(folder) |
| mockfs.create_dir(folder) |
| with mockfs.open_output_stream(path) as out: |
| pq.write_table(_table_from_pandas(chunk), out) |
| |
| # create one with hive partitioning by color |
| mockfs.create_dir('hive_color') |
| for part, chunk in df_d.groupby(["color"]): |
| folder = 'hive_color/color={}'.format(*part) |
| path = '{}/chunk.parquet'.format(folder) |
| mockfs.create_dir(folder) |
| with mockfs.open_output_stream(path) as out: |
| pq.write_table(_table_from_pandas(chunk), out) |
| |
| return mockfs |
| |
| |
| @pytest.fixture |
| def dataset(mockfs): |
| format = ds.ParquetFileFormat() |
| selector = fs.FileSelector('subdir', recursive=True) |
| options = ds.FileSystemFactoryOptions('subdir') |
| options.partitioning = ds.DirectoryPartitioning( |
| pa.schema([ |
| pa.field('group', pa.int32()), |
| pa.field('key', pa.string()) |
| ]) |
| ) |
| factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) |
| return factory.finish() |
| |
| |
| def test_filesystem_dataset(mockfs): |
| schema = pa.schema([ |
| pa.field('const', pa.int64()) |
| ]) |
| file_format = ds.ParquetFileFormat() |
| paths = ['subdir/1/xxx/file0.parquet', 'subdir/2/yyy/file1.parquet'] |
| partitions = [ds.field('part') == x for x in range(1, 3)] |
| fragments = [file_format.make_fragment(path, mockfs, part) |
| for path, part in zip(paths, partitions)] |
| root_partition = ds.field('level') == ds.scalar(1337) |
| |
| dataset_from_fragments = ds.FileSystemDataset( |
| fragments, schema=schema, format=file_format, |
| filesystem=mockfs, root_partition=root_partition, |
| ) |
| dataset_from_paths = ds.FileSystemDataset.from_paths( |
| paths, schema=schema, format=file_format, filesystem=mockfs, |
| partitions=partitions, root_partition=root_partition, |
| ) |
| |
| for dataset in [dataset_from_fragments, dataset_from_paths]: |
| assert isinstance(dataset, ds.FileSystemDataset) |
| assert isinstance(dataset.format, ds.ParquetFileFormat) |
| assert dataset.partition_expression.equals(root_partition) |
| assert set(dataset.files) == set(paths) |
| |
| fragments = list(dataset.get_fragments()) |
| for fragment, partition, path in zip(fragments, partitions, paths): |
| assert fragment.partition_expression.equals(partition) |
| assert fragment.path == path |
| assert isinstance(fragment.format, ds.ParquetFileFormat) |
| assert isinstance(fragment, ds.ParquetFileFragment) |
| assert fragment.row_groups == [0] |
| assert fragment.num_row_groups == 1 |
| |
| row_group_fragments = list(fragment.split_by_row_group()) |
| assert fragment.num_row_groups == len(row_group_fragments) == 1 |
| assert isinstance(row_group_fragments[0], ds.ParquetFileFragment) |
| assert row_group_fragments[0].path == path |
| assert row_group_fragments[0].row_groups == [0] |
| assert row_group_fragments[0].num_row_groups == 1 |
| |
| fragments = list(dataset.get_fragments(filter=ds.field("const") == 0)) |
| assert len(fragments) == 2 |
| |
| # the root_partition keyword has a default |
| dataset = ds.FileSystemDataset( |
| fragments, schema=schema, format=file_format, filesystem=mockfs |
| ) |
| assert dataset.partition_expression.equals(ds.scalar(True)) |
| |
| # from_paths partitions have defaults |
| dataset = ds.FileSystemDataset.from_paths( |
| paths, schema=schema, format=file_format, filesystem=mockfs |
| ) |
| assert dataset.partition_expression.equals(ds.scalar(True)) |
| for fragment in dataset.get_fragments(): |
| assert fragment.partition_expression.equals(ds.scalar(True)) |
| |
| # validation of required arguments |
| with pytest.raises(TypeError, match="incorrect type"): |
| ds.FileSystemDataset(fragments, file_format, schema) |
| # validation of root_partition |
| with pytest.raises(TypeError, match="incorrect type"): |
| ds.FileSystemDataset(fragments, schema=schema, |
| format=file_format, root_partition=1) |
| # missing required argument in from_paths |
| with pytest.raises(TypeError, match="incorrect type"): |
| ds.FileSystemDataset.from_paths(fragments, format=file_format) |
| |
| |
| def test_filesystem_dataset_no_filesystem_interaction(): |
| # ARROW-8283 |
| schema = pa.schema([ |
| pa.field('f1', pa.int64()) |
| ]) |
| file_format = ds.IpcFileFormat() |
| paths = ['nonexistingfile.arrow'] |
| |
| # creating the dataset itself doesn't raise |
| dataset = ds.FileSystemDataset.from_paths( |
| paths, schema=schema, format=file_format, |
| filesystem=fs.LocalFileSystem(), |
| ) |
| |
| # getting fragments also doesn't raise |
| dataset.get_fragments() |
| |
| # scanning does raise |
| with pytest.raises(FileNotFoundError): |
| dataset.to_table() |
| |
| |
| def test_dataset(dataset): |
| assert isinstance(dataset, ds.Dataset) |
| assert isinstance(dataset.schema, pa.Schema) |
| |
| # TODO(kszucs): test non-boolean Exprs for filter do raise |
| expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64()) |
| expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64()) |
| for task in dataset.scan(): |
| assert isinstance(task, ds.ScanTask) |
| for batch in task.execute(): |
| assert batch.column(0).equals(expected_i64) |
| assert batch.column(1).equals(expected_f64) |
| |
| batches = dataset.to_batches() |
| assert all(isinstance(batch, pa.RecordBatch) for batch in batches) |
| |
| table = dataset.to_table() |
| assert isinstance(table, pa.Table) |
| assert len(table) == 10 |
| |
| condition = ds.field('i64') == 1 |
| result = dataset.to_table(use_threads=True, filter=condition).to_pydict() |
| |
| # don't rely on the scanning order |
| assert result['i64'] == [1, 1] |
| assert result['f64'] == [1., 1.] |
| assert sorted(result['group']) == [1, 2] |
| assert sorted(result['key']) == ['xxx', 'yyy'] |
| |
| |
| def test_dataset_execute_iterator(dataset): |
| # ARROW-11596: this would segfault due to Cython raising |
| # StopIteration without holding the GIL. (Fixed on Cython master, |
| # post 3.0a6) |
| tasks = dataset.scan() |
| task = next(tasks) |
| iterator = task.execute() |
| thread = threading.Thread(target=lambda: next(iterator)) |
| thread.start() |
| thread.join() |
| with pytest.raises(StopIteration): |
| next(iterator) |
| |
| |
| def test_scanner(dataset): |
| scanner = ds.Scanner.from_dataset(dataset, |
| memory_pool=pa.default_memory_pool()) |
| assert isinstance(scanner, ds.Scanner) |
| assert len(list(scanner.scan())) == 2 |
| |
| with pytest.raises(pa.ArrowInvalid): |
| ds.Scanner.from_dataset(dataset, columns=['unknown']) |
| |
| scanner = ds.Scanner.from_dataset(dataset, columns=['i64'], |
| memory_pool=pa.default_memory_pool()) |
| |
| assert isinstance(scanner, ds.Scanner) |
| assert len(list(scanner.scan())) == 2 |
| for task in scanner.scan(): |
| for batch in task.execute(): |
| assert batch.num_columns == 1 |
| |
| |
| def test_abstract_classes(): |
| classes = [ |
| ds.FileFormat, |
| ds.Scanner, |
| ds.Partitioning, |
| ] |
| for klass in classes: |
| with pytest.raises(TypeError): |
| klass() |
| |
| |
| def test_partitioning(): |
| schema = pa.schema([ |
| pa.field('i64', pa.int64()), |
| pa.field('f64', pa.float64()) |
| ]) |
| for klass in [ds.DirectoryPartitioning, ds.HivePartitioning]: |
| partitioning = klass(schema) |
| assert isinstance(partitioning, ds.Partitioning) |
| |
| partitioning = ds.DirectoryPartitioning( |
| pa.schema([ |
| pa.field('group', pa.int64()), |
| pa.field('key', pa.float64()) |
| ]) |
| ) |
| expr = partitioning.parse('/3/3.14') |
| assert isinstance(expr, ds.Expression) |
| |
| expected = (ds.field('group') == 3) & (ds.field('key') == 3.14) |
| assert expr.equals(expected) |
| |
| with pytest.raises(pa.ArrowInvalid): |
| partitioning.parse('/prefix/3/aaa') |
| |
| expr = partitioning.parse('/3') |
| expected = ds.field('group') == 3 |
| assert expr.equals(expected) |
| |
| partitioning = ds.HivePartitioning( |
| pa.schema([ |
| pa.field('alpha', pa.int64()), |
| pa.field('beta', pa.int64()) |
| ]), |
| null_fallback='xyz' |
| ) |
| expr = partitioning.parse('/alpha=0/beta=3') |
| expected = ( |
| (ds.field('alpha') == ds.scalar(0)) & |
| (ds.field('beta') == ds.scalar(3)) |
| ) |
| assert expr.equals(expected) |
| |
| expr = partitioning.parse('/alpha=xyz/beta=3') |
| expected = ( |
| (ds.field('alpha').is_null() & (ds.field('beta') == ds.scalar(3))) |
| ) |
| assert expr.equals(expected) |
| |
| for shouldfail in ['/alpha=one/beta=2', '/alpha=one', '/beta=two']: |
| with pytest.raises(pa.ArrowInvalid): |
| partitioning.parse(shouldfail) |
| |
| |
| def test_expression_serialization(): |
| a = ds.scalar(1) |
| b = ds.scalar(1.1) |
| c = ds.scalar(True) |
| d = ds.scalar("string") |
| e = ds.scalar(None) |
| f = ds.scalar({'a': 1}) |
| g = ds.scalar(pa.scalar(1)) |
| |
| all_exprs = [a, b, c, d, e, f, g, a == b, a > b, a & b, a | b, ~c, |
| d.is_valid(), a.cast(pa.int32(), safe=False), |
| a.cast(pa.int32(), safe=False), a.isin([1, 2, 3]), |
| ds.field('i64') > 5, ds.field('i64') == 5, |
| ds.field('i64') == 7, ds.field('i64').is_null()] |
| for expr in all_exprs: |
| assert isinstance(expr, ds.Expression) |
| restored = pickle.loads(pickle.dumps(expr)) |
| assert expr.equals(restored) |
| |
| |
| def test_expression_construction(): |
| zero = ds.scalar(0) |
| one = ds.scalar(1) |
| true = ds.scalar(True) |
| false = ds.scalar(False) |
| string = ds.scalar("string") |
| field = ds.field("field") |
| |
| zero | one == string |
| ~true == false |
| for typ in ("bool", pa.bool_()): |
| field.cast(typ) == true |
| |
| field.isin([1, 2]) |
| |
| with pytest.raises(TypeError): |
| field.isin(1) |
| |
| with pytest.raises(pa.ArrowInvalid): |
| field != {1} |
| |
| |
| def test_expression_boolean_operators(): |
| # https://issues.apache.org/jira/browse/ARROW-11412 |
| true = ds.scalar(True) |
| false = ds.scalar(False) |
| |
| with pytest.raises(ValueError, match="cannot be evaluated to python True"): |
| true and false |
| |
| with pytest.raises(ValueError, match="cannot be evaluated to python True"): |
| true or false |
| |
| with pytest.raises(ValueError, match="cannot be evaluated to python True"): |
| bool(true) |
| |
| with pytest.raises(ValueError, match="cannot be evaluated to python True"): |
| not true |
| |
| |
| def test_partition_keys(): |
| a, b, c = [ds.field(f) == f for f in 'abc'] |
| assert ds._get_partition_keys(a) == {'a': 'a'} |
| assert ds._get_partition_keys(a & b & c) == {f: f for f in 'abc'} |
| |
| nope = ds.field('d') >= 3 |
| assert ds._get_partition_keys(nope) == {} |
| assert ds._get_partition_keys(a & nope) == {'a': 'a'} |
| |
| null = ds.field('a').is_null() |
| assert ds._get_partition_keys(null) == {'a': None} |
| |
| |
| def test_parquet_read_options(): |
| opts1 = ds.ParquetReadOptions() |
| opts2 = ds.ParquetReadOptions(buffer_size=4096, |
| dictionary_columns=['a', 'b']) |
| opts3 = ds.ParquetReadOptions(buffer_size=2**13, use_buffered_stream=True, |
| dictionary_columns={'a', 'b'}) |
| opts4 = ds.ParquetReadOptions(buffer_size=2**13, pre_buffer=True, |
| dictionary_columns={'a', 'b'}) |
| |
| assert opts1.use_buffered_stream is False |
| assert opts1.buffer_size == 2**13 |
| assert opts1.pre_buffer is False |
| assert opts1.dictionary_columns == set() |
| |
| assert opts2.use_buffered_stream is False |
| assert opts2.buffer_size == 2**12 |
| assert opts2.pre_buffer is False |
| assert opts2.dictionary_columns == {'a', 'b'} |
| |
| assert opts3.use_buffered_stream is True |
| assert opts3.buffer_size == 2**13 |
| assert opts3.pre_buffer is False |
| assert opts3.dictionary_columns == {'a', 'b'} |
| |
| assert opts4.use_buffered_stream is False |
| assert opts4.buffer_size == 2**13 |
| assert opts4.pre_buffer is True |
| assert opts4.dictionary_columns == {'a', 'b'} |
| |
| assert opts1 == opts1 |
| assert opts1 != opts2 |
| assert opts2 != opts3 |
| assert opts3 != opts4 |
| |
| |
| def test_file_format_pickling(): |
| formats = [ |
| ds.IpcFileFormat(), |
| ds.CsvFileFormat(), |
| ds.CsvFileFormat(pa.csv.ParseOptions(delimiter='\t', |
| ignore_empty_lines=True)), |
| ds.CsvFileFormat(read_options=pa.csv.ReadOptions( |
| skip_rows=3, column_names=['foo'])), |
| ds.CsvFileFormat(read_options=pa.csv.ReadOptions( |
| skip_rows=3, block_size=2**20)), |
| ds.ParquetFileFormat(), |
| ds.ParquetFileFormat( |
| read_options=ds.ParquetReadOptions(use_buffered_stream=True) |
| ), |
| ds.ParquetFileFormat( |
| read_options={ |
| 'use_buffered_stream': True, |
| 'buffer_size': 4096, |
| } |
| ) |
| ] |
| for file_format in formats: |
| assert pickle.loads(pickle.dumps(file_format)) == file_format |
| |
| |
| def test_fragment_scan_options_pickling(): |
| options = [ |
| ds.CsvFragmentScanOptions(), |
| ds.CsvFragmentScanOptions( |
| convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)), |
| ds.CsvFragmentScanOptions( |
| read_options=pa.csv.ReadOptions(block_size=2**16)), |
| ] |
| for option in options: |
| assert pickle.loads(pickle.dumps(option)) == option |
| |
| |
| @pytest.mark.parametrize('paths_or_selector', [ |
| fs.FileSelector('subdir', recursive=True), |
| [ |
| 'subdir/1/xxx/file0.parquet', |
| 'subdir/2/yyy/file1.parquet', |
| ] |
| ]) |
| @pytest.mark.parametrize('pre_buffer', [False, True]) |
| def test_filesystem_factory(mockfs, paths_or_selector, pre_buffer): |
| format = ds.ParquetFileFormat( |
| read_options=ds.ParquetReadOptions(dictionary_columns={"str"}, |
| pre_buffer=pre_buffer) |
| ) |
| |
| options = ds.FileSystemFactoryOptions('subdir') |
| options.partitioning = ds.DirectoryPartitioning( |
| pa.schema([ |
| pa.field('group', pa.int32()), |
| pa.field('key', pa.string()) |
| ]) |
| ) |
| assert options.partition_base_dir == 'subdir' |
| assert options.selector_ignore_prefixes == ['.', '_'] |
| assert options.exclude_invalid_files is False |
| |
| factory = ds.FileSystemDatasetFactory( |
| mockfs, paths_or_selector, format, options |
| ) |
| inspected_schema = factory.inspect() |
| |
| assert factory.inspect().equals(pa.schema([ |
| pa.field('i64', pa.int64()), |
| pa.field('f64', pa.float64()), |
| pa.field('str', pa.dictionary(pa.int32(), pa.string())), |
| pa.field('const', pa.int64()), |
| pa.field('group', pa.int32()), |
| pa.field('key', pa.string()), |
| ]), check_metadata=False) |
| |
| assert isinstance(factory.inspect_schemas(), list) |
| assert isinstance(factory.finish(inspected_schema), |
| ds.FileSystemDataset) |
| assert factory.root_partition.equals(ds.scalar(True)) |
| |
| dataset = factory.finish() |
| assert isinstance(dataset, ds.FileSystemDataset) |
| assert len(list(dataset.scan())) == 2 |
| |
| scanner = ds.Scanner.from_dataset(dataset) |
| expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64()) |
| expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64()) |
| expected_str = pa.DictionaryArray.from_arrays( |
| pa.array([0, 1, 2, 3, 4], type=pa.int32()), |
| pa.array("0 1 2 3 4".split(), type=pa.string()) |
| ) |
| for task, group, key in zip(scanner.scan(), [1, 2], ['xxx', 'yyy']): |
| expected_group = pa.array([group] * 5, type=pa.int32()) |
| expected_key = pa.array([key] * 5, type=pa.string()) |
| expected_const = pa.array([group - 1] * 5, type=pa.int64()) |
| for batch in task.execute(): |
| assert batch.num_columns == 6 |
| assert batch[0].equals(expected_i64) |
| assert batch[1].equals(expected_f64) |
| assert batch[2].equals(expected_str) |
| assert batch[3].equals(expected_const) |
| assert batch[4].equals(expected_group) |
| assert batch[5].equals(expected_key) |
| |
| table = dataset.to_table() |
| assert isinstance(table, pa.Table) |
| assert len(table) == 10 |
| assert table.num_columns == 6 |
| |
| |
| def test_make_fragment(multisourcefs): |
| parquet_format = ds.ParquetFileFormat() |
| dataset = ds.dataset('/plain', filesystem=multisourcefs, |
| format=parquet_format) |
| |
| for path in dataset.files: |
| fragment = parquet_format.make_fragment(path, multisourcefs) |
| assert fragment.row_groups == [0] |
| |
| row_group_fragment = parquet_format.make_fragment(path, multisourcefs, |
| row_groups=[0]) |
| for f in [fragment, row_group_fragment]: |
| assert isinstance(f, ds.ParquetFileFragment) |
| assert f.path == path |
| assert isinstance(f.filesystem, type(multisourcefs)) |
| assert row_group_fragment.row_groups == [0] |
| |
| |
| def test_make_csv_fragment_from_buffer(): |
| content = textwrap.dedent(""" |
| alpha,num,animal |
| a,12,dog |
| b,11,cat |
| c,10,rabbit |
| """) |
| buffer = pa.py_buffer(content.encode('utf-8')) |
| |
| csv_format = ds.CsvFileFormat() |
| fragment = csv_format.make_fragment(buffer) |
| |
| expected = pa.table([['a', 'b', 'c'], |
| [12, 11, 10], |
| ['dog', 'cat', 'rabbit']], |
| names=['alpha', 'num', 'animal']) |
| assert fragment.to_table().equals(expected) |
| |
| pickled = pickle.loads(pickle.dumps(fragment)) |
| assert pickled.to_table().equals(fragment.to_table()) |
| |
| |
| @pytest.mark.parquet |
| def test_make_parquet_fragment_from_buffer(): |
| import pyarrow.parquet as pq |
| |
| arrays = [ |
| pa.array(['a', 'b', 'c']), |
| pa.array([12, 11, 10]), |
| pa.array(['dog', 'cat', 'rabbit']) |
| ] |
| dictionary_arrays = [ |
| arrays[0].dictionary_encode(), |
| arrays[1], |
| arrays[2].dictionary_encode() |
| ] |
| dictionary_format = ds.ParquetFileFormat( |
| read_options=ds.ParquetReadOptions( |
| use_buffered_stream=True, |
| buffer_size=4096, |
| dictionary_columns=['alpha', 'animal'] |
| ) |
| ) |
| |
| cases = [ |
| (arrays, ds.ParquetFileFormat()), |
| (dictionary_arrays, dictionary_format) |
| ] |
| for arrays, format_ in cases: |
| table = pa.table(arrays, names=['alpha', 'num', 'animal']) |
| |
| out = pa.BufferOutputStream() |
| pq.write_table(table, out) |
| buffer = out.getvalue() |
| |
| fragment = format_.make_fragment(buffer) |
| assert fragment.to_table().equals(table) |
| |
| pickled = pickle.loads(pickle.dumps(fragment)) |
| assert pickled.to_table().equals(table) |
| |
| |
| def _create_dataset_for_fragments(tempdir, chunk_size=None, filesystem=None): |
| import pyarrow.parquet as pq |
| |
| table = pa.table( |
| [range(8), [1] * 8, ['a'] * 4 + ['b'] * 4], |
| names=['f1', 'f2', 'part'] |
| ) |
| |
| path = str(tempdir / "test_parquet_dataset") |
| |
| # write_to_dataset currently requires pandas |
| pq.write_to_dataset(table, path, |
| partition_cols=["part"], chunk_size=chunk_size) |
| dataset = ds.dataset( |
| path, format="parquet", partitioning="hive", filesystem=filesystem |
| ) |
| |
| return table, dataset |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments(tempdir): |
| table, dataset = _create_dataset_for_fragments(tempdir) |
| |
| # list fragments |
| fragments = list(dataset.get_fragments()) |
| assert len(fragments) == 2 |
| f = fragments[0] |
| |
| physical_names = ['f1', 'f2'] |
| # file's schema does not include partition column |
| assert f.physical_schema.names == physical_names |
| assert f.format.inspect(f.path, f.filesystem) == f.physical_schema |
| assert f.partition_expression.equals(ds.field('part') == 'a') |
| |
| # By default, the partition column is not part of the schema. |
| result = f.to_table() |
| assert result.column_names == physical_names |
| assert result.equals(table.remove_column(2).slice(0, 4)) |
| |
| # scanning fragment includes partition columns when given the proper |
| # schema. |
| result = f.to_table(schema=dataset.schema) |
| assert result.column_names == ['f1', 'f2', 'part'] |
| assert result.equals(table.slice(0, 4)) |
| assert f.physical_schema == result.schema.remove(2) |
| |
| # scanning fragments follow filter predicate |
| result = f.to_table(schema=dataset.schema, filter=ds.field('f1') < 2) |
| assert result.column_names == ['f1', 'f2', 'part'] |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_implicit_cast(tempdir): |
| # ARROW-8693 |
| import pyarrow.parquet as pq |
| |
| table = pa.table([range(8), [1] * 4 + [2] * 4], names=['col', 'part']) |
| path = str(tempdir / "test_parquet_dataset") |
| pq.write_to_dataset(table, path, partition_cols=["part"]) |
| |
| part = ds.partitioning(pa.schema([('part', 'int8')]), flavor="hive") |
| dataset = ds.dataset(path, format="parquet", partitioning=part) |
| fragments = dataset.get_fragments(filter=ds.field("part") >= 2) |
| assert len(list(fragments)) == 1 |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_reconstruct(tempdir): |
| table, dataset = _create_dataset_for_fragments(tempdir) |
| |
| def assert_yields_projected(fragment, row_slice, |
| columns=None, filter=None): |
| actual = fragment.to_table( |
| schema=table.schema, columns=columns, filter=filter) |
| column_names = columns if columns else table.column_names |
| assert actual.column_names == column_names |
| |
| expected = table.slice(*row_slice).select(column_names) |
| assert actual.equals(expected) |
| |
| fragment = list(dataset.get_fragments())[0] |
| parquet_format = fragment.format |
| |
| # test pickle roundtrip |
| pickled_fragment = pickle.loads(pickle.dumps(fragment)) |
| assert pickled_fragment.to_table() == fragment.to_table() |
| |
| # manually re-construct a fragment, with explicit schema |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression) |
| assert new_fragment.to_table().equals(fragment.to_table()) |
| assert_yields_projected(new_fragment, (0, 4)) |
| |
| # filter / column projection, inspected schema |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression) |
| assert_yields_projected(new_fragment, (0, 2), filter=ds.field('f1') < 2) |
| |
| # filter requiring cast / column projection, inspected schema |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression) |
| assert_yields_projected(new_fragment, (0, 2), |
| columns=['f1'], filter=ds.field('f1') < 2.0) |
| |
| # filter on the partition column |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression) |
| assert_yields_projected(new_fragment, (0, 4), |
| filter=ds.field('part') == 'a') |
| |
| # Fragments don't contain the partition's columns if not provided to the |
| # `to_table(schema=...)` method. |
| pattern = (r'No match for FieldRef.Name\(part\) in ' + |
| fragment.physical_schema.to_string(False, False, False)) |
| with pytest.raises(ValueError, match=pattern): |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression) |
| new_fragment.to_table(filter=ds.field('part') == 'a') |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_row_groups(tempdir): |
| table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=2) |
| |
| fragment = list(dataset.get_fragments())[0] |
| |
| # list and scan row group fragments |
| row_group_fragments = list(fragment.split_by_row_group()) |
| assert len(row_group_fragments) == fragment.num_row_groups == 2 |
| result = row_group_fragments[0].to_table(schema=dataset.schema) |
| assert result.column_names == ['f1', 'f2', 'part'] |
| assert len(result) == 2 |
| assert result.equals(table.slice(0, 2)) |
| |
| assert row_group_fragments[0].row_groups is not None |
| assert row_group_fragments[0].num_row_groups == 1 |
| assert row_group_fragments[0].row_groups[0].statistics == { |
| 'f1': {'min': 0, 'max': 1}, |
| 'f2': {'min': 1, 'max': 1}, |
| } |
| |
| fragment = list(dataset.get_fragments(filter=ds.field('f1') < 1))[0] |
| row_group_fragments = list(fragment.split_by_row_group(ds.field('f1') < 1)) |
| assert len(row_group_fragments) == 1 |
| result = row_group_fragments[0].to_table(filter=ds.field('f1') < 1) |
| assert len(result) == 1 |
| |
| |
| @pytest.mark.parquet |
| def test_fragments_parquet_num_row_groups(tempdir): |
| import pyarrow.parquet as pq |
| |
| table = pa.table({'a': range(8)}) |
| pq.write_table(table, tempdir / "test.parquet", row_group_size=2) |
| dataset = ds.dataset(tempdir / "test.parquet", format="parquet") |
| original_fragment = list(dataset.get_fragments())[0] |
| |
| # create fragment with subset of row groups |
| fragment = original_fragment.format.make_fragment( |
| original_fragment.path, original_fragment.filesystem, |
| row_groups=[1, 3]) |
| assert fragment.num_row_groups == 2 |
| # ensure that parsing metadata preserves correct number of row groups |
| fragment.ensure_complete_metadata() |
| assert fragment.num_row_groups == 2 |
| assert len(fragment.row_groups) == 2 |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_row_groups_dictionary(tempdir): |
| import pandas as pd |
| |
| df = pd.DataFrame(dict(col1=['a', 'b'], col2=[1, 2])) |
| df['col1'] = df['col1'].astype("category") |
| |
| import pyarrow.parquet as pq |
| pq.write_table(pa.table(df), tempdir / "test_filter_dictionary.parquet") |
| |
| import pyarrow.dataset as ds |
| dataset = ds.dataset(tempdir / 'test_filter_dictionary.parquet') |
| result = dataset.to_table(filter=ds.field("col1") == "a") |
| |
| assert (df.iloc[0] == result.to_pandas()).all().all() |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_ensure_metadata(tempdir, open_logging_fs): |
| fs, assert_opens = open_logging_fs |
| _, dataset = _create_dataset_for_fragments( |
| tempdir, chunk_size=2, filesystem=fs |
| ) |
| fragment = list(dataset.get_fragments())[0] |
| |
| # with default discovery, no metadata loaded |
| with assert_opens([fragment.path]): |
| fragment.ensure_complete_metadata() |
| assert fragment.row_groups == [0, 1] |
| |
| # second time -> use cached / no file IO |
| with assert_opens([]): |
| fragment.ensure_complete_metadata() |
| |
| # recreate fragment with row group ids |
| new_fragment = fragment.format.make_fragment( |
| fragment.path, fragment.filesystem, row_groups=[0, 1] |
| ) |
| assert new_fragment.row_groups == fragment.row_groups |
| |
| # collect metadata |
| new_fragment.ensure_complete_metadata() |
| row_group = new_fragment.row_groups[0] |
| assert row_group.id == 0 |
| assert row_group.num_rows == 2 |
| assert row_group.statistics is not None |
| |
| # pickling preserves row group ids |
| pickled_fragment = pickle.loads(pickle.dumps(new_fragment)) |
| with assert_opens([fragment.path]): |
| assert pickled_fragment.row_groups == [0, 1] |
| row_group = pickled_fragment.row_groups[0] |
| assert row_group.id == 0 |
| assert row_group.statistics is not None |
| |
| |
| def _create_dataset_all_types(tempdir, chunk_size=None): |
| import pyarrow.parquet as pq |
| |
| table = pa.table( |
| [ |
| pa.array([True, None, False], pa.bool_()), |
| pa.array([1, 10, 42], pa.int8()), |
| pa.array([1, 10, 42], pa.uint8()), |
| pa.array([1, 10, 42], pa.int16()), |
| pa.array([1, 10, 42], pa.uint16()), |
| pa.array([1, 10, 42], pa.int32()), |
| pa.array([1, 10, 42], pa.uint32()), |
| pa.array([1, 10, 42], pa.int64()), |
| pa.array([1, 10, 42], pa.uint64()), |
| pa.array([1.0, 10.0, 42.0], pa.float32()), |
| pa.array([1.0, 10.0, 42.0], pa.float64()), |
| pa.array(['a', None, 'z'], pa.utf8()), |
| pa.array(['a', None, 'z'], pa.binary()), |
| pa.array([1, 10, 42], pa.timestamp('s')), |
| pa.array([1, 10, 42], pa.timestamp('ms')), |
| pa.array([1, 10, 42], pa.timestamp('us')), |
| pa.array([1, 10, 42], pa.date32()), |
| pa.array([1, 10, 4200000000], pa.date64()), |
| pa.array([1, 10, 42], pa.time32('s')), |
| pa.array([1, 10, 42], pa.time64('us')), |
| ], |
| names=[ |
| 'boolean', |
| 'int8', |
| 'uint8', |
| 'int16', |
| 'uint16', |
| 'int32', |
| 'uint32', |
| 'int64', |
| 'uint64', |
| 'float', |
| 'double', |
| 'utf8', |
| 'binary', |
| 'ts[s]', |
| 'ts[ms]', |
| 'ts[us]', |
| 'date32', |
| 'date64', |
| 'time32', |
| 'time64', |
| ] |
| ) |
| |
| path = str(tempdir / "test_parquet_dataset_all_types") |
| |
| # write_to_dataset currently requires pandas |
| pq.write_to_dataset(table, path, chunk_size=chunk_size) |
| |
| return table, ds.dataset(path, format="parquet", partitioning="hive") |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_parquet_fragment_statistics(tempdir): |
| table, dataset = _create_dataset_all_types(tempdir) |
| |
| fragment = list(dataset.get_fragments())[0] |
| |
| import datetime |
| def dt_s(x): return datetime.datetime(1970, 1, 1, 0, 0, x) |
| def dt_ms(x): return datetime.datetime(1970, 1, 1, 0, 0, 0, x*1000) |
| def dt_us(x): return datetime.datetime(1970, 1, 1, 0, 0, 0, x) |
| date = datetime.date |
| time = datetime.time |
| |
| # list and scan row group fragments |
| row_group_fragments = list(fragment.split_by_row_group()) |
| assert row_group_fragments[0].row_groups is not None |
| row_group = row_group_fragments[0].row_groups[0] |
| assert row_group.num_rows == 3 |
| assert row_group.total_byte_size > 1000 |
| assert row_group.statistics == { |
| 'boolean': {'min': False, 'max': True}, |
| 'int8': {'min': 1, 'max': 42}, |
| 'uint8': {'min': 1, 'max': 42}, |
| 'int16': {'min': 1, 'max': 42}, |
| 'uint16': {'min': 1, 'max': 42}, |
| 'int32': {'min': 1, 'max': 42}, |
| 'uint32': {'min': 1, 'max': 42}, |
| 'int64': {'min': 1, 'max': 42}, |
| 'uint64': {'min': 1, 'max': 42}, |
| 'float': {'min': 1.0, 'max': 42.0}, |
| 'double': {'min': 1.0, 'max': 42.0}, |
| 'utf8': {'min': 'a', 'max': 'z'}, |
| 'binary': {'min': b'a', 'max': b'z'}, |
| 'ts[s]': {'min': dt_s(1), 'max': dt_s(42)}, |
| 'ts[ms]': {'min': dt_ms(1), 'max': dt_ms(42)}, |
| 'ts[us]': {'min': dt_us(1), 'max': dt_us(42)}, |
| 'date32': {'min': date(1970, 1, 2), 'max': date(1970, 2, 12)}, |
| 'date64': {'min': date(1970, 1, 1), 'max': date(1970, 2, 18)}, |
| 'time32': {'min': time(0, 0, 1), 'max': time(0, 0, 42)}, |
| 'time64': {'min': time(0, 0, 0, 1), 'max': time(0, 0, 0, 42)}, |
| } |
| |
| |
| @pytest.mark.parquet |
| def test_parquet_fragment_statistics_nulls(tempdir): |
| import pyarrow.parquet as pq |
| |
| table = pa.table({'a': [0, 1, None, None], 'b': ['a', 'b', None, None]}) |
| pq.write_table(table, tempdir / "test.parquet", row_group_size=2) |
| |
| dataset = ds.dataset(tempdir / "test.parquet", format="parquet") |
| fragments = list(dataset.get_fragments())[0].split_by_row_group() |
| # second row group has all nulls -> no statistics |
| assert fragments[1].row_groups[0].statistics == {} |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_parquet_empty_row_group_statistics(tempdir): |
| df = pd.DataFrame({"a": ["a", "b", "b"], "b": [4, 5, 6]})[:0] |
| df.to_parquet(tempdir / "test.parquet", engine="pyarrow") |
| |
| dataset = ds.dataset(tempdir / "test.parquet", format="parquet") |
| fragments = list(dataset.get_fragments())[0].split_by_row_group() |
| # Only row group is empty |
| assert fragments[0].row_groups[0].statistics == {} |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_row_groups_predicate(tempdir): |
| table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=2) |
| |
| fragment = list(dataset.get_fragments())[0] |
| assert fragment.partition_expression.equals(ds.field('part') == 'a') |
| |
| # predicate may reference a partition field not present in the |
| # physical_schema if an explicit schema is provided to split_by_row_group |
| |
| # filter matches partition_expression: all row groups |
| row_group_fragments = list( |
| fragment.split_by_row_group(filter=ds.field('part') == 'a', |
| schema=dataset.schema)) |
| assert len(row_group_fragments) == 2 |
| |
| # filter contradicts partition_expression: no row groups |
| row_group_fragments = list( |
| fragment.split_by_row_group(filter=ds.field('part') == 'b', |
| schema=dataset.schema)) |
| assert len(row_group_fragments) == 0 |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_row_groups_reconstruct(tempdir): |
| table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=2) |
| |
| fragment = list(dataset.get_fragments())[0] |
| parquet_format = fragment.format |
| row_group_fragments = list(fragment.split_by_row_group()) |
| |
| # test pickle roundtrip |
| pickled_fragment = pickle.loads(pickle.dumps(fragment)) |
| assert pickled_fragment.to_table() == fragment.to_table() |
| |
| # manually re-construct row group fragments |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression, |
| row_groups=[0]) |
| result = new_fragment.to_table() |
| assert result.equals(row_group_fragments[0].to_table()) |
| |
| # manually re-construct a row group fragment with filter/column projection |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression, |
| row_groups={1}) |
| result = new_fragment.to_table(schema=table.schema, columns=['f1', 'part'], |
| filter=ds.field('f1') < 3, ) |
| assert result.column_names == ['f1', 'part'] |
| assert len(result) == 1 |
| |
| # out of bounds row group index |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression, |
| row_groups={2}) |
| with pytest.raises(IndexError, match="references row group 2"): |
| new_fragment.to_table() |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_subset_ids(tempdir, open_logging_fs): |
| fs, assert_opens = open_logging_fs |
| table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=1, |
| filesystem=fs) |
| fragment = list(dataset.get_fragments())[0] |
| |
| # select with row group ids |
| subfrag = fragment.subset(row_group_ids=[0, 3]) |
| with assert_opens([]): |
| assert subfrag.num_row_groups == 2 |
| assert subfrag.row_groups == [0, 3] |
| assert subfrag.row_groups[0].statistics is not None |
| |
| # check correct scan result of subset |
| result = subfrag.to_table() |
| assert result.to_pydict() == {"f1": [0, 3], "f2": [1, 1]} |
| |
| # empty list of ids |
| subfrag = fragment.subset(row_group_ids=[]) |
| assert subfrag.num_row_groups == 0 |
| assert subfrag.row_groups == [] |
| result = subfrag.to_table(schema=dataset.schema) |
| assert result.num_rows == 0 |
| assert result.equals(table[:0]) |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_subset_filter(tempdir, open_logging_fs): |
| fs, assert_opens = open_logging_fs |
| table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=1, |
| filesystem=fs) |
| fragment = list(dataset.get_fragments())[0] |
| |
| # select with filter |
| subfrag = fragment.subset(ds.field("f1") >= 1) |
| with assert_opens([]): |
| assert subfrag.num_row_groups == 3 |
| assert len(subfrag.row_groups) == 3 |
| assert subfrag.row_groups[0].statistics is not None |
| |
| # check correct scan result of subset |
| result = subfrag.to_table() |
| assert result.to_pydict() == {"f1": [1, 2, 3], "f2": [1, 1, 1]} |
| |
| # filter that results in empty selection |
| subfrag = fragment.subset(ds.field("f1") > 5) |
| assert subfrag.num_row_groups == 0 |
| assert subfrag.row_groups == [] |
| result = subfrag.to_table(schema=dataset.schema) |
| assert result.num_rows == 0 |
| assert result.equals(table[:0]) |
| |
| # passing schema to ensure filter on partition expression works |
| subfrag = fragment.subset(ds.field("part") == "a", schema=dataset.schema) |
| assert subfrag.num_row_groups == 4 |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_subset_invalid(tempdir): |
| _, dataset = _create_dataset_for_fragments(tempdir, chunk_size=1) |
| fragment = list(dataset.get_fragments())[0] |
| |
| # passing none or both of filter / row_group_ids |
| with pytest.raises(ValueError): |
| fragment.subset(ds.field("f1") >= 1, row_group_ids=[1, 2]) |
| |
| with pytest.raises(ValueError): |
| fragment.subset() |
| |
| |
| def test_partitioning_factory(mockfs): |
| paths_or_selector = fs.FileSelector('subdir', recursive=True) |
| format = ds.ParquetFileFormat() |
| |
| options = ds.FileSystemFactoryOptions('subdir') |
| partitioning_factory = ds.DirectoryPartitioning.discover(['group', 'key']) |
| assert isinstance(partitioning_factory, ds.PartitioningFactory) |
| options.partitioning_factory = partitioning_factory |
| |
| factory = ds.FileSystemDatasetFactory( |
| mockfs, paths_or_selector, format, options |
| ) |
| inspected_schema = factory.inspect() |
| # i64/f64 from data, group/key from "/1/xxx" and "/2/yyy" paths |
| expected_schema = pa.schema([ |
| ("i64", pa.int64()), |
| ("f64", pa.float64()), |
| ("str", pa.string()), |
| ("const", pa.int64()), |
| ("group", pa.int32()), |
| ("key", pa.string()), |
| ]) |
| assert inspected_schema.equals(expected_schema) |
| |
| hive_partitioning_factory = ds.HivePartitioning.discover() |
| assert isinstance(hive_partitioning_factory, ds.PartitioningFactory) |
| |
| |
| @pytest.mark.parametrize('infer_dictionary', [False, True]) |
| def test_partitioning_factory_dictionary(mockfs, infer_dictionary): |
| paths_or_selector = fs.FileSelector('subdir', recursive=True) |
| format = ds.ParquetFileFormat() |
| options = ds.FileSystemFactoryOptions('subdir') |
| |
| options.partitioning_factory = ds.DirectoryPartitioning.discover( |
| ['group', 'key'], infer_dictionary=infer_dictionary) |
| |
| factory = ds.FileSystemDatasetFactory( |
| mockfs, paths_or_selector, format, options) |
| |
| inferred_schema = factory.inspect() |
| if infer_dictionary: |
| expected_type = pa.dictionary(pa.int32(), pa.string()) |
| assert inferred_schema.field('key').type == expected_type |
| |
| table = factory.finish().to_table().combine_chunks() |
| actual = table.column('key').chunk(0) |
| expected = pa.array(['xxx'] * 5 + ['yyy'] * 5).dictionary_encode() |
| assert actual.equals(expected) |
| |
| # ARROW-9345 ensure filtering on the partition field works |
| table = factory.finish().to_table(filter=ds.field('key') == 'xxx') |
| actual = table.column('key').chunk(0) |
| expected = expected.slice(0, 5) |
| assert actual.equals(expected) |
| else: |
| assert inferred_schema.field('key').type == pa.string() |
| |
| |
| def test_dictionary_partitioning_outer_nulls_raises(tempdir): |
| table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z']}) |
| part = ds.partitioning( |
| pa.schema([pa.field('a', pa.string()), pa.field('b', pa.string())])) |
| with pytest.raises(pa.ArrowInvalid): |
| ds.write_dataset(table, tempdir, format='parquet', partitioning=part) |
| |
| |
| def _has_subdirs(basedir): |
| elements = os.listdir(basedir) |
| return any([os.path.isdir(os.path.join(basedir, el)) for el in elements]) |
| |
| |
| def _do_list_all_dirs(basedir, path_so_far, result): |
| for f in os.listdir(basedir): |
| true_nested = os.path.join(basedir, f) |
| if os.path.isdir(true_nested): |
| norm_nested = posixpath.join(path_so_far, f) |
| if _has_subdirs(true_nested): |
| _do_list_all_dirs(true_nested, norm_nested, result) |
| else: |
| result.append(norm_nested) |
| |
| |
| def _list_all_dirs(basedir): |
| result = [] |
| _do_list_all_dirs(basedir, '', result) |
| return result |
| |
| |
| def _check_dataset_directories(tempdir, expected_directories): |
| actual_directories = set(_list_all_dirs(tempdir)) |
| assert actual_directories == set(expected_directories) |
| |
| |
| def test_dictionary_partitioning_inner_nulls(tempdir): |
| table = pa.table({'a': ['x', 'y', 'z'], 'b': ['x', 'y', None]}) |
| part = ds.partitioning( |
| pa.schema([pa.field('a', pa.string()), pa.field('b', pa.string())])) |
| ds.write_dataset(table, tempdir, format='parquet', partitioning=part) |
| _check_dataset_directories(tempdir, ['x/x', 'y/y', 'z']) |
| |
| |
| def test_hive_partitioning_nulls(tempdir): |
| table = pa.table({'a': ['x', None, 'z'], 'b': ['x', 'y', None]}) |
| part = ds.HivePartitioning(pa.schema( |
| [pa.field('a', pa.string()), pa.field('b', pa.string())]), None, 'xyz') |
| ds.write_dataset(table, tempdir, format='parquet', partitioning=part) |
| _check_dataset_directories(tempdir, ['a=x/b=x', 'a=xyz/b=y', 'a=z/b=xyz']) |
| |
| |
| def test_partitioning_function(): |
| schema = pa.schema([("year", pa.int16()), ("month", pa.int8())]) |
| names = ["year", "month"] |
| |
| # default DirectoryPartitioning |
| part = ds.partitioning(schema) |
| assert isinstance(part, ds.DirectoryPartitioning) |
| part = ds.partitioning(schema, dictionaries="infer") |
| assert isinstance(part, ds.PartitioningFactory) |
| part = ds.partitioning(field_names=names) |
| assert isinstance(part, ds.PartitioningFactory) |
| # needs schema or list of names |
| with pytest.raises(ValueError): |
| ds.partitioning() |
| with pytest.raises(ValueError, match="Expected list"): |
| ds.partitioning(field_names=schema) |
| with pytest.raises(ValueError, match="Cannot specify both"): |
| ds.partitioning(schema, field_names=schema) |
| |
| # Hive partitioning |
| part = ds.partitioning(schema, flavor="hive") |
| assert isinstance(part, ds.HivePartitioning) |
| part = ds.partitioning(schema, dictionaries="infer", flavor="hive") |
| assert isinstance(part, ds.PartitioningFactory) |
| part = ds.partitioning(flavor="hive") |
| assert isinstance(part, ds.PartitioningFactory) |
| # cannot pass list of names |
| with pytest.raises(ValueError): |
| ds.partitioning(names, flavor="hive") |
| with pytest.raises(ValueError, match="Cannot specify 'field_names'"): |
| ds.partitioning(field_names=names, flavor="hive") |
| |
| # unsupported flavor |
| with pytest.raises(ValueError): |
| ds.partitioning(schema, flavor="unsupported") |
| |
| |
| def test_directory_partitioning_dictionary_key(mockfs): |
| # ARROW-8088 specifying partition key as dictionary type |
| schema = pa.schema([ |
| pa.field('group', pa.dictionary(pa.int8(), pa.int32())), |
| pa.field('key', pa.dictionary(pa.int8(), pa.string())) |
| ]) |
| part = ds.DirectoryPartitioning.discover(schema=schema) |
| |
| dataset = ds.dataset( |
| "subdir", format="parquet", filesystem=mockfs, partitioning=part |
| ) |
| table = dataset.to_table() |
| |
| assert table.column('group').type.equals(schema.types[0]) |
| assert table.column('group').to_pylist() == [1] * 5 + [2] * 5 |
| assert table.column('key').type.equals(schema.types[1]) |
| assert table.column('key').to_pylist() == ['xxx'] * 5 + ['yyy'] * 5 |
| |
| |
| def test_hive_partitioning_dictionary_key(multisourcefs): |
| # ARROW-8088 specifying partition key as dictionary type |
| schema = pa.schema([ |
| pa.field('year', pa.dictionary(pa.int8(), pa.int16())), |
| pa.field('month', pa.dictionary(pa.int8(), pa.int16())) |
| ]) |
| part = ds.HivePartitioning.discover(schema=schema) |
| |
| dataset = ds.dataset( |
| "hive", format="parquet", filesystem=multisourcefs, partitioning=part |
| ) |
| table = dataset.to_table() |
| |
| year_dictionary = list(range(2006, 2011)) |
| month_dictionary = list(range(1, 13)) |
| assert table.column('year').type.equals(schema.types[0]) |
| for chunk in table.column('year').chunks: |
| actual = chunk.dictionary.to_pylist() |
| actual.sort() |
| assert actual == year_dictionary |
| assert table.column('month').type.equals(schema.types[1]) |
| for chunk in table.column('month').chunks: |
| actual = chunk.dictionary.to_pylist() |
| actual.sort() |
| assert actual == month_dictionary |
| |
| |
| def _create_single_file(base_dir, table=None, row_group_size=None): |
| import pyarrow.parquet as pq |
| if table is None: |
| table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5}) |
| path = base_dir / "test.parquet" |
| pq.write_table(table, path, row_group_size=row_group_size) |
| return table, path |
| |
| |
| def _create_directory_of_files(base_dir): |
| import pyarrow.parquet as pq |
| table1 = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5}) |
| path1 = base_dir / "test1.parquet" |
| pq.write_table(table1, path1) |
| table2 = pa.table({'a': range(9, 18), 'b': [0.] * 4 + [1.] * 5}) |
| path2 = base_dir / "test2.parquet" |
| pq.write_table(table2, path2) |
| return (table1, table2), (path1, path2) |
| |
| |
| def _check_dataset(dataset, table): |
| # also test that pickle roundtrip keeps the functionality |
| for d in [dataset, pickle.loads(pickle.dumps(dataset))]: |
| assert dataset.schema.equals(table.schema) |
| assert dataset.to_table().equals(table) |
| |
| |
| def _check_dataset_from_path(path, table, **kwargs): |
| # pathlib object |
| assert isinstance(path, pathlib.Path) |
| |
| # accept Path, str, List[Path], List[str] |
| for p in [path, str(path), [path], [str(path)]]: |
| dataset = ds.dataset(path, **kwargs) |
| assert isinstance(dataset, ds.FileSystemDataset) |
| _check_dataset(dataset, table) |
| |
| # relative string path |
| with change_cwd(path.parent): |
| dataset = ds.dataset(path.name, **kwargs) |
| assert isinstance(dataset, ds.FileSystemDataset) |
| _check_dataset(dataset, table) |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_single_file(tempdir): |
| table, path = _create_single_file(tempdir) |
| _check_dataset_from_path(path, table) |
| |
| |
| @pytest.mark.parquet |
| def test_deterministic_row_order(tempdir): |
| # ARROW-8447 Ensure that dataset.to_table (and Scanner::ToTable) returns a |
| # deterministic row ordering. This is achieved by constructing a single |
| # parquet file with one row per RowGroup. |
| table, path = _create_single_file(tempdir, row_group_size=1) |
| _check_dataset_from_path(path, table) |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_directory(tempdir): |
| tables, _ = _create_directory_of_files(tempdir) |
| table = pa.concat_tables(tables) |
| _check_dataset_from_path(tempdir, table) |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_list_of_files(tempdir): |
| tables, (path1, path2) = _create_directory_of_files(tempdir) |
| table = pa.concat_tables(tables) |
| |
| datasets = [ |
| ds.dataset([path1, path2]), |
| ds.dataset([str(path1), str(path2)]) |
| ] |
| datasets += [ |
| pickle.loads(pickle.dumps(d)) for d in datasets |
| ] |
| |
| for dataset in datasets: |
| assert dataset.schema.equals(table.schema) |
| result = dataset.to_table() |
| assert result.equals(table) |
| |
| |
| def test_construct_from_single_file(tempdir): |
| directory = tempdir / 'single-file' |
| directory.mkdir() |
| table, path = _create_single_file(directory) |
| relative_path = path.relative_to(directory) |
| |
| # instantiate from a single file |
| d1 = ds.dataset(path) |
| # instantiate from a single file with a filesystem object |
| d2 = ds.dataset(path, filesystem=fs.LocalFileSystem()) |
| # instantiate from a single file with prefixed filesystem URI |
| d3 = ds.dataset(str(relative_path), filesystem=_filesystem_uri(directory)) |
| # pickle roundtrip |
| d4 = pickle.loads(pickle.dumps(d1)) |
| |
| assert d1.to_table() == d2.to_table() == d3.to_table() == d4.to_table() |
| |
| |
| def test_construct_from_single_directory(tempdir): |
| directory = tempdir / 'single-directory' |
| directory.mkdir() |
| tables, paths = _create_directory_of_files(directory) |
| |
| d1 = ds.dataset(directory) |
| d2 = ds.dataset(directory, filesystem=fs.LocalFileSystem()) |
| d3 = ds.dataset(directory.name, filesystem=_filesystem_uri(tempdir)) |
| t1 = d1.to_table() |
| t2 = d2.to_table() |
| t3 = d3.to_table() |
| assert t1 == t2 == t3 |
| |
| # test pickle roundtrip |
| for d in [d1, d2, d3]: |
| restored = pickle.loads(pickle.dumps(d)) |
| assert restored.to_table() == t1 |
| |
| |
| def test_construct_from_list_of_files(tempdir): |
| # instantiate from a list of files |
| directory = tempdir / 'list-of-files' |
| directory.mkdir() |
| tables, paths = _create_directory_of_files(directory) |
| |
| relative_paths = [p.relative_to(tempdir) for p in paths] |
| with change_cwd(tempdir): |
| d1 = ds.dataset(relative_paths) |
| t1 = d1.to_table() |
| assert len(t1) == sum(map(len, tables)) |
| |
| d2 = ds.dataset(relative_paths, filesystem=_filesystem_uri(tempdir)) |
| t2 = d2.to_table() |
| d3 = ds.dataset(paths) |
| t3 = d3.to_table() |
| d4 = ds.dataset(paths, filesystem=fs.LocalFileSystem()) |
| t4 = d4.to_table() |
| |
| assert t1 == t2 == t3 == t4 |
| |
| |
| def test_construct_from_list_of_mixed_paths_fails(mockfs): |
| # isntantiate from a list of mixed paths |
| files = [ |
| 'subdir/1/xxx/file0.parquet', |
| 'subdir/1/xxx/doesnt-exist.parquet', |
| ] |
| with pytest.raises(FileNotFoundError, match='doesnt-exist'): |
| ds.dataset(files, filesystem=mockfs) |
| |
| |
| def test_construct_from_mixed_child_datasets(mockfs): |
| # isntantiate from a list of mixed paths |
| a = ds.dataset(['subdir/1/xxx/file0.parquet', |
| 'subdir/2/yyy/file1.parquet'], filesystem=mockfs) |
| b = ds.dataset('subdir', filesystem=mockfs) |
| |
| dataset = ds.dataset([a, b]) |
| |
| assert isinstance(dataset, ds.UnionDataset) |
| assert len(list(dataset.get_fragments())) == 4 |
| |
| table = dataset.to_table() |
| assert len(table) == 20 |
| assert table.num_columns == 4 |
| |
| assert len(dataset.children) == 2 |
| for child in dataset.children: |
| assert child.files == ['subdir/1/xxx/file0.parquet', |
| 'subdir/2/yyy/file1.parquet'] |
| |
| |
| def test_construct_empty_dataset(): |
| empty = ds.dataset([]) |
| table = empty.to_table() |
| assert table.num_rows == 0 |
| assert table.num_columns == 0 |
| |
| |
| def test_construct_dataset_with_invalid_schema(): |
| empty = ds.dataset([], schema=pa.schema([ |
| ('a', pa.int64()), |
| ('a', pa.string()) |
| ])) |
| with pytest.raises(ValueError, match='Multiple matches for .*a.* in '): |
| empty.to_table() |
| |
| |
| def test_construct_from_invalid_sources_raise(multisourcefs): |
| child1 = ds.FileSystemDatasetFactory( |
| multisourcefs, |
| fs.FileSelector('/plain'), |
| format=ds.ParquetFileFormat() |
| ) |
| child2 = ds.FileSystemDatasetFactory( |
| multisourcefs, |
| fs.FileSelector('/schema'), |
| format=ds.ParquetFileFormat() |
| ) |
| batch1 = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["a"]) |
| batch2 = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["b"]) |
| |
| with pytest.raises(TypeError, match='Expected.*FileSystemDatasetFactory'): |
| ds.dataset([child1, child2]) |
| |
| expected = ( |
| "Expected a list of path-like or dataset objects, or a list " |
| "of batches or tables. The given list contains the following " |
| "types: int" |
| ) |
| with pytest.raises(TypeError, match=expected): |
| ds.dataset([1, 2, 3]) |
| |
| expected = ( |
| "Expected a path-like, list of path-likes or a list of Datasets " |
| "instead of the given type: NoneType" |
| ) |
| with pytest.raises(TypeError, match=expected): |
| ds.dataset(None) |
| |
| expected = ( |
| "Must provide schema to construct in-memory dataset from an iterable" |
| ) |
| with pytest.raises(ValueError, match=expected): |
| ds.dataset((batch1 for _ in range(3))) |
| |
| expected = ( |
| "Must provide schema to construct in-memory dataset from an empty list" |
| ) |
| with pytest.raises(ValueError, match=expected): |
| ds.InMemoryDataset([]) |
| |
| expected = ( |
| "Item has schema\nb: int64\nwhich does not match expected schema\n" |
| "a: int64" |
| ) |
| with pytest.raises(TypeError, match=expected): |
| ds.dataset([batch1, batch2]) |
| |
| expected = ( |
| "Expected a list of path-like or dataset objects, or a list of " |
| "batches or tables. The given list contains the following types:" |
| ) |
| with pytest.raises(TypeError, match=expected): |
| ds.dataset([batch1, 0]) |
| |
| expected = ( |
| "Expected a list of tables or batches. The given list contains a int" |
| ) |
| with pytest.raises(TypeError, match=expected): |
| ds.InMemoryDataset([batch1, 0]) |
| |
| |
| def test_construct_in_memory(): |
| batch = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["a"]) |
| table = pa.Table.from_batches([batch]) |
| reader = pa.ipc.RecordBatchReader.from_batches(batch.schema, [batch]) |
| iterable = (batch for _ in range(1)) |
| |
| for source in (batch, table, reader, [batch], [table]): |
| dataset = ds.dataset(source) |
| assert dataset.to_table() == table |
| |
| assert ds.dataset(iterable, schema=batch.schema).to_table().equals(table) |
| assert ds.dataset([], schema=pa.schema([])).to_table() == pa.table([]) |
| |
| # When constructed from batches/tables, should be reusable |
| for source in (batch, table, [batch], [table]): |
| dataset = ds.dataset(source) |
| assert len(list(dataset.get_fragments())) == 1 |
| assert len(list(dataset.get_fragments())) == 1 |
| assert dataset.to_table() == table |
| assert dataset.to_table() == table |
| assert next(dataset.get_fragments()).to_table() == table |
| |
| # When constructed from readers/iterators, should be one-shot |
| match = "InMemoryDataset was already consumed" |
| for factory in ( |
| lambda: pa.ipc.RecordBatchReader.from_batches( |
| batch.schema, [batch]), |
| lambda: (batch for _ in range(1)), |
| ): |
| dataset = ds.dataset(factory(), schema=batch.schema) |
| # Getting fragments consumes the underlying iterator |
| fragments = list(dataset.get_fragments()) |
| assert len(fragments) == 1 |
| assert fragments[0].to_table() == table |
| with pytest.raises(pa.ArrowInvalid, match=match): |
| list(dataset.get_fragments()) |
| with pytest.raises(pa.ArrowInvalid, match=match): |
| dataset.to_table() |
| # Materializing consumes the underlying iterator |
| dataset = ds.dataset(factory(), schema=batch.schema) |
| assert dataset.to_table() == table |
| with pytest.raises(pa.ArrowInvalid, match=match): |
| list(dataset.get_fragments()) |
| with pytest.raises(pa.ArrowInvalid, match=match): |
| dataset.to_table() |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_partitioned_directory(tempdir): |
| import pyarrow.parquet as pq |
| table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5}) |
| |
| path = tempdir / "dataset" |
| path.mkdir() |
| |
| for part in range(3): |
| part = path / "part={}".format(part) |
| part.mkdir() |
| pq.write_table(table, part / "test.parquet") |
| |
| # no partitioning specified, just read all individual files |
| full_table = pa.concat_tables([table] * 3) |
| _check_dataset_from_path(path, full_table) |
| |
| # specify partition scheme with discovery |
| dataset = ds.dataset( |
| str(path), partitioning=ds.partitioning(flavor="hive")) |
| expected_schema = table.schema.append(pa.field("part", pa.int32())) |
| assert dataset.schema.equals(expected_schema) |
| |
| # specify partition scheme with discovery and relative path |
| with change_cwd(tempdir): |
| dataset = ds.dataset( |
| "dataset/", partitioning=ds.partitioning(flavor="hive")) |
| expected_schema = table.schema.append(pa.field("part", pa.int32())) |
| assert dataset.schema.equals(expected_schema) |
| |
| # specify partition scheme with string short-cut |
| dataset = ds.dataset(str(path), partitioning="hive") |
| assert dataset.schema.equals(expected_schema) |
| |
| # specify partition scheme with explicit scheme |
| dataset = ds.dataset( |
| str(path), |
| partitioning=ds.partitioning( |
| pa.schema([("part", pa.int8())]), flavor="hive")) |
| expected_schema = table.schema.append(pa.field("part", pa.int8())) |
| assert dataset.schema.equals(expected_schema) |
| |
| result = dataset.to_table() |
| expected = full_table.append_column( |
| "part", pa.array(np.repeat([0, 1, 2], 9), type=pa.int8())) |
| assert result.equals(expected) |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_filesystem(tempdir): |
| # single file |
| table, path = _create_single_file(tempdir) |
| |
| # filesystem inferred from path |
| dataset1 = ds.dataset(str(path)) |
| assert dataset1.schema.equals(table.schema) |
| |
| # filesystem specified |
| dataset2 = ds.dataset(str(path), filesystem=fs.LocalFileSystem()) |
| assert dataset2.schema.equals(table.schema) |
| |
| # local filesystem specified with relative path |
| with change_cwd(tempdir): |
| dataset3 = ds.dataset("test.parquet", filesystem=fs.LocalFileSystem()) |
| assert dataset3.schema.equals(table.schema) |
| |
| # passing different filesystem |
| with pytest.raises(FileNotFoundError): |
| ds.dataset(str(path), filesystem=fs._MockFileSystem()) |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_unsupported_format(tempdir): |
| _, path = _create_single_file(tempdir) |
| with pytest.raises(ValueError, match="format 'blabla' is not supported"): |
| ds.dataset([path], format="blabla") |
| |
| |
| @pytest.mark.parquet |
| def test_open_union_dataset(tempdir): |
| _, path = _create_single_file(tempdir) |
| dataset = ds.dataset(path) |
| |
| union = ds.dataset([dataset, dataset]) |
| assert isinstance(union, ds.UnionDataset) |
| |
| pickled = pickle.loads(pickle.dumps(union)) |
| assert pickled.to_table() == union.to_table() |
| |
| |
| def test_open_union_dataset_with_additional_kwargs(multisourcefs): |
| child = ds.dataset('/plain', filesystem=multisourcefs, format='parquet') |
| with pytest.raises(ValueError, match="cannot pass any additional"): |
| ds.dataset([child], format="parquet") |
| |
| |
| def test_open_dataset_non_existing_file(): |
| # ARROW-8213: Opening a dataset with a local incorrect path gives confusing |
| # error message |
| with pytest.raises(FileNotFoundError): |
| ds.dataset('i-am-not-existing.parquet', format='parquet') |
| |
| with pytest.raises(pa.ArrowInvalid, match='cannot be relative'): |
| ds.dataset('file:i-am-not-existing.parquet', format='parquet') |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.parametrize('partitioning', ["directory", "hive"]) |
| @pytest.mark.parametrize('null_fallback', ['xyz', None]) |
| @pytest.mark.parametrize('infer_dictionary', [False, True]) |
| @pytest.mark.parametrize('partition_keys', [ |
| (["A", "B", "C"], [1, 2, 3]), |
| ([1, 2, 3], ["A", "B", "C"]), |
| (["A", "B", "C"], ["D", "E", "F"]), |
| ([1, 2, 3], [4, 5, 6]), |
| ([1, None, 3], ["A", "B", "C"]), |
| ([1, 2, 3], ["A", None, "C"]), |
| ([None, 2, 3], [None, 2, 3]), |
| ]) |
| def test_partition_discovery( |
| tempdir, partitioning, null_fallback, infer_dictionary, partition_keys |
| ): |
| # ARROW-9288 / ARROW-9476 |
| import pyarrow.parquet as pq |
| |
| table = pa.table({'a': range(9), 'b': [0.0] * 4 + [1.0] * 5}) |
| |
| has_null = None in partition_keys[0] or None in partition_keys[1] |
| if partitioning == "directory" and has_null: |
| # Directory partitioning can't handle the first part being null |
| return |
| |
| if partitioning == "directory": |
| partitioning = ds.DirectoryPartitioning.discover( |
| ["part1", "part2"], infer_dictionary=infer_dictionary) |
| fmt = "{0}/{1}" |
| null_value = None |
| else: |
| if null_fallback: |
| partitioning = ds.HivePartitioning.discover( |
| infer_dictionary=infer_dictionary, null_fallback=null_fallback |
| ) |
| else: |
| partitioning = ds.HivePartitioning.discover( |
| infer_dictionary=infer_dictionary) |
| fmt = "part1={0}/part2={1}" |
| if null_fallback: |
| null_value = null_fallback |
| else: |
| null_value = "__HIVE_DEFAULT_PARTITION__" |
| |
| basepath = tempdir / "dataset" |
| basepath.mkdir() |
| |
| part_keys1, part_keys2 = partition_keys |
| for part1 in part_keys1: |
| for part2 in part_keys2: |
| path = basepath / \ |
| fmt.format(part1 or null_value, part2 or null_value) |
| path.mkdir(parents=True) |
| pq.write_table(table, path / "test.parquet") |
| |
| dataset = ds.dataset(str(basepath), partitioning=partitioning) |
| |
| def expected_type(key): |
| if infer_dictionary: |
| value_type = pa.string() if isinstance(key, str) else pa.int32() |
| return pa.dictionary(pa.int32(), value_type) |
| else: |
| return pa.string() if isinstance(key, str) else pa.int32() |
| expected_schema = table.schema.append( |
| pa.field("part1", expected_type(part_keys1[0])) |
| ).append( |
| pa.field("part2", expected_type(part_keys2[0])) |
| ) |
| assert dataset.schema.equals(expected_schema) |
| |
| |
| @pytest.mark.pandas |
| def test_dataset_partitioned_dictionary_type_reconstruct(tempdir): |
| # https://issues.apache.org/jira/browse/ARROW-11400 |
| table = pa.table({'part': np.repeat(['A', 'B'], 5), 'col': range(10)}) |
| part = ds.partitioning(table.select(['part']).schema, flavor="hive") |
| ds.write_dataset(table, tempdir, partitioning=part, format="feather") |
| |
| dataset = ds.dataset( |
| tempdir, format="feather", |
| partitioning=ds.HivePartitioning.discover(infer_dictionary=True) |
| ) |
| expected = pa.table( |
| {'col': table['col'], 'part': table['part'].dictionary_encode()} |
| ) |
| assert dataset.to_table().equals(expected) |
| fragment = list(dataset.get_fragments())[0] |
| assert fragment.to_table(schema=dataset.schema).equals(expected[:5]) |
| part_expr = fragment.partition_expression |
| |
| restored = pickle.loads(pickle.dumps(dataset)) |
| assert restored.to_table().equals(expected) |
| |
| restored = pickle.loads(pickle.dumps(fragment)) |
| assert restored.to_table(schema=dataset.schema).equals(expected[:5]) |
| # to_pandas call triggers computation of the actual dictionary values |
| assert restored.to_table(schema=dataset.schema).to_pandas().equals( |
| expected[:5].to_pandas() |
| ) |
| assert restored.partition_expression.equals(part_expr) |
| |
| |
| @pytest.fixture |
| def s3_example_simple(s3_connection, s3_server): |
| from pyarrow.fs import FileSystem |
| import pyarrow.parquet as pq |
| |
| host, port, access_key, secret_key = s3_connection |
| uri = ( |
| "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}" |
| .format(access_key, secret_key, host, port) |
| ) |
| |
| fs, path = FileSystem.from_uri(uri) |
| |
| fs.create_dir("mybucket") |
| table = pa.table({'a': [1, 2, 3]}) |
| with fs.open_output_stream("mybucket/data.parquet") as out: |
| pq.write_table(table, out) |
| |
| return table, path, fs, uri, host, port, access_key, secret_key |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.s3 |
| def test_open_dataset_from_uri_s3(s3_example_simple): |
| # open dataset from non-localfs string path |
| table, path, fs, uri, _, _, _, _ = s3_example_simple |
| |
| # full string URI |
| dataset = ds.dataset(uri, format="parquet") |
| assert dataset.to_table().equals(table) |
| |
| # passing filesystem object |
| dataset = ds.dataset(path, format="parquet", filesystem=fs) |
| assert dataset.to_table().equals(table) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.s3 # still needed to create the data |
| def test_open_dataset_from_uri_s3_fsspec(s3_example_simple): |
| table, path, _, _, host, port, access_key, secret_key = s3_example_simple |
| s3fs = pytest.importorskip("s3fs") |
| |
| from pyarrow.fs import PyFileSystem, FSSpecHandler |
| |
| fs = s3fs.S3FileSystem( |
| key=access_key, |
| secret=secret_key, |
| client_kwargs={ |
| 'endpoint_url': 'http://{}:{}'.format(host, port) |
| } |
| ) |
| |
| # passing as fsspec filesystem |
| dataset = ds.dataset(path, format="parquet", filesystem=fs) |
| assert dataset.to_table().equals(table) |
| |
| # directly passing the fsspec-handler |
| fs = PyFileSystem(FSSpecHandler(fs)) |
| dataset = ds.dataset(path, format="parquet", filesystem=fs) |
| assert dataset.to_table().equals(table) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.s3 |
| def test_open_dataset_from_s3_with_filesystem_uri(s3_connection, s3_server): |
| from pyarrow.fs import FileSystem |
| import pyarrow.parquet as pq |
| |
| host, port, access_key, secret_key = s3_connection |
| bucket = 'theirbucket' |
| path = 'nested/folder/data.parquet' |
| uri = "s3://{}:{}@{}/{}?scheme=http&endpoint_override={}:{}".format( |
| access_key, secret_key, bucket, path, host, port |
| ) |
| |
| fs, path = FileSystem.from_uri(uri) |
| assert path == 'theirbucket/nested/folder/data.parquet' |
| |
| fs.create_dir(bucket) |
| |
| table = pa.table({'a': [1, 2, 3]}) |
| with fs.open_output_stream(path) as out: |
| pq.write_table(table, out) |
| |
| # full string URI |
| dataset = ds.dataset(uri, format="parquet") |
| assert dataset.to_table().equals(table) |
| |
| # passing filesystem as an uri |
| template = ( |
| "s3://{}:{}@{{}}?scheme=http&endpoint_override={}:{}".format( |
| access_key, secret_key, host, port |
| ) |
| ) |
| cases = [ |
| ('theirbucket/nested/folder/', '/data.parquet'), |
| ('theirbucket/nested/folder', 'data.parquet'), |
| ('theirbucket/nested/', 'folder/data.parquet'), |
| ('theirbucket/nested', 'folder/data.parquet'), |
| ('theirbucket', '/nested/folder/data.parquet'), |
| ('theirbucket', 'nested/folder/data.parquet'), |
| ] |
| for prefix, path in cases: |
| uri = template.format(prefix) |
| dataset = ds.dataset(path, filesystem=uri, format="parquet") |
| assert dataset.to_table().equals(table) |
| |
| with pytest.raises(pa.ArrowInvalid, match='Missing bucket name'): |
| uri = template.format('/') |
| ds.dataset('/theirbucket/nested/folder/data.parquet', filesystem=uri) |
| |
| error = ( |
| "The path component of the filesystem URI must point to a directory " |
| "but it has a type: `{}`. The path component is `{}` and the given " |
| "filesystem URI is `{}`" |
| ) |
| |
| path = 'theirbucket/doesnt/exist' |
| uri = template.format(path) |
| with pytest.raises(ValueError) as exc: |
| ds.dataset('data.parquet', filesystem=uri) |
| assert str(exc.value) == error.format('NotFound', path, uri) |
| |
| path = 'theirbucket/nested/folder/data.parquet' |
| uri = template.format(path) |
| with pytest.raises(ValueError) as exc: |
| ds.dataset('data.parquet', filesystem=uri) |
| assert str(exc.value) == error.format('File', path, uri) |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_from_fsspec(tempdir): |
| table, path = _create_single_file(tempdir) |
| |
| fsspec = pytest.importorskip("fsspec") |
| |
| localfs = fsspec.filesystem("file") |
| dataset = ds.dataset(path, filesystem=localfs) |
| assert dataset.schema.equals(table.schema) |
| |
| |
| @pytest.mark.pandas |
| def test_filter_timestamp(tempdir): |
| # ARROW-11379 |
| path = tempdir / "test_partition_timestamps" |
| |
| table = pa.table({ |
| "dates": ['2012-01-01', '2012-01-02'] * 5, |
| "id": range(10)}) |
| |
| # write dataset partitioned on dates (as strings) |
| part = ds.partitioning(table.select(['dates']).schema, flavor="hive") |
| ds.write_dataset(table, path, partitioning=part, format="feather") |
| |
| # read dataset partitioned on dates (as timestamps) |
| part = ds.partitioning(pa.schema([("dates", pa.timestamp("s"))]), |
| flavor="hive") |
| dataset = ds.dataset(path, format="feather", partitioning=part) |
| |
| condition = ds.field("dates") > pd.Timestamp("2012-01-01") |
| table = dataset.to_table(filter=condition) |
| assert table.column('id').to_pylist() == [1, 3, 5, 7, 9] |
| |
| import datetime |
| condition = ds.field("dates") > datetime.datetime(2012, 1, 1) |
| table = dataset.to_table(filter=condition) |
| assert table.column('id').to_pylist() == [1, 3, 5, 7, 9] |
| |
| |
| @pytest.mark.parquet |
| def test_filter_implicit_cast(tempdir): |
| # ARROW-7652 |
| table = pa.table({'a': pa.array([0, 1, 2, 3, 4, 5], type=pa.int8())}) |
| _, path = _create_single_file(tempdir, table) |
| dataset = ds.dataset(str(path)) |
| |
| filter_ = ds.field('a') > 2 |
| assert len(dataset.to_table(filter=filter_)) == 3 |
| |
| |
| def test_dataset_union(multisourcefs): |
| child = ds.FileSystemDatasetFactory( |
| multisourcefs, fs.FileSelector('/plain'), |
| format=ds.ParquetFileFormat() |
| ) |
| factory = ds.UnionDatasetFactory([child]) |
| |
| # TODO(bkietz) reintroduce factory.children property |
| assert len(factory.inspect_schemas()) == 1 |
| assert all(isinstance(s, pa.Schema) for s in factory.inspect_schemas()) |
| assert factory.inspect_schemas()[0].equals(child.inspect()) |
| assert factory.inspect().equals(child.inspect()) |
| assert isinstance(factory.finish(), ds.Dataset) |
| |
| |
| def test_union_dataset_from_other_datasets(tempdir, multisourcefs): |
| child1 = ds.dataset('/plain', filesystem=multisourcefs, format='parquet') |
| child2 = ds.dataset('/schema', filesystem=multisourcefs, format='parquet', |
| partitioning=['week', 'color']) |
| child3 = ds.dataset('/hive', filesystem=multisourcefs, format='parquet', |
| partitioning='hive') |
| |
| assert child1.schema != child2.schema != child3.schema |
| |
| assembled = ds.dataset([child1, child2, child3]) |
| assert isinstance(assembled, ds.UnionDataset) |
| |
| msg = 'cannot pass any additional arguments' |
| with pytest.raises(ValueError, match=msg): |
| ds.dataset([child1, child2], filesystem=multisourcefs) |
| |
| expected_schema = pa.schema([ |
| ('date', pa.date32()), |
| ('index', pa.int64()), |
| ('value', pa.float64()), |
| ('color', pa.string()), |
| ('week', pa.int32()), |
| ('year', pa.int32()), |
| ('month', pa.int32()), |
| ]) |
| assert assembled.schema.equals(expected_schema) |
| assert assembled.to_table().schema.equals(expected_schema) |
| |
| assembled = ds.dataset([child1, child3]) |
| expected_schema = pa.schema([ |
| ('date', pa.date32()), |
| ('index', pa.int64()), |
| ('value', pa.float64()), |
| ('color', pa.string()), |
| ('year', pa.int32()), |
| ('month', pa.int32()), |
| ]) |
| assert assembled.schema.equals(expected_schema) |
| assert assembled.to_table().schema.equals(expected_schema) |
| |
| expected_schema = pa.schema([ |
| ('month', pa.int32()), |
| ('color', pa.string()), |
| ('date', pa.date32()), |
| ]) |
| assembled = ds.dataset([child1, child3], schema=expected_schema) |
| assert assembled.to_table().schema.equals(expected_schema) |
| |
| expected_schema = pa.schema([ |
| ('month', pa.int32()), |
| ('color', pa.string()), |
| ('unknown', pa.string()) # fill with nulls |
| ]) |
| assembled = ds.dataset([child1, child3], schema=expected_schema) |
| assert assembled.to_table().schema.equals(expected_schema) |
| |
| # incompatible schemas, date and index columns have conflicting types |
| table = pa.table([range(9), [0.] * 4 + [1.] * 5, 'abcdefghj'], |
| names=['date', 'value', 'index']) |
| _, path = _create_single_file(tempdir, table=table) |
| child4 = ds.dataset(path) |
| |
| with pytest.raises(pa.ArrowInvalid, match='Unable to merge'): |
| ds.dataset([child1, child4]) |
| |
| |
| def test_dataset_from_a_list_of_local_directories_raises(multisourcefs): |
| msg = 'points to a directory, but only file paths are supported' |
| with pytest.raises(IsADirectoryError, match=msg): |
| ds.dataset(['/plain', '/schema', '/hive'], filesystem=multisourcefs) |
| |
| |
| def test_union_dataset_filesystem_datasets(multisourcefs): |
| # without partitioning |
| dataset = ds.dataset([ |
| ds.dataset('/plain', filesystem=multisourcefs), |
| ds.dataset('/schema', filesystem=multisourcefs), |
| ds.dataset('/hive', filesystem=multisourcefs), |
| ]) |
| expected_schema = pa.schema([ |
| ('date', pa.date32()), |
| ('index', pa.int64()), |
| ('value', pa.float64()), |
| ('color', pa.string()), |
| ]) |
| assert dataset.schema.equals(expected_schema) |
| |
| # with hive partitioning for two hive sources |
| dataset = ds.dataset([ |
| ds.dataset('/plain', filesystem=multisourcefs), |
| ds.dataset('/schema', filesystem=multisourcefs), |
| ds.dataset('/hive', filesystem=multisourcefs, partitioning='hive') |
| ]) |
| expected_schema = pa.schema([ |
| ('date', pa.date32()), |
| ('index', pa.int64()), |
| ('value', pa.float64()), |
| ('color', pa.string()), |
| ('year', pa.int32()), |
| ('month', pa.int32()), |
| ]) |
| assert dataset.schema.equals(expected_schema) |
| |
| |
| @pytest.mark.parquet |
| def test_specified_schema(tempdir): |
| import pyarrow.parquet as pq |
| |
| table = pa.table({'a': [1, 2, 3], 'b': [.1, .2, .3]}) |
| pq.write_table(table, tempdir / "data.parquet") |
| |
| def _check_dataset(schema, expected, expected_schema=None): |
| dataset = ds.dataset(str(tempdir / "data.parquet"), schema=schema) |
| if expected_schema is not None: |
| assert dataset.schema.equals(expected_schema) |
| else: |
| assert dataset.schema.equals(schema) |
| result = dataset.to_table() |
| assert result.equals(expected) |
| |
| # no schema specified |
| schema = None |
| expected = table |
| _check_dataset(schema, expected, expected_schema=table.schema) |
| |
| # identical schema specified |
| schema = table.schema |
| expected = table |
| _check_dataset(schema, expected) |
| |
| # Specifying schema with change column order |
| schema = pa.schema([('b', 'float64'), ('a', 'int64')]) |
| expected = pa.table([[.1, .2, .3], [1, 2, 3]], names=['b', 'a']) |
| _check_dataset(schema, expected) |
| |
| # Specifying schema with missing column |
| schema = pa.schema([('a', 'int64')]) |
| expected = pa.table([[1, 2, 3]], names=['a']) |
| _check_dataset(schema, expected) |
| |
| # Specifying schema with additional column |
| schema = pa.schema([('a', 'int64'), ('c', 'int32')]) |
| expected = pa.table([[1, 2, 3], |
| pa.array([None, None, None], type='int32')], |
| names=['a', 'c']) |
| _check_dataset(schema, expected) |
| |
| # Specifying with differing field types |
| schema = pa.schema([('a', 'int32'), ('b', 'float64')]) |
| dataset = ds.dataset(str(tempdir / "data.parquet"), schema=schema) |
| expected = pa.table([table['a'].cast('int32'), |
| table['b']], |
| names=['a', 'b']) |
| _check_dataset(schema, expected) |
| |
| # Specifying with incompatible schema |
| schema = pa.schema([('a', pa.list_(pa.int32())), ('b', 'float64')]) |
| dataset = ds.dataset(str(tempdir / "data.parquet"), schema=schema) |
| assert dataset.schema.equals(schema) |
| with pytest.raises(NotImplementedError, |
| match='Unsupported cast from int64 to list'): |
| dataset.to_table() |
| |
| |
| def test_ipc_format(tempdir): |
| table = pa.table({'a': pa.array([1, 2, 3], type="int8"), |
| 'b': pa.array([.1, .2, .3], type="float64")}) |
| |
| path = str(tempdir / 'test.arrow') |
| with pa.output_stream(path) as sink: |
| writer = pa.RecordBatchFileWriter(sink, table.schema) |
| writer.write_batch(table.to_batches()[0]) |
| writer.close() |
| |
| dataset = ds.dataset(path, format=ds.IpcFileFormat()) |
| result = dataset.to_table() |
| assert result.equals(table) |
| |
| for format_str in ["ipc", "arrow"]: |
| dataset = ds.dataset(path, format=format_str) |
| result = dataset.to_table() |
| assert result.equals(table) |
| |
| |
| @pytest.mark.pandas |
| def test_csv_format(tempdir): |
| table = pa.table({'a': pa.array([1, 2, 3], type="int64"), |
| 'b': pa.array([.1, .2, .3], type="float64")}) |
| |
| path = str(tempdir / 'test.csv') |
| table.to_pandas().to_csv(path, index=False) |
| |
| dataset = ds.dataset(path, format=ds.CsvFileFormat()) |
| result = dataset.to_table() |
| assert result.equals(table) |
| |
| dataset = ds.dataset(path, format='csv') |
| result = dataset.to_table() |
| assert result.equals(table) |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parametrize("compression", [ |
| "bz2", |
| "gzip", |
| "lz4", |
| "zstd", |
| ]) |
| def test_csv_format_compressed(tempdir, compression): |
| if not pyarrow.Codec.is_available(compression): |
| pytest.skip("{} support is not built".format(compression)) |
| table = pa.table({'a': pa.array([1, 2, 3], type="int64"), |
| 'b': pa.array([.1, .2, .3], type="float64")}) |
| filesystem = fs.LocalFileSystem() |
| suffix = compression if compression != 'gzip' else 'gz' |
| path = str(tempdir / f'test.csv.{suffix}') |
| with filesystem.open_output_stream(path, compression=compression) as sink: |
| # https://github.com/pandas-dev/pandas/issues/23854 |
| # With CI version of Pandas (anything < 1.2), Pandas tries to write |
| # str to the sink |
| csv_str = table.to_pandas().to_csv(index=False) |
| sink.write(csv_str.encode('utf-8')) |
| |
| dataset = ds.dataset(path, format=ds.CsvFileFormat()) |
| result = dataset.to_table() |
| assert result.equals(table) |
| |
| |
| def test_csv_format_options(tempdir): |
| path = str(tempdir / 'test.csv') |
| with open(path, 'w') as sink: |
| sink.write('skipped\ncol0\nfoo\nbar\n') |
| dataset = ds.dataset(path, format='csv') |
| result = dataset.to_table() |
| assert result.equals( |
| pa.table({'skipped': pa.array(['col0', 'foo', 'bar'])})) |
| |
| dataset = ds.dataset(path, format=ds.CsvFileFormat( |
| read_options=pa.csv.ReadOptions(skip_rows=1))) |
| result = dataset.to_table() |
| assert result.equals(pa.table({'col0': pa.array(['foo', 'bar'])})) |
| |
| dataset = ds.dataset(path, format=ds.CsvFileFormat( |
| read_options=pa.csv.ReadOptions(column_names=['foo']))) |
| result = dataset.to_table() |
| assert result.equals( |
| pa.table({'foo': pa.array(['skipped', 'col0', 'foo', 'bar'])})) |
| |
| |
| def test_csv_fragment_options(tempdir): |
| path = str(tempdir / 'test.csv') |
| with open(path, 'w') as sink: |
| sink.write('col0\nfoo\nspam\nMYNULL\n') |
| dataset = ds.dataset(path, format='csv') |
| convert_options = pyarrow.csv.ConvertOptions(null_values=['MYNULL'], |
| strings_can_be_null=True) |
| options = ds.CsvFragmentScanOptions( |
| convert_options=convert_options, |
| read_options=pa.csv.ReadOptions(block_size=2**16)) |
| result = dataset.to_table(fragment_scan_options=options) |
| assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])})) |
| |
| csv_format = ds.CsvFileFormat(convert_options=convert_options) |
| dataset = ds.dataset(path, format=csv_format) |
| result = dataset.to_table() |
| assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])})) |
| |
| options = ds.CsvFragmentScanOptions() |
| result = dataset.to_table(fragment_scan_options=options) |
| assert result.equals( |
| pa.table({'col0': pa.array(['foo', 'spam', 'MYNULL'])})) |
| |
| |
| def test_feather_format(tempdir): |
| from pyarrow.feather import write_feather |
| |
| table = pa.table({'a': pa.array([1, 2, 3], type="int8"), |
| 'b': pa.array([.1, .2, .3], type="float64")}) |
| |
| basedir = tempdir / "feather_dataset" |
| basedir.mkdir() |
| write_feather(table, str(basedir / "data.feather")) |
| |
| dataset = ds.dataset(basedir, format=ds.IpcFileFormat()) |
| result = dataset.to_table() |
| assert result.equals(table) |
| |
| dataset = ds.dataset(basedir, format="feather") |
| result = dataset.to_table() |
| assert result.equals(table) |
| |
| # ARROW-8641 - column selection order |
| result = dataset.to_table(columns=["b", "a"]) |
| assert result.column_names == ["b", "a"] |
| result = dataset.to_table(columns=["a", "a"]) |
| assert result.column_names == ["a", "a"] |
| |
| # error with Feather v1 files |
| write_feather(table, str(basedir / "data1.feather"), version=1) |
| with pytest.raises(ValueError): |
| ds.dataset(basedir, format="feather").to_table() |
| |
| |
| def _create_parquet_dataset_simple(root_path): |
| import pyarrow.parquet as pq |
| |
| metadata_collector = [] |
| |
| for i in range(4): |
| table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)}) |
| pq.write_to_dataset( |
| table, str(root_path), metadata_collector=metadata_collector |
| ) |
| |
| metadata_path = str(root_path / '_metadata') |
| # write _metadata file |
| pq.write_metadata( |
| table.schema, metadata_path, |
| metadata_collector=metadata_collector |
| ) |
| return metadata_path, table |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas # write_to_dataset currently requires pandas |
| def test_parquet_dataset_factory(tempdir): |
| root_path = tempdir / "test_parquet_dataset" |
| metadata_path, table = _create_parquet_dataset_simple(root_path) |
| dataset = ds.parquet_dataset(metadata_path) |
| assert dataset.schema.equals(table.schema) |
| assert len(dataset.files) == 4 |
| result = dataset.to_table() |
| assert result.num_rows == 40 |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_parquet_dataset_factory_invalid(tempdir): |
| root_path = tempdir / "test_parquet_dataset_invalid" |
| metadata_path, table = _create_parquet_dataset_simple(root_path) |
| # remove one of the files |
| list(root_path.glob("*.parquet"))[0].unlink() |
| dataset = ds.parquet_dataset(metadata_path) |
| assert dataset.schema.equals(table.schema) |
| assert len(dataset.files) == 4 |
| with pytest.raises(FileNotFoundError): |
| dataset.to_table() |
| |
| |
| def _create_metadata_file(root_path): |
| # create _metadata file from existing parquet dataset |
| import pyarrow.parquet as pq |
| |
| parquet_paths = list(sorted(root_path.rglob("*.parquet"))) |
| schema = pq.ParquetFile(parquet_paths[0]).schema.to_arrow_schema() |
| |
| metadata_collector = [] |
| for path in parquet_paths: |
| metadata = pq.ParquetFile(path).metadata |
| metadata.set_file_path(str(path.relative_to(root_path))) |
| metadata_collector.append(metadata) |
| |
| metadata_path = root_path / "_metadata" |
| pq.write_metadata( |
| schema, metadata_path, metadata_collector=metadata_collector |
| ) |
| return metadata_path |
| |
| |
| def _create_parquet_dataset_partitioned(root_path): |
| import pyarrow.parquet as pq |
| |
| table = pa.table([ |
| pa.array(range(20)), pa.array(np.random.randn(20)), |
| pa.array(np.repeat(['a', 'b'], 10))], |
| names=["f1", "f2", "part"] |
| ) |
| table = table.replace_schema_metadata({"key": "value"}) |
| pq.write_to_dataset(table, str(root_path), partition_cols=['part']) |
| return _create_metadata_file(root_path), table |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_parquet_dataset_factory_partitioned(tempdir): |
| root_path = tempdir / "test_parquet_dataset_factory_partitioned" |
| metadata_path, table = _create_parquet_dataset_partitioned(root_path) |
| |
| partitioning = ds.partitioning(flavor="hive") |
| dataset = ds.parquet_dataset(metadata_path, partitioning=partitioning) |
| |
| assert dataset.schema.equals(table.schema) |
| assert len(dataset.files) == 2 |
| result = dataset.to_table() |
| assert result.num_rows == 20 |
| |
| # the partitioned dataset does not preserve order |
| result = result.to_pandas().sort_values("f1").reset_index(drop=True) |
| expected = table.to_pandas() |
| pd.testing.assert_frame_equal(result, expected) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_parquet_dataset_factory_metadata(tempdir): |
| # ensure ParquetDatasetFactory preserves metadata (ARROW-9363) |
| root_path = tempdir / "test_parquet_dataset_factory_metadata" |
| metadata_path, table = _create_parquet_dataset_partitioned(root_path) |
| |
| dataset = ds.parquet_dataset(metadata_path, partitioning="hive") |
| assert dataset.schema.equals(table.schema) |
| assert b"key" in dataset.schema.metadata |
| |
| fragments = list(dataset.get_fragments()) |
| assert b"key" in fragments[0].physical_schema.metadata |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs): |
| fs, assert_opens = open_logging_fs |
| |
| # Test to ensure that no IO happens when filtering a dataset |
| # created with ParquetDatasetFactory from a _metadata file |
| |
| root_path = tempdir / "test_parquet_dataset_lazy_filtering" |
| metadata_path, _ = _create_parquet_dataset_simple(root_path) |
| |
| # creating the dataset should only open the metadata file |
| with assert_opens([metadata_path]): |
| dataset = ds.parquet_dataset( |
| metadata_path, |
| partitioning=ds.partitioning(flavor="hive"), |
| filesystem=fs) |
| |
| # materializing fragments should not open any file |
| with assert_opens([]): |
| fragments = list(dataset.get_fragments()) |
| |
| # filtering fragments should not open any file |
| with assert_opens([]): |
| list(dataset.get_fragments(ds.field("f1") > 15)) |
| |
| # splitting by row group should still not open any file |
| with assert_opens([]): |
| fragments[0].split_by_row_group(ds.field("f1") > 15) |
| |
| # ensuring metadata of splitted fragment should also not open any file |
| with assert_opens([]): |
| rg_fragments = fragments[0].split_by_row_group() |
| rg_fragments[0].ensure_complete_metadata() |
| |
| # FIXME(bkietz) on Windows this results in FileNotFoundErrors. |
| # but actually scanning does open files |
| # with assert_opens([f.path for f in fragments]): |
| # dataset.to_table() |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_dataset_schema_metadata(tempdir): |
| # ARROW-8802 |
| df = pd.DataFrame({'a': [1, 2, 3]}) |
| path = tempdir / "test.parquet" |
| df.to_parquet(path) |
| dataset = ds.dataset(path) |
| |
| schema = dataset.to_table().schema |
| projected_schema = dataset.to_table(columns=["a"]).schema |
| |
| # ensure the pandas metadata is included in the schema |
| assert b"pandas" in schema.metadata |
| # ensure it is still there in a projected schema (with column selection) |
| assert schema.equals(projected_schema, check_metadata=True) |
| |
| |
| @pytest.mark.parquet |
| def test_filter_mismatching_schema(tempdir): |
| # ARROW-9146 |
| import pyarrow.parquet as pq |
| |
| table = pa.table({"col": pa.array([1, 2, 3, 4], type='int32')}) |
| pq.write_table(table, str(tempdir / "data.parquet")) |
| |
| # specifying explicit schema, but that mismatches the schema of the data |
| schema = pa.schema([("col", pa.int64())]) |
| dataset = ds.dataset( |
| tempdir / "data.parquet", format="parquet", schema=schema) |
| |
| # filtering on a column with such type mismatch should implicitly |
| # cast the column |
| filtered = dataset.to_table(filter=ds.field("col") > 2) |
| assert filtered["col"].equals(table["col"].cast('int64').slice(2)) |
| |
| fragment = list(dataset.get_fragments())[0] |
| filtered = fragment.to_table(filter=ds.field("col") > 2, schema=schema) |
| assert filtered["col"].equals(table["col"].cast('int64').slice(2)) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_dataset_project_only_partition_columns(tempdir): |
| # ARROW-8729 |
| import pyarrow.parquet as pq |
| |
| table = pa.table({'part': 'a a b b'.split(), 'col': list(range(4))}) |
| |
| path = str(tempdir / 'test_dataset') |
| pq.write_to_dataset(table, path, partition_cols=['part']) |
| dataset = ds.dataset(path, partitioning='hive') |
| |
| all_cols = dataset.to_table(use_threads=False) |
| part_only = dataset.to_table(columns=['part'], use_threads=False) |
| |
| assert all_cols.column('part').equals(part_only.column('part')) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_dataset_project_null_column(tempdir): |
| import pandas as pd |
| df = pd.DataFrame({"col": np.array([None, None, None], dtype='object')}) |
| |
| f = tempdir / "test_dataset_project_null_column.parquet" |
| df.to_parquet(f, engine="pyarrow") |
| |
| dataset = ds.dataset(f, format="parquet", |
| schema=pa.schema([("col", pa.int64())])) |
| expected = pa.table({'col': pa.array([None, None, None], pa.int64())}) |
| assert dataset.to_table().equals(expected) |
| |
| |
| def test_dataset_project_columns(tempdir): |
| # basic column re-projection with expressions |
| from pyarrow import feather |
| table = pa.table({"A": [1, 2, 3], "B": [1., 2., 3.], "C": ["a", "b", "c"]}) |
| feather.write_feather(table, tempdir / "data.feather") |
| |
| dataset = ds.dataset(tempdir / "data.feather", format="feather") |
| result = dataset.to_table(columns={ |
| 'A_renamed': ds.field('A'), |
| 'B_as_int': ds.field('B').cast("int32", safe=False), |
| 'C_is_a': ds.field('C') == 'a' |
| }) |
| expected = pa.table({ |
| "A_renamed": [1, 2, 3], |
| "B_as_int": pa.array([1, 2, 3], type="int32"), |
| "C_is_a": [True, False, False], |
| }) |
| assert result.equals(expected) |
| |
| # raise proper error when not passing an expression |
| with pytest.raises(TypeError, match="Expected an Expression"): |
| dataset.to_table(columns={"A": "A"}) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_write_to_dataset_given_null_just_works(tempdir): |
| import pyarrow.parquet as pq |
| |
| schema = pa.schema([ |
| pa.field('col', pa.int64()), |
| pa.field('part', pa.dictionary(pa.int32(), pa.string())) |
| ]) |
| table = pa.table({'part': [None, None, 'a', 'a'], |
| 'col': list(range(4))}, schema=schema) |
| |
| path = str(tempdir / 'test_dataset') |
| pq.write_to_dataset(table, path, partition_cols=[ |
| 'part'], use_legacy_dataset=False) |
| |
| actual_table = pq.read_table(tempdir / 'test_dataset') |
| # column.equals can handle the difference in chunking but not the fact |
| # that `part` will have different dictionaries for the two chunks |
| assert actual_table.column('part').to_pylist( |
| ) == table.column('part').to_pylist() |
| assert actual_table.column('col').equals(table.column('col')) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_legacy_write_to_dataset_drops_null(tempdir): |
| import pyarrow.parquet as pq |
| |
| schema = pa.schema([ |
| pa.field('col', pa.int64()), |
| pa.field('part', pa.dictionary(pa.int32(), pa.string())) |
| ]) |
| table = pa.table({'part': ['a', 'a', None, None], |
| 'col': list(range(4))}, schema=schema) |
| expected = pa.table( |
| {'part': ['a', 'a'], 'col': list(range(2))}, schema=schema) |
| |
| path = str(tempdir / 'test_dataset') |
| pq.write_to_dataset(table, path, partition_cols=[ |
| 'part'], use_legacy_dataset=True) |
| |
| actual = pq.read_table(tempdir / 'test_dataset') |
| assert actual == expected |
| |
| |
| def _check_dataset_roundtrip(dataset, base_dir, expected_files, |
| base_dir_path=None, partitioning=None): |
| base_dir_path = base_dir_path or base_dir |
| |
| ds.write_dataset(dataset, base_dir, format="feather", |
| partitioning=partitioning, use_threads=False) |
| |
| # check that all files are present |
| file_paths = list(base_dir_path.rglob("*")) |
| assert set(file_paths) == set(expected_files) |
| |
| # check that reading back in as dataset gives the same result |
| dataset2 = ds.dataset( |
| base_dir_path, format="feather", partitioning=partitioning) |
| assert dataset2.to_table().equals(dataset.to_table()) |
| |
| |
| @pytest.mark.parquet |
| def test_write_dataset(tempdir): |
| # manually create a written dataset and read as dataset object |
| directory = tempdir / 'single-file' |
| directory.mkdir() |
| _ = _create_single_file(directory) |
| dataset = ds.dataset(directory) |
| |
| # full string path |
| target = tempdir / 'single-file-target' |
| expected_files = [target / "part-0.feather"] |
| _check_dataset_roundtrip(dataset, str(target), expected_files, target) |
| |
| # pathlib path object |
| target = tempdir / 'single-file-target2' |
| expected_files = [target / "part-0.feather"] |
| _check_dataset_roundtrip(dataset, target, expected_files, target) |
| |
| # TODO |
| # # relative path |
| # target = tempdir / 'single-file-target3' |
| # expected_files = [target / "part-0.ipc"] |
| # _check_dataset_roundtrip( |
| # dataset, './single-file-target3', expected_files, target) |
| |
| # Directory of files |
| directory = tempdir / 'single-directory' |
| directory.mkdir() |
| _ = _create_directory_of_files(directory) |
| dataset = ds.dataset(directory) |
| |
| target = tempdir / 'single-directory-target' |
| expected_files = [target / "part-0.feather"] |
| _check_dataset_roundtrip(dataset, str(target), expected_files, target) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_write_dataset_partitioned(tempdir): |
| directory = tempdir / "partitioned" |
| _ = _create_parquet_dataset_partitioned(directory) |
| partitioning = ds.partitioning(flavor="hive") |
| dataset = ds.dataset(directory, partitioning=partitioning) |
| |
| # hive partitioning |
| target = tempdir / 'partitioned-hive-target' |
| expected_paths = [ |
| target / "part=a", target / "part=a" / "part-0.feather", |
| target / "part=b", target / "part=b" / "part-1.feather" |
| ] |
| partitioning_schema = ds.partitioning( |
| pa.schema([("part", pa.string())]), flavor="hive") |
| _check_dataset_roundtrip( |
| dataset, str(target), expected_paths, target, |
| partitioning=partitioning_schema) |
| |
| # directory partitioning |
| target = tempdir / 'partitioned-dir-target' |
| expected_paths = [ |
| target / "a", target / "a" / "part-0.feather", |
| target / "b", target / "b" / "part-1.feather" |
| ] |
| partitioning_schema = ds.partitioning( |
| pa.schema([("part", pa.string())])) |
| _check_dataset_roundtrip( |
| dataset, str(target), expected_paths, target, |
| partitioning=partitioning_schema) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_write_dataset_partitioned_dict(tempdir): |
| directory = tempdir / "partitioned" |
| _ = _create_parquet_dataset_partitioned(directory) |
| |
| # directory partitioning, dictionary partition columns |
| dataset = ds.dataset( |
| directory, |
| partitioning=ds.HivePartitioning.discover(infer_dictionary=True)) |
| target = tempdir / 'partitioned-dir-target' |
| expected_paths = [ |
| target / "a", target / "a" / "part-0.feather", |
| target / "b", target / "b" / "part-1.feather" |
| ] |
| partitioning = ds.partitioning(pa.schema([ |
| dataset.schema.field('part')]), |
| dictionaries={'part': pa.array(['a', 'b'])}) |
| # NB: dictionaries required here since we use partitioning to parse |
| # directories in _check_dataset_roundtrip (not currently required for |
| # the formatting step) |
| _check_dataset_roundtrip( |
| dataset, str(target), expected_paths, target, |
| partitioning=partitioning) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_write_dataset_use_threads(tempdir): |
| directory = tempdir / "partitioned" |
| _ = _create_parquet_dataset_partitioned(directory) |
| dataset = ds.dataset(directory, partitioning="hive") |
| |
| partitioning = ds.partitioning( |
| pa.schema([("part", pa.string())]), flavor="hive") |
| |
| target1 = tempdir / 'partitioned1' |
| ds.write_dataset( |
| dataset, target1, format="feather", partitioning=partitioning, |
| use_threads=True |
| ) |
| target2 = tempdir / 'partitioned2' |
| ds.write_dataset( |
| dataset, target2, format="feather", partitioning=partitioning, |
| use_threads=False |
| ) |
| |
| # check that reading in gives same result |
| result1 = ds.dataset(target1, format="feather", partitioning=partitioning) |
| result2 = ds.dataset(target2, format="feather", partitioning=partitioning) |
| assert result1.to_table().equals(result2.to_table()) |
| |
| |
| def test_write_table(tempdir): |
| table = pa.table([ |
| pa.array(range(20)), pa.array(np.random.randn(20)), |
| pa.array(np.repeat(['a', 'b'], 10)) |
| ], names=["f1", "f2", "part"]) |
| |
| base_dir = tempdir / 'single' |
| ds.write_dataset(table, base_dir, |
| basename_template='dat_{i}.arrow', format="feather") |
| # check that all files are present |
| file_paths = list(base_dir.rglob("*")) |
| expected_paths = [base_dir / "dat_0.arrow"] |
| assert set(file_paths) == set(expected_paths) |
| # check Table roundtrip |
| result = ds.dataset(base_dir, format="ipc").to_table() |
| assert result.equals(table) |
| |
| # with partitioning |
| base_dir = tempdir / 'partitioned' |
| partitioning = ds.partitioning( |
| pa.schema([("part", pa.string())]), flavor="hive") |
| ds.write_dataset(table, base_dir, format="feather", |
| basename_template='dat_{i}.arrow', |
| partitioning=partitioning) |
| file_paths = list(base_dir.rglob("*")) |
| expected_paths = [ |
| base_dir / "part=a", base_dir / "part=a" / "dat_0.arrow", |
| base_dir / "part=b", base_dir / "part=b" / "dat_1.arrow" |
| ] |
| assert set(file_paths) == set(expected_paths) |
| result = ds.dataset(base_dir, format="ipc", partitioning=partitioning) |
| assert result.to_table().equals(table) |
| |
| |
| def test_write_table_multiple_fragments(tempdir): |
| table = pa.table([ |
| pa.array(range(10)), pa.array(np.random.randn(10)), |
| pa.array(np.repeat(['a', 'b'], 5)) |
| ], names=["f1", "f2", "part"]) |
| table = pa.concat_tables([table]*2) |
| |
| # Table with multiple batches written as single Fragment by default |
| base_dir = tempdir / 'single' |
| ds.write_dataset(table, base_dir, format="feather") |
| assert set(base_dir.rglob("*")) == set([base_dir / "part-0.feather"]) |
| assert ds.dataset(base_dir, format="ipc").to_table().equals(table) |
| |
| # Same for single-element list of Table |
| base_dir = tempdir / 'single-list' |
| ds.write_dataset([table], base_dir, format="feather") |
| assert set(base_dir.rglob("*")) == set([base_dir / "part-0.feather"]) |
| assert ds.dataset(base_dir, format="ipc").to_table().equals(table) |
| |
| # Provide list of batches to write multiple fragments |
| base_dir = tempdir / 'multiple' |
| ds.write_dataset(table.to_batches(), base_dir, format="feather") |
| assert set(base_dir.rglob("*")) == set( |
| [base_dir / "part-0.feather"]) |
| assert ds.dataset(base_dir, format="ipc").to_table().equals(table) |
| |
| # Provide list of tables to write multiple fragments |
| base_dir = tempdir / 'multiple-table' |
| ds.write_dataset([table, table], base_dir, format="feather") |
| assert set(base_dir.rglob("*")) == set( |
| [base_dir / "part-0.feather"]) |
| assert ds.dataset(base_dir, format="ipc").to_table().equals( |
| pa.concat_tables([table]*2) |
| ) |
| |
| |
| def test_write_iterable(tempdir): |
| table = pa.table([ |
| pa.array(range(20)), pa.array(np.random.randn(20)), |
| pa.array(np.repeat(['a', 'b'], 10)) |
| ], names=["f1", "f2", "part"]) |
| |
| base_dir = tempdir / 'inmemory_iterable' |
| ds.write_dataset((batch for batch in table.to_batches()), base_dir, |
| schema=table.schema, |
| basename_template='dat_{i}.arrow', format="feather") |
| result = ds.dataset(base_dir, format="ipc").to_table() |
| assert result.equals(table) |
| |
| base_dir = tempdir / 'inmemory_reader' |
| reader = pa.ipc.RecordBatchReader.from_batches(table.schema, |
| table.to_batches()) |
| ds.write_dataset(reader, base_dir, |
| basename_template='dat_{i}.arrow', format="feather") |
| result = ds.dataset(base_dir, format="ipc").to_table() |
| assert result.equals(table) |
| |
| |
| def test_write_table_partitioned_dict(tempdir): |
| # ensure writing table partitioned on a dictionary column works without |
| # specifying the dictionary values explicitly |
| table = pa.table([ |
| pa.array(range(20)), |
| pa.array(np.repeat(['a', 'b'], 10)).dictionary_encode(), |
| ], names=['col', 'part']) |
| |
| partitioning = ds.partitioning(table.select(["part"]).schema) |
| |
| base_dir = tempdir / "dataset" |
| ds.write_dataset( |
| table, base_dir, format="feather", partitioning=partitioning |
| ) |
| |
| # check roundtrip |
| partitioning_read = ds.DirectoryPartitioning.discover( |
| ["part"], infer_dictionary=True) |
| result = ds.dataset( |
| base_dir, format="ipc", partitioning=partitioning_read |
| ).to_table() |
| assert result.equals(table) |
| |
| |
| @pytest.mark.parquet |
| def test_write_dataset_parquet(tempdir): |
| import pyarrow.parquet as pq |
| |
| table = pa.table([ |
| pa.array(range(20)), pa.array(np.random.randn(20)), |
| pa.array(np.repeat(['a', 'b'], 10)) |
| ], names=["f1", "f2", "part"]) |
| |
| # using default "parquet" format string |
| |
| base_dir = tempdir / 'parquet_dataset' |
| ds.write_dataset(table, base_dir, format="parquet") |
| # check that all files are present |
| file_paths = list(base_dir.rglob("*")) |
| expected_paths = [base_dir / "part-0.parquet"] |
| assert set(file_paths) == set(expected_paths) |
| # check Table roundtrip |
| result = ds.dataset(base_dir, format="parquet").to_table() |
| assert result.equals(table) |
| |
| # using custom options |
| for version in ["1.0", "2.0"]: |
| format = ds.ParquetFileFormat() |
| opts = format.make_write_options(version=version) |
| base_dir = tempdir / 'parquet_dataset_version{0}'.format(version) |
| ds.write_dataset(table, base_dir, format=format, file_options=opts) |
| meta = pq.read_metadata(base_dir / "part-0.parquet") |
| assert meta.format_version == version |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_write_dataset_arrow_schema_metadata(tempdir): |
| # ensure we serialize ARROW schema in the parquet metadata, to have a |
| # correct roundtrip (e.g. preserve non-UTC timezone) |
| import pyarrow.parquet as pq |
| |
| table = pa.table({"a": [pd.Timestamp("2012-01-01", tz="Europe/Brussels")]}) |
| assert table["a"].type.tz == "Europe/Brussels" |
| |
| ds.write_dataset(table, tempdir, format="parquet") |
| result = pq.read_table(tempdir / "part-0.parquet") |
| assert result["a"].type.tz == "Europe/Brussels" |
| |
| |
| def test_write_dataset_schema_metadata(tempdir): |
| # ensure that schema metadata gets written |
| from pyarrow import feather |
| |
| table = pa.table({'a': [1, 2, 3]}) |
| table = table.replace_schema_metadata({b'key': b'value'}) |
| ds.write_dataset(table, tempdir, format="feather") |
| |
| schema = feather.read_table(tempdir / "part-0.feather").schema |
| assert schema.metadata == {b'key': b'value'} |
| |
| |
| @pytest.mark.parquet |
| def test_write_dataset_schema_metadata_parquet(tempdir): |
| # ensure that schema metadata gets written |
| import pyarrow.parquet as pq |
| |
| table = pa.table({'a': [1, 2, 3]}) |
| table = table.replace_schema_metadata({b'key': b'value'}) |
| ds.write_dataset(table, tempdir, format="parquet") |
| |
| schema = pq.read_table(tempdir / "part-0.parquet").schema |
| assert schema.metadata == {b'key': b'value'} |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.s3 |
| def test_write_dataset_s3(s3_example_simple): |
| # write dataset with s3 filesystem |
| _, _, fs, _, host, port, access_key, secret_key = s3_example_simple |
| uri_template = ( |
| "s3://{}:{}@{{}}?scheme=http&endpoint_override={}:{}".format( |
| access_key, secret_key, host, port) |
| ) |
| |
| table = pa.table([ |
| pa.array(range(20)), pa.array(np.random.randn(20)), |
| pa.array(np.repeat(['a', 'b'], 10))], |
| names=["f1", "f2", "part"] |
| ) |
| part = ds.partitioning(pa.schema([("part", pa.string())]), flavor="hive") |
| |
| # writing with filesystem object |
| ds.write_dataset( |
| table, "mybucket/dataset", filesystem=fs, format="feather", |
| partitioning=part |
| ) |
| # check rountrip |
| result = ds.dataset( |
| "mybucket/dataset", filesystem=fs, format="ipc", partitioning="hive" |
| ).to_table() |
| assert result.equals(table) |
| |
| # writing with URI |
| uri = uri_template.format("mybucket/dataset2") |
| ds.write_dataset(table, uri, format="feather", partitioning=part) |
| # check rountrip |
| result = ds.dataset( |
| "mybucket/dataset2", filesystem=fs, format="ipc", partitioning="hive" |
| ).to_table() |
| assert result.equals(table) |
| |
| # writing with path + URI as filesystem |
| uri = uri_template.format("mybucket") |
| ds.write_dataset( |
| table, "dataset3", filesystem=uri, format="feather", partitioning=part |
| ) |
| # check rountrip |
| result = ds.dataset( |
| "mybucket/dataset3", filesystem=fs, format="ipc", partitioning="hive" |
| ).to_table() |
| assert result.equals(table) |