| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import contextlib |
| import os |
| import posixpath |
| import pathlib |
| import pickle |
| import textwrap |
| import tempfile |
| import threading |
| import time |
| |
| import numpy as np |
| import pytest |
| |
| import pyarrow as pa |
| import pyarrow.csv |
| import pyarrow.feather |
| import pyarrow.fs as fs |
| from pyarrow.tests.util import (change_cwd, _filesystem_uri, |
| FSProtocolClass, ProxyHandler) |
| |
| try: |
| import pandas as pd |
| except ImportError: |
| pd = None |
| |
| try: |
| import pyarrow.dataset as ds |
| except ImportError: |
| ds = None |
| |
| # Marks all of the tests in this module |
| # Ignore these with pytest ... -m 'not dataset' |
| pytestmark = pytest.mark.dataset |
| |
| |
| def _generate_data(n): |
| import datetime |
| import itertools |
| |
| day = datetime.datetime(2000, 1, 1) |
| interval = datetime.timedelta(days=5) |
| colors = itertools.cycle(['green', 'blue', 'yellow', 'red', 'orange']) |
| |
| data = [] |
| for i in range(n): |
| data.append((day, i, float(i), next(colors))) |
| day += interval |
| |
| return pd.DataFrame(data, columns=['date', 'index', 'value', 'color']) |
| |
| |
| def _table_from_pandas(df): |
| schema = pa.schema([ |
| pa.field('date', pa.date32()), |
| pa.field('index', pa.int64()), |
| pa.field('value', pa.float64()), |
| pa.field('color', pa.string()), |
| ]) |
| table = pa.Table.from_pandas(df, schema=schema, preserve_index=False) |
| return table.replace_schema_metadata() |
| |
| |
| @pytest.fixture |
| @pytest.mark.parquet |
| def mockfs(): |
| import pyarrow.parquet as pq |
| |
| mockfs = fs._MockFileSystem() |
| |
| directories = [ |
| 'subdir/1/xxx', |
| 'subdir/2/yyy', |
| ] |
| |
| for i, directory in enumerate(directories): |
| path = '{}/file{}.parquet'.format(directory, i) |
| mockfs.create_dir(directory) |
| with mockfs.open_output_stream(path) as out: |
| data = [ |
| list(range(5)), |
| list(map(float, range(5))), |
| list(map(str, range(5))), |
| [i] * 5 |
| ] |
| schema = pa.schema([ |
| pa.field('i64', pa.int64()), |
| pa.field('f64', pa.float64()), |
| pa.field('str', pa.string()), |
| pa.field('const', pa.int64()), |
| ]) |
| batch = pa.record_batch(data, schema=schema) |
| table = pa.Table.from_batches([batch]) |
| |
| pq.write_table(table, out) |
| |
| return mockfs |
| |
| |
| @pytest.fixture |
| def open_logging_fs(monkeypatch): |
| from pyarrow.fs import PyFileSystem, LocalFileSystem |
| from .test_fs import ProxyHandler |
| |
| localfs = LocalFileSystem() |
| |
| def normalized(paths): |
| return {localfs.normalize_path(str(p)) for p in paths} |
| |
| opened = set() |
| |
| def open_input_file(self, path): |
| path = localfs.normalize_path(str(path)) |
| opened.add(path) |
| return self._fs.open_input_file(path) |
| |
| # patch proxyhandler to log calls to open_input_file |
| monkeypatch.setattr(ProxyHandler, "open_input_file", open_input_file) |
| fs = PyFileSystem(ProxyHandler(localfs)) |
| |
| @contextlib.contextmanager |
| def assert_opens(expected_opened): |
| opened.clear() |
| try: |
| yield |
| finally: |
| assert normalized(opened) == normalized(expected_opened) |
| |
| return fs, assert_opens |
| |
| |
| @pytest.fixture(scope='module') |
| def multisourcefs(request): |
| request.config.pyarrow.requires('pandas') |
| request.config.pyarrow.requires('parquet') |
| import pyarrow.parquet as pq |
| |
| df = _generate_data(1000) |
| mockfs = fs._MockFileSystem() |
| |
| # simply split the dataframe into four chunks to construct a data source |
| # from each chunk into its own directory |
| df_a, df_b, df_c, df_d = np.array_split(df, 4) |
| |
| # create a directory containing a flat sequence of parquet files without |
| # any partitioning involved |
| mockfs.create_dir('plain') |
| for i, chunk in enumerate(np.array_split(df_a, 10)): |
| path = 'plain/chunk-{}.parquet'.format(i) |
| with mockfs.open_output_stream(path) as out: |
| pq.write_table(_table_from_pandas(chunk), out) |
| |
| # create one with schema partitioning by weekday and color |
| mockfs.create_dir('schema') |
| for part, chunk in df_b.groupby([df_b.date.dt.dayofweek, df_b.color]): |
| folder = 'schema/{}/{}'.format(*part) |
| path = '{}/chunk.parquet'.format(folder) |
| mockfs.create_dir(folder) |
| with mockfs.open_output_stream(path) as out: |
| pq.write_table(_table_from_pandas(chunk), out) |
| |
| # create one with hive partitioning by year and month |
| mockfs.create_dir('hive') |
| for part, chunk in df_c.groupby([df_c.date.dt.year, df_c.date.dt.month]): |
| folder = 'hive/year={}/month={}'.format(*part) |
| path = '{}/chunk.parquet'.format(folder) |
| mockfs.create_dir(folder) |
| with mockfs.open_output_stream(path) as out: |
| pq.write_table(_table_from_pandas(chunk), out) |
| |
| # create one with hive partitioning by color |
| mockfs.create_dir('hive_color') |
| for part, chunk in df_d.groupby(["color"]): |
| folder = 'hive_color/color={}'.format(*part) |
| path = '{}/chunk.parquet'.format(folder) |
| mockfs.create_dir(folder) |
| with mockfs.open_output_stream(path) as out: |
| pq.write_table(_table_from_pandas(chunk), out) |
| |
| return mockfs |
| |
| |
| @pytest.fixture |
| def dataset(mockfs): |
| format = ds.ParquetFileFormat() |
| selector = fs.FileSelector('subdir', recursive=True) |
| options = ds.FileSystemFactoryOptions('subdir') |
| options.partitioning = ds.DirectoryPartitioning( |
| pa.schema([ |
| pa.field('group', pa.int32()), |
| pa.field('key', pa.string()) |
| ]) |
| ) |
| factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) |
| return factory.finish() |
| |
| |
| @pytest.fixture(params=[ |
| (True, True), |
| (True, False), |
| (False, True), |
| (False, False) |
| ], ids=['threaded-async', 'threaded-sync', 'serial-async', 'serial-sync']) |
| def dataset_reader(request): |
| ''' |
| Fixture which allows dataset scanning operations to be |
| run with/without threads and with/without async |
| ''' |
| use_threads, use_async = request.param |
| |
| class reader: |
| |
| def __init__(self): |
| self.use_threads = use_threads |
| self.use_async = use_async |
| |
| def _patch_kwargs(self, kwargs): |
| if 'use_threads' in kwargs: |
| raise Exception( |
| ('Invalid use of dataset_reader, do not specify' |
| ' use_threads')) |
| if 'use_async' in kwargs: |
| raise Exception( |
| 'Invalid use of dataset_reader, do not specify use_async') |
| kwargs['use_threads'] = use_threads |
| kwargs['use_async'] = use_async |
| |
| def to_table(self, dataset, **kwargs): |
| self._patch_kwargs(kwargs) |
| return dataset.to_table(**kwargs) |
| |
| def to_batches(self, dataset, **kwargs): |
| self._patch_kwargs(kwargs) |
| return dataset.to_batches(**kwargs) |
| |
| def scanner(self, dataset, **kwargs): |
| self._patch_kwargs(kwargs) |
| return dataset.scanner(**kwargs) |
| |
| def head(self, dataset, num_rows, **kwargs): |
| self._patch_kwargs(kwargs) |
| return dataset.head(num_rows, **kwargs) |
| |
| def take(self, dataset, indices, **kwargs): |
| self._patch_kwargs(kwargs) |
| return dataset.take(indices, **kwargs) |
| |
| def count_rows(self, dataset, **kwargs): |
| self._patch_kwargs(kwargs) |
| return dataset.count_rows(**kwargs) |
| |
| return reader() |
| |
| |
| def test_filesystem_dataset(mockfs): |
| schema = pa.schema([ |
| pa.field('const', pa.int64()) |
| ]) |
| file_format = ds.ParquetFileFormat() |
| paths = ['subdir/1/xxx/file0.parquet', 'subdir/2/yyy/file1.parquet'] |
| partitions = [ds.field('part') == x for x in range(1, 3)] |
| fragments = [file_format.make_fragment(path, mockfs, part) |
| for path, part in zip(paths, partitions)] |
| root_partition = ds.field('level') == ds.scalar(1337) |
| |
| dataset_from_fragments = ds.FileSystemDataset( |
| fragments, schema=schema, format=file_format, |
| filesystem=mockfs, root_partition=root_partition, |
| ) |
| dataset_from_paths = ds.FileSystemDataset.from_paths( |
| paths, schema=schema, format=file_format, filesystem=mockfs, |
| partitions=partitions, root_partition=root_partition, |
| ) |
| |
| for dataset in [dataset_from_fragments, dataset_from_paths]: |
| assert isinstance(dataset, ds.FileSystemDataset) |
| assert isinstance(dataset.format, ds.ParquetFileFormat) |
| assert dataset.partition_expression.equals(root_partition) |
| assert set(dataset.files) == set(paths) |
| |
| fragments = list(dataset.get_fragments()) |
| for fragment, partition, path in zip(fragments, partitions, paths): |
| assert fragment.partition_expression.equals(partition) |
| assert fragment.path == path |
| assert isinstance(fragment.format, ds.ParquetFileFormat) |
| assert isinstance(fragment, ds.ParquetFileFragment) |
| assert fragment.row_groups == [0] |
| assert fragment.num_row_groups == 1 |
| |
| row_group_fragments = list(fragment.split_by_row_group()) |
| assert fragment.num_row_groups == len(row_group_fragments) == 1 |
| assert isinstance(row_group_fragments[0], ds.ParquetFileFragment) |
| assert row_group_fragments[0].path == path |
| assert row_group_fragments[0].row_groups == [0] |
| assert row_group_fragments[0].num_row_groups == 1 |
| |
| fragments = list(dataset.get_fragments(filter=ds.field("const") == 0)) |
| assert len(fragments) == 2 |
| |
| # the root_partition keyword has a default |
| dataset = ds.FileSystemDataset( |
| fragments, schema=schema, format=file_format, filesystem=mockfs |
| ) |
| assert dataset.partition_expression.equals(ds.scalar(True)) |
| |
| # from_paths partitions have defaults |
| dataset = ds.FileSystemDataset.from_paths( |
| paths, schema=schema, format=file_format, filesystem=mockfs |
| ) |
| assert dataset.partition_expression.equals(ds.scalar(True)) |
| for fragment in dataset.get_fragments(): |
| assert fragment.partition_expression.equals(ds.scalar(True)) |
| |
| # validation of required arguments |
| with pytest.raises(TypeError, match="incorrect type"): |
| ds.FileSystemDataset(fragments, file_format, schema) |
| # validation of root_partition |
| with pytest.raises(TypeError, match="incorrect type"): |
| ds.FileSystemDataset(fragments, schema=schema, |
| format=file_format, root_partition=1) |
| # missing required argument in from_paths |
| with pytest.raises(TypeError, match="incorrect type"): |
| ds.FileSystemDataset.from_paths(fragments, format=file_format) |
| |
| |
| def test_filesystem_dataset_no_filesystem_interaction(dataset_reader): |
| # ARROW-8283 |
| schema = pa.schema([ |
| pa.field('f1', pa.int64()) |
| ]) |
| file_format = ds.IpcFileFormat() |
| paths = ['nonexistingfile.arrow'] |
| |
| # creating the dataset itself doesn't raise |
| dataset = ds.FileSystemDataset.from_paths( |
| paths, schema=schema, format=file_format, |
| filesystem=fs.LocalFileSystem(), |
| ) |
| |
| # getting fragments also doesn't raise |
| dataset.get_fragments() |
| |
| # scanning does raise |
| with pytest.raises(FileNotFoundError): |
| dataset_reader.to_table(dataset) |
| |
| |
| def test_dataset(dataset, dataset_reader): |
| assert isinstance(dataset, ds.Dataset) |
| assert isinstance(dataset.schema, pa.Schema) |
| |
| # TODO(kszucs): test non-boolean Exprs for filter do raise |
| expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64()) |
| expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64()) |
| |
| for batch in dataset_reader.to_batches(dataset): |
| assert isinstance(batch, pa.RecordBatch) |
| assert batch.column(0).equals(expected_i64) |
| assert batch.column(1).equals(expected_f64) |
| |
| for batch in dataset_reader.scanner(dataset).scan_batches(): |
| assert isinstance(batch, ds.TaggedRecordBatch) |
| assert isinstance(batch.fragment, ds.Fragment) |
| |
| table = dataset_reader.to_table(dataset) |
| assert isinstance(table, pa.Table) |
| assert len(table) == 10 |
| |
| condition = ds.field('i64') == 1 |
| result = dataset.to_table(use_threads=True, filter=condition).to_pydict() |
| |
| # don't rely on the scanning order |
| assert result['i64'] == [1, 1] |
| assert result['f64'] == [1., 1.] |
| assert sorted(result['group']) == [1, 2] |
| assert sorted(result['key']) == ['xxx', 'yyy'] |
| |
| |
| def test_scanner(dataset, dataset_reader): |
| scanner = dataset_reader.scanner( |
| dataset, memory_pool=pa.default_memory_pool()) |
| assert isinstance(scanner, ds.Scanner) |
| |
| with pytest.raises(pa.ArrowInvalid): |
| dataset_reader.scanner(dataset, columns=['unknown']) |
| |
| scanner = dataset_reader.scanner(dataset, columns=['i64'], |
| memory_pool=pa.default_memory_pool()) |
| assert scanner.dataset_schema == dataset.schema |
| assert scanner.projected_schema == pa.schema([("i64", pa.int64())]) |
| |
| assert isinstance(scanner, ds.Scanner) |
| table = scanner.to_table() |
| for batch in scanner.to_batches(): |
| assert batch.schema == scanner.projected_schema |
| assert batch.num_columns == 1 |
| assert table == scanner.to_reader().read_all() |
| |
| assert table.schema == scanner.projected_schema |
| for i in range(table.num_rows): |
| indices = pa.array([i]) |
| assert table.take(indices) == scanner.take(indices) |
| with pytest.raises(pa.ArrowIndexError): |
| scanner.take(pa.array([table.num_rows])) |
| |
| assert table.num_rows == scanner.count_rows() |
| |
| |
| def test_head(dataset, dataset_reader): |
| result = dataset_reader.head(dataset, 0) |
| assert result == pa.Table.from_batches([], schema=dataset.schema) |
| |
| result = dataset_reader.head(dataset, 1, columns=['i64']).to_pydict() |
| assert result == {'i64': [0]} |
| |
| result = dataset_reader.head(dataset, 2, columns=['i64'], |
| filter=ds.field('i64') > 1).to_pydict() |
| assert result == {'i64': [2, 3]} |
| |
| result = dataset_reader.head(dataset, 1024, columns=['i64']).to_pydict() |
| assert result == {'i64': list(range(5)) * 2} |
| |
| fragment = next(dataset.get_fragments()) |
| result = fragment.head(1, columns=['i64']).to_pydict() |
| assert result == {'i64': [0]} |
| |
| result = fragment.head(1024, columns=['i64']).to_pydict() |
| assert result == {'i64': list(range(5))} |
| |
| |
| def test_take(dataset, dataset_reader): |
| fragment = next(dataset.get_fragments()) |
| indices = pa.array([1, 3]) |
| assert dataset_reader.take( |
| fragment, indices) == dataset_reader.to_table(fragment).take(indices) |
| with pytest.raises(IndexError): |
| dataset_reader.take(fragment, pa.array([5])) |
| |
| indices = pa.array([1, 7]) |
| assert dataset_reader.take( |
| dataset, indices) == dataset_reader.to_table(dataset).take(indices) |
| with pytest.raises(IndexError): |
| dataset_reader.take(dataset, pa.array([10])) |
| |
| |
| def test_count_rows(dataset, dataset_reader): |
| fragment = next(dataset.get_fragments()) |
| assert dataset_reader.count_rows(fragment) == 5 |
| assert dataset_reader.count_rows( |
| fragment, filter=ds.field("i64") == 4) == 1 |
| |
| assert dataset_reader.count_rows(dataset) == 10 |
| # Filter on partition key |
| assert dataset_reader.count_rows( |
| dataset, filter=ds.field("group") == 1) == 5 |
| # Filter on data |
| assert dataset_reader.count_rows(dataset, filter=ds.field("i64") >= 3) == 4 |
| assert dataset_reader.count_rows(dataset, filter=ds.field("i64") < 0) == 0 |
| |
| |
| def test_abstract_classes(): |
| classes = [ |
| ds.FileFormat, |
| ds.Scanner, |
| ds.Partitioning, |
| ] |
| for klass in classes: |
| with pytest.raises(TypeError): |
| klass() |
| |
| |
| def test_partitioning(): |
| schema = pa.schema([ |
| pa.field('i64', pa.int64()), |
| pa.field('f64', pa.float64()) |
| ]) |
| for klass in [ds.DirectoryPartitioning, ds.HivePartitioning]: |
| partitioning = klass(schema) |
| assert isinstance(partitioning, ds.Partitioning) |
| |
| partitioning = ds.DirectoryPartitioning( |
| pa.schema([ |
| pa.field('group', pa.int64()), |
| pa.field('key', pa.float64()) |
| ]) |
| ) |
| assert partitioning.dictionaries is None |
| expr = partitioning.parse('/3/3.14') |
| assert isinstance(expr, ds.Expression) |
| |
| expected = (ds.field('group') == 3) & (ds.field('key') == 3.14) |
| assert expr.equals(expected) |
| |
| with pytest.raises(pa.ArrowInvalid): |
| partitioning.parse('/prefix/3/aaa') |
| |
| expr = partitioning.parse('/3') |
| expected = ds.field('group') == 3 |
| assert expr.equals(expected) |
| |
| partitioning = ds.HivePartitioning( |
| pa.schema([ |
| pa.field('alpha', pa.int64()), |
| pa.field('beta', pa.int64()) |
| ]), |
| null_fallback='xyz' |
| ) |
| assert partitioning.dictionaries is None |
| expr = partitioning.parse('/alpha=0/beta=3') |
| expected = ( |
| (ds.field('alpha') == ds.scalar(0)) & |
| (ds.field('beta') == ds.scalar(3)) |
| ) |
| assert expr.equals(expected) |
| |
| expr = partitioning.parse('/alpha=xyz/beta=3') |
| expected = ( |
| (ds.field('alpha').is_null() & (ds.field('beta') == ds.scalar(3))) |
| ) |
| assert expr.equals(expected) |
| |
| for shouldfail in ['/alpha=one/beta=2', '/alpha=one', '/beta=two']: |
| with pytest.raises(pa.ArrowInvalid): |
| partitioning.parse(shouldfail) |
| |
| |
| def test_expression_serialization(): |
| a = ds.scalar(1) |
| b = ds.scalar(1.1) |
| c = ds.scalar(True) |
| d = ds.scalar("string") |
| e = ds.scalar(None) |
| f = ds.scalar({'a': 1}) |
| g = ds.scalar(pa.scalar(1)) |
| |
| all_exprs = [a, b, c, d, e, f, g, a == b, a > b, a & b, a | b, ~c, |
| d.is_valid(), a.cast(pa.int32(), safe=False), |
| a.cast(pa.int32(), safe=False), a.isin([1, 2, 3]), |
| ds.field('i64') > 5, ds.field('i64') == 5, |
| ds.field('i64') == 7, ds.field('i64').is_null()] |
| for expr in all_exprs: |
| assert isinstance(expr, ds.Expression) |
| restored = pickle.loads(pickle.dumps(expr)) |
| assert expr.equals(restored) |
| |
| |
| def test_expression_construction(): |
| zero = ds.scalar(0) |
| one = ds.scalar(1) |
| true = ds.scalar(True) |
| false = ds.scalar(False) |
| string = ds.scalar("string") |
| field = ds.field("field") |
| |
| zero | one == string |
| ~true == false |
| for typ in ("bool", pa.bool_()): |
| field.cast(typ) == true |
| |
| field.isin([1, 2]) |
| |
| with pytest.raises(TypeError): |
| field.isin(1) |
| |
| with pytest.raises(pa.ArrowInvalid): |
| field != object() |
| |
| |
| def test_expression_boolean_operators(): |
| # https://issues.apache.org/jira/browse/ARROW-11412 |
| true = ds.scalar(True) |
| false = ds.scalar(False) |
| |
| with pytest.raises(ValueError, match="cannot be evaluated to python True"): |
| true and false |
| |
| with pytest.raises(ValueError, match="cannot be evaluated to python True"): |
| true or false |
| |
| with pytest.raises(ValueError, match="cannot be evaluated to python True"): |
| bool(true) |
| |
| with pytest.raises(ValueError, match="cannot be evaluated to python True"): |
| not true |
| |
| |
| def test_expression_arithmetic_operators(): |
| dataset = ds.dataset(pa.table({'a': [1, 2, 3], 'b': [2, 2, 2]})) |
| a = ds.field("a") |
| b = ds.field("b") |
| result = dataset.to_table(columns={ |
| "a+1": a + 1, |
| "b-a": b - a, |
| "a*2": a * 2, |
| "a/b": a.cast("float64") / b, |
| }) |
| expected = pa.table({ |
| "a+1": [2, 3, 4], "b-a": [1, 0, -1], |
| "a*2": [2, 4, 6], "a/b": [0.5, 1.0, 1.5], |
| }) |
| assert result.equals(expected) |
| |
| |
| def test_partition_keys(): |
| a, b, c = [ds.field(f) == f for f in 'abc'] |
| assert ds._get_partition_keys(a) == {'a': 'a'} |
| assert ds._get_partition_keys(a & b & c) == {f: f for f in 'abc'} |
| |
| nope = ds.field('d') >= 3 |
| assert ds._get_partition_keys(nope) == {} |
| assert ds._get_partition_keys(a & nope) == {'a': 'a'} |
| |
| null = ds.field('a').is_null() |
| assert ds._get_partition_keys(null) == {'a': None} |
| |
| |
| def test_parquet_read_options(): |
| opts1 = ds.ParquetReadOptions() |
| opts2 = ds.ParquetReadOptions(dictionary_columns=['a', 'b']) |
| opts3 = ds.ParquetReadOptions(coerce_int96_timestamp_unit="ms") |
| |
| assert opts1.dictionary_columns == set() |
| |
| assert opts2.dictionary_columns == {'a', 'b'} |
| |
| assert opts1.coerce_int96_timestamp_unit == "ns" |
| assert opts3.coerce_int96_timestamp_unit == "ms" |
| |
| assert opts1 == opts1 |
| assert opts1 != opts2 |
| assert opts1 != opts3 |
| |
| |
| def test_parquet_file_format_read_options(): |
| pff1 = ds.ParquetFileFormat() |
| pff2 = ds.ParquetFileFormat(dictionary_columns={'a'}) |
| pff3 = ds.ParquetFileFormat(coerce_int96_timestamp_unit="s") |
| |
| assert pff1.read_options == ds.ParquetReadOptions() |
| assert pff2.read_options == ds.ParquetReadOptions(dictionary_columns=['a']) |
| assert pff3.read_options == ds.ParquetReadOptions( |
| coerce_int96_timestamp_unit="s") |
| |
| |
| def test_parquet_scan_options(): |
| opts1 = ds.ParquetFragmentScanOptions() |
| opts2 = ds.ParquetFragmentScanOptions(buffer_size=4096) |
| opts3 = ds.ParquetFragmentScanOptions( |
| buffer_size=2**13, use_buffered_stream=True) |
| opts4 = ds.ParquetFragmentScanOptions(buffer_size=2**13, pre_buffer=True) |
| |
| assert opts1.use_buffered_stream is False |
| assert opts1.buffer_size == 2**13 |
| assert opts1.pre_buffer is False |
| |
| assert opts2.use_buffered_stream is False |
| assert opts2.buffer_size == 2**12 |
| assert opts2.pre_buffer is False |
| |
| assert opts3.use_buffered_stream is True |
| assert opts3.buffer_size == 2**13 |
| assert opts3.pre_buffer is False |
| |
| assert opts4.use_buffered_stream is False |
| assert opts4.buffer_size == 2**13 |
| assert opts4.pre_buffer is True |
| |
| assert opts1 == opts1 |
| assert opts1 != opts2 |
| assert opts2 != opts3 |
| assert opts3 != opts4 |
| |
| |
| def test_file_format_pickling(): |
| formats = [ |
| ds.IpcFileFormat(), |
| ds.CsvFileFormat(), |
| ds.CsvFileFormat(pa.csv.ParseOptions(delimiter='\t', |
| ignore_empty_lines=True)), |
| ds.CsvFileFormat(read_options=pa.csv.ReadOptions( |
| skip_rows=3, column_names=['foo'])), |
| ds.CsvFileFormat(read_options=pa.csv.ReadOptions( |
| skip_rows=3, block_size=2**20)), |
| ds.ParquetFileFormat(), |
| ds.ParquetFileFormat(dictionary_columns={'a'}), |
| ds.ParquetFileFormat(use_buffered_stream=True), |
| ds.ParquetFileFormat( |
| use_buffered_stream=True, |
| buffer_size=4096, |
| ) |
| ] |
| try: |
| formats.append(ds.OrcFileFormat()) |
| except (ImportError, AttributeError): |
| # catch AttributeError for Python 3.6 |
| pass |
| |
| for file_format in formats: |
| assert pickle.loads(pickle.dumps(file_format)) == file_format |
| |
| |
| def test_fragment_scan_options_pickling(): |
| options = [ |
| ds.CsvFragmentScanOptions(), |
| ds.CsvFragmentScanOptions( |
| convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)), |
| ds.CsvFragmentScanOptions( |
| read_options=pa.csv.ReadOptions(block_size=2**16)), |
| ds.ParquetFragmentScanOptions(buffer_size=4096), |
| ds.ParquetFragmentScanOptions(pre_buffer=True), |
| ] |
| for option in options: |
| assert pickle.loads(pickle.dumps(option)) == option |
| |
| |
| @pytest.mark.parametrize('paths_or_selector', [ |
| fs.FileSelector('subdir', recursive=True), |
| [ |
| 'subdir/1/xxx/file0.parquet', |
| 'subdir/2/yyy/file1.parquet', |
| ] |
| ]) |
| @pytest.mark.parametrize('pre_buffer', [False, True]) |
| def test_filesystem_factory(mockfs, paths_or_selector, pre_buffer): |
| format = ds.ParquetFileFormat( |
| read_options=ds.ParquetReadOptions(dictionary_columns={"str"}), |
| pre_buffer=pre_buffer |
| ) |
| |
| options = ds.FileSystemFactoryOptions('subdir') |
| options.partitioning = ds.DirectoryPartitioning( |
| pa.schema([ |
| pa.field('group', pa.int32()), |
| pa.field('key', pa.string()) |
| ]) |
| ) |
| assert options.partition_base_dir == 'subdir' |
| assert options.selector_ignore_prefixes == ['.', '_'] |
| assert options.exclude_invalid_files is False |
| |
| factory = ds.FileSystemDatasetFactory( |
| mockfs, paths_or_selector, format, options |
| ) |
| inspected_schema = factory.inspect() |
| |
| assert factory.inspect().equals(pa.schema([ |
| pa.field('i64', pa.int64()), |
| pa.field('f64', pa.float64()), |
| pa.field('str', pa.dictionary(pa.int32(), pa.string())), |
| pa.field('const', pa.int64()), |
| pa.field('group', pa.int32()), |
| pa.field('key', pa.string()), |
| ]), check_metadata=False) |
| |
| assert isinstance(factory.inspect_schemas(), list) |
| assert isinstance(factory.finish(inspected_schema), |
| ds.FileSystemDataset) |
| assert factory.root_partition.equals(ds.scalar(True)) |
| |
| dataset = factory.finish() |
| assert isinstance(dataset, ds.FileSystemDataset) |
| |
| scanner = dataset.scanner() |
| expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64()) |
| expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64()) |
| expected_str = pa.DictionaryArray.from_arrays( |
| pa.array([0, 1, 2, 3, 4], type=pa.int32()), |
| pa.array("0 1 2 3 4".split(), type=pa.string()) |
| ) |
| iterator = scanner.scan_batches() |
| for (batch, fragment), group, key in zip(iterator, [1, 2], ['xxx', 'yyy']): |
| expected_group = pa.array([group] * 5, type=pa.int32()) |
| expected_key = pa.array([key] * 5, type=pa.string()) |
| expected_const = pa.array([group - 1] * 5, type=pa.int64()) |
| # Can't compare or really introspect expressions from Python |
| assert fragment.partition_expression is not None |
| assert batch.num_columns == 6 |
| assert batch[0].equals(expected_i64) |
| assert batch[1].equals(expected_f64) |
| assert batch[2].equals(expected_str) |
| assert batch[3].equals(expected_const) |
| assert batch[4].equals(expected_group) |
| assert batch[5].equals(expected_key) |
| |
| table = dataset.to_table() |
| assert isinstance(table, pa.Table) |
| assert len(table) == 10 |
| assert table.num_columns == 6 |
| |
| |
| def test_make_fragment(multisourcefs): |
| parquet_format = ds.ParquetFileFormat() |
| dataset = ds.dataset('/plain', filesystem=multisourcefs, |
| format=parquet_format) |
| |
| for path in dataset.files: |
| fragment = parquet_format.make_fragment(path, multisourcefs) |
| assert fragment.row_groups == [0] |
| |
| row_group_fragment = parquet_format.make_fragment(path, multisourcefs, |
| row_groups=[0]) |
| for f in [fragment, row_group_fragment]: |
| assert isinstance(f, ds.ParquetFileFragment) |
| assert f.path == path |
| assert isinstance(f.filesystem, type(multisourcefs)) |
| assert row_group_fragment.row_groups == [0] |
| |
| |
| def test_make_csv_fragment_from_buffer(dataset_reader): |
| content = textwrap.dedent(""" |
| alpha,num,animal |
| a,12,dog |
| b,11,cat |
| c,10,rabbit |
| """) |
| buffer = pa.py_buffer(content.encode('utf-8')) |
| |
| csv_format = ds.CsvFileFormat() |
| fragment = csv_format.make_fragment(buffer) |
| |
| expected = pa.table([['a', 'b', 'c'], |
| [12, 11, 10], |
| ['dog', 'cat', 'rabbit']], |
| names=['alpha', 'num', 'animal']) |
| assert dataset_reader.to_table(fragment).equals(expected) |
| |
| pickled = pickle.loads(pickle.dumps(fragment)) |
| assert dataset_reader.to_table(pickled).equals(fragment.to_table()) |
| |
| |
| @pytest.mark.parquet |
| def test_make_parquet_fragment_from_buffer(dataset_reader): |
| import pyarrow.parquet as pq |
| |
| arrays = [ |
| pa.array(['a', 'b', 'c']), |
| pa.array([12, 11, 10]), |
| pa.array(['dog', 'cat', 'rabbit']) |
| ] |
| dictionary_arrays = [ |
| arrays[0].dictionary_encode(), |
| arrays[1], |
| arrays[2].dictionary_encode() |
| ] |
| dictionary_format = ds.ParquetFileFormat( |
| read_options=ds.ParquetReadOptions( |
| dictionary_columns=['alpha', 'animal'] |
| ), |
| use_buffered_stream=True, |
| buffer_size=4096, |
| ) |
| |
| cases = [ |
| (arrays, ds.ParquetFileFormat()), |
| (dictionary_arrays, dictionary_format) |
| ] |
| for arrays, format_ in cases: |
| table = pa.table(arrays, names=['alpha', 'num', 'animal']) |
| |
| out = pa.BufferOutputStream() |
| pq.write_table(table, out) |
| buffer = out.getvalue() |
| |
| fragment = format_.make_fragment(buffer) |
| assert dataset_reader.to_table(fragment).equals(table) |
| |
| pickled = pickle.loads(pickle.dumps(fragment)) |
| assert dataset_reader.to_table(pickled).equals(table) |
| |
| |
| def _create_dataset_for_fragments(tempdir, chunk_size=None, filesystem=None): |
| import pyarrow.parquet as pq |
| |
| table = pa.table( |
| [range(8), [1] * 8, ['a'] * 4 + ['b'] * 4], |
| names=['f1', 'f2', 'part'] |
| ) |
| |
| path = str(tempdir / "test_parquet_dataset") |
| |
| # write_to_dataset currently requires pandas |
| pq.write_to_dataset(table, path, |
| partition_cols=["part"], chunk_size=chunk_size) |
| dataset = ds.dataset( |
| path, format="parquet", partitioning="hive", filesystem=filesystem |
| ) |
| |
| return table, dataset |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments(tempdir, dataset_reader): |
| table, dataset = _create_dataset_for_fragments(tempdir) |
| |
| # list fragments |
| fragments = list(dataset.get_fragments()) |
| assert len(fragments) == 2 |
| f = fragments[0] |
| |
| physical_names = ['f1', 'f2'] |
| # file's schema does not include partition column |
| assert f.physical_schema.names == physical_names |
| assert f.format.inspect(f.path, f.filesystem) == f.physical_schema |
| assert f.partition_expression.equals(ds.field('part') == 'a') |
| |
| # By default, the partition column is not part of the schema. |
| result = dataset_reader.to_table(f) |
| assert result.column_names == physical_names |
| assert result.equals(table.remove_column(2).slice(0, 4)) |
| |
| # scanning fragment includes partition columns when given the proper |
| # schema. |
| result = dataset_reader.to_table(f, schema=dataset.schema) |
| assert result.column_names == ['f1', 'f2', 'part'] |
| assert result.equals(table.slice(0, 4)) |
| assert f.physical_schema == result.schema.remove(2) |
| |
| # scanning fragments follow filter predicate |
| result = dataset_reader.to_table( |
| f, schema=dataset.schema, filter=ds.field('f1') < 2) |
| assert result.column_names == ['f1', 'f2', 'part'] |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_implicit_cast(tempdir): |
| # ARROW-8693 |
| import pyarrow.parquet as pq |
| |
| table = pa.table([range(8), [1] * 4 + [2] * 4], names=['col', 'part']) |
| path = str(tempdir / "test_parquet_dataset") |
| pq.write_to_dataset(table, path, partition_cols=["part"]) |
| |
| part = ds.partitioning(pa.schema([('part', 'int8')]), flavor="hive") |
| dataset = ds.dataset(path, format="parquet", partitioning=part) |
| fragments = dataset.get_fragments(filter=ds.field("part") >= 2) |
| assert len(list(fragments)) == 1 |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_reconstruct(tempdir, dataset_reader): |
| table, dataset = _create_dataset_for_fragments(tempdir) |
| |
| def assert_yields_projected(fragment, row_slice, |
| columns=None, filter=None): |
| actual = fragment.to_table( |
| schema=table.schema, columns=columns, filter=filter) |
| column_names = columns if columns else table.column_names |
| assert actual.column_names == column_names |
| |
| expected = table.slice(*row_slice).select(column_names) |
| assert actual.equals(expected) |
| |
| fragment = list(dataset.get_fragments())[0] |
| parquet_format = fragment.format |
| |
| # test pickle roundtrip |
| pickled_fragment = pickle.loads(pickle.dumps(fragment)) |
| assert dataset_reader.to_table( |
| pickled_fragment) == dataset_reader.to_table(fragment) |
| |
| # manually re-construct a fragment, with explicit schema |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression) |
| assert dataset_reader.to_table(new_fragment).equals( |
| dataset_reader.to_table(fragment)) |
| assert_yields_projected(new_fragment, (0, 4)) |
| |
| # filter / column projection, inspected schema |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression) |
| assert_yields_projected(new_fragment, (0, 2), filter=ds.field('f1') < 2) |
| |
| # filter requiring cast / column projection, inspected schema |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression) |
| assert_yields_projected(new_fragment, (0, 2), |
| columns=['f1'], filter=ds.field('f1') < 2.0) |
| |
| # filter on the partition column |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression) |
| assert_yields_projected(new_fragment, (0, 4), |
| filter=ds.field('part') == 'a') |
| |
| # Fragments don't contain the partition's columns if not provided to the |
| # `to_table(schema=...)` method. |
| pattern = (r'No match for FieldRef.Name\(part\) in ' + |
| fragment.physical_schema.to_string(False, False, False)) |
| with pytest.raises(ValueError, match=pattern): |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression) |
| dataset_reader.to_table(new_fragment, filter=ds.field('part') == 'a') |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_row_groups(tempdir, dataset_reader): |
| table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=2) |
| |
| fragment = list(dataset.get_fragments())[0] |
| |
| # list and scan row group fragments |
| row_group_fragments = list(fragment.split_by_row_group()) |
| assert len(row_group_fragments) == fragment.num_row_groups == 2 |
| result = dataset_reader.to_table( |
| row_group_fragments[0], schema=dataset.schema) |
| assert result.column_names == ['f1', 'f2', 'part'] |
| assert len(result) == 2 |
| assert result.equals(table.slice(0, 2)) |
| |
| assert row_group_fragments[0].row_groups is not None |
| assert row_group_fragments[0].num_row_groups == 1 |
| assert row_group_fragments[0].row_groups[0].statistics == { |
| 'f1': {'min': 0, 'max': 1}, |
| 'f2': {'min': 1, 'max': 1}, |
| } |
| |
| fragment = list(dataset.get_fragments(filter=ds.field('f1') < 1))[0] |
| row_group_fragments = list(fragment.split_by_row_group(ds.field('f1') < 1)) |
| assert len(row_group_fragments) == 1 |
| result = dataset_reader.to_table( |
| row_group_fragments[0], filter=ds.field('f1') < 1) |
| assert len(result) == 1 |
| |
| |
| @pytest.mark.parquet |
| def test_fragments_parquet_num_row_groups(tempdir): |
| import pyarrow.parquet as pq |
| |
| table = pa.table({'a': range(8)}) |
| pq.write_table(table, tempdir / "test.parquet", row_group_size=2) |
| dataset = ds.dataset(tempdir / "test.parquet", format="parquet") |
| original_fragment = list(dataset.get_fragments())[0] |
| |
| # create fragment with subset of row groups |
| fragment = original_fragment.format.make_fragment( |
| original_fragment.path, original_fragment.filesystem, |
| row_groups=[1, 3]) |
| assert fragment.num_row_groups == 2 |
| # ensure that parsing metadata preserves correct number of row groups |
| fragment.ensure_complete_metadata() |
| assert fragment.num_row_groups == 2 |
| assert len(fragment.row_groups) == 2 |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_row_groups_dictionary(tempdir, dataset_reader): |
| import pandas as pd |
| |
| df = pd.DataFrame(dict(col1=['a', 'b'], col2=[1, 2])) |
| df['col1'] = df['col1'].astype("category") |
| |
| import pyarrow.parquet as pq |
| pq.write_table(pa.table(df), tempdir / "test_filter_dictionary.parquet") |
| |
| import pyarrow.dataset as ds |
| dataset = ds.dataset(tempdir / 'test_filter_dictionary.parquet') |
| result = dataset_reader.to_table(dataset, filter=ds.field("col1") == "a") |
| |
| assert (df.iloc[0] == result.to_pandas()).all().all() |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_ensure_metadata(tempdir, open_logging_fs): |
| fs, assert_opens = open_logging_fs |
| _, dataset = _create_dataset_for_fragments( |
| tempdir, chunk_size=2, filesystem=fs |
| ) |
| fragment = list(dataset.get_fragments())[0] |
| |
| # with default discovery, no metadata loaded |
| with assert_opens([fragment.path]): |
| fragment.ensure_complete_metadata() |
| assert fragment.row_groups == [0, 1] |
| |
| # second time -> use cached / no file IO |
| with assert_opens([]): |
| fragment.ensure_complete_metadata() |
| |
| # recreate fragment with row group ids |
| new_fragment = fragment.format.make_fragment( |
| fragment.path, fragment.filesystem, row_groups=[0, 1] |
| ) |
| assert new_fragment.row_groups == fragment.row_groups |
| |
| # collect metadata |
| new_fragment.ensure_complete_metadata() |
| row_group = new_fragment.row_groups[0] |
| assert row_group.id == 0 |
| assert row_group.num_rows == 2 |
| assert row_group.statistics is not None |
| |
| # pickling preserves row group ids |
| pickled_fragment = pickle.loads(pickle.dumps(new_fragment)) |
| with assert_opens([fragment.path]): |
| assert pickled_fragment.row_groups == [0, 1] |
| row_group = pickled_fragment.row_groups[0] |
| assert row_group.id == 0 |
| assert row_group.statistics is not None |
| |
| |
| def _create_dataset_all_types(tempdir, chunk_size=None): |
| import pyarrow.parquet as pq |
| |
| table = pa.table( |
| [ |
| pa.array([True, None, False], pa.bool_()), |
| pa.array([1, 10, 42], pa.int8()), |
| pa.array([1, 10, 42], pa.uint8()), |
| pa.array([1, 10, 42], pa.int16()), |
| pa.array([1, 10, 42], pa.uint16()), |
| pa.array([1, 10, 42], pa.int32()), |
| pa.array([1, 10, 42], pa.uint32()), |
| pa.array([1, 10, 42], pa.int64()), |
| pa.array([1, 10, 42], pa.uint64()), |
| pa.array([1.0, 10.0, 42.0], pa.float32()), |
| pa.array([1.0, 10.0, 42.0], pa.float64()), |
| pa.array(['a', None, 'z'], pa.utf8()), |
| pa.array(['a', None, 'z'], pa.binary()), |
| pa.array([1, 10, 42], pa.timestamp('s')), |
| pa.array([1, 10, 42], pa.timestamp('ms')), |
| pa.array([1, 10, 42], pa.timestamp('us')), |
| pa.array([1, 10, 42], pa.date32()), |
| pa.array([1, 10, 4200000000], pa.date64()), |
| pa.array([1, 10, 42], pa.time32('s')), |
| pa.array([1, 10, 42], pa.time64('us')), |
| ], |
| names=[ |
| 'boolean', |
| 'int8', |
| 'uint8', |
| 'int16', |
| 'uint16', |
| 'int32', |
| 'uint32', |
| 'int64', |
| 'uint64', |
| 'float', |
| 'double', |
| 'utf8', |
| 'binary', |
| 'ts[s]', |
| 'ts[ms]', |
| 'ts[us]', |
| 'date32', |
| 'date64', |
| 'time32', |
| 'time64', |
| ] |
| ) |
| |
| path = str(tempdir / "test_parquet_dataset_all_types") |
| |
| # write_to_dataset currently requires pandas |
| pq.write_to_dataset(table, path, chunk_size=chunk_size) |
| |
| return table, ds.dataset(path, format="parquet", partitioning="hive") |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_parquet_fragment_statistics(tempdir): |
| table, dataset = _create_dataset_all_types(tempdir) |
| |
| fragment = list(dataset.get_fragments())[0] |
| |
| import datetime |
| def dt_s(x): return datetime.datetime(1970, 1, 1, 0, 0, x) |
| def dt_ms(x): return datetime.datetime(1970, 1, 1, 0, 0, 0, x*1000) |
| def dt_us(x): return datetime.datetime(1970, 1, 1, 0, 0, 0, x) |
| date = datetime.date |
| time = datetime.time |
| |
| # list and scan row group fragments |
| row_group_fragments = list(fragment.split_by_row_group()) |
| assert row_group_fragments[0].row_groups is not None |
| row_group = row_group_fragments[0].row_groups[0] |
| assert row_group.num_rows == 3 |
| assert row_group.total_byte_size > 1000 |
| assert row_group.statistics == { |
| 'boolean': {'min': False, 'max': True}, |
| 'int8': {'min': 1, 'max': 42}, |
| 'uint8': {'min': 1, 'max': 42}, |
| 'int16': {'min': 1, 'max': 42}, |
| 'uint16': {'min': 1, 'max': 42}, |
| 'int32': {'min': 1, 'max': 42}, |
| 'uint32': {'min': 1, 'max': 42}, |
| 'int64': {'min': 1, 'max': 42}, |
| 'uint64': {'min': 1, 'max': 42}, |
| 'float': {'min': 1.0, 'max': 42.0}, |
| 'double': {'min': 1.0, 'max': 42.0}, |
| 'utf8': {'min': 'a', 'max': 'z'}, |
| 'binary': {'min': b'a', 'max': b'z'}, |
| 'ts[s]': {'min': dt_s(1), 'max': dt_s(42)}, |
| 'ts[ms]': {'min': dt_ms(1), 'max': dt_ms(42)}, |
| 'ts[us]': {'min': dt_us(1), 'max': dt_us(42)}, |
| 'date32': {'min': date(1970, 1, 2), 'max': date(1970, 2, 12)}, |
| 'date64': {'min': date(1970, 1, 1), 'max': date(1970, 2, 18)}, |
| 'time32': {'min': time(0, 0, 1), 'max': time(0, 0, 42)}, |
| 'time64': {'min': time(0, 0, 0, 1), 'max': time(0, 0, 0, 42)}, |
| } |
| |
| |
| @pytest.mark.parquet |
| def test_parquet_fragment_statistics_nulls(tempdir): |
| import pyarrow.parquet as pq |
| |
| table = pa.table({'a': [0, 1, None, None], 'b': ['a', 'b', None, None]}) |
| pq.write_table(table, tempdir / "test.parquet", row_group_size=2) |
| |
| dataset = ds.dataset(tempdir / "test.parquet", format="parquet") |
| fragments = list(dataset.get_fragments())[0].split_by_row_group() |
| # second row group has all nulls -> no statistics |
| assert fragments[1].row_groups[0].statistics == {} |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_parquet_empty_row_group_statistics(tempdir): |
| df = pd.DataFrame({"a": ["a", "b", "b"], "b": [4, 5, 6]})[:0] |
| df.to_parquet(tempdir / "test.parquet", engine="pyarrow") |
| |
| dataset = ds.dataset(tempdir / "test.parquet", format="parquet") |
| fragments = list(dataset.get_fragments())[0].split_by_row_group() |
| # Only row group is empty |
| assert fragments[0].row_groups[0].statistics == {} |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_row_groups_predicate(tempdir): |
| table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=2) |
| |
| fragment = list(dataset.get_fragments())[0] |
| assert fragment.partition_expression.equals(ds.field('part') == 'a') |
| |
| # predicate may reference a partition field not present in the |
| # physical_schema if an explicit schema is provided to split_by_row_group |
| |
| # filter matches partition_expression: all row groups |
| row_group_fragments = list( |
| fragment.split_by_row_group(filter=ds.field('part') == 'a', |
| schema=dataset.schema)) |
| assert len(row_group_fragments) == 2 |
| |
| # filter contradicts partition_expression: no row groups |
| row_group_fragments = list( |
| fragment.split_by_row_group(filter=ds.field('part') == 'b', |
| schema=dataset.schema)) |
| assert len(row_group_fragments) == 0 |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_row_groups_reconstruct(tempdir, dataset_reader): |
| table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=2) |
| |
| fragment = list(dataset.get_fragments())[0] |
| parquet_format = fragment.format |
| row_group_fragments = list(fragment.split_by_row_group()) |
| |
| # test pickle roundtrip |
| pickled_fragment = pickle.loads(pickle.dumps(fragment)) |
| assert dataset_reader.to_table( |
| pickled_fragment) == dataset_reader.to_table(fragment) |
| |
| # manually re-construct row group fragments |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression, |
| row_groups=[0]) |
| result = dataset_reader.to_table(new_fragment) |
| assert result.equals(dataset_reader.to_table(row_group_fragments[0])) |
| |
| # manually re-construct a row group fragment with filter/column projection |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression, |
| row_groups={1}) |
| result = dataset_reader.to_table( |
| new_fragment, schema=table.schema, columns=['f1', 'part'], |
| filter=ds.field('f1') < 3, ) |
| assert result.column_names == ['f1', 'part'] |
| assert len(result) == 1 |
| |
| # out of bounds row group index |
| new_fragment = parquet_format.make_fragment( |
| fragment.path, fragment.filesystem, |
| partition_expression=fragment.partition_expression, |
| row_groups={2}) |
| with pytest.raises(IndexError, match="references row group 2"): |
| dataset_reader.to_table(new_fragment) |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_subset_ids(tempdir, open_logging_fs, |
| dataset_reader): |
| fs, assert_opens = open_logging_fs |
| table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=1, |
| filesystem=fs) |
| fragment = list(dataset.get_fragments())[0] |
| |
| # select with row group ids |
| subfrag = fragment.subset(row_group_ids=[0, 3]) |
| with assert_opens([]): |
| assert subfrag.num_row_groups == 2 |
| assert subfrag.row_groups == [0, 3] |
| assert subfrag.row_groups[0].statistics is not None |
| |
| # check correct scan result of subset |
| result = dataset_reader.to_table(subfrag) |
| assert result.to_pydict() == {"f1": [0, 3], "f2": [1, 1]} |
| |
| # empty list of ids |
| subfrag = fragment.subset(row_group_ids=[]) |
| assert subfrag.num_row_groups == 0 |
| assert subfrag.row_groups == [] |
| result = dataset_reader.to_table(subfrag, schema=dataset.schema) |
| assert result.num_rows == 0 |
| assert result.equals(table[:0]) |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_subset_filter(tempdir, open_logging_fs, |
| dataset_reader): |
| fs, assert_opens = open_logging_fs |
| table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=1, |
| filesystem=fs) |
| fragment = list(dataset.get_fragments())[0] |
| |
| # select with filter |
| subfrag = fragment.subset(ds.field("f1") >= 1) |
| with assert_opens([]): |
| assert subfrag.num_row_groups == 3 |
| assert len(subfrag.row_groups) == 3 |
| assert subfrag.row_groups[0].statistics is not None |
| |
| # check correct scan result of subset |
| result = dataset_reader.to_table(subfrag) |
| assert result.to_pydict() == {"f1": [1, 2, 3], "f2": [1, 1, 1]} |
| |
| # filter that results in empty selection |
| subfrag = fragment.subset(ds.field("f1") > 5) |
| assert subfrag.num_row_groups == 0 |
| assert subfrag.row_groups == [] |
| result = dataset_reader.to_table(subfrag, schema=dataset.schema) |
| assert result.num_rows == 0 |
| assert result.equals(table[:0]) |
| |
| # passing schema to ensure filter on partition expression works |
| subfrag = fragment.subset(ds.field("part") == "a", schema=dataset.schema) |
| assert subfrag.num_row_groups == 4 |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_parquet_subset_invalid(tempdir): |
| _, dataset = _create_dataset_for_fragments(tempdir, chunk_size=1) |
| fragment = list(dataset.get_fragments())[0] |
| |
| # passing none or both of filter / row_group_ids |
| with pytest.raises(ValueError): |
| fragment.subset(ds.field("f1") >= 1, row_group_ids=[1, 2]) |
| |
| with pytest.raises(ValueError): |
| fragment.subset() |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_fragments_repr(tempdir, dataset): |
| # partitioned parquet dataset |
| fragment = list(dataset.get_fragments())[0] |
| assert ( |
| repr(fragment) == |
| "<pyarrow.dataset.ParquetFileFragment path=subdir/1/xxx/file0.parquet " |
| "partition=[key=xxx, group=1]>" |
| ) |
| |
| # single-file parquet dataset (no partition information in repr) |
| table, path = _create_single_file(tempdir) |
| dataset = ds.dataset(path, format="parquet") |
| fragment = list(dataset.get_fragments())[0] |
| assert ( |
| repr(fragment) == |
| "<pyarrow.dataset.ParquetFileFragment path={}>".format( |
| dataset.filesystem.normalize_path(str(path))) |
| ) |
| |
| # non-parquet format |
| path = tempdir / "data.feather" |
| pa.feather.write_feather(table, path) |
| dataset = ds.dataset(path, format="feather") |
| fragment = list(dataset.get_fragments())[0] |
| assert ( |
| repr(fragment) == |
| "<pyarrow.dataset.FileFragment type=ipc path={}>".format( |
| dataset.filesystem.normalize_path(str(path))) |
| ) |
| |
| |
| def test_partitioning_factory(mockfs): |
| paths_or_selector = fs.FileSelector('subdir', recursive=True) |
| format = ds.ParquetFileFormat() |
| |
| options = ds.FileSystemFactoryOptions('subdir') |
| partitioning_factory = ds.DirectoryPartitioning.discover(['group', 'key']) |
| assert isinstance(partitioning_factory, ds.PartitioningFactory) |
| options.partitioning_factory = partitioning_factory |
| |
| factory = ds.FileSystemDatasetFactory( |
| mockfs, paths_or_selector, format, options |
| ) |
| inspected_schema = factory.inspect() |
| # i64/f64 from data, group/key from "/1/xxx" and "/2/yyy" paths |
| expected_schema = pa.schema([ |
| ("i64", pa.int64()), |
| ("f64", pa.float64()), |
| ("str", pa.string()), |
| ("const", pa.int64()), |
| ("group", pa.int32()), |
| ("key", pa.string()), |
| ]) |
| assert inspected_schema.equals(expected_schema) |
| |
| hive_partitioning_factory = ds.HivePartitioning.discover() |
| assert isinstance(hive_partitioning_factory, ds.PartitioningFactory) |
| |
| |
| @pytest.mark.parametrize('infer_dictionary', [False, True]) |
| def test_partitioning_factory_dictionary(mockfs, infer_dictionary): |
| paths_or_selector = fs.FileSelector('subdir', recursive=True) |
| format = ds.ParquetFileFormat() |
| options = ds.FileSystemFactoryOptions('subdir') |
| |
| options.partitioning_factory = ds.DirectoryPartitioning.discover( |
| ['group', 'key'], infer_dictionary=infer_dictionary) |
| |
| factory = ds.FileSystemDatasetFactory( |
| mockfs, paths_or_selector, format, options) |
| |
| inferred_schema = factory.inspect() |
| if infer_dictionary: |
| expected_type = pa.dictionary(pa.int32(), pa.string()) |
| assert inferred_schema.field('key').type == expected_type |
| |
| table = factory.finish().to_table().combine_chunks() |
| actual = table.column('key').chunk(0) |
| expected = pa.array(['xxx'] * 5 + ['yyy'] * 5).dictionary_encode() |
| assert actual.equals(expected) |
| |
| # ARROW-9345 ensure filtering on the partition field works |
| table = factory.finish().to_table(filter=ds.field('key') == 'xxx') |
| actual = table.column('key').chunk(0) |
| expected = expected.slice(0, 5) |
| assert actual.equals(expected) |
| else: |
| assert inferred_schema.field('key').type == pa.string() |
| |
| |
| def test_partitioning_factory_segment_encoding(): |
| mockfs = fs._MockFileSystem() |
| format = ds.IpcFileFormat() |
| schema = pa.schema([("i64", pa.int64())]) |
| table = pa.table([pa.array(range(10))], schema=schema) |
| partition_schema = pa.schema( |
| [("date", pa.timestamp("s")), ("string", pa.string())]) |
| string_partition_schema = pa.schema( |
| [("date", pa.string()), ("string", pa.string())]) |
| full_schema = pa.schema(list(schema) + list(partition_schema)) |
| for directory in [ |
| "directory/2021-05-04 00%3A00%3A00/%24", |
| "hive/date=2021-05-04 00%3A00%3A00/string=%24", |
| ]: |
| mockfs.create_dir(directory) |
| with mockfs.open_output_stream(directory + "/0.feather") as sink: |
| with pa.ipc.new_file(sink, schema) as writer: |
| writer.write_table(table) |
| writer.close() |
| |
| # Directory |
| selector = fs.FileSelector("directory", recursive=True) |
| options = ds.FileSystemFactoryOptions("directory") |
| options.partitioning_factory = ds.DirectoryPartitioning.discover( |
| schema=partition_schema) |
| factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) |
| inferred_schema = factory.inspect() |
| assert inferred_schema == full_schema |
| actual = factory.finish().to_table(columns={ |
| "date_int": ds.field("date").cast(pa.int64()), |
| }) |
| assert actual[0][0].as_py() == 1620086400 |
| |
| options.partitioning_factory = ds.DirectoryPartitioning.discover( |
| ["date", "string"], segment_encoding="none") |
| factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) |
| fragments = list(factory.finish().get_fragments()) |
| assert fragments[0].partition_expression.equals( |
| (ds.field("date") == "2021-05-04 00%3A00%3A00") & |
| (ds.field("string") == "%24")) |
| |
| options.partitioning = ds.DirectoryPartitioning( |
| string_partition_schema, segment_encoding="none") |
| factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) |
| fragments = list(factory.finish().get_fragments()) |
| assert fragments[0].partition_expression.equals( |
| (ds.field("date") == "2021-05-04 00%3A00%3A00") & |
| (ds.field("string") == "%24")) |
| |
| options.partitioning_factory = ds.DirectoryPartitioning.discover( |
| schema=partition_schema, segment_encoding="none") |
| factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) |
| with pytest.raises(pa.ArrowInvalid, |
| match="Could not cast segments for partition field"): |
| inferred_schema = factory.inspect() |
| |
| # Hive |
| selector = fs.FileSelector("hive", recursive=True) |
| options = ds.FileSystemFactoryOptions("hive") |
| options.partitioning_factory = ds.HivePartitioning.discover( |
| schema=partition_schema) |
| factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) |
| inferred_schema = factory.inspect() |
| assert inferred_schema == full_schema |
| actual = factory.finish().to_table(columns={ |
| "date_int": ds.field("date").cast(pa.int64()), |
| }) |
| assert actual[0][0].as_py() == 1620086400 |
| |
| options.partitioning_factory = ds.HivePartitioning.discover( |
| segment_encoding="none") |
| factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) |
| fragments = list(factory.finish().get_fragments()) |
| assert fragments[0].partition_expression.equals( |
| (ds.field("date") == "2021-05-04 00%3A00%3A00") & |
| (ds.field("string") == "%24")) |
| |
| options.partitioning = ds.HivePartitioning( |
| string_partition_schema, segment_encoding="none") |
| factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) |
| fragments = list(factory.finish().get_fragments()) |
| assert fragments[0].partition_expression.equals( |
| (ds.field("date") == "2021-05-04 00%3A00%3A00") & |
| (ds.field("string") == "%24")) |
| |
| options.partitioning_factory = ds.HivePartitioning.discover( |
| schema=partition_schema, segment_encoding="none") |
| factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options) |
| with pytest.raises(pa.ArrowInvalid, |
| match="Could not cast segments for partition field"): |
| inferred_schema = factory.inspect() |
| |
| |
| def test_dictionary_partitioning_outer_nulls_raises(tempdir): |
| table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z']}) |
| part = ds.partitioning( |
| pa.schema([pa.field('a', pa.string()), pa.field('b', pa.string())])) |
| with pytest.raises(pa.ArrowInvalid): |
| ds.write_dataset(table, tempdir, format='parquet', partitioning=part) |
| |
| |
| def _has_subdirs(basedir): |
| elements = os.listdir(basedir) |
| return any([os.path.isdir(os.path.join(basedir, el)) for el in elements]) |
| |
| |
| def _do_list_all_dirs(basedir, path_so_far, result): |
| for f in os.listdir(basedir): |
| true_nested = os.path.join(basedir, f) |
| if os.path.isdir(true_nested): |
| norm_nested = posixpath.join(path_so_far, f) |
| if _has_subdirs(true_nested): |
| _do_list_all_dirs(true_nested, norm_nested, result) |
| else: |
| result.append(norm_nested) |
| |
| |
| def _list_all_dirs(basedir): |
| result = [] |
| _do_list_all_dirs(basedir, '', result) |
| return result |
| |
| |
| def _check_dataset_directories(tempdir, expected_directories): |
| actual_directories = set(_list_all_dirs(tempdir)) |
| assert actual_directories == set(expected_directories) |
| |
| |
| def test_dictionary_partitioning_inner_nulls(tempdir): |
| table = pa.table({'a': ['x', 'y', 'z'], 'b': ['x', 'y', None]}) |
| part = ds.partitioning( |
| pa.schema([pa.field('a', pa.string()), pa.field('b', pa.string())])) |
| ds.write_dataset(table, tempdir, format='parquet', partitioning=part) |
| _check_dataset_directories(tempdir, ['x/x', 'y/y', 'z']) |
| |
| |
| def test_hive_partitioning_nulls(tempdir): |
| table = pa.table({'a': ['x', None, 'z'], 'b': ['x', 'y', None]}) |
| part = ds.HivePartitioning(pa.schema( |
| [pa.field('a', pa.string()), pa.field('b', pa.string())]), None, 'xyz') |
| ds.write_dataset(table, tempdir, format='parquet', partitioning=part) |
| _check_dataset_directories(tempdir, ['a=x/b=x', 'a=xyz/b=y', 'a=z/b=xyz']) |
| |
| |
| def test_partitioning_function(): |
| schema = pa.schema([("year", pa.int16()), ("month", pa.int8())]) |
| names = ["year", "month"] |
| |
| # default DirectoryPartitioning |
| part = ds.partitioning(schema) |
| assert isinstance(part, ds.DirectoryPartitioning) |
| part = ds.partitioning(schema, dictionaries="infer") |
| assert isinstance(part, ds.PartitioningFactory) |
| part = ds.partitioning(field_names=names) |
| assert isinstance(part, ds.PartitioningFactory) |
| # needs schema or list of names |
| with pytest.raises(ValueError): |
| ds.partitioning() |
| with pytest.raises(ValueError, match="Expected list"): |
| ds.partitioning(field_names=schema) |
| with pytest.raises(ValueError, match="Cannot specify both"): |
| ds.partitioning(schema, field_names=schema) |
| |
| # Hive partitioning |
| part = ds.partitioning(schema, flavor="hive") |
| assert isinstance(part, ds.HivePartitioning) |
| part = ds.partitioning(schema, dictionaries="infer", flavor="hive") |
| assert isinstance(part, ds.PartitioningFactory) |
| part = ds.partitioning(flavor="hive") |
| assert isinstance(part, ds.PartitioningFactory) |
| # cannot pass list of names |
| with pytest.raises(ValueError): |
| ds.partitioning(names, flavor="hive") |
| with pytest.raises(ValueError, match="Cannot specify 'field_names'"): |
| ds.partitioning(field_names=names, flavor="hive") |
| |
| # unsupported flavor |
| with pytest.raises(ValueError): |
| ds.partitioning(schema, flavor="unsupported") |
| |
| |
| def test_directory_partitioning_dictionary_key(mockfs): |
| # ARROW-8088 specifying partition key as dictionary type |
| schema = pa.schema([ |
| pa.field('group', pa.dictionary(pa.int8(), pa.int32())), |
| pa.field('key', pa.dictionary(pa.int8(), pa.string())) |
| ]) |
| part = ds.DirectoryPartitioning.discover(schema=schema) |
| |
| dataset = ds.dataset( |
| "subdir", format="parquet", filesystem=mockfs, partitioning=part |
| ) |
| assert dataset.partitioning.schema == schema |
| table = dataset.to_table() |
| |
| assert table.column('group').type.equals(schema.types[0]) |
| assert table.column('group').to_pylist() == [1] * 5 + [2] * 5 |
| assert table.column('key').type.equals(schema.types[1]) |
| assert table.column('key').to_pylist() == ['xxx'] * 5 + ['yyy'] * 5 |
| |
| |
| def test_hive_partitioning_dictionary_key(multisourcefs): |
| # ARROW-8088 specifying partition key as dictionary type |
| schema = pa.schema([ |
| pa.field('year', pa.dictionary(pa.int8(), pa.int16())), |
| pa.field('month', pa.dictionary(pa.int8(), pa.int16())) |
| ]) |
| part = ds.HivePartitioning.discover(schema=schema) |
| |
| dataset = ds.dataset( |
| "hive", format="parquet", filesystem=multisourcefs, partitioning=part |
| ) |
| assert dataset.partitioning.schema == schema |
| table = dataset.to_table() |
| |
| year_dictionary = list(range(2006, 2011)) |
| month_dictionary = list(range(1, 13)) |
| assert table.column('year').type.equals(schema.types[0]) |
| for chunk in table.column('year').chunks: |
| actual = chunk.dictionary.to_pylist() |
| actual.sort() |
| assert actual == year_dictionary |
| assert table.column('month').type.equals(schema.types[1]) |
| for chunk in table.column('month').chunks: |
| actual = chunk.dictionary.to_pylist() |
| actual.sort() |
| assert actual == month_dictionary |
| |
| |
| def _create_single_file(base_dir, table=None, row_group_size=None): |
| import pyarrow.parquet as pq |
| if table is None: |
| table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5}) |
| path = base_dir / "test.parquet" |
| pq.write_table(table, path, row_group_size=row_group_size) |
| return table, path |
| |
| |
| def _create_directory_of_files(base_dir): |
| import pyarrow.parquet as pq |
| table1 = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5}) |
| path1 = base_dir / "test1.parquet" |
| pq.write_table(table1, path1) |
| table2 = pa.table({'a': range(9, 18), 'b': [0.] * 4 + [1.] * 5}) |
| path2 = base_dir / "test2.parquet" |
| pq.write_table(table2, path2) |
| return (table1, table2), (path1, path2) |
| |
| |
| def _check_dataset(dataset, table, dataset_reader): |
| # also test that pickle roundtrip keeps the functionality |
| for d in [dataset, pickle.loads(pickle.dumps(dataset))]: |
| assert dataset.schema.equals(table.schema) |
| assert dataset_reader.to_table(dataset).equals(table) |
| |
| |
| def _check_dataset_from_path(path, table, dataset_reader, **kwargs): |
| # pathlib object |
| assert isinstance(path, pathlib.Path) |
| |
| # accept Path, str, List[Path], List[str] |
| for p in [path, str(path), [path], [str(path)]]: |
| dataset = ds.dataset(path, **kwargs) |
| assert isinstance(dataset, ds.FileSystemDataset) |
| _check_dataset(dataset, table, dataset_reader) |
| |
| # relative string path |
| with change_cwd(path.parent): |
| dataset = ds.dataset(path.name, **kwargs) |
| assert isinstance(dataset, ds.FileSystemDataset) |
| _check_dataset(dataset, table, dataset_reader) |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_single_file(tempdir, dataset_reader): |
| table, path = _create_single_file(tempdir) |
| _check_dataset_from_path(path, table, dataset_reader) |
| |
| |
| @pytest.mark.parquet |
| def test_deterministic_row_order(tempdir, dataset_reader): |
| # ARROW-8447 Ensure that dataset.to_table (and Scanner::ToTable) returns a |
| # deterministic row ordering. This is achieved by constructing a single |
| # parquet file with one row per RowGroup. |
| table, path = _create_single_file(tempdir, row_group_size=1) |
| _check_dataset_from_path(path, table, dataset_reader) |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_directory(tempdir, dataset_reader): |
| tables, _ = _create_directory_of_files(tempdir) |
| table = pa.concat_tables(tables) |
| _check_dataset_from_path(tempdir, table, dataset_reader) |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_list_of_files(tempdir, dataset_reader): |
| tables, (path1, path2) = _create_directory_of_files(tempdir) |
| table = pa.concat_tables(tables) |
| |
| datasets = [ |
| ds.dataset([path1, path2]), |
| ds.dataset([str(path1), str(path2)]) |
| ] |
| datasets += [ |
| pickle.loads(pickle.dumps(d)) for d in datasets |
| ] |
| |
| for dataset in datasets: |
| assert dataset.schema.equals(table.schema) |
| result = dataset_reader.to_table(dataset) |
| assert result.equals(table) |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_filesystem_fspath(tempdir): |
| # single file |
| table, path = _create_single_file(tempdir) |
| |
| fspath = FSProtocolClass(path) |
| |
| # filesystem inferred from path |
| dataset1 = ds.dataset(fspath) |
| assert dataset1.schema.equals(table.schema) |
| |
| # filesystem specified |
| dataset2 = ds.dataset(fspath, filesystem=fs.LocalFileSystem()) |
| assert dataset2.schema.equals(table.schema) |
| |
| # passing different filesystem |
| with pytest.raises(TypeError): |
| ds.dataset(fspath, filesystem=fs._MockFileSystem()) |
| |
| |
| def test_construct_from_single_file(tempdir, dataset_reader): |
| directory = tempdir / 'single-file' |
| directory.mkdir() |
| table, path = _create_single_file(directory) |
| relative_path = path.relative_to(directory) |
| |
| # instantiate from a single file |
| d1 = ds.dataset(path) |
| # instantiate from a single file with a filesystem object |
| d2 = ds.dataset(path, filesystem=fs.LocalFileSystem()) |
| # instantiate from a single file with prefixed filesystem URI |
| d3 = ds.dataset(str(relative_path), filesystem=_filesystem_uri(directory)) |
| # pickle roundtrip |
| d4 = pickle.loads(pickle.dumps(d1)) |
| |
| assert dataset_reader.to_table(d1) == dataset_reader.to_table( |
| d2) == dataset_reader.to_table(d3) == dataset_reader.to_table(d4) |
| |
| |
| def test_construct_from_single_directory(tempdir, dataset_reader): |
| directory = tempdir / 'single-directory' |
| directory.mkdir() |
| tables, paths = _create_directory_of_files(directory) |
| |
| d1 = ds.dataset(directory) |
| d2 = ds.dataset(directory, filesystem=fs.LocalFileSystem()) |
| d3 = ds.dataset(directory.name, filesystem=_filesystem_uri(tempdir)) |
| t1 = dataset_reader.to_table(d1) |
| t2 = dataset_reader.to_table(d2) |
| t3 = dataset_reader.to_table(d3) |
| assert t1 == t2 == t3 |
| |
| # test pickle roundtrip |
| for d in [d1, d2, d3]: |
| restored = pickle.loads(pickle.dumps(d)) |
| assert dataset_reader.to_table(restored) == t1 |
| |
| |
| def test_construct_from_list_of_files(tempdir, dataset_reader): |
| # instantiate from a list of files |
| directory = tempdir / 'list-of-files' |
| directory.mkdir() |
| tables, paths = _create_directory_of_files(directory) |
| |
| relative_paths = [p.relative_to(tempdir) for p in paths] |
| with change_cwd(tempdir): |
| d1 = ds.dataset(relative_paths) |
| t1 = dataset_reader.to_table(d1) |
| assert len(t1) == sum(map(len, tables)) |
| |
| d2 = ds.dataset(relative_paths, filesystem=_filesystem_uri(tempdir)) |
| t2 = dataset_reader.to_table(d2) |
| d3 = ds.dataset(paths) |
| t3 = dataset_reader.to_table(d3) |
| d4 = ds.dataset(paths, filesystem=fs.LocalFileSystem()) |
| t4 = dataset_reader.to_table(d4) |
| |
| assert t1 == t2 == t3 == t4 |
| |
| |
| def test_construct_from_list_of_mixed_paths_fails(mockfs): |
| # isntantiate from a list of mixed paths |
| files = [ |
| 'subdir/1/xxx/file0.parquet', |
| 'subdir/1/xxx/doesnt-exist.parquet', |
| ] |
| with pytest.raises(FileNotFoundError, match='doesnt-exist'): |
| ds.dataset(files, filesystem=mockfs) |
| |
| |
| def test_construct_from_mixed_child_datasets(mockfs): |
| # isntantiate from a list of mixed paths |
| a = ds.dataset(['subdir/1/xxx/file0.parquet', |
| 'subdir/2/yyy/file1.parquet'], filesystem=mockfs) |
| b = ds.dataset('subdir', filesystem=mockfs) |
| |
| dataset = ds.dataset([a, b]) |
| |
| assert isinstance(dataset, ds.UnionDataset) |
| assert len(list(dataset.get_fragments())) == 4 |
| |
| table = dataset.to_table() |
| assert len(table) == 20 |
| assert table.num_columns == 4 |
| |
| assert len(dataset.children) == 2 |
| for child in dataset.children: |
| assert child.files == ['subdir/1/xxx/file0.parquet', |
| 'subdir/2/yyy/file1.parquet'] |
| |
| |
| def test_construct_empty_dataset(): |
| empty = ds.dataset([]) |
| table = empty.to_table() |
| assert table.num_rows == 0 |
| assert table.num_columns == 0 |
| |
| |
| def test_construct_dataset_with_invalid_schema(): |
| empty = ds.dataset([], schema=pa.schema([ |
| ('a', pa.int64()), |
| ('a', pa.string()) |
| ])) |
| with pytest.raises(ValueError, match='Multiple matches for .*a.* in '): |
| empty.to_table() |
| |
| |
| def test_construct_from_invalid_sources_raise(multisourcefs): |
| child1 = ds.FileSystemDatasetFactory( |
| multisourcefs, |
| fs.FileSelector('/plain'), |
| format=ds.ParquetFileFormat() |
| ) |
| child2 = ds.FileSystemDatasetFactory( |
| multisourcefs, |
| fs.FileSelector('/schema'), |
| format=ds.ParquetFileFormat() |
| ) |
| batch1 = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["a"]) |
| batch2 = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["b"]) |
| |
| with pytest.raises(TypeError, match='Expected.*FileSystemDatasetFactory'): |
| ds.dataset([child1, child2]) |
| |
| expected = ( |
| "Expected a list of path-like or dataset objects, or a list " |
| "of batches or tables. The given list contains the following " |
| "types: int" |
| ) |
| with pytest.raises(TypeError, match=expected): |
| ds.dataset([1, 2, 3]) |
| |
| expected = ( |
| "Expected a path-like, list of path-likes or a list of Datasets " |
| "instead of the given type: NoneType" |
| ) |
| with pytest.raises(TypeError, match=expected): |
| ds.dataset(None) |
| |
| expected = ( |
| "Expected a path-like, list of path-likes or a list of Datasets " |
| "instead of the given type: generator" |
| ) |
| with pytest.raises(TypeError, match=expected): |
| ds.dataset((batch1 for _ in range(3))) |
| |
| expected = ( |
| "Must provide schema to construct in-memory dataset from an empty list" |
| ) |
| with pytest.raises(ValueError, match=expected): |
| ds.InMemoryDataset([]) |
| |
| expected = ( |
| "Item has schema\nb: int64\nwhich does not match expected schema\n" |
| "a: int64" |
| ) |
| with pytest.raises(TypeError, match=expected): |
| ds.dataset([batch1, batch2]) |
| |
| expected = ( |
| "Expected a list of path-like or dataset objects, or a list of " |
| "batches or tables. The given list contains the following types:" |
| ) |
| with pytest.raises(TypeError, match=expected): |
| ds.dataset([batch1, 0]) |
| |
| expected = ( |
| "Expected a list of tables or batches. The given list contains a int" |
| ) |
| with pytest.raises(TypeError, match=expected): |
| ds.InMemoryDataset([batch1, 0]) |
| |
| |
| def test_construct_in_memory(dataset_reader): |
| batch = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["a"]) |
| table = pa.Table.from_batches([batch]) |
| |
| assert ds.dataset([], schema=pa.schema([])).to_table() == pa.table([]) |
| |
| for source in (batch, table, [batch], [table]): |
| dataset = ds.dataset(source) |
| assert dataset_reader.to_table(dataset) == table |
| assert len(list(dataset.get_fragments())) == 1 |
| assert next(dataset.get_fragments()).to_table() == table |
| assert pa.Table.from_batches(list(dataset.to_batches())) == table |
| |
| |
| @pytest.mark.parametrize('use_threads,use_async', |
| [(False, False), (False, True), |
| (True, False), (True, True)]) |
| def test_scan_iterator(use_threads, use_async): |
| batch = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["a"]) |
| table = pa.Table.from_batches([batch]) |
| # When constructed from readers/iterators, should be one-shot |
| match = "OneShotFragment was already scanned" |
| for factory, schema in ( |
| (lambda: pa.ipc.RecordBatchReader.from_batches( |
| batch.schema, [batch]), None), |
| (lambda: (batch for _ in range(1)), batch.schema), |
| ): |
| # Scanning the fragment consumes the underlying iterator |
| scanner = ds.Scanner.from_batches( |
| factory(), schema=schema, use_threads=use_threads, |
| use_async=use_async) |
| assert scanner.to_table() == table |
| with pytest.raises(pa.ArrowInvalid, match=match): |
| scanner.to_table() |
| |
| |
| def _create_partitioned_dataset(basedir): |
| import pyarrow.parquet as pq |
| table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5}) |
| |
| path = basedir / "dataset-partitioned" |
| path.mkdir() |
| |
| for i in range(3): |
| part = path / "part={}".format(i) |
| part.mkdir() |
| pq.write_table(table.slice(3*i, 3), part / "test.parquet") |
| |
| full_table = table.append_column( |
| "part", pa.array(np.repeat([0, 1, 2], 3), type=pa.int32())) |
| |
| return full_table, path |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_partitioned_directory(tempdir, dataset_reader): |
| full_table, path = _create_partitioned_dataset(tempdir) |
| |
| # no partitioning specified, just read all individual files |
| table = full_table.select(['a', 'b']) |
| _check_dataset_from_path(path, table, dataset_reader) |
| |
| # specify partition scheme with discovery |
| dataset = ds.dataset( |
| str(path), partitioning=ds.partitioning(flavor="hive")) |
| assert dataset.schema.equals(full_table.schema) |
| |
| # specify partition scheme with discovery and relative path |
| with change_cwd(tempdir): |
| dataset = ds.dataset("dataset-partitioned/", |
| partitioning=ds.partitioning(flavor="hive")) |
| assert dataset.schema.equals(full_table.schema) |
| |
| # specify partition scheme with string short-cut |
| dataset = ds.dataset(str(path), partitioning="hive") |
| assert dataset.schema.equals(full_table.schema) |
| |
| # specify partition scheme with explicit scheme |
| dataset = ds.dataset( |
| str(path), |
| partitioning=ds.partitioning( |
| pa.schema([("part", pa.int8())]), flavor="hive")) |
| expected_schema = table.schema.append(pa.field("part", pa.int8())) |
| assert dataset.schema.equals(expected_schema) |
| |
| result = dataset.to_table() |
| expected = table.append_column( |
| "part", pa.array(np.repeat([0, 1, 2], 3), type=pa.int8())) |
| assert result.equals(expected) |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_filesystem(tempdir): |
| # single file |
| table, path = _create_single_file(tempdir) |
| |
| # filesystem inferred from path |
| dataset1 = ds.dataset(str(path)) |
| assert dataset1.schema.equals(table.schema) |
| |
| # filesystem specified |
| dataset2 = ds.dataset(str(path), filesystem=fs.LocalFileSystem()) |
| assert dataset2.schema.equals(table.schema) |
| |
| # local filesystem specified with relative path |
| with change_cwd(tempdir): |
| dataset3 = ds.dataset("test.parquet", filesystem=fs.LocalFileSystem()) |
| assert dataset3.schema.equals(table.schema) |
| |
| # passing different filesystem |
| with pytest.raises(FileNotFoundError): |
| ds.dataset(str(path), filesystem=fs._MockFileSystem()) |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_unsupported_format(tempdir): |
| _, path = _create_single_file(tempdir) |
| with pytest.raises(ValueError, match="format 'blabla' is not supported"): |
| ds.dataset([path], format="blabla") |
| |
| |
| @pytest.mark.parquet |
| def test_open_union_dataset(tempdir, dataset_reader): |
| _, path = _create_single_file(tempdir) |
| dataset = ds.dataset(path) |
| |
| union = ds.dataset([dataset, dataset]) |
| assert isinstance(union, ds.UnionDataset) |
| |
| pickled = pickle.loads(pickle.dumps(union)) |
| assert dataset_reader.to_table(pickled) == dataset_reader.to_table(union) |
| |
| |
| def test_open_union_dataset_with_additional_kwargs(multisourcefs): |
| child = ds.dataset('/plain', filesystem=multisourcefs, format='parquet') |
| with pytest.raises(ValueError, match="cannot pass any additional"): |
| ds.dataset([child], format="parquet") |
| |
| |
| def test_open_dataset_non_existing_file(): |
| # ARROW-8213: Opening a dataset with a local incorrect path gives confusing |
| # error message |
| with pytest.raises(FileNotFoundError): |
| ds.dataset('i-am-not-existing.parquet', format='parquet') |
| |
| with pytest.raises(pa.ArrowInvalid, match='cannot be relative'): |
| ds.dataset('file:i-am-not-existing.parquet', format='parquet') |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.parametrize('partitioning', ["directory", "hive"]) |
| @pytest.mark.parametrize('null_fallback', ['xyz', None]) |
| @pytest.mark.parametrize('infer_dictionary', [False, True]) |
| @pytest.mark.parametrize('partition_keys', [ |
| (["A", "B", "C"], [1, 2, 3]), |
| ([1, 2, 3], ["A", "B", "C"]), |
| (["A", "B", "C"], ["D", "E", "F"]), |
| ([1, 2, 3], [4, 5, 6]), |
| ([1, None, 3], ["A", "B", "C"]), |
| ([1, 2, 3], ["A", None, "C"]), |
| ([None, 2, 3], [None, 2, 3]), |
| ]) |
| def test_partition_discovery( |
| tempdir, partitioning, null_fallback, infer_dictionary, partition_keys |
| ): |
| # ARROW-9288 / ARROW-9476 |
| import pyarrow.parquet as pq |
| |
| table = pa.table({'a': range(9), 'b': [0.0] * 4 + [1.0] * 5}) |
| |
| has_null = None in partition_keys[0] or None in partition_keys[1] |
| if partitioning == "directory" and has_null: |
| # Directory partitioning can't handle the first part being null |
| return |
| |
| if partitioning == "directory": |
| partitioning = ds.DirectoryPartitioning.discover( |
| ["part1", "part2"], infer_dictionary=infer_dictionary) |
| fmt = "{0}/{1}" |
| null_value = None |
| else: |
| if null_fallback: |
| partitioning = ds.HivePartitioning.discover( |
| infer_dictionary=infer_dictionary, null_fallback=null_fallback |
| ) |
| else: |
| partitioning = ds.HivePartitioning.discover( |
| infer_dictionary=infer_dictionary) |
| fmt = "part1={0}/part2={1}" |
| if null_fallback: |
| null_value = null_fallback |
| else: |
| null_value = "__HIVE_DEFAULT_PARTITION__" |
| |
| basepath = tempdir / "dataset" |
| basepath.mkdir() |
| |
| part_keys1, part_keys2 = partition_keys |
| for part1 in part_keys1: |
| for part2 in part_keys2: |
| path = basepath / \ |
| fmt.format(part1 or null_value, part2 or null_value) |
| path.mkdir(parents=True) |
| pq.write_table(table, path / "test.parquet") |
| |
| dataset = ds.dataset(str(basepath), partitioning=partitioning) |
| |
| def expected_type(key): |
| if infer_dictionary: |
| value_type = pa.string() if isinstance(key, str) else pa.int32() |
| return pa.dictionary(pa.int32(), value_type) |
| else: |
| return pa.string() if isinstance(key, str) else pa.int32() |
| expected_schema = table.schema.append( |
| pa.field("part1", expected_type(part_keys1[0])) |
| ).append( |
| pa.field("part2", expected_type(part_keys2[0])) |
| ) |
| assert dataset.schema.equals(expected_schema) |
| |
| |
| @pytest.mark.pandas |
| def test_dataset_partitioned_dictionary_type_reconstruct(tempdir): |
| # https://issues.apache.org/jira/browse/ARROW-11400 |
| table = pa.table({'part': np.repeat(['A', 'B'], 5), 'col': range(10)}) |
| part = ds.partitioning(table.select(['part']).schema, flavor="hive") |
| ds.write_dataset(table, tempdir, partitioning=part, format="feather") |
| |
| dataset = ds.dataset( |
| tempdir, format="feather", |
| partitioning=ds.HivePartitioning.discover(infer_dictionary=True) |
| ) |
| expected = pa.table( |
| {'col': table['col'], 'part': table['part'].dictionary_encode()} |
| ) |
| assert dataset.to_table().equals(expected) |
| fragment = list(dataset.get_fragments())[0] |
| assert fragment.to_table(schema=dataset.schema).equals(expected[:5]) |
| part_expr = fragment.partition_expression |
| |
| restored = pickle.loads(pickle.dumps(dataset)) |
| assert restored.to_table().equals(expected) |
| |
| restored = pickle.loads(pickle.dumps(fragment)) |
| assert restored.to_table(schema=dataset.schema).equals(expected[:5]) |
| # to_pandas call triggers computation of the actual dictionary values |
| assert restored.to_table(schema=dataset.schema).to_pandas().equals( |
| expected[:5].to_pandas() |
| ) |
| assert restored.partition_expression.equals(part_expr) |
| |
| |
| @pytest.fixture |
| def s3_example_simple(s3_server): |
| from pyarrow.fs import FileSystem |
| import pyarrow.parquet as pq |
| |
| host, port, access_key, secret_key = s3_server['connection'] |
| uri = ( |
| "s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}" |
| .format(access_key, secret_key, host, port) |
| ) |
| |
| fs, path = FileSystem.from_uri(uri) |
| |
| fs.create_dir("mybucket") |
| table = pa.table({'a': [1, 2, 3]}) |
| with fs.open_output_stream("mybucket/data.parquet") as out: |
| pq.write_table(table, out) |
| |
| return table, path, fs, uri, host, port, access_key, secret_key |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.s3 |
| def test_open_dataset_from_uri_s3(s3_example_simple, dataset_reader): |
| # open dataset from non-localfs string path |
| table, path, fs, uri, _, _, _, _ = s3_example_simple |
| |
| # full string URI |
| dataset = ds.dataset(uri, format="parquet") |
| assert dataset_reader.to_table(dataset).equals(table) |
| |
| # passing filesystem object |
| dataset = ds.dataset(path, format="parquet", filesystem=fs) |
| assert dataset_reader.to_table(dataset).equals(table) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.s3 # still needed to create the data |
| def test_open_dataset_from_uri_s3_fsspec(s3_example_simple): |
| table, path, _, _, host, port, access_key, secret_key = s3_example_simple |
| s3fs = pytest.importorskip("s3fs") |
| |
| from pyarrow.fs import PyFileSystem, FSSpecHandler |
| |
| fs = s3fs.S3FileSystem( |
| key=access_key, |
| secret=secret_key, |
| client_kwargs={ |
| 'endpoint_url': 'http://{}:{}'.format(host, port) |
| } |
| ) |
| |
| # passing as fsspec filesystem |
| dataset = ds.dataset(path, format="parquet", filesystem=fs) |
| assert dataset.to_table().equals(table) |
| |
| # directly passing the fsspec-handler |
| fs = PyFileSystem(FSSpecHandler(fs)) |
| dataset = ds.dataset(path, format="parquet", filesystem=fs) |
| assert dataset.to_table().equals(table) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.s3 |
| def test_open_dataset_from_s3_with_filesystem_uri(s3_server): |
| from pyarrow.fs import FileSystem |
| import pyarrow.parquet as pq |
| |
| host, port, access_key, secret_key = s3_server['connection'] |
| bucket = 'theirbucket' |
| path = 'nested/folder/data.parquet' |
| uri = "s3://{}:{}@{}/{}?scheme=http&endpoint_override={}:{}".format( |
| access_key, secret_key, bucket, path, host, port |
| ) |
| |
| fs, path = FileSystem.from_uri(uri) |
| assert path == 'theirbucket/nested/folder/data.parquet' |
| |
| fs.create_dir(bucket) |
| |
| table = pa.table({'a': [1, 2, 3]}) |
| with fs.open_output_stream(path) as out: |
| pq.write_table(table, out) |
| |
| # full string URI |
| dataset = ds.dataset(uri, format="parquet") |
| assert dataset.to_table().equals(table) |
| |
| # passing filesystem as an uri |
| template = ( |
| "s3://{}:{}@{{}}?scheme=http&endpoint_override={}:{}".format( |
| access_key, secret_key, host, port |
| ) |
| ) |
| cases = [ |
| ('theirbucket/nested/folder/', '/data.parquet'), |
| ('theirbucket/nested/folder', 'data.parquet'), |
| ('theirbucket/nested/', 'folder/data.parquet'), |
| ('theirbucket/nested', 'folder/data.parquet'), |
| ('theirbucket', '/nested/folder/data.parquet'), |
| ('theirbucket', 'nested/folder/data.parquet'), |
| ] |
| for prefix, path in cases: |
| uri = template.format(prefix) |
| dataset = ds.dataset(path, filesystem=uri, format="parquet") |
| assert dataset.to_table().equals(table) |
| |
| with pytest.raises(pa.ArrowInvalid, match='Missing bucket name'): |
| uri = template.format('/') |
| ds.dataset('/theirbucket/nested/folder/data.parquet', filesystem=uri) |
| |
| error = ( |
| "The path component of the filesystem URI must point to a directory " |
| "but it has a type: `{}`. The path component is `{}` and the given " |
| "filesystem URI is `{}`" |
| ) |
| |
| path = 'theirbucket/doesnt/exist' |
| uri = template.format(path) |
| with pytest.raises(ValueError) as exc: |
| ds.dataset('data.parquet', filesystem=uri) |
| assert str(exc.value) == error.format('NotFound', path, uri) |
| |
| path = 'theirbucket/nested/folder/data.parquet' |
| uri = template.format(path) |
| with pytest.raises(ValueError) as exc: |
| ds.dataset('data.parquet', filesystem=uri) |
| assert str(exc.value) == error.format('File', path, uri) |
| |
| |
| @pytest.mark.parquet |
| def test_open_dataset_from_fsspec(tempdir): |
| table, path = _create_single_file(tempdir) |
| |
| fsspec = pytest.importorskip("fsspec") |
| |
| localfs = fsspec.filesystem("file") |
| dataset = ds.dataset(path, filesystem=localfs) |
| assert dataset.schema.equals(table.schema) |
| |
| |
| @pytest.mark.pandas |
| def test_filter_timestamp(tempdir, dataset_reader): |
| # ARROW-11379 |
| path = tempdir / "test_partition_timestamps" |
| |
| table = pa.table({ |
| "dates": ['2012-01-01', '2012-01-02'] * 5, |
| "id": range(10)}) |
| |
| # write dataset partitioned on dates (as strings) |
| part = ds.partitioning(table.select(['dates']).schema, flavor="hive") |
| ds.write_dataset(table, path, partitioning=part, format="feather") |
| |
| # read dataset partitioned on dates (as timestamps) |
| part = ds.partitioning(pa.schema([("dates", pa.timestamp("s"))]), |
| flavor="hive") |
| dataset = ds.dataset(path, format="feather", partitioning=part) |
| |
| condition = ds.field("dates") > pd.Timestamp("2012-01-01") |
| table = dataset_reader.to_table(dataset, filter=condition) |
| assert table.column('id').to_pylist() == [1, 3, 5, 7, 9] |
| |
| import datetime |
| condition = ds.field("dates") > datetime.datetime(2012, 1, 1) |
| table = dataset_reader.to_table(dataset, filter=condition) |
| assert table.column('id').to_pylist() == [1, 3, 5, 7, 9] |
| |
| |
| @pytest.mark.parquet |
| def test_filter_implicit_cast(tempdir, dataset_reader): |
| # ARROW-7652 |
| table = pa.table({'a': pa.array([0, 1, 2, 3, 4, 5], type=pa.int8())}) |
| _, path = _create_single_file(tempdir, table) |
| dataset = ds.dataset(str(path)) |
| |
| filter_ = ds.field('a') > 2 |
| assert len(dataset_reader.to_table(dataset, filter=filter_)) == 3 |
| |
| |
| def test_dataset_union(multisourcefs): |
| child = ds.FileSystemDatasetFactory( |
| multisourcefs, fs.FileSelector('/plain'), |
| format=ds.ParquetFileFormat() |
| ) |
| factory = ds.UnionDatasetFactory([child]) |
| |
| # TODO(bkietz) reintroduce factory.children property |
| assert len(factory.inspect_schemas()) == 1 |
| assert all(isinstance(s, pa.Schema) for s in factory.inspect_schemas()) |
| assert factory.inspect_schemas()[0].equals(child.inspect()) |
| assert factory.inspect().equals(child.inspect()) |
| assert isinstance(factory.finish(), ds.Dataset) |
| |
| |
| def test_union_dataset_from_other_datasets(tempdir, multisourcefs): |
| child1 = ds.dataset('/plain', filesystem=multisourcefs, format='parquet') |
| child2 = ds.dataset('/schema', filesystem=multisourcefs, format='parquet', |
| partitioning=['week', 'color']) |
| child3 = ds.dataset('/hive', filesystem=multisourcefs, format='parquet', |
| partitioning='hive') |
| |
| assert child1.schema != child2.schema != child3.schema |
| |
| assembled = ds.dataset([child1, child2, child3]) |
| assert isinstance(assembled, ds.UnionDataset) |
| |
| msg = 'cannot pass any additional arguments' |
| with pytest.raises(ValueError, match=msg): |
| ds.dataset([child1, child2], filesystem=multisourcefs) |
| |
| expected_schema = pa.schema([ |
| ('date', pa.date32()), |
| ('index', pa.int64()), |
| ('value', pa.float64()), |
| ('color', pa.string()), |
| ('week', pa.int32()), |
| ('year', pa.int32()), |
| ('month', pa.int32()), |
| ]) |
| assert assembled.schema.equals(expected_schema) |
| assert assembled.to_table().schema.equals(expected_schema) |
| |
| assembled = ds.dataset([child1, child3]) |
| expected_schema = pa.schema([ |
| ('date', pa.date32()), |
| ('index', pa.int64()), |
| ('value', pa.float64()), |
| ('color', pa.string()), |
| ('year', pa.int32()), |
| ('month', pa.int32()), |
| ]) |
| assert assembled.schema.equals(expected_schema) |
| assert assembled.to_table().schema.equals(expected_schema) |
| |
| expected_schema = pa.schema([ |
| ('month', pa.int32()), |
| ('color', pa.string()), |
| ('date', pa.date32()), |
| ]) |
| assembled = ds.dataset([child1, child3], schema=expected_schema) |
| assert assembled.to_table().schema.equals(expected_schema) |
| |
| expected_schema = pa.schema([ |
| ('month', pa.int32()), |
| ('color', pa.string()), |
| ('unknown', pa.string()) # fill with nulls |
| ]) |
| assembled = ds.dataset([child1, child3], schema=expected_schema) |
| assert assembled.to_table().schema.equals(expected_schema) |
| |
| # incompatible schemas, date and index columns have conflicting types |
| table = pa.table([range(9), [0.] * 4 + [1.] * 5, 'abcdefghj'], |
| names=['date', 'value', 'index']) |
| _, path = _create_single_file(tempdir, table=table) |
| child4 = ds.dataset(path) |
| |
| with pytest.raises(pa.ArrowInvalid, match='Unable to merge'): |
| ds.dataset([child1, child4]) |
| |
| |
| def test_dataset_from_a_list_of_local_directories_raises(multisourcefs): |
| msg = 'points to a directory, but only file paths are supported' |
| with pytest.raises(IsADirectoryError, match=msg): |
| ds.dataset(['/plain', '/schema', '/hive'], filesystem=multisourcefs) |
| |
| |
| def test_union_dataset_filesystem_datasets(multisourcefs): |
| # without partitioning |
| dataset = ds.dataset([ |
| ds.dataset('/plain', filesystem=multisourcefs), |
| ds.dataset('/schema', filesystem=multisourcefs), |
| ds.dataset('/hive', filesystem=multisourcefs), |
| ]) |
| expected_schema = pa.schema([ |
| ('date', pa.date32()), |
| ('index', pa.int64()), |
| ('value', pa.float64()), |
| ('color', pa.string()), |
| ]) |
| assert dataset.schema.equals(expected_schema) |
| |
| # with hive partitioning for two hive sources |
| dataset = ds.dataset([ |
| ds.dataset('/plain', filesystem=multisourcefs), |
| ds.dataset('/schema', filesystem=multisourcefs), |
| ds.dataset('/hive', filesystem=multisourcefs, partitioning='hive') |
| ]) |
| expected_schema = pa.schema([ |
| ('date', pa.date32()), |
| ('index', pa.int64()), |
| ('value', pa.float64()), |
| ('color', pa.string()), |
| ('year', pa.int32()), |
| ('month', pa.int32()), |
| ]) |
| assert dataset.schema.equals(expected_schema) |
| |
| |
| @pytest.mark.parquet |
| def test_specified_schema(tempdir, dataset_reader): |
| import pyarrow.parquet as pq |
| |
| table = pa.table({'a': [1, 2, 3], 'b': [.1, .2, .3]}) |
| pq.write_table(table, tempdir / "data.parquet") |
| |
| def _check_dataset(schema, expected, expected_schema=None): |
| dataset = ds.dataset(str(tempdir / "data.parquet"), schema=schema) |
| if expected_schema is not None: |
| assert dataset.schema.equals(expected_schema) |
| else: |
| assert dataset.schema.equals(schema) |
| result = dataset_reader.to_table(dataset) |
| assert result.equals(expected) |
| |
| # no schema specified |
| schema = None |
| expected = table |
| _check_dataset(schema, expected, expected_schema=table.schema) |
| |
| # identical schema specified |
| schema = table.schema |
| expected = table |
| _check_dataset(schema, expected) |
| |
| # Specifying schema with change column order |
| schema = pa.schema([('b', 'float64'), ('a', 'int64')]) |
| expected = pa.table([[.1, .2, .3], [1, 2, 3]], names=['b', 'a']) |
| _check_dataset(schema, expected) |
| |
| # Specifying schema with missing column |
| schema = pa.schema([('a', 'int64')]) |
| expected = pa.table([[1, 2, 3]], names=['a']) |
| _check_dataset(schema, expected) |
| |
| # Specifying schema with additional column |
| schema = pa.schema([('a', 'int64'), ('c', 'int32')]) |
| expected = pa.table([[1, 2, 3], |
| pa.array([None, None, None], type='int32')], |
| names=['a', 'c']) |
| _check_dataset(schema, expected) |
| |
| # Specifying with differing field types |
| schema = pa.schema([('a', 'int32'), ('b', 'float64')]) |
| dataset = ds.dataset(str(tempdir / "data.parquet"), schema=schema) |
| expected = pa.table([table['a'].cast('int32'), |
| table['b']], |
| names=['a', 'b']) |
| _check_dataset(schema, expected) |
| |
| # Specifying with incompatible schema |
| schema = pa.schema([('a', pa.list_(pa.int32())), ('b', 'float64')]) |
| dataset = ds.dataset(str(tempdir / "data.parquet"), schema=schema) |
| assert dataset.schema.equals(schema) |
| with pytest.raises(NotImplementedError, |
| match='Unsupported cast from int64 to list'): |
| dataset_reader.to_table(dataset) |
| |
| |
| @pytest.mark.parquet |
| def test_incompatible_schema_hang(tempdir, dataset_reader): |
| # ARROW-13480: deadlock when reading past an errored fragment |
| import pyarrow.parquet as pq |
| |
| fn = tempdir / "data.parquet" |
| table = pa.table({'a': [1, 2, 3]}) |
| pq.write_table(table, fn) |
| |
| schema = pa.schema([('a', pa.null())]) |
| dataset = ds.dataset([str(fn)] * 100, schema=schema) |
| assert dataset.schema.equals(schema) |
| scanner = dataset_reader.scanner(dataset) |
| reader = scanner.to_reader() |
| with pytest.raises(NotImplementedError, |
| match='Unsupported cast from int64 to null'): |
| reader.read_all() |
| |
| |
| def test_ipc_format(tempdir, dataset_reader): |
| table = pa.table({'a': pa.array([1, 2, 3], type="int8"), |
| 'b': pa.array([.1, .2, .3], type="float64")}) |
| |
| path = str(tempdir / 'test.arrow') |
| with pa.output_stream(path) as sink: |
| writer = pa.RecordBatchFileWriter(sink, table.schema) |
| writer.write_batch(table.to_batches()[0]) |
| writer.close() |
| |
| dataset = ds.dataset(path, format=ds.IpcFileFormat()) |
| result = dataset_reader.to_table(dataset) |
| assert result.equals(table) |
| |
| for format_str in ["ipc", "arrow"]: |
| dataset = ds.dataset(path, format=format_str) |
| result = dataset_reader.to_table(dataset) |
| assert result.equals(table) |
| |
| |
| @pytest.mark.orc |
| def test_orc_format(tempdir, dataset_reader): |
| from pyarrow import orc |
| table = pa.table({'a': pa.array([1, 2, 3], type="int8"), |
| 'b': pa.array([.1, .2, .3], type="float64")}) |
| |
| path = str(tempdir / 'test.orc') |
| orc.write_table(table, path) |
| |
| dataset = ds.dataset(path, format=ds.OrcFileFormat()) |
| result = dataset_reader.to_table(dataset) |
| result.validate(full=True) |
| assert result.equals(table) |
| |
| dataset = ds.dataset(path, format="orc") |
| result = dataset_reader.to_table(dataset) |
| result.validate(full=True) |
| assert result.equals(table) |
| |
| result = dataset_reader.to_table(dataset, columns=["b"]) |
| result.validate(full=True) |
| assert result.equals(table.select(["b"])) |
| |
| result = dataset_reader.to_table( |
| dataset, columns={"b2": ds.field("b") * 2} |
| ) |
| result.validate(full=True) |
| assert result.equals( |
| pa.table({'b2': pa.array([.2, .4, .6], type="float64")}) |
| ) |
| |
| assert dataset_reader.count_rows(dataset) == 3 |
| assert dataset_reader.count_rows(dataset, filter=ds.field("a") > 2) == 1 |
| |
| |
| @pytest.mark.orc |
| def test_orc_scan_options(tempdir, dataset_reader): |
| from pyarrow import orc |
| table = pa.table({'a': pa.array([1, 2, 3], type="int8"), |
| 'b': pa.array([.1, .2, .3], type="float64")}) |
| |
| path = str(tempdir / 'test.orc') |
| orc.write_table(table, path) |
| |
| dataset = ds.dataset(path, format="orc") |
| result = list(dataset_reader.to_batches(dataset)) |
| assert len(result) == 1 |
| assert result[0].num_rows == 3 |
| assert result[0].equals(table.to_batches()[0]) |
| # TODO batch_size is not yet supported (ARROW-14153) |
| # result = list(dataset_reader.to_batches(dataset, batch_size=2)) |
| # assert len(result) == 2 |
| # assert result[0].num_rows == 2 |
| # assert result[0].equals(table.slice(0, 2).to_batches()[0]) |
| # assert result[1].num_rows == 1 |
| # assert result[1].equals(table.slice(2, 1).to_batches()[0]) |
| |
| |
| def test_orc_format_not_supported(): |
| try: |
| from pyarrow.dataset import OrcFileFormat # noqa |
| except (ImportError, AttributeError): |
| # catch AttributeError for Python 3.6 |
| # ORC is not available, test error message |
| with pytest.raises( |
| ValueError, match="not built with support for the ORC file" |
| ): |
| ds.dataset(".", format="orc") |
| |
| |
| @pytest.mark.pandas |
| def test_csv_format(tempdir, dataset_reader): |
| table = pa.table({'a': pa.array([1, 2, 3], type="int64"), |
| 'b': pa.array([.1, .2, .3], type="float64")}) |
| |
| path = str(tempdir / 'test.csv') |
| table.to_pandas().to_csv(path, index=False) |
| |
| dataset = ds.dataset(path, format=ds.CsvFileFormat()) |
| result = dataset_reader.to_table(dataset) |
| assert result.equals(table) |
| |
| dataset = ds.dataset(path, format='csv') |
| result = dataset_reader.to_table(dataset) |
| assert result.equals(table) |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parametrize("compression", [ |
| "bz2", |
| "gzip", |
| "lz4", |
| "zstd", |
| ]) |
| def test_csv_format_compressed(tempdir, compression, dataset_reader): |
| if not pyarrow.Codec.is_available(compression): |
| pytest.skip("{} support is not built".format(compression)) |
| table = pa.table({'a': pa.array([1, 2, 3], type="int64"), |
| 'b': pa.array([.1, .2, .3], type="float64")}) |
| filesystem = fs.LocalFileSystem() |
| suffix = compression if compression != 'gzip' else 'gz' |
| path = str(tempdir / f'test.csv.{suffix}') |
| with filesystem.open_output_stream(path, compression=compression) as sink: |
| # https://github.com/pandas-dev/pandas/issues/23854 |
| # With CI version of Pandas (anything < 1.2), Pandas tries to write |
| # str to the sink |
| csv_str = table.to_pandas().to_csv(index=False) |
| sink.write(csv_str.encode('utf-8')) |
| |
| dataset = ds.dataset(path, format=ds.CsvFileFormat()) |
| result = dataset_reader.to_table(dataset) |
| assert result.equals(table) |
| |
| |
| def test_csv_format_options(tempdir, dataset_reader): |
| path = str(tempdir / 'test.csv') |
| with open(path, 'w') as sink: |
| sink.write('skipped\ncol0\nfoo\nbar\n') |
| dataset = ds.dataset(path, format='csv') |
| result = dataset_reader.to_table(dataset) |
| assert result.equals( |
| pa.table({'skipped': pa.array(['col0', 'foo', 'bar'])})) |
| |
| dataset = ds.dataset(path, format=ds.CsvFileFormat( |
| read_options=pa.csv.ReadOptions(skip_rows=1))) |
| result = dataset_reader.to_table(dataset) |
| assert result.equals(pa.table({'col0': pa.array(['foo', 'bar'])})) |
| |
| dataset = ds.dataset(path, format=ds.CsvFileFormat( |
| read_options=pa.csv.ReadOptions(column_names=['foo']))) |
| result = dataset_reader.to_table(dataset) |
| assert result.equals( |
| pa.table({'foo': pa.array(['skipped', 'col0', 'foo', 'bar'])})) |
| |
| |
| def test_csv_fragment_options(tempdir, dataset_reader): |
| path = str(tempdir / 'test.csv') |
| with open(path, 'w') as sink: |
| sink.write('col0\nfoo\nspam\nMYNULL\n') |
| dataset = ds.dataset(path, format='csv') |
| convert_options = pyarrow.csv.ConvertOptions(null_values=['MYNULL'], |
| strings_can_be_null=True) |
| options = ds.CsvFragmentScanOptions( |
| convert_options=convert_options, |
| read_options=pa.csv.ReadOptions(block_size=2**16)) |
| result = dataset_reader.to_table(dataset, fragment_scan_options=options) |
| assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])})) |
| |
| csv_format = ds.CsvFileFormat(convert_options=convert_options) |
| dataset = ds.dataset(path, format=csv_format) |
| result = dataset_reader.to_table(dataset) |
| assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])})) |
| |
| options = ds.CsvFragmentScanOptions() |
| result = dataset_reader.to_table(dataset, fragment_scan_options=options) |
| assert result.equals( |
| pa.table({'col0': pa.array(['foo', 'spam', 'MYNULL'])})) |
| |
| |
| def test_feather_format(tempdir, dataset_reader): |
| from pyarrow.feather import write_feather |
| |
| table = pa.table({'a': pa.array([1, 2, 3], type="int8"), |
| 'b': pa.array([.1, .2, .3], type="float64")}) |
| |
| basedir = tempdir / "feather_dataset" |
| basedir.mkdir() |
| write_feather(table, str(basedir / "data.feather")) |
| |
| dataset = ds.dataset(basedir, format=ds.IpcFileFormat()) |
| result = dataset_reader.to_table(dataset) |
| assert result.equals(table) |
| |
| dataset = ds.dataset(basedir, format="feather") |
| result = dataset_reader.to_table(dataset) |
| assert result.equals(table) |
| |
| # ARROW-8641 - column selection order |
| result = dataset_reader.to_table(dataset, columns=["b", "a"]) |
| assert result.column_names == ["b", "a"] |
| result = dataset_reader.to_table(dataset, columns=["a", "a"]) |
| assert result.column_names == ["a", "a"] |
| |
| # error with Feather v1 files |
| write_feather(table, str(basedir / "data1.feather"), version=1) |
| with pytest.raises(ValueError): |
| dataset_reader.to_table(ds.dataset(basedir, format="feather")) |
| |
| |
| def _create_parquet_dataset_simple(root_path): |
| """ |
| Creates a simple (flat files, no nested partitioning) Parquet dataset |
| """ |
| import pyarrow.parquet as pq |
| |
| metadata_collector = [] |
| |
| for i in range(4): |
| table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)}) |
| pq.write_to_dataset( |
| table, str(root_path), metadata_collector=metadata_collector |
| ) |
| |
| metadata_path = str(root_path / '_metadata') |
| # write _metadata file |
| pq.write_metadata( |
| table.schema, metadata_path, |
| metadata_collector=metadata_collector |
| ) |
| return metadata_path, table |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas # write_to_dataset currently requires pandas |
| def test_parquet_dataset_factory(tempdir): |
| root_path = tempdir / "test_parquet_dataset" |
| metadata_path, table = _create_parquet_dataset_simple(root_path) |
| dataset = ds.parquet_dataset(metadata_path) |
| assert dataset.schema.equals(table.schema) |
| assert len(dataset.files) == 4 |
| result = dataset.to_table() |
| assert result.num_rows == 40 |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas # write_to_dataset currently requires pandas |
| @pytest.mark.parametrize('use_legacy_dataset', [False, True]) |
| def test_parquet_dataset_factory_roundtrip(tempdir, use_legacy_dataset): |
| # Simple test to ensure we can roundtrip dataset to |
| # _metadata/common_metadata and back. A more complex test |
| # using partitioning will have to wait for ARROW-13269. The |
| # above test (test_parquet_dataset_factory) will not work |
| # when legacy is False as there is no "append" equivalent in |
| # the new dataset until ARROW-12358 |
| import pyarrow.parquet as pq |
| root_path = tempdir / "test_parquet_dataset" |
| table = pa.table({'f1': [0] * 10, 'f2': np.random.randn(10)}) |
| metadata_collector = [] |
| pq.write_to_dataset( |
| table, str(root_path), metadata_collector=metadata_collector, |
| use_legacy_dataset=use_legacy_dataset |
| ) |
| metadata_path = str(root_path / '_metadata') |
| # write _metadata file |
| pq.write_metadata( |
| table.schema, metadata_path, |
| metadata_collector=metadata_collector |
| ) |
| dataset = ds.parquet_dataset(metadata_path) |
| assert dataset.schema.equals(table.schema) |
| result = dataset.to_table() |
| assert result.num_rows == 10 |
| |
| |
| def test_parquet_dataset_factory_order(tempdir): |
| # The order of the fragments in the dataset should match the order of the |
| # row groups in the _metadata file. |
| import pyarrow.parquet as pq |
| metadatas = [] |
| # Create a dataset where f1 is incrementing from 0 to 100 spread across |
| # 10 files. Put the row groups in the correct order in _metadata |
| for i in range(10): |
| table = pa.table( |
| {'f1': list(range(i*10, (i+1)*10))}) |
| table_path = tempdir / f'{i}.parquet' |
| pq.write_table(table, table_path, metadata_collector=metadatas) |
| metadatas[-1].set_file_path(f'{i}.parquet') |
| metadata_path = str(tempdir / '_metadata') |
| pq.write_metadata(table.schema, metadata_path, metadatas) |
| dataset = ds.parquet_dataset(metadata_path) |
| # Ensure the table contains values from 0-100 in the right order |
| scanned_table = dataset.to_table() |
| scanned_col = scanned_table.column('f1').to_pylist() |
| assert scanned_col == list(range(0, 100)) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_parquet_dataset_factory_invalid(tempdir): |
| root_path = tempdir / "test_parquet_dataset_invalid" |
| metadata_path, table = _create_parquet_dataset_simple(root_path) |
| # remove one of the files |
| list(root_path.glob("*.parquet"))[0].unlink() |
| dataset = ds.parquet_dataset(metadata_path) |
| assert dataset.schema.equals(table.schema) |
| assert len(dataset.files) == 4 |
| with pytest.raises(FileNotFoundError): |
| dataset.to_table() |
| |
| |
| def _create_metadata_file(root_path): |
| # create _metadata file from existing parquet dataset |
| import pyarrow.parquet as pq |
| |
| parquet_paths = list(sorted(root_path.rglob("*.parquet"))) |
| schema = pq.ParquetFile(parquet_paths[0]).schema.to_arrow_schema() |
| |
| metadata_collector = [] |
| for path in parquet_paths: |
| metadata = pq.ParquetFile(path).metadata |
| metadata.set_file_path(str(path.relative_to(root_path))) |
| metadata_collector.append(metadata) |
| |
| metadata_path = root_path / "_metadata" |
| pq.write_metadata( |
| schema, metadata_path, metadata_collector=metadata_collector |
| ) |
| return metadata_path |
| |
| |
| def _create_parquet_dataset_partitioned(root_path): |
| import pyarrow.parquet as pq |
| |
| table = pa.table([ |
| pa.array(range(20)), pa.array(np.random.randn(20)), |
| pa.array(np.repeat(['a', 'b'], 10))], |
| names=["f1", "f2", "part"] |
| ) |
| table = table.replace_schema_metadata({"key": "value"}) |
| pq.write_to_dataset(table, str(root_path), partition_cols=['part']) |
| return _create_metadata_file(root_path), table |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_parquet_dataset_factory_partitioned(tempdir): |
| root_path = tempdir / "test_parquet_dataset_factory_partitioned" |
| metadata_path, table = _create_parquet_dataset_partitioned(root_path) |
| |
| partitioning = ds.partitioning(flavor="hive") |
| dataset = ds.parquet_dataset(metadata_path, partitioning=partitioning) |
| |
| assert dataset.schema.equals(table.schema) |
| assert len(dataset.files) == 2 |
| result = dataset.to_table() |
| assert result.num_rows == 20 |
| |
| # the partitioned dataset does not preserve order |
| result = result.to_pandas().sort_values("f1").reset_index(drop=True) |
| expected = table.to_pandas() |
| pd.testing.assert_frame_equal(result, expected) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_parquet_dataset_factory_metadata(tempdir): |
| # ensure ParquetDatasetFactory preserves metadata (ARROW-9363) |
| root_path = tempdir / "test_parquet_dataset_factory_metadata" |
| metadata_path, table = _create_parquet_dataset_partitioned(root_path) |
| |
| dataset = ds.parquet_dataset(metadata_path, partitioning="hive") |
| assert dataset.schema.equals(table.schema) |
| assert b"key" in dataset.schema.metadata |
| |
| fragments = list(dataset.get_fragments()) |
| assert b"key" in fragments[0].physical_schema.metadata |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs): |
| fs, assert_opens = open_logging_fs |
| |
| # Test to ensure that no IO happens when filtering a dataset |
| # created with ParquetDatasetFactory from a _metadata file |
| |
| root_path = tempdir / "test_parquet_dataset_lazy_filtering" |
| metadata_path, _ = _create_parquet_dataset_simple(root_path) |
| |
| # creating the dataset should only open the metadata file |
| with assert_opens([metadata_path]): |
| dataset = ds.parquet_dataset( |
| metadata_path, |
| partitioning=ds.partitioning(flavor="hive"), |
| filesystem=fs) |
| |
| # materializing fragments should not open any file |
| with assert_opens([]): |
| fragments = list(dataset.get_fragments()) |
| |
| # filtering fragments should not open any file |
| with assert_opens([]): |
| list(dataset.get_fragments(ds.field("f1") > 15)) |
| |
| # splitting by row group should still not open any file |
| with assert_opens([]): |
| fragments[0].split_by_row_group(ds.field("f1") > 15) |
| |
| # ensuring metadata of splitted fragment should also not open any file |
| with assert_opens([]): |
| rg_fragments = fragments[0].split_by_row_group() |
| rg_fragments[0].ensure_complete_metadata() |
| |
| # FIXME(bkietz) on Windows this results in FileNotFoundErrors. |
| # but actually scanning does open files |
| # with assert_opens([f.path for f in fragments]): |
| # dataset.to_table() |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_dataset_schema_metadata(tempdir, dataset_reader): |
| # ARROW-8802 |
| df = pd.DataFrame({'a': [1, 2, 3]}) |
| path = tempdir / "test.parquet" |
| df.to_parquet(path) |
| dataset = ds.dataset(path) |
| |
| schema = dataset_reader.to_table(dataset).schema |
| projected_schema = dataset_reader.to_table(dataset, columns=["a"]).schema |
| |
| # ensure the pandas metadata is included in the schema |
| assert b"pandas" in schema.metadata |
| # ensure it is still there in a projected schema (with column selection) |
| assert schema.equals(projected_schema, check_metadata=True) |
| |
| |
| @pytest.mark.parquet |
| def test_filter_mismatching_schema(tempdir, dataset_reader): |
| # ARROW-9146 |
| import pyarrow.parquet as pq |
| |
| table = pa.table({"col": pa.array([1, 2, 3, 4], type='int32')}) |
| pq.write_table(table, str(tempdir / "data.parquet")) |
| |
| # specifying explicit schema, but that mismatches the schema of the data |
| schema = pa.schema([("col", pa.int64())]) |
| dataset = ds.dataset( |
| tempdir / "data.parquet", format="parquet", schema=schema) |
| |
| # filtering on a column with such type mismatch should implicitly |
| # cast the column |
| filtered = dataset_reader.to_table(dataset, filter=ds.field("col") > 2) |
| assert filtered["col"].equals(table["col"].cast('int64').slice(2)) |
| |
| fragment = list(dataset.get_fragments())[0] |
| filtered = dataset_reader.to_table( |
| fragment, filter=ds.field("col") > 2, schema=schema) |
| assert filtered["col"].equals(table["col"].cast('int64').slice(2)) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_dataset_project_only_partition_columns(tempdir, dataset_reader): |
| # ARROW-8729 |
| import pyarrow.parquet as pq |
| |
| table = pa.table({'part': 'a a b b'.split(), 'col': list(range(4))}) |
| |
| path = str(tempdir / 'test_dataset') |
| pq.write_to_dataset(table, path, partition_cols=['part']) |
| dataset = ds.dataset(path, partitioning='hive') |
| |
| all_cols = dataset_reader.to_table(dataset) |
| part_only = dataset_reader.to_table(dataset, columns=['part']) |
| |
| assert all_cols.column('part').equals(part_only.column('part')) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_dataset_project_null_column(tempdir, dataset_reader): |
| import pandas as pd |
| df = pd.DataFrame({"col": np.array([None, None, None], dtype='object')}) |
| |
| f = tempdir / "test_dataset_project_null_column.parquet" |
| df.to_parquet(f, engine="pyarrow") |
| |
| dataset = ds.dataset(f, format="parquet", |
| schema=pa.schema([("col", pa.int64())])) |
| expected = pa.table({'col': pa.array([None, None, None], pa.int64())}) |
| assert dataset_reader.to_table(dataset).equals(expected) |
| |
| |
| def test_dataset_project_columns(tempdir, dataset_reader): |
| # basic column re-projection with expressions |
| from pyarrow import feather |
| table = pa.table({"A": [1, 2, 3], "B": [1., 2., 3.], "C": ["a", "b", "c"]}) |
| feather.write_feather(table, tempdir / "data.feather") |
| |
| dataset = ds.dataset(tempdir / "data.feather", format="feather") |
| result = dataset_reader.to_table(dataset, columns={ |
| 'A_renamed': ds.field('A'), |
| 'B_as_int': ds.field('B').cast("int32", safe=False), |
| 'C_is_a': ds.field('C') == 'a' |
| }) |
| expected = pa.table({ |
| "A_renamed": [1, 2, 3], |
| "B_as_int": pa.array([1, 2, 3], type="int32"), |
| "C_is_a": [True, False, False], |
| }) |
| assert result.equals(expected) |
| |
| # raise proper error when not passing an expression |
| with pytest.raises(TypeError, match="Expected an Expression"): |
| dataset_reader.to_table(dataset, columns={"A": "A"}) |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parquet |
| def test_dataset_preserved_partitioning(tempdir): |
| # ARROW-8655 |
| |
| # through discovery, but without partitioning |
| _, path = _create_single_file(tempdir) |
| dataset = ds.dataset(path) |
| assert dataset.partitioning is None |
| |
| # through discovery, with hive partitioning but not specified |
| full_table, path = _create_partitioned_dataset(tempdir) |
| dataset = ds.dataset(path) |
| assert dataset.partitioning is None |
| |
| # through discovery, with hive partitioning (from a partitioning factory) |
| dataset = ds.dataset(path, partitioning="hive") |
| part = dataset.partitioning |
| assert part is not None |
| assert isinstance(part, ds.HivePartitioning) |
| assert part.schema == pa.schema([("part", pa.int32())]) |
| assert len(part.dictionaries) == 1 |
| assert part.dictionaries[0] == pa.array([0, 1, 2], pa.int32()) |
| |
| # through discovery, with hive partitioning (from a partitioning object) |
| part = ds.partitioning(pa.schema([("part", pa.int32())]), flavor="hive") |
| assert isinstance(part, ds.HivePartitioning) # not a factory |
| assert part.dictionaries is None |
| dataset = ds.dataset(path, partitioning=part) |
| part = dataset.partitioning |
| assert isinstance(part, ds.HivePartitioning) |
| assert part.schema == pa.schema([("part", pa.int32())]) |
| # TODO is this expected? |
| assert part.dictionaries is None |
| |
| # through manual creation -> not available |
| dataset = ds.dataset(path, partitioning="hive") |
| dataset2 = ds.FileSystemDataset( |
| list(dataset.get_fragments()), schema=dataset.schema, |
| format=dataset.format, filesystem=dataset.filesystem |
| ) |
| assert dataset2.partitioning is None |
| |
| # through discovery with ParquetDatasetFactory |
| root_path = tempdir / "data-partitioned-metadata" |
| metadata_path, _ = _create_parquet_dataset_partitioned(root_path) |
| dataset = ds.parquet_dataset(metadata_path, partitioning="hive") |
| part = dataset.partitioning |
| assert part is not None |
| assert isinstance(part, ds.HivePartitioning) |
| assert part.schema == pa.schema([("part", pa.string())]) |
| assert len(part.dictionaries) == 1 |
| # will be fixed by ARROW-13153 (order is not preserved at the moment) |
| # assert part.dictionaries[0] == pa.array(["a", "b"], pa.string()) |
| assert set(part.dictionaries[0].to_pylist()) == {"a", "b"} |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_write_to_dataset_given_null_just_works(tempdir): |
| import pyarrow.parquet as pq |
| |
| schema = pa.schema([ |
| pa.field('col', pa.int64()), |
| pa.field('part', pa.dictionary(pa.int32(), pa.string())) |
| ]) |
| table = pa.table({'part': [None, None, 'a', 'a'], |
| 'col': list(range(4))}, schema=schema) |
| |
| path = str(tempdir / 'test_dataset') |
| pq.write_to_dataset(table, path, partition_cols=[ |
| 'part'], use_legacy_dataset=False) |
| |
| actual_table = pq.read_table(tempdir / 'test_dataset') |
| # column.equals can handle the difference in chunking but not the fact |
| # that `part` will have different dictionaries for the two chunks |
| assert actual_table.column('part').to_pylist( |
| ) == table.column('part').to_pylist() |
| assert actual_table.column('col').equals(table.column('col')) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_legacy_write_to_dataset_drops_null(tempdir): |
| import pyarrow.parquet as pq |
| |
| schema = pa.schema([ |
| pa.field('col', pa.int64()), |
| pa.field('part', pa.dictionary(pa.int32(), pa.string())) |
| ]) |
| table = pa.table({'part': ['a', 'a', None, None], |
| 'col': list(range(4))}, schema=schema) |
| expected = pa.table( |
| {'part': ['a', 'a'], 'col': list(range(2))}, schema=schema) |
| |
| path = str(tempdir / 'test_dataset') |
| pq.write_to_dataset(table, path, partition_cols=[ |
| 'part'], use_legacy_dataset=True) |
| |
| actual = pq.read_table(tempdir / 'test_dataset') |
| assert actual == expected |
| |
| |
| def _sort_table(tab, sort_col): |
| import pyarrow.compute as pc |
| sorted_indices = pc.sort_indices( |
| tab, options=pc.SortOptions([(sort_col, 'ascending')])) |
| return pc.take(tab, sorted_indices) |
| |
| |
| def _check_dataset_roundtrip(dataset, base_dir, expected_files, sort_col, |
| base_dir_path=None, partitioning=None): |
| base_dir_path = base_dir_path or base_dir |
| |
| ds.write_dataset(dataset, base_dir, format="feather", |
| partitioning=partitioning, use_threads=False) |
| |
| # check that all files are present |
| file_paths = list(base_dir_path.rglob("*")) |
| assert set(file_paths) == set(expected_files) |
| |
| # check that reading back in as dataset gives the same result |
| dataset2 = ds.dataset( |
| base_dir_path, format="feather", partitioning=partitioning) |
| |
| assert _sort_table(dataset2.to_table(), sort_col).equals( |
| _sort_table(dataset.to_table(), sort_col)) |
| |
| |
| @pytest.mark.parquet |
| def test_write_dataset(tempdir): |
| # manually create a written dataset and read as dataset object |
| directory = tempdir / 'single-file' |
| directory.mkdir() |
| _ = _create_single_file(directory) |
| dataset = ds.dataset(directory) |
| |
| # full string path |
| target = tempdir / 'single-file-target' |
| expected_files = [target / "part-0.feather"] |
| _check_dataset_roundtrip(dataset, str(target), expected_files, 'a', target) |
| |
| # pathlib path object |
| target = tempdir / 'single-file-target2' |
| expected_files = [target / "part-0.feather"] |
| _check_dataset_roundtrip(dataset, target, expected_files, 'a', target) |
| |
| # TODO |
| # # relative path |
| # target = tempdir / 'single-file-target3' |
| # expected_files = [target / "part-0.ipc"] |
| # _check_dataset_roundtrip( |
| # dataset, './single-file-target3', expected_files, target) |
| |
| # Directory of files |
| directory = tempdir / 'single-directory' |
| directory.mkdir() |
| _ = _create_directory_of_files(directory) |
| dataset = ds.dataset(directory) |
| |
| target = tempdir / 'single-directory-target' |
| expected_files = [target / "part-0.feather"] |
| _check_dataset_roundtrip(dataset, str(target), expected_files, 'a', target) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_write_dataset_partitioned(tempdir): |
| directory = tempdir / "partitioned" |
| _ = _create_parquet_dataset_partitioned(directory) |
| partitioning = ds.partitioning(flavor="hive") |
| dataset = ds.dataset(directory, partitioning=partitioning) |
| |
| # hive partitioning |
| target = tempdir / 'partitioned-hive-target' |
| expected_paths = [ |
| target / "part=a", target / "part=a" / "part-0.feather", |
| target / "part=b", target / "part=b" / "part-0.feather" |
| ] |
| partitioning_schema = ds.partitioning( |
| pa.schema([("part", pa.string())]), flavor="hive") |
| _check_dataset_roundtrip( |
| dataset, str(target), expected_paths, 'f1', target, |
| partitioning=partitioning_schema) |
| |
| # directory partitioning |
| target = tempdir / 'partitioned-dir-target' |
| expected_paths = [ |
| target / "a", target / "a" / "part-0.feather", |
| target / "b", target / "b" / "part-0.feather" |
| ] |
| partitioning_schema = ds.partitioning( |
| pa.schema([("part", pa.string())])) |
| _check_dataset_roundtrip( |
| dataset, str(target), expected_paths, 'f1', target, |
| partitioning=partitioning_schema) |
| |
| |
| def test_write_dataset_with_field_names(tempdir): |
| table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z']}) |
| |
| ds.write_dataset(table, tempdir, format='parquet', |
| partitioning=["b"]) |
| |
| load_back = ds.dataset(tempdir, partitioning=["b"]) |
| files = load_back.files |
| partitioning_dirs = { |
| str(pathlib.Path(f).relative_to(tempdir).parent) for f in files |
| } |
| assert partitioning_dirs == {"x", "y", "z"} |
| |
| load_back_table = load_back.to_table() |
| assert load_back_table.equals(table) |
| |
| |
| def test_write_dataset_with_field_names_hive(tempdir): |
| table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z']}) |
| |
| ds.write_dataset(table, tempdir, format='parquet', |
| partitioning=["b"], partitioning_flavor="hive") |
| |
| load_back = ds.dataset(tempdir, partitioning="hive") |
| files = load_back.files |
| partitioning_dirs = { |
| str(pathlib.Path(f).relative_to(tempdir).parent) for f in files |
| } |
| assert partitioning_dirs == {"b=x", "b=y", "b=z"} |
| |
| load_back_table = load_back.to_table() |
| assert load_back_table.equals(table) |
| |
| |
| def test_write_dataset_with_scanner(tempdir): |
| table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z'], |
| 'c': [1, 2, 3]}) |
| |
| ds.write_dataset(table, tempdir, format='parquet', |
| partitioning=["b"]) |
| |
| dataset = ds.dataset(tempdir, partitioning=["b"]) |
| |
| with tempfile.TemporaryDirectory() as tempdir2: |
| ds.write_dataset(dataset.scanner(columns=["b", "c"], use_async=True), |
| tempdir2, format='parquet', partitioning=["b"]) |
| |
| load_back = ds.dataset(tempdir2, partitioning=["b"]) |
| load_back_table = load_back.to_table() |
| assert dict(load_back_table.to_pydict() |
| ) == table.drop(["a"]).to_pydict() |
| |
| |
| def test_write_dataset_with_backpressure(tempdir): |
| consumer_gate = threading.Event() |
| |
| # A filesystem that blocks all writes so that we can build |
| # up backpressure. The writes are released at the end of |
| # the test. |
| class GatingFs(ProxyHandler): |
| def open_output_stream(self, path, metadata): |
| # Block until the end of the test |
| consumer_gate.wait() |
| return self._fs.open_output_stream(path, metadata=metadata) |
| gating_fs = fs.PyFileSystem(GatingFs(fs.LocalFileSystem())) |
| |
| schema = pa.schema([pa.field('data', pa.int32())]) |
| # By default, the dataset writer will queue up 64Mi rows so |
| # with batches of 1M it should only fit ~67 batches |
| batch = pa.record_batch([pa.array(list(range(1_000_000)))], schema=schema) |
| batches_read = 0 |
| min_backpressure = 67 |
| end = 200 |
| |
| def counting_generator(): |
| nonlocal batches_read |
| while batches_read < end: |
| time.sleep(0.01) |
| batches_read += 1 |
| yield batch |
| |
| scanner = ds.Scanner.from_batches( |
| counting_generator(), schema=schema, use_threads=True, |
| use_async=True) |
| |
| write_thread = threading.Thread( |
| target=lambda: ds.write_dataset( |
| scanner, str(tempdir), format='parquet', filesystem=gating_fs)) |
| write_thread.start() |
| |
| try: |
| start = time.time() |
| |
| def duration(): |
| return time.time() - start |
| |
| # This test is timing dependent. There is no signal from the C++ |
| # when backpressure has been hit. We don't know exactly when |
| # backpressure will be hit because it may take some time for the |
| # signal to get from the sink to the scanner. |
| # |
| # The test may emit false positives on slow systems. It could |
| # theoretically emit a false negative if the scanner managed to read |
| # and emit all 200 batches before the backpressure signal had a chance |
| # to propagate but the 0.01s delay in the generator should make that |
| # scenario unlikely. |
| last_value = 0 |
| backpressure_probably_hit = False |
| while duration() < 10: |
| if batches_read > min_backpressure: |
| if batches_read == last_value: |
| backpressure_probably_hit = True |
| break |
| last_value = batches_read |
| time.sleep(0.5) |
| |
| assert backpressure_probably_hit |
| |
| finally: |
| consumer_gate.set() |
| write_thread.join() |
| assert batches_read == end |
| |
| |
| def test_write_dataset_with_dataset(tempdir): |
| table = pa.table({'b': ['x', 'y', 'z'], 'c': [1, 2, 3]}) |
| |
| ds.write_dataset(table, tempdir, format='parquet', |
| partitioning=["b"]) |
| |
| dataset = ds.dataset(tempdir, partitioning=["b"]) |
| |
| with tempfile.TemporaryDirectory() as tempdir2: |
| ds.write_dataset(dataset, tempdir2, |
| format='parquet', partitioning=["b"]) |
| |
| load_back = ds.dataset(tempdir2, partitioning=["b"]) |
| load_back_table = load_back.to_table() |
| assert dict(load_back_table.to_pydict()) == table.to_pydict() |
| |
| |
| @pytest.mark.pandas |
| def test_write_dataset_existing_data(tempdir): |
| directory = tempdir / 'ds' |
| table = pa.table({'b': ['x', 'y', 'z'], 'c': [1, 2, 3]}) |
| partitioning = ds.partitioning(schema=pa.schema( |
| [pa.field('c', pa.int64())]), flavor='hive') |
| |
| def compare_tables_ignoring_order(t1, t2): |
| df1 = t1.to_pandas().sort_values('b').reset_index(drop=True) |
| df2 = t2.to_pandas().sort_values('b').reset_index(drop=True) |
| assert df1.equals(df2) |
| |
| # First write is ok |
| ds.write_dataset(table, directory, partitioning=partitioning, format='ipc') |
| |
| table = pa.table({'b': ['a', 'b', 'c'], 'c': [2, 3, 4]}) |
| |
| # Second write should fail |
| with pytest.raises(pa.ArrowInvalid): |
| ds.write_dataset(table, directory, |
| partitioning=partitioning, format='ipc') |
| |
| extra_table = pa.table({'b': ['e']}) |
| extra_file = directory / 'c=2' / 'foo.arrow' |
| pyarrow.feather.write_feather(extra_table, extra_file) |
| |
| # Should be ok and overwrite with overwrite behavior |
| ds.write_dataset(table, directory, partitioning=partitioning, |
| format='ipc', |
| existing_data_behavior='overwrite_or_ignore') |
| |
| overwritten = pa.table( |
| {'b': ['e', 'x', 'a', 'b', 'c'], 'c': [2, 1, 2, 3, 4]}) |
| readback = ds.dataset(tempdir, format='ipc', |
| partitioning=partitioning).to_table() |
| compare_tables_ignoring_order(readback, overwritten) |
| assert extra_file.exists() |
| |
| # Should be ok and delete matching with delete_matching |
| ds.write_dataset(table, directory, partitioning=partitioning, |
| format='ipc', existing_data_behavior='delete_matching') |
| |
| overwritten = pa.table({'b': ['x', 'a', 'b', 'c'], 'c': [1, 2, 3, 4]}) |
| readback = ds.dataset(tempdir, format='ipc', |
| partitioning=partitioning).to_table() |
| compare_tables_ignoring_order(readback, overwritten) |
| assert not extra_file.exists() |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_write_dataset_partitioned_dict(tempdir): |
| directory = tempdir / "partitioned" |
| _ = _create_parquet_dataset_partitioned(directory) |
| |
| # directory partitioning, dictionary partition columns |
| dataset = ds.dataset( |
| directory, |
| partitioning=ds.HivePartitioning.discover(infer_dictionary=True)) |
| target = tempdir / 'partitioned-dir-target' |
| expected_paths = [ |
| target / "a", target / "a" / "part-0.feather", |
| target / "b", target / "b" / "part-0.feather" |
| ] |
| partitioning = ds.partitioning(pa.schema([ |
| dataset.schema.field('part')]), |
| dictionaries={'part': pa.array(['a', 'b'])}) |
| # NB: dictionaries required here since we use partitioning to parse |
| # directories in _check_dataset_roundtrip (not currently required for |
| # the formatting step) |
| _check_dataset_roundtrip( |
| dataset, str(target), expected_paths, 'f1', target, |
| partitioning=partitioning) |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_write_dataset_use_threads(tempdir): |
| directory = tempdir / "partitioned" |
| _ = _create_parquet_dataset_partitioned(directory) |
| dataset = ds.dataset(directory, partitioning="hive") |
| |
| partitioning = ds.partitioning( |
| pa.schema([("part", pa.string())]), flavor="hive") |
| |
| target1 = tempdir / 'partitioned1' |
| paths_written = [] |
| |
| def file_visitor(written_file): |
| paths_written.append(written_file.path) |
| |
| ds.write_dataset( |
| dataset, target1, format="feather", partitioning=partitioning, |
| use_threads=True, file_visitor=file_visitor |
| ) |
| |
| expected_paths = { |
| target1 / 'part=a' / 'part-0.feather', |
| target1 / 'part=b' / 'part-0.feather' |
| } |
| paths_written_set = set(map(pathlib.Path, paths_written)) |
| assert paths_written_set == expected_paths |
| |
| target2 = tempdir / 'partitioned2' |
| ds.write_dataset( |
| dataset, target2, format="feather", partitioning=partitioning, |
| use_threads=False |
| ) |
| |
| # check that reading in gives same result |
| result1 = ds.dataset(target1, format="feather", partitioning=partitioning) |
| result2 = ds.dataset(target2, format="feather", partitioning=partitioning) |
| assert result1.to_table().equals(result2.to_table()) |
| |
| |
| def test_write_table(tempdir): |
| table = pa.table([ |
| pa.array(range(20)), pa.array(np.random.randn(20)), |
| pa.array(np.repeat(['a', 'b'], 10)) |
| ], names=["f1", "f2", "part"]) |
| |
| base_dir = tempdir / 'single' |
| ds.write_dataset(table, base_dir, |
| basename_template='dat_{i}.arrow', format="feather") |
| # check that all files are present |
| file_paths = list(base_dir.rglob("*")) |
| expected_paths = [base_dir / "dat_0.arrow"] |
| assert set(file_paths) == set(expected_paths) |
| # check Table roundtrip |
| result = ds.dataset(base_dir, format="ipc").to_table() |
| assert result.equals(table) |
| |
| # with partitioning |
| base_dir = tempdir / 'partitioned' |
| expected_paths = [ |
| base_dir / "part=a", base_dir / "part=a" / "dat_0.arrow", |
| base_dir / "part=b", base_dir / "part=b" / "dat_0.arrow" |
| ] |
| |
| visited_paths = [] |
| |
| def file_visitor(written_file): |
| visited_paths.append(written_file.path) |
| |
| partitioning = ds.partitioning( |
| pa.schema([("part", pa.string())]), flavor="hive") |
| ds.write_dataset(table, base_dir, format="feather", |
| basename_template='dat_{i}.arrow', |
| partitioning=partitioning, file_visitor=file_visitor) |
| file_paths = list(base_dir.rglob("*")) |
| assert set(file_paths) == set(expected_paths) |
| result = ds.dataset(base_dir, format="ipc", partitioning=partitioning) |
| assert result.to_table().equals(table) |
| assert len(visited_paths) == 2 |
| for visited_path in visited_paths: |
| assert pathlib.Path(visited_path) in expected_paths |
| |
| |
| def test_write_table_multiple_fragments(tempdir): |
| table = pa.table([ |
| pa.array(range(10)), pa.array(np.random.randn(10)), |
| pa.array(np.repeat(['a', 'b'], 5)) |
| ], names=["f1", "f2", "part"]) |
| table = pa.concat_tables([table]*2) |
| |
| # Table with multiple batches written as single Fragment by default |
| base_dir = tempdir / 'single' |
| ds.write_dataset(table, base_dir, format="feather") |
| assert set(base_dir.rglob("*")) == set([base_dir / "part-0.feather"]) |
| assert ds.dataset(base_dir, format="ipc").to_table().equals(table) |
| |
| # Same for single-element list of Table |
| base_dir = tempdir / 'single-list' |
| ds.write_dataset([table], base_dir, format="feather") |
| assert set(base_dir.rglob("*")) == set([base_dir / "part-0.feather"]) |
| assert ds.dataset(base_dir, format="ipc").to_table().equals(table) |
| |
| # Provide list of batches to write multiple fragments |
| base_dir = tempdir / 'multiple' |
| ds.write_dataset(table.to_batches(), base_dir, format="feather") |
| assert set(base_dir.rglob("*")) == set( |
| [base_dir / "part-0.feather"]) |
| assert ds.dataset(base_dir, format="ipc").to_table().equals(table) |
| |
| # Provide list of tables to write multiple fragments |
| base_dir = tempdir / 'multiple-table' |
| ds.write_dataset([table, table], base_dir, format="feather") |
| assert set(base_dir.rglob("*")) == set( |
| [base_dir / "part-0.feather"]) |
| assert ds.dataset(base_dir, format="ipc").to_table().equals( |
| pa.concat_tables([table]*2) |
| ) |
| |
| |
| def test_write_iterable(tempdir): |
| table = pa.table([ |
| pa.array(range(20)), pa.array(np.random.randn(20)), |
| pa.array(np.repeat(['a', 'b'], 10)) |
| ], names=["f1", "f2", "part"]) |
| |
| base_dir = tempdir / 'inmemory_iterable' |
| ds.write_dataset((batch for batch in table.to_batches()), base_dir, |
| schema=table.schema, |
| basename_template='dat_{i}.arrow', format="feather") |
| result = ds.dataset(base_dir, format="ipc").to_table() |
| assert result.equals(table) |
| |
| base_dir = tempdir / 'inmemory_reader' |
| reader = pa.ipc.RecordBatchReader.from_batches(table.schema, |
| table.to_batches()) |
| ds.write_dataset(reader, base_dir, |
| basename_template='dat_{i}.arrow', format="feather") |
| result = ds.dataset(base_dir, format="ipc").to_table() |
| assert result.equals(table) |
| |
| |
| def test_write_scanner(tempdir, dataset_reader): |
| if not dataset_reader.use_async: |
| pytest.skip( |
| ('ARROW-13338: Write dataset with scanner does not' |
| ' support synchronous scan')) |
| |
| table = pa.table([ |
| pa.array(range(20)), pa.array(np.random.randn(20)), |
| pa.array(np.repeat(['a', 'b'], 10)) |
| ], names=["f1", "f2", "part"]) |
| dataset = ds.dataset(table) |
| |
| base_dir = tempdir / 'dataset_from_scanner' |
| ds.write_dataset(dataset_reader.scanner( |
| dataset), base_dir, format="feather") |
| result = dataset_reader.to_table(ds.dataset(base_dir, format="ipc")) |
| assert result.equals(table) |
| |
| # scanner with different projected_schema |
| base_dir = tempdir / 'dataset_from_scanner2' |
| ds.write_dataset(dataset_reader.scanner(dataset, columns=["f1"]), |
| base_dir, format="feather") |
| result = dataset_reader.to_table(ds.dataset(base_dir, format="ipc")) |
| assert result.equals(table.select(["f1"])) |
| |
| # schema not allowed when writing a scanner |
| with pytest.raises(ValueError, match="Cannot specify a schema"): |
| ds.write_dataset(dataset_reader.scanner(dataset), base_dir, |
| schema=table.schema, format="feather") |
| |
| |
| def test_write_table_partitioned_dict(tempdir): |
| # ensure writing table partitioned on a dictionary column works without |
| # specifying the dictionary values explicitly |
| table = pa.table([ |
| pa.array(range(20)), |
| pa.array(np.repeat(['a', 'b'], 10)).dictionary_encode(), |
| ], names=['col', 'part']) |
| |
| partitioning = ds.partitioning(table.select(["part"]).schema) |
| |
| base_dir = tempdir / "dataset" |
| ds.write_dataset( |
| table, base_dir, format="feather", partitioning=partitioning |
| ) |
| |
| # check roundtrip |
| partitioning_read = ds.DirectoryPartitioning.discover( |
| ["part"], infer_dictionary=True) |
| result = ds.dataset( |
| base_dir, format="ipc", partitioning=partitioning_read |
| ).to_table() |
| assert result.equals(table) |
| |
| |
| @pytest.mark.parquet |
| def test_write_dataset_parquet(tempdir): |
| import pyarrow.parquet as pq |
| |
| table = pa.table([ |
| pa.array(range(20)), pa.array(np.random.randn(20)), |
| pa.array(np.repeat(['a', 'b'], 10)) |
| ], names=["f1", "f2", "part"]) |
| |
| # using default "parquet" format string |
| |
| base_dir = tempdir / 'parquet_dataset' |
| ds.write_dataset(table, base_dir, format="parquet") |
| # check that all files are present |
| file_paths = list(base_dir.rglob("*")) |
| expected_paths = [base_dir / "part-0.parquet"] |
| assert set(file_paths) == set(expected_paths) |
| # check Table roundtrip |
| result = ds.dataset(base_dir, format="parquet").to_table() |
| assert result.equals(table) |
| |
| # using custom options |
| for version in ["1.0", "2.4", "2.6"]: |
| format = ds.ParquetFileFormat() |
| opts = format.make_write_options(version=version) |
| base_dir = tempdir / 'parquet_dataset_version{0}'.format(version) |
| ds.write_dataset(table, base_dir, format=format, file_options=opts) |
| meta = pq.read_metadata(base_dir / "part-0.parquet") |
| expected_version = "1.0" if version == "1.0" else "2.6" |
| assert meta.format_version == expected_version |
| |
| |
| def test_write_dataset_csv(tempdir): |
| table = pa.table([ |
| pa.array(range(20)), pa.array(np.random.randn(20)), |
| pa.array(np.repeat(['a', 'b'], 10)) |
| ], names=["f1", "f2", "chr1"]) |
| |
| base_dir = tempdir / 'csv_dataset' |
| ds.write_dataset(table, base_dir, format="csv") |
| # check that all files are present |
| file_paths = list(base_dir.rglob("*")) |
| expected_paths = [base_dir / "part-0.csv"] |
| assert set(file_paths) == set(expected_paths) |
| # check Table roundtrip |
| result = ds.dataset(base_dir, format="csv").to_table() |
| assert result.equals(table) |
| |
| # using custom options |
| format = ds.CsvFileFormat(read_options=pyarrow.csv.ReadOptions( |
| column_names=table.schema.names)) |
| opts = format.make_write_options(include_header=False) |
| base_dir = tempdir / 'csv_dataset_noheader' |
| ds.write_dataset(table, base_dir, format=format, file_options=opts) |
| result = ds.dataset(base_dir, format=format).to_table() |
| assert result.equals(table) |
| |
| |
| @pytest.mark.parquet |
| def test_write_dataset_parquet_file_visitor(tempdir): |
| table = pa.table([ |
| pa.array(range(20)), pa.array(np.random.randn(20)), |
| pa.array(np.repeat(['a', 'b'], 10)) |
| ], names=["f1", "f2", "part"]) |
| |
| visitor_called = False |
| |
| def file_visitor(written_file): |
| nonlocal visitor_called |
| if (written_file.metadata is not None and |
| written_file.metadata.num_columns == 3): |
| visitor_called = True |
| |
| base_dir = tempdir / 'parquet_dataset' |
| ds.write_dataset(table, base_dir, format="parquet", |
| file_visitor=file_visitor) |
| |
| assert visitor_called |
| |
| |
| def test_partition_dataset_parquet_file_visitor(tempdir): |
| f1_vals = [item for chunk in range(4) for item in [chunk] * 10] |
| f2_vals = [item*10 for chunk in range(4) for item in [chunk] * 10] |
| table = pa.table({'f1': f1_vals, 'f2': f2_vals, |
| 'part': np.repeat(['a', 'b'], 20)}) |
| |
| root_path = tempdir / 'partitioned' |
| partitioning = ds.partitioning( |
| pa.schema([("part", pa.string())]), flavor="hive") |
| |
| paths_written = [] |
| |
| sample_metadata = None |
| |
| def file_visitor(written_file): |
| nonlocal sample_metadata |
| if written_file.metadata: |
| sample_metadata = written_file.metadata |
| paths_written.append(written_file.path) |
| |
| ds.write_dataset( |
| table, root_path, format="parquet", partitioning=partitioning, |
| use_threads=True, file_visitor=file_visitor |
| ) |
| |
| expected_paths = { |
| root_path / 'part=a' / 'part-0.parquet', |
| root_path / 'part=b' / 'part-0.parquet' |
| } |
| paths_written_set = set(map(pathlib.Path, paths_written)) |
| assert paths_written_set == expected_paths |
| assert sample_metadata is not None |
| assert sample_metadata.num_columns == 2 |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.pandas |
| def test_write_dataset_arrow_schema_metadata(tempdir): |
| # ensure we serialize ARROW schema in the parquet metadata, to have a |
| # correct roundtrip (e.g. preserve non-UTC timezone) |
| import pyarrow.parquet as pq |
| |
| table = pa.table({"a": [pd.Timestamp("2012-01-01", tz="Europe/Brussels")]}) |
| assert table["a"].type.tz == "Europe/Brussels" |
| |
| ds.write_dataset(table, tempdir, format="parquet") |
| result = pq.read_table(tempdir / "part-0.parquet") |
| assert result["a"].type.tz == "Europe/Brussels" |
| |
| |
| def test_write_dataset_schema_metadata(tempdir): |
| # ensure that schema metadata gets written |
| from pyarrow import feather |
| |
| table = pa.table({'a': [1, 2, 3]}) |
| table = table.replace_schema_metadata({b'key': b'value'}) |
| ds.write_dataset(table, tempdir, format="feather") |
| |
| schema = feather.read_table(tempdir / "part-0.feather").schema |
| assert schema.metadata == {b'key': b'value'} |
| |
| |
| @pytest.mark.parquet |
| def test_write_dataset_schema_metadata_parquet(tempdir): |
| # ensure that schema metadata gets written |
| import pyarrow.parquet as pq |
| |
| table = pa.table({'a': [1, 2, 3]}) |
| table = table.replace_schema_metadata({b'key': b'value'}) |
| ds.write_dataset(table, tempdir, format="parquet") |
| |
| schema = pq.read_table(tempdir / "part-0.parquet").schema |
| assert schema.metadata == {b'key': b'value'} |
| |
| |
| @pytest.mark.parquet |
| @pytest.mark.s3 |
| def test_write_dataset_s3(s3_example_simple): |
| # write dataset with s3 filesystem |
| _, _, fs, _, host, port, access_key, secret_key = s3_example_simple |
| uri_template = ( |
| "s3://{}:{}@{{}}?scheme=http&endpoint_override={}:{}".format( |
| access_key, secret_key, host, port) |
| ) |
| |
| table = pa.table([ |
| pa.array(range(20)), pa.array(np.random.randn(20)), |
| pa.array(np.repeat(['a', 'b'], 10))], |
| names=["f1", "f2", "part"] |
| ) |
| part = ds.partitioning(pa.schema([("part", pa.string())]), flavor="hive") |
| |
| # writing with filesystem object |
| ds.write_dataset( |
| table, "mybucket/dataset", filesystem=fs, format="feather", |
| partitioning=part |
| ) |
| # check rountrip |
| result = ds.dataset( |
| "mybucket/dataset", filesystem=fs, format="ipc", partitioning="hive" |
| ).to_table() |
| assert result.equals(table) |
| |
| # writing with URI |
| uri = uri_template.format("mybucket/dataset2") |
| ds.write_dataset(table, uri, format="feather", partitioning=part) |
| # check rountrip |
| result = ds.dataset( |
| "mybucket/dataset2", filesystem=fs, format="ipc", partitioning="hive" |
| ).to_table() |
| assert result.equals(table) |
| |
| # writing with path + URI as filesystem |
| uri = uri_template.format("mybucket") |
| ds.write_dataset( |
| table, "dataset3", filesystem=uri, format="feather", partitioning=part |
| ) |
| # check rountrip |
| result = ds.dataset( |
| "mybucket/dataset3", filesystem=fs, format="ipc", partitioning="hive" |
| ).to_table() |
| assert result.equals(table) |
| |
| |
| @pytest.mark.parquet |
| def test_dataset_null_to_dictionary_cast(tempdir, dataset_reader): |
| # ARROW-12420 |
| import pyarrow.parquet as pq |
| |
| table = pa.table({"a": [None, None]}) |
| pq.write_table(table, tempdir / "test.parquet") |
| |
| schema = pa.schema([ |
| pa.field("a", pa.dictionary(pa.int32(), pa.string())) |
| ]) |
| fsds = ds.FileSystemDataset.from_paths( |
| paths=[tempdir / "test.parquet"], |
| schema=schema, |
| format=ds.ParquetFileFormat(), |
| filesystem=fs.LocalFileSystem(), |
| ) |
| table = dataset_reader.to_table(fsds) |
| assert table.schema == schema |