blob: 20b12316b66130c1c3b3d0446b48f2276a7cb2d1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import contextlib
import os
import posixpath
import pathlib
import pickle
import textwrap
import tempfile
import threading
import time
import numpy as np
import pytest
import pyarrow as pa
import pyarrow.csv
import pyarrow.feather
import pyarrow.fs as fs
from pyarrow.tests.util import (change_cwd, _filesystem_uri,
FSProtocolClass, ProxyHandler)
try:
import pandas as pd
except ImportError:
pd = None
try:
import pyarrow.dataset as ds
except ImportError:
ds = None
# Marks all of the tests in this module
# Ignore these with pytest ... -m 'not dataset'
pytestmark = pytest.mark.dataset
def _generate_data(n):
import datetime
import itertools
day = datetime.datetime(2000, 1, 1)
interval = datetime.timedelta(days=5)
colors = itertools.cycle(['green', 'blue', 'yellow', 'red', 'orange'])
data = []
for i in range(n):
data.append((day, i, float(i), next(colors)))
day += interval
return pd.DataFrame(data, columns=['date', 'index', 'value', 'color'])
def _table_from_pandas(df):
schema = pa.schema([
pa.field('date', pa.date32()),
pa.field('index', pa.int64()),
pa.field('value', pa.float64()),
pa.field('color', pa.string()),
])
table = pa.Table.from_pandas(df, schema=schema, preserve_index=False)
return table.replace_schema_metadata()
@pytest.fixture
@pytest.mark.parquet
def mockfs():
import pyarrow.parquet as pq
mockfs = fs._MockFileSystem()
directories = [
'subdir/1/xxx',
'subdir/2/yyy',
]
for i, directory in enumerate(directories):
path = '{}/file{}.parquet'.format(directory, i)
mockfs.create_dir(directory)
with mockfs.open_output_stream(path) as out:
data = [
list(range(5)),
list(map(float, range(5))),
list(map(str, range(5))),
[i] * 5
]
schema = pa.schema([
pa.field('i64', pa.int64()),
pa.field('f64', pa.float64()),
pa.field('str', pa.string()),
pa.field('const', pa.int64()),
])
batch = pa.record_batch(data, schema=schema)
table = pa.Table.from_batches([batch])
pq.write_table(table, out)
return mockfs
@pytest.fixture
def open_logging_fs(monkeypatch):
from pyarrow.fs import PyFileSystem, LocalFileSystem
from .test_fs import ProxyHandler
localfs = LocalFileSystem()
def normalized(paths):
return {localfs.normalize_path(str(p)) for p in paths}
opened = set()
def open_input_file(self, path):
path = localfs.normalize_path(str(path))
opened.add(path)
return self._fs.open_input_file(path)
# patch proxyhandler to log calls to open_input_file
monkeypatch.setattr(ProxyHandler, "open_input_file", open_input_file)
fs = PyFileSystem(ProxyHandler(localfs))
@contextlib.contextmanager
def assert_opens(expected_opened):
opened.clear()
try:
yield
finally:
assert normalized(opened) == normalized(expected_opened)
return fs, assert_opens
@pytest.fixture(scope='module')
def multisourcefs(request):
request.config.pyarrow.requires('pandas')
request.config.pyarrow.requires('parquet')
import pyarrow.parquet as pq
df = _generate_data(1000)
mockfs = fs._MockFileSystem()
# simply split the dataframe into four chunks to construct a data source
# from each chunk into its own directory
df_a, df_b, df_c, df_d = np.array_split(df, 4)
# create a directory containing a flat sequence of parquet files without
# any partitioning involved
mockfs.create_dir('plain')
for i, chunk in enumerate(np.array_split(df_a, 10)):
path = 'plain/chunk-{}.parquet'.format(i)
with mockfs.open_output_stream(path) as out:
pq.write_table(_table_from_pandas(chunk), out)
# create one with schema partitioning by weekday and color
mockfs.create_dir('schema')
for part, chunk in df_b.groupby([df_b.date.dt.dayofweek, df_b.color]):
folder = 'schema/{}/{}'.format(*part)
path = '{}/chunk.parquet'.format(folder)
mockfs.create_dir(folder)
with mockfs.open_output_stream(path) as out:
pq.write_table(_table_from_pandas(chunk), out)
# create one with hive partitioning by year and month
mockfs.create_dir('hive')
for part, chunk in df_c.groupby([df_c.date.dt.year, df_c.date.dt.month]):
folder = 'hive/year={}/month={}'.format(*part)
path = '{}/chunk.parquet'.format(folder)
mockfs.create_dir(folder)
with mockfs.open_output_stream(path) as out:
pq.write_table(_table_from_pandas(chunk), out)
# create one with hive partitioning by color
mockfs.create_dir('hive_color')
for part, chunk in df_d.groupby(["color"]):
folder = 'hive_color/color={}'.format(*part)
path = '{}/chunk.parquet'.format(folder)
mockfs.create_dir(folder)
with mockfs.open_output_stream(path) as out:
pq.write_table(_table_from_pandas(chunk), out)
return mockfs
@pytest.fixture
def dataset(mockfs):
format = ds.ParquetFileFormat()
selector = fs.FileSelector('subdir', recursive=True)
options = ds.FileSystemFactoryOptions('subdir')
options.partitioning = ds.DirectoryPartitioning(
pa.schema([
pa.field('group', pa.int32()),
pa.field('key', pa.string())
])
)
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
return factory.finish()
@pytest.fixture(params=[
(True, True),
(True, False),
(False, True),
(False, False)
], ids=['threaded-async', 'threaded-sync', 'serial-async', 'serial-sync'])
def dataset_reader(request):
'''
Fixture which allows dataset scanning operations to be
run with/without threads and with/without async
'''
use_threads, use_async = request.param
class reader:
def __init__(self):
self.use_threads = use_threads
self.use_async = use_async
def _patch_kwargs(self, kwargs):
if 'use_threads' in kwargs:
raise Exception(
('Invalid use of dataset_reader, do not specify'
' use_threads'))
if 'use_async' in kwargs:
raise Exception(
'Invalid use of dataset_reader, do not specify use_async')
kwargs['use_threads'] = use_threads
kwargs['use_async'] = use_async
def to_table(self, dataset, **kwargs):
self._patch_kwargs(kwargs)
return dataset.to_table(**kwargs)
def to_batches(self, dataset, **kwargs):
self._patch_kwargs(kwargs)
return dataset.to_batches(**kwargs)
def scanner(self, dataset, **kwargs):
self._patch_kwargs(kwargs)
return dataset.scanner(**kwargs)
def head(self, dataset, num_rows, **kwargs):
self._patch_kwargs(kwargs)
return dataset.head(num_rows, **kwargs)
def take(self, dataset, indices, **kwargs):
self._patch_kwargs(kwargs)
return dataset.take(indices, **kwargs)
def count_rows(self, dataset, **kwargs):
self._patch_kwargs(kwargs)
return dataset.count_rows(**kwargs)
return reader()
def test_filesystem_dataset(mockfs):
schema = pa.schema([
pa.field('const', pa.int64())
])
file_format = ds.ParquetFileFormat()
paths = ['subdir/1/xxx/file0.parquet', 'subdir/2/yyy/file1.parquet']
partitions = [ds.field('part') == x for x in range(1, 3)]
fragments = [file_format.make_fragment(path, mockfs, part)
for path, part in zip(paths, partitions)]
root_partition = ds.field('level') == ds.scalar(1337)
dataset_from_fragments = ds.FileSystemDataset(
fragments, schema=schema, format=file_format,
filesystem=mockfs, root_partition=root_partition,
)
dataset_from_paths = ds.FileSystemDataset.from_paths(
paths, schema=schema, format=file_format, filesystem=mockfs,
partitions=partitions, root_partition=root_partition,
)
for dataset in [dataset_from_fragments, dataset_from_paths]:
assert isinstance(dataset, ds.FileSystemDataset)
assert isinstance(dataset.format, ds.ParquetFileFormat)
assert dataset.partition_expression.equals(root_partition)
assert set(dataset.files) == set(paths)
fragments = list(dataset.get_fragments())
for fragment, partition, path in zip(fragments, partitions, paths):
assert fragment.partition_expression.equals(partition)
assert fragment.path == path
assert isinstance(fragment.format, ds.ParquetFileFormat)
assert isinstance(fragment, ds.ParquetFileFragment)
assert fragment.row_groups == [0]
assert fragment.num_row_groups == 1
row_group_fragments = list(fragment.split_by_row_group())
assert fragment.num_row_groups == len(row_group_fragments) == 1
assert isinstance(row_group_fragments[0], ds.ParquetFileFragment)
assert row_group_fragments[0].path == path
assert row_group_fragments[0].row_groups == [0]
assert row_group_fragments[0].num_row_groups == 1
fragments = list(dataset.get_fragments(filter=ds.field("const") == 0))
assert len(fragments) == 2
# the root_partition keyword has a default
dataset = ds.FileSystemDataset(
fragments, schema=schema, format=file_format, filesystem=mockfs
)
assert dataset.partition_expression.equals(ds.scalar(True))
# from_paths partitions have defaults
dataset = ds.FileSystemDataset.from_paths(
paths, schema=schema, format=file_format, filesystem=mockfs
)
assert dataset.partition_expression.equals(ds.scalar(True))
for fragment in dataset.get_fragments():
assert fragment.partition_expression.equals(ds.scalar(True))
# validation of required arguments
with pytest.raises(TypeError, match="incorrect type"):
ds.FileSystemDataset(fragments, file_format, schema)
# validation of root_partition
with pytest.raises(TypeError, match="incorrect type"):
ds.FileSystemDataset(fragments, schema=schema,
format=file_format, root_partition=1)
# missing required argument in from_paths
with pytest.raises(TypeError, match="incorrect type"):
ds.FileSystemDataset.from_paths(fragments, format=file_format)
def test_filesystem_dataset_no_filesystem_interaction(dataset_reader):
# ARROW-8283
schema = pa.schema([
pa.field('f1', pa.int64())
])
file_format = ds.IpcFileFormat()
paths = ['nonexistingfile.arrow']
# creating the dataset itself doesn't raise
dataset = ds.FileSystemDataset.from_paths(
paths, schema=schema, format=file_format,
filesystem=fs.LocalFileSystem(),
)
# getting fragments also doesn't raise
dataset.get_fragments()
# scanning does raise
with pytest.raises(FileNotFoundError):
dataset_reader.to_table(dataset)
def test_dataset(dataset, dataset_reader):
assert isinstance(dataset, ds.Dataset)
assert isinstance(dataset.schema, pa.Schema)
# TODO(kszucs): test non-boolean Exprs for filter do raise
expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64())
expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64())
for batch in dataset_reader.to_batches(dataset):
assert isinstance(batch, pa.RecordBatch)
assert batch.column(0).equals(expected_i64)
assert batch.column(1).equals(expected_f64)
for batch in dataset_reader.scanner(dataset).scan_batches():
assert isinstance(batch, ds.TaggedRecordBatch)
assert isinstance(batch.fragment, ds.Fragment)
table = dataset_reader.to_table(dataset)
assert isinstance(table, pa.Table)
assert len(table) == 10
condition = ds.field('i64') == 1
result = dataset.to_table(use_threads=True, filter=condition).to_pydict()
# don't rely on the scanning order
assert result['i64'] == [1, 1]
assert result['f64'] == [1., 1.]
assert sorted(result['group']) == [1, 2]
assert sorted(result['key']) == ['xxx', 'yyy']
def test_scanner(dataset, dataset_reader):
scanner = dataset_reader.scanner(
dataset, memory_pool=pa.default_memory_pool())
assert isinstance(scanner, ds.Scanner)
with pytest.raises(pa.ArrowInvalid):
dataset_reader.scanner(dataset, columns=['unknown'])
scanner = dataset_reader.scanner(dataset, columns=['i64'],
memory_pool=pa.default_memory_pool())
assert scanner.dataset_schema == dataset.schema
assert scanner.projected_schema == pa.schema([("i64", pa.int64())])
assert isinstance(scanner, ds.Scanner)
table = scanner.to_table()
for batch in scanner.to_batches():
assert batch.schema == scanner.projected_schema
assert batch.num_columns == 1
assert table == scanner.to_reader().read_all()
assert table.schema == scanner.projected_schema
for i in range(table.num_rows):
indices = pa.array([i])
assert table.take(indices) == scanner.take(indices)
with pytest.raises(pa.ArrowIndexError):
scanner.take(pa.array([table.num_rows]))
assert table.num_rows == scanner.count_rows()
def test_head(dataset, dataset_reader):
result = dataset_reader.head(dataset, 0)
assert result == pa.Table.from_batches([], schema=dataset.schema)
result = dataset_reader.head(dataset, 1, columns=['i64']).to_pydict()
assert result == {'i64': [0]}
result = dataset_reader.head(dataset, 2, columns=['i64'],
filter=ds.field('i64') > 1).to_pydict()
assert result == {'i64': [2, 3]}
result = dataset_reader.head(dataset, 1024, columns=['i64']).to_pydict()
assert result == {'i64': list(range(5)) * 2}
fragment = next(dataset.get_fragments())
result = fragment.head(1, columns=['i64']).to_pydict()
assert result == {'i64': [0]}
result = fragment.head(1024, columns=['i64']).to_pydict()
assert result == {'i64': list(range(5))}
def test_take(dataset, dataset_reader):
fragment = next(dataset.get_fragments())
indices = pa.array([1, 3])
assert dataset_reader.take(
fragment, indices) == dataset_reader.to_table(fragment).take(indices)
with pytest.raises(IndexError):
dataset_reader.take(fragment, pa.array([5]))
indices = pa.array([1, 7])
assert dataset_reader.take(
dataset, indices) == dataset_reader.to_table(dataset).take(indices)
with pytest.raises(IndexError):
dataset_reader.take(dataset, pa.array([10]))
def test_count_rows(dataset, dataset_reader):
fragment = next(dataset.get_fragments())
assert dataset_reader.count_rows(fragment) == 5
assert dataset_reader.count_rows(
fragment, filter=ds.field("i64") == 4) == 1
assert dataset_reader.count_rows(dataset) == 10
# Filter on partition key
assert dataset_reader.count_rows(
dataset, filter=ds.field("group") == 1) == 5
# Filter on data
assert dataset_reader.count_rows(dataset, filter=ds.field("i64") >= 3) == 4
assert dataset_reader.count_rows(dataset, filter=ds.field("i64") < 0) == 0
def test_abstract_classes():
classes = [
ds.FileFormat,
ds.Scanner,
ds.Partitioning,
]
for klass in classes:
with pytest.raises(TypeError):
klass()
def test_partitioning():
schema = pa.schema([
pa.field('i64', pa.int64()),
pa.field('f64', pa.float64())
])
for klass in [ds.DirectoryPartitioning, ds.HivePartitioning]:
partitioning = klass(schema)
assert isinstance(partitioning, ds.Partitioning)
partitioning = ds.DirectoryPartitioning(
pa.schema([
pa.field('group', pa.int64()),
pa.field('key', pa.float64())
])
)
assert partitioning.dictionaries is None
expr = partitioning.parse('/3/3.14')
assert isinstance(expr, ds.Expression)
expected = (ds.field('group') == 3) & (ds.field('key') == 3.14)
assert expr.equals(expected)
with pytest.raises(pa.ArrowInvalid):
partitioning.parse('/prefix/3/aaa')
expr = partitioning.parse('/3')
expected = ds.field('group') == 3
assert expr.equals(expected)
partitioning = ds.HivePartitioning(
pa.schema([
pa.field('alpha', pa.int64()),
pa.field('beta', pa.int64())
]),
null_fallback='xyz'
)
assert partitioning.dictionaries is None
expr = partitioning.parse('/alpha=0/beta=3')
expected = (
(ds.field('alpha') == ds.scalar(0)) &
(ds.field('beta') == ds.scalar(3))
)
assert expr.equals(expected)
expr = partitioning.parse('/alpha=xyz/beta=3')
expected = (
(ds.field('alpha').is_null() & (ds.field('beta') == ds.scalar(3)))
)
assert expr.equals(expected)
for shouldfail in ['/alpha=one/beta=2', '/alpha=one', '/beta=two']:
with pytest.raises(pa.ArrowInvalid):
partitioning.parse(shouldfail)
def test_expression_serialization():
a = ds.scalar(1)
b = ds.scalar(1.1)
c = ds.scalar(True)
d = ds.scalar("string")
e = ds.scalar(None)
f = ds.scalar({'a': 1})
g = ds.scalar(pa.scalar(1))
all_exprs = [a, b, c, d, e, f, g, a == b, a > b, a & b, a | b, ~c,
d.is_valid(), a.cast(pa.int32(), safe=False),
a.cast(pa.int32(), safe=False), a.isin([1, 2, 3]),
ds.field('i64') > 5, ds.field('i64') == 5,
ds.field('i64') == 7, ds.field('i64').is_null()]
for expr in all_exprs:
assert isinstance(expr, ds.Expression)
restored = pickle.loads(pickle.dumps(expr))
assert expr.equals(restored)
def test_expression_construction():
zero = ds.scalar(0)
one = ds.scalar(1)
true = ds.scalar(True)
false = ds.scalar(False)
string = ds.scalar("string")
field = ds.field("field")
zero | one == string
~true == false
for typ in ("bool", pa.bool_()):
field.cast(typ) == true
field.isin([1, 2])
with pytest.raises(TypeError):
field.isin(1)
with pytest.raises(pa.ArrowInvalid):
field != object()
def test_expression_boolean_operators():
# https://issues.apache.org/jira/browse/ARROW-11412
true = ds.scalar(True)
false = ds.scalar(False)
with pytest.raises(ValueError, match="cannot be evaluated to python True"):
true and false
with pytest.raises(ValueError, match="cannot be evaluated to python True"):
true or false
with pytest.raises(ValueError, match="cannot be evaluated to python True"):
bool(true)
with pytest.raises(ValueError, match="cannot be evaluated to python True"):
not true
def test_expression_arithmetic_operators():
dataset = ds.dataset(pa.table({'a': [1, 2, 3], 'b': [2, 2, 2]}))
a = ds.field("a")
b = ds.field("b")
result = dataset.to_table(columns={
"a+1": a + 1,
"b-a": b - a,
"a*2": a * 2,
"a/b": a.cast("float64") / b,
})
expected = pa.table({
"a+1": [2, 3, 4], "b-a": [1, 0, -1],
"a*2": [2, 4, 6], "a/b": [0.5, 1.0, 1.5],
})
assert result.equals(expected)
def test_partition_keys():
a, b, c = [ds.field(f) == f for f in 'abc']
assert ds._get_partition_keys(a) == {'a': 'a'}
assert ds._get_partition_keys(a & b & c) == {f: f for f in 'abc'}
nope = ds.field('d') >= 3
assert ds._get_partition_keys(nope) == {}
assert ds._get_partition_keys(a & nope) == {'a': 'a'}
null = ds.field('a').is_null()
assert ds._get_partition_keys(null) == {'a': None}
def test_parquet_read_options():
opts1 = ds.ParquetReadOptions()
opts2 = ds.ParquetReadOptions(dictionary_columns=['a', 'b'])
opts3 = ds.ParquetReadOptions(coerce_int96_timestamp_unit="ms")
assert opts1.dictionary_columns == set()
assert opts2.dictionary_columns == {'a', 'b'}
assert opts1.coerce_int96_timestamp_unit == "ns"
assert opts3.coerce_int96_timestamp_unit == "ms"
assert opts1 == opts1
assert opts1 != opts2
assert opts1 != opts3
def test_parquet_file_format_read_options():
pff1 = ds.ParquetFileFormat()
pff2 = ds.ParquetFileFormat(dictionary_columns={'a'})
pff3 = ds.ParquetFileFormat(coerce_int96_timestamp_unit="s")
assert pff1.read_options == ds.ParquetReadOptions()
assert pff2.read_options == ds.ParquetReadOptions(dictionary_columns=['a'])
assert pff3.read_options == ds.ParquetReadOptions(
coerce_int96_timestamp_unit="s")
def test_parquet_scan_options():
opts1 = ds.ParquetFragmentScanOptions()
opts2 = ds.ParquetFragmentScanOptions(buffer_size=4096)
opts3 = ds.ParquetFragmentScanOptions(
buffer_size=2**13, use_buffered_stream=True)
opts4 = ds.ParquetFragmentScanOptions(buffer_size=2**13, pre_buffer=True)
assert opts1.use_buffered_stream is False
assert opts1.buffer_size == 2**13
assert opts1.pre_buffer is False
assert opts2.use_buffered_stream is False
assert opts2.buffer_size == 2**12
assert opts2.pre_buffer is False
assert opts3.use_buffered_stream is True
assert opts3.buffer_size == 2**13
assert opts3.pre_buffer is False
assert opts4.use_buffered_stream is False
assert opts4.buffer_size == 2**13
assert opts4.pre_buffer is True
assert opts1 == opts1
assert opts1 != opts2
assert opts2 != opts3
assert opts3 != opts4
def test_file_format_pickling():
formats = [
ds.IpcFileFormat(),
ds.CsvFileFormat(),
ds.CsvFileFormat(pa.csv.ParseOptions(delimiter='\t',
ignore_empty_lines=True)),
ds.CsvFileFormat(read_options=pa.csv.ReadOptions(
skip_rows=3, column_names=['foo'])),
ds.CsvFileFormat(read_options=pa.csv.ReadOptions(
skip_rows=3, block_size=2**20)),
ds.ParquetFileFormat(),
ds.ParquetFileFormat(dictionary_columns={'a'}),
ds.ParquetFileFormat(use_buffered_stream=True),
ds.ParquetFileFormat(
use_buffered_stream=True,
buffer_size=4096,
)
]
try:
formats.append(ds.OrcFileFormat())
except (ImportError, AttributeError):
# catch AttributeError for Python 3.6
pass
for file_format in formats:
assert pickle.loads(pickle.dumps(file_format)) == file_format
def test_fragment_scan_options_pickling():
options = [
ds.CsvFragmentScanOptions(),
ds.CsvFragmentScanOptions(
convert_options=pa.csv.ConvertOptions(strings_can_be_null=True)),
ds.CsvFragmentScanOptions(
read_options=pa.csv.ReadOptions(block_size=2**16)),
ds.ParquetFragmentScanOptions(buffer_size=4096),
ds.ParquetFragmentScanOptions(pre_buffer=True),
]
for option in options:
assert pickle.loads(pickle.dumps(option)) == option
@pytest.mark.parametrize('paths_or_selector', [
fs.FileSelector('subdir', recursive=True),
[
'subdir/1/xxx/file0.parquet',
'subdir/2/yyy/file1.parquet',
]
])
@pytest.mark.parametrize('pre_buffer', [False, True])
def test_filesystem_factory(mockfs, paths_or_selector, pre_buffer):
format = ds.ParquetFileFormat(
read_options=ds.ParquetReadOptions(dictionary_columns={"str"}),
pre_buffer=pre_buffer
)
options = ds.FileSystemFactoryOptions('subdir')
options.partitioning = ds.DirectoryPartitioning(
pa.schema([
pa.field('group', pa.int32()),
pa.field('key', pa.string())
])
)
assert options.partition_base_dir == 'subdir'
assert options.selector_ignore_prefixes == ['.', '_']
assert options.exclude_invalid_files is False
factory = ds.FileSystemDatasetFactory(
mockfs, paths_or_selector, format, options
)
inspected_schema = factory.inspect()
assert factory.inspect().equals(pa.schema([
pa.field('i64', pa.int64()),
pa.field('f64', pa.float64()),
pa.field('str', pa.dictionary(pa.int32(), pa.string())),
pa.field('const', pa.int64()),
pa.field('group', pa.int32()),
pa.field('key', pa.string()),
]), check_metadata=False)
assert isinstance(factory.inspect_schemas(), list)
assert isinstance(factory.finish(inspected_schema),
ds.FileSystemDataset)
assert factory.root_partition.equals(ds.scalar(True))
dataset = factory.finish()
assert isinstance(dataset, ds.FileSystemDataset)
scanner = dataset.scanner()
expected_i64 = pa.array([0, 1, 2, 3, 4], type=pa.int64())
expected_f64 = pa.array([0, 1, 2, 3, 4], type=pa.float64())
expected_str = pa.DictionaryArray.from_arrays(
pa.array([0, 1, 2, 3, 4], type=pa.int32()),
pa.array("0 1 2 3 4".split(), type=pa.string())
)
iterator = scanner.scan_batches()
for (batch, fragment), group, key in zip(iterator, [1, 2], ['xxx', 'yyy']):
expected_group = pa.array([group] * 5, type=pa.int32())
expected_key = pa.array([key] * 5, type=pa.string())
expected_const = pa.array([group - 1] * 5, type=pa.int64())
# Can't compare or really introspect expressions from Python
assert fragment.partition_expression is not None
assert batch.num_columns == 6
assert batch[0].equals(expected_i64)
assert batch[1].equals(expected_f64)
assert batch[2].equals(expected_str)
assert batch[3].equals(expected_const)
assert batch[4].equals(expected_group)
assert batch[5].equals(expected_key)
table = dataset.to_table()
assert isinstance(table, pa.Table)
assert len(table) == 10
assert table.num_columns == 6
def test_make_fragment(multisourcefs):
parquet_format = ds.ParquetFileFormat()
dataset = ds.dataset('/plain', filesystem=multisourcefs,
format=parquet_format)
for path in dataset.files:
fragment = parquet_format.make_fragment(path, multisourcefs)
assert fragment.row_groups == [0]
row_group_fragment = parquet_format.make_fragment(path, multisourcefs,
row_groups=[0])
for f in [fragment, row_group_fragment]:
assert isinstance(f, ds.ParquetFileFragment)
assert f.path == path
assert isinstance(f.filesystem, type(multisourcefs))
assert row_group_fragment.row_groups == [0]
def test_make_csv_fragment_from_buffer(dataset_reader):
content = textwrap.dedent("""
alpha,num,animal
a,12,dog
b,11,cat
c,10,rabbit
""")
buffer = pa.py_buffer(content.encode('utf-8'))
csv_format = ds.CsvFileFormat()
fragment = csv_format.make_fragment(buffer)
expected = pa.table([['a', 'b', 'c'],
[12, 11, 10],
['dog', 'cat', 'rabbit']],
names=['alpha', 'num', 'animal'])
assert dataset_reader.to_table(fragment).equals(expected)
pickled = pickle.loads(pickle.dumps(fragment))
assert dataset_reader.to_table(pickled).equals(fragment.to_table())
@pytest.mark.parquet
def test_make_parquet_fragment_from_buffer(dataset_reader):
import pyarrow.parquet as pq
arrays = [
pa.array(['a', 'b', 'c']),
pa.array([12, 11, 10]),
pa.array(['dog', 'cat', 'rabbit'])
]
dictionary_arrays = [
arrays[0].dictionary_encode(),
arrays[1],
arrays[2].dictionary_encode()
]
dictionary_format = ds.ParquetFileFormat(
read_options=ds.ParquetReadOptions(
dictionary_columns=['alpha', 'animal']
),
use_buffered_stream=True,
buffer_size=4096,
)
cases = [
(arrays, ds.ParquetFileFormat()),
(dictionary_arrays, dictionary_format)
]
for arrays, format_ in cases:
table = pa.table(arrays, names=['alpha', 'num', 'animal'])
out = pa.BufferOutputStream()
pq.write_table(table, out)
buffer = out.getvalue()
fragment = format_.make_fragment(buffer)
assert dataset_reader.to_table(fragment).equals(table)
pickled = pickle.loads(pickle.dumps(fragment))
assert dataset_reader.to_table(pickled).equals(table)
def _create_dataset_for_fragments(tempdir, chunk_size=None, filesystem=None):
import pyarrow.parquet as pq
table = pa.table(
[range(8), [1] * 8, ['a'] * 4 + ['b'] * 4],
names=['f1', 'f2', 'part']
)
path = str(tempdir / "test_parquet_dataset")
# write_to_dataset currently requires pandas
pq.write_to_dataset(table, path,
partition_cols=["part"], chunk_size=chunk_size)
dataset = ds.dataset(
path, format="parquet", partitioning="hive", filesystem=filesystem
)
return table, dataset
@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments(tempdir, dataset_reader):
table, dataset = _create_dataset_for_fragments(tempdir)
# list fragments
fragments = list(dataset.get_fragments())
assert len(fragments) == 2
f = fragments[0]
physical_names = ['f1', 'f2']
# file's schema does not include partition column
assert f.physical_schema.names == physical_names
assert f.format.inspect(f.path, f.filesystem) == f.physical_schema
assert f.partition_expression.equals(ds.field('part') == 'a')
# By default, the partition column is not part of the schema.
result = dataset_reader.to_table(f)
assert result.column_names == physical_names
assert result.equals(table.remove_column(2).slice(0, 4))
# scanning fragment includes partition columns when given the proper
# schema.
result = dataset_reader.to_table(f, schema=dataset.schema)
assert result.column_names == ['f1', 'f2', 'part']
assert result.equals(table.slice(0, 4))
assert f.physical_schema == result.schema.remove(2)
# scanning fragments follow filter predicate
result = dataset_reader.to_table(
f, schema=dataset.schema, filter=ds.field('f1') < 2)
assert result.column_names == ['f1', 'f2', 'part']
@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_implicit_cast(tempdir):
# ARROW-8693
import pyarrow.parquet as pq
table = pa.table([range(8), [1] * 4 + [2] * 4], names=['col', 'part'])
path = str(tempdir / "test_parquet_dataset")
pq.write_to_dataset(table, path, partition_cols=["part"])
part = ds.partitioning(pa.schema([('part', 'int8')]), flavor="hive")
dataset = ds.dataset(path, format="parquet", partitioning=part)
fragments = dataset.get_fragments(filter=ds.field("part") >= 2)
assert len(list(fragments)) == 1
@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_reconstruct(tempdir, dataset_reader):
table, dataset = _create_dataset_for_fragments(tempdir)
def assert_yields_projected(fragment, row_slice,
columns=None, filter=None):
actual = fragment.to_table(
schema=table.schema, columns=columns, filter=filter)
column_names = columns if columns else table.column_names
assert actual.column_names == column_names
expected = table.slice(*row_slice).select(column_names)
assert actual.equals(expected)
fragment = list(dataset.get_fragments())[0]
parquet_format = fragment.format
# test pickle roundtrip
pickled_fragment = pickle.loads(pickle.dumps(fragment))
assert dataset_reader.to_table(
pickled_fragment) == dataset_reader.to_table(fragment)
# manually re-construct a fragment, with explicit schema
new_fragment = parquet_format.make_fragment(
fragment.path, fragment.filesystem,
partition_expression=fragment.partition_expression)
assert dataset_reader.to_table(new_fragment).equals(
dataset_reader.to_table(fragment))
assert_yields_projected(new_fragment, (0, 4))
# filter / column projection, inspected schema
new_fragment = parquet_format.make_fragment(
fragment.path, fragment.filesystem,
partition_expression=fragment.partition_expression)
assert_yields_projected(new_fragment, (0, 2), filter=ds.field('f1') < 2)
# filter requiring cast / column projection, inspected schema
new_fragment = parquet_format.make_fragment(
fragment.path, fragment.filesystem,
partition_expression=fragment.partition_expression)
assert_yields_projected(new_fragment, (0, 2),
columns=['f1'], filter=ds.field('f1') < 2.0)
# filter on the partition column
new_fragment = parquet_format.make_fragment(
fragment.path, fragment.filesystem,
partition_expression=fragment.partition_expression)
assert_yields_projected(new_fragment, (0, 4),
filter=ds.field('part') == 'a')
# Fragments don't contain the partition's columns if not provided to the
# `to_table(schema=...)` method.
pattern = (r'No match for FieldRef.Name\(part\) in ' +
fragment.physical_schema.to_string(False, False, False))
with pytest.raises(ValueError, match=pattern):
new_fragment = parquet_format.make_fragment(
fragment.path, fragment.filesystem,
partition_expression=fragment.partition_expression)
dataset_reader.to_table(new_fragment, filter=ds.field('part') == 'a')
@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_parquet_row_groups(tempdir, dataset_reader):
table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=2)
fragment = list(dataset.get_fragments())[0]
# list and scan row group fragments
row_group_fragments = list(fragment.split_by_row_group())
assert len(row_group_fragments) == fragment.num_row_groups == 2
result = dataset_reader.to_table(
row_group_fragments[0], schema=dataset.schema)
assert result.column_names == ['f1', 'f2', 'part']
assert len(result) == 2
assert result.equals(table.slice(0, 2))
assert row_group_fragments[0].row_groups is not None
assert row_group_fragments[0].num_row_groups == 1
assert row_group_fragments[0].row_groups[0].statistics == {
'f1': {'min': 0, 'max': 1},
'f2': {'min': 1, 'max': 1},
}
fragment = list(dataset.get_fragments(filter=ds.field('f1') < 1))[0]
row_group_fragments = list(fragment.split_by_row_group(ds.field('f1') < 1))
assert len(row_group_fragments) == 1
result = dataset_reader.to_table(
row_group_fragments[0], filter=ds.field('f1') < 1)
assert len(result) == 1
@pytest.mark.parquet
def test_fragments_parquet_num_row_groups(tempdir):
import pyarrow.parquet as pq
table = pa.table({'a': range(8)})
pq.write_table(table, tempdir / "test.parquet", row_group_size=2)
dataset = ds.dataset(tempdir / "test.parquet", format="parquet")
original_fragment = list(dataset.get_fragments())[0]
# create fragment with subset of row groups
fragment = original_fragment.format.make_fragment(
original_fragment.path, original_fragment.filesystem,
row_groups=[1, 3])
assert fragment.num_row_groups == 2
# ensure that parsing metadata preserves correct number of row groups
fragment.ensure_complete_metadata()
assert fragment.num_row_groups == 2
assert len(fragment.row_groups) == 2
@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_parquet_row_groups_dictionary(tempdir, dataset_reader):
import pandas as pd
df = pd.DataFrame(dict(col1=['a', 'b'], col2=[1, 2]))
df['col1'] = df['col1'].astype("category")
import pyarrow.parquet as pq
pq.write_table(pa.table(df), tempdir / "test_filter_dictionary.parquet")
import pyarrow.dataset as ds
dataset = ds.dataset(tempdir / 'test_filter_dictionary.parquet')
result = dataset_reader.to_table(dataset, filter=ds.field("col1") == "a")
assert (df.iloc[0] == result.to_pandas()).all().all()
@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_parquet_ensure_metadata(tempdir, open_logging_fs):
fs, assert_opens = open_logging_fs
_, dataset = _create_dataset_for_fragments(
tempdir, chunk_size=2, filesystem=fs
)
fragment = list(dataset.get_fragments())[0]
# with default discovery, no metadata loaded
with assert_opens([fragment.path]):
fragment.ensure_complete_metadata()
assert fragment.row_groups == [0, 1]
# second time -> use cached / no file IO
with assert_opens([]):
fragment.ensure_complete_metadata()
# recreate fragment with row group ids
new_fragment = fragment.format.make_fragment(
fragment.path, fragment.filesystem, row_groups=[0, 1]
)
assert new_fragment.row_groups == fragment.row_groups
# collect metadata
new_fragment.ensure_complete_metadata()
row_group = new_fragment.row_groups[0]
assert row_group.id == 0
assert row_group.num_rows == 2
assert row_group.statistics is not None
# pickling preserves row group ids
pickled_fragment = pickle.loads(pickle.dumps(new_fragment))
with assert_opens([fragment.path]):
assert pickled_fragment.row_groups == [0, 1]
row_group = pickled_fragment.row_groups[0]
assert row_group.id == 0
assert row_group.statistics is not None
def _create_dataset_all_types(tempdir, chunk_size=None):
import pyarrow.parquet as pq
table = pa.table(
[
pa.array([True, None, False], pa.bool_()),
pa.array([1, 10, 42], pa.int8()),
pa.array([1, 10, 42], pa.uint8()),
pa.array([1, 10, 42], pa.int16()),
pa.array([1, 10, 42], pa.uint16()),
pa.array([1, 10, 42], pa.int32()),
pa.array([1, 10, 42], pa.uint32()),
pa.array([1, 10, 42], pa.int64()),
pa.array([1, 10, 42], pa.uint64()),
pa.array([1.0, 10.0, 42.0], pa.float32()),
pa.array([1.0, 10.0, 42.0], pa.float64()),
pa.array(['a', None, 'z'], pa.utf8()),
pa.array(['a', None, 'z'], pa.binary()),
pa.array([1, 10, 42], pa.timestamp('s')),
pa.array([1, 10, 42], pa.timestamp('ms')),
pa.array([1, 10, 42], pa.timestamp('us')),
pa.array([1, 10, 42], pa.date32()),
pa.array([1, 10, 4200000000], pa.date64()),
pa.array([1, 10, 42], pa.time32('s')),
pa.array([1, 10, 42], pa.time64('us')),
],
names=[
'boolean',
'int8',
'uint8',
'int16',
'uint16',
'int32',
'uint32',
'int64',
'uint64',
'float',
'double',
'utf8',
'binary',
'ts[s]',
'ts[ms]',
'ts[us]',
'date32',
'date64',
'time32',
'time64',
]
)
path = str(tempdir / "test_parquet_dataset_all_types")
# write_to_dataset currently requires pandas
pq.write_to_dataset(table, path, chunk_size=chunk_size)
return table, ds.dataset(path, format="parquet", partitioning="hive")
@pytest.mark.pandas
@pytest.mark.parquet
def test_parquet_fragment_statistics(tempdir):
table, dataset = _create_dataset_all_types(tempdir)
fragment = list(dataset.get_fragments())[0]
import datetime
def dt_s(x): return datetime.datetime(1970, 1, 1, 0, 0, x)
def dt_ms(x): return datetime.datetime(1970, 1, 1, 0, 0, 0, x*1000)
def dt_us(x): return datetime.datetime(1970, 1, 1, 0, 0, 0, x)
date = datetime.date
time = datetime.time
# list and scan row group fragments
row_group_fragments = list(fragment.split_by_row_group())
assert row_group_fragments[0].row_groups is not None
row_group = row_group_fragments[0].row_groups[0]
assert row_group.num_rows == 3
assert row_group.total_byte_size > 1000
assert row_group.statistics == {
'boolean': {'min': False, 'max': True},
'int8': {'min': 1, 'max': 42},
'uint8': {'min': 1, 'max': 42},
'int16': {'min': 1, 'max': 42},
'uint16': {'min': 1, 'max': 42},
'int32': {'min': 1, 'max': 42},
'uint32': {'min': 1, 'max': 42},
'int64': {'min': 1, 'max': 42},
'uint64': {'min': 1, 'max': 42},
'float': {'min': 1.0, 'max': 42.0},
'double': {'min': 1.0, 'max': 42.0},
'utf8': {'min': 'a', 'max': 'z'},
'binary': {'min': b'a', 'max': b'z'},
'ts[s]': {'min': dt_s(1), 'max': dt_s(42)},
'ts[ms]': {'min': dt_ms(1), 'max': dt_ms(42)},
'ts[us]': {'min': dt_us(1), 'max': dt_us(42)},
'date32': {'min': date(1970, 1, 2), 'max': date(1970, 2, 12)},
'date64': {'min': date(1970, 1, 1), 'max': date(1970, 2, 18)},
'time32': {'min': time(0, 0, 1), 'max': time(0, 0, 42)},
'time64': {'min': time(0, 0, 0, 1), 'max': time(0, 0, 0, 42)},
}
@pytest.mark.parquet
def test_parquet_fragment_statistics_nulls(tempdir):
import pyarrow.parquet as pq
table = pa.table({'a': [0, 1, None, None], 'b': ['a', 'b', None, None]})
pq.write_table(table, tempdir / "test.parquet", row_group_size=2)
dataset = ds.dataset(tempdir / "test.parquet", format="parquet")
fragments = list(dataset.get_fragments())[0].split_by_row_group()
# second row group has all nulls -> no statistics
assert fragments[1].row_groups[0].statistics == {}
@pytest.mark.pandas
@pytest.mark.parquet
def test_parquet_empty_row_group_statistics(tempdir):
df = pd.DataFrame({"a": ["a", "b", "b"], "b": [4, 5, 6]})[:0]
df.to_parquet(tempdir / "test.parquet", engine="pyarrow")
dataset = ds.dataset(tempdir / "test.parquet", format="parquet")
fragments = list(dataset.get_fragments())[0].split_by_row_group()
# Only row group is empty
assert fragments[0].row_groups[0].statistics == {}
@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_parquet_row_groups_predicate(tempdir):
table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=2)
fragment = list(dataset.get_fragments())[0]
assert fragment.partition_expression.equals(ds.field('part') == 'a')
# predicate may reference a partition field not present in the
# physical_schema if an explicit schema is provided to split_by_row_group
# filter matches partition_expression: all row groups
row_group_fragments = list(
fragment.split_by_row_group(filter=ds.field('part') == 'a',
schema=dataset.schema))
assert len(row_group_fragments) == 2
# filter contradicts partition_expression: no row groups
row_group_fragments = list(
fragment.split_by_row_group(filter=ds.field('part') == 'b',
schema=dataset.schema))
assert len(row_group_fragments) == 0
@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_parquet_row_groups_reconstruct(tempdir, dataset_reader):
table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=2)
fragment = list(dataset.get_fragments())[0]
parquet_format = fragment.format
row_group_fragments = list(fragment.split_by_row_group())
# test pickle roundtrip
pickled_fragment = pickle.loads(pickle.dumps(fragment))
assert dataset_reader.to_table(
pickled_fragment) == dataset_reader.to_table(fragment)
# manually re-construct row group fragments
new_fragment = parquet_format.make_fragment(
fragment.path, fragment.filesystem,
partition_expression=fragment.partition_expression,
row_groups=[0])
result = dataset_reader.to_table(new_fragment)
assert result.equals(dataset_reader.to_table(row_group_fragments[0]))
# manually re-construct a row group fragment with filter/column projection
new_fragment = parquet_format.make_fragment(
fragment.path, fragment.filesystem,
partition_expression=fragment.partition_expression,
row_groups={1})
result = dataset_reader.to_table(
new_fragment, schema=table.schema, columns=['f1', 'part'],
filter=ds.field('f1') < 3, )
assert result.column_names == ['f1', 'part']
assert len(result) == 1
# out of bounds row group index
new_fragment = parquet_format.make_fragment(
fragment.path, fragment.filesystem,
partition_expression=fragment.partition_expression,
row_groups={2})
with pytest.raises(IndexError, match="references row group 2"):
dataset_reader.to_table(new_fragment)
@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_parquet_subset_ids(tempdir, open_logging_fs,
dataset_reader):
fs, assert_opens = open_logging_fs
table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=1,
filesystem=fs)
fragment = list(dataset.get_fragments())[0]
# select with row group ids
subfrag = fragment.subset(row_group_ids=[0, 3])
with assert_opens([]):
assert subfrag.num_row_groups == 2
assert subfrag.row_groups == [0, 3]
assert subfrag.row_groups[0].statistics is not None
# check correct scan result of subset
result = dataset_reader.to_table(subfrag)
assert result.to_pydict() == {"f1": [0, 3], "f2": [1, 1]}
# empty list of ids
subfrag = fragment.subset(row_group_ids=[])
assert subfrag.num_row_groups == 0
assert subfrag.row_groups == []
result = dataset_reader.to_table(subfrag, schema=dataset.schema)
assert result.num_rows == 0
assert result.equals(table[:0])
@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_parquet_subset_filter(tempdir, open_logging_fs,
dataset_reader):
fs, assert_opens = open_logging_fs
table, dataset = _create_dataset_for_fragments(tempdir, chunk_size=1,
filesystem=fs)
fragment = list(dataset.get_fragments())[0]
# select with filter
subfrag = fragment.subset(ds.field("f1") >= 1)
with assert_opens([]):
assert subfrag.num_row_groups == 3
assert len(subfrag.row_groups) == 3
assert subfrag.row_groups[0].statistics is not None
# check correct scan result of subset
result = dataset_reader.to_table(subfrag)
assert result.to_pydict() == {"f1": [1, 2, 3], "f2": [1, 1, 1]}
# filter that results in empty selection
subfrag = fragment.subset(ds.field("f1") > 5)
assert subfrag.num_row_groups == 0
assert subfrag.row_groups == []
result = dataset_reader.to_table(subfrag, schema=dataset.schema)
assert result.num_rows == 0
assert result.equals(table[:0])
# passing schema to ensure filter on partition expression works
subfrag = fragment.subset(ds.field("part") == "a", schema=dataset.schema)
assert subfrag.num_row_groups == 4
@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_parquet_subset_invalid(tempdir):
_, dataset = _create_dataset_for_fragments(tempdir, chunk_size=1)
fragment = list(dataset.get_fragments())[0]
# passing none or both of filter / row_group_ids
with pytest.raises(ValueError):
fragment.subset(ds.field("f1") >= 1, row_group_ids=[1, 2])
with pytest.raises(ValueError):
fragment.subset()
@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_repr(tempdir, dataset):
# partitioned parquet dataset
fragment = list(dataset.get_fragments())[0]
assert (
repr(fragment) ==
"<pyarrow.dataset.ParquetFileFragment path=subdir/1/xxx/file0.parquet "
"partition=[key=xxx, group=1]>"
)
# single-file parquet dataset (no partition information in repr)
table, path = _create_single_file(tempdir)
dataset = ds.dataset(path, format="parquet")
fragment = list(dataset.get_fragments())[0]
assert (
repr(fragment) ==
"<pyarrow.dataset.ParquetFileFragment path={}>".format(
dataset.filesystem.normalize_path(str(path)))
)
# non-parquet format
path = tempdir / "data.feather"
pa.feather.write_feather(table, path)
dataset = ds.dataset(path, format="feather")
fragment = list(dataset.get_fragments())[0]
assert (
repr(fragment) ==
"<pyarrow.dataset.FileFragment type=ipc path={}>".format(
dataset.filesystem.normalize_path(str(path)))
)
def test_partitioning_factory(mockfs):
paths_or_selector = fs.FileSelector('subdir', recursive=True)
format = ds.ParquetFileFormat()
options = ds.FileSystemFactoryOptions('subdir')
partitioning_factory = ds.DirectoryPartitioning.discover(['group', 'key'])
assert isinstance(partitioning_factory, ds.PartitioningFactory)
options.partitioning_factory = partitioning_factory
factory = ds.FileSystemDatasetFactory(
mockfs, paths_or_selector, format, options
)
inspected_schema = factory.inspect()
# i64/f64 from data, group/key from "/1/xxx" and "/2/yyy" paths
expected_schema = pa.schema([
("i64", pa.int64()),
("f64", pa.float64()),
("str", pa.string()),
("const", pa.int64()),
("group", pa.int32()),
("key", pa.string()),
])
assert inspected_schema.equals(expected_schema)
hive_partitioning_factory = ds.HivePartitioning.discover()
assert isinstance(hive_partitioning_factory, ds.PartitioningFactory)
@pytest.mark.parametrize('infer_dictionary', [False, True])
def test_partitioning_factory_dictionary(mockfs, infer_dictionary):
paths_or_selector = fs.FileSelector('subdir', recursive=True)
format = ds.ParquetFileFormat()
options = ds.FileSystemFactoryOptions('subdir')
options.partitioning_factory = ds.DirectoryPartitioning.discover(
['group', 'key'], infer_dictionary=infer_dictionary)
factory = ds.FileSystemDatasetFactory(
mockfs, paths_or_selector, format, options)
inferred_schema = factory.inspect()
if infer_dictionary:
expected_type = pa.dictionary(pa.int32(), pa.string())
assert inferred_schema.field('key').type == expected_type
table = factory.finish().to_table().combine_chunks()
actual = table.column('key').chunk(0)
expected = pa.array(['xxx'] * 5 + ['yyy'] * 5).dictionary_encode()
assert actual.equals(expected)
# ARROW-9345 ensure filtering on the partition field works
table = factory.finish().to_table(filter=ds.field('key') == 'xxx')
actual = table.column('key').chunk(0)
expected = expected.slice(0, 5)
assert actual.equals(expected)
else:
assert inferred_schema.field('key').type == pa.string()
def test_partitioning_factory_segment_encoding():
mockfs = fs._MockFileSystem()
format = ds.IpcFileFormat()
schema = pa.schema([("i64", pa.int64())])
table = pa.table([pa.array(range(10))], schema=schema)
partition_schema = pa.schema(
[("date", pa.timestamp("s")), ("string", pa.string())])
string_partition_schema = pa.schema(
[("date", pa.string()), ("string", pa.string())])
full_schema = pa.schema(list(schema) + list(partition_schema))
for directory in [
"directory/2021-05-04 00%3A00%3A00/%24",
"hive/date=2021-05-04 00%3A00%3A00/string=%24",
]:
mockfs.create_dir(directory)
with mockfs.open_output_stream(directory + "/0.feather") as sink:
with pa.ipc.new_file(sink, schema) as writer:
writer.write_table(table)
writer.close()
# Directory
selector = fs.FileSelector("directory", recursive=True)
options = ds.FileSystemFactoryOptions("directory")
options.partitioning_factory = ds.DirectoryPartitioning.discover(
schema=partition_schema)
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
inferred_schema = factory.inspect()
assert inferred_schema == full_schema
actual = factory.finish().to_table(columns={
"date_int": ds.field("date").cast(pa.int64()),
})
assert actual[0][0].as_py() == 1620086400
options.partitioning_factory = ds.DirectoryPartitioning.discover(
["date", "string"], segment_encoding="none")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
fragments = list(factory.finish().get_fragments())
assert fragments[0].partition_expression.equals(
(ds.field("date") == "2021-05-04 00%3A00%3A00") &
(ds.field("string") == "%24"))
options.partitioning = ds.DirectoryPartitioning(
string_partition_schema, segment_encoding="none")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
fragments = list(factory.finish().get_fragments())
assert fragments[0].partition_expression.equals(
(ds.field("date") == "2021-05-04 00%3A00%3A00") &
(ds.field("string") == "%24"))
options.partitioning_factory = ds.DirectoryPartitioning.discover(
schema=partition_schema, segment_encoding="none")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
with pytest.raises(pa.ArrowInvalid,
match="Could not cast segments for partition field"):
inferred_schema = factory.inspect()
# Hive
selector = fs.FileSelector("hive", recursive=True)
options = ds.FileSystemFactoryOptions("hive")
options.partitioning_factory = ds.HivePartitioning.discover(
schema=partition_schema)
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
inferred_schema = factory.inspect()
assert inferred_schema == full_schema
actual = factory.finish().to_table(columns={
"date_int": ds.field("date").cast(pa.int64()),
})
assert actual[0][0].as_py() == 1620086400
options.partitioning_factory = ds.HivePartitioning.discover(
segment_encoding="none")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
fragments = list(factory.finish().get_fragments())
assert fragments[0].partition_expression.equals(
(ds.field("date") == "2021-05-04 00%3A00%3A00") &
(ds.field("string") == "%24"))
options.partitioning = ds.HivePartitioning(
string_partition_schema, segment_encoding="none")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
fragments = list(factory.finish().get_fragments())
assert fragments[0].partition_expression.equals(
(ds.field("date") == "2021-05-04 00%3A00%3A00") &
(ds.field("string") == "%24"))
options.partitioning_factory = ds.HivePartitioning.discover(
schema=partition_schema, segment_encoding="none")
factory = ds.FileSystemDatasetFactory(mockfs, selector, format, options)
with pytest.raises(pa.ArrowInvalid,
match="Could not cast segments for partition field"):
inferred_schema = factory.inspect()
def test_dictionary_partitioning_outer_nulls_raises(tempdir):
table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z']})
part = ds.partitioning(
pa.schema([pa.field('a', pa.string()), pa.field('b', pa.string())]))
with pytest.raises(pa.ArrowInvalid):
ds.write_dataset(table, tempdir, format='parquet', partitioning=part)
def _has_subdirs(basedir):
elements = os.listdir(basedir)
return any([os.path.isdir(os.path.join(basedir, el)) for el in elements])
def _do_list_all_dirs(basedir, path_so_far, result):
for f in os.listdir(basedir):
true_nested = os.path.join(basedir, f)
if os.path.isdir(true_nested):
norm_nested = posixpath.join(path_so_far, f)
if _has_subdirs(true_nested):
_do_list_all_dirs(true_nested, norm_nested, result)
else:
result.append(norm_nested)
def _list_all_dirs(basedir):
result = []
_do_list_all_dirs(basedir, '', result)
return result
def _check_dataset_directories(tempdir, expected_directories):
actual_directories = set(_list_all_dirs(tempdir))
assert actual_directories == set(expected_directories)
def test_dictionary_partitioning_inner_nulls(tempdir):
table = pa.table({'a': ['x', 'y', 'z'], 'b': ['x', 'y', None]})
part = ds.partitioning(
pa.schema([pa.field('a', pa.string()), pa.field('b', pa.string())]))
ds.write_dataset(table, tempdir, format='parquet', partitioning=part)
_check_dataset_directories(tempdir, ['x/x', 'y/y', 'z'])
def test_hive_partitioning_nulls(tempdir):
table = pa.table({'a': ['x', None, 'z'], 'b': ['x', 'y', None]})
part = ds.HivePartitioning(pa.schema(
[pa.field('a', pa.string()), pa.field('b', pa.string())]), None, 'xyz')
ds.write_dataset(table, tempdir, format='parquet', partitioning=part)
_check_dataset_directories(tempdir, ['a=x/b=x', 'a=xyz/b=y', 'a=z/b=xyz'])
def test_partitioning_function():
schema = pa.schema([("year", pa.int16()), ("month", pa.int8())])
names = ["year", "month"]
# default DirectoryPartitioning
part = ds.partitioning(schema)
assert isinstance(part, ds.DirectoryPartitioning)
part = ds.partitioning(schema, dictionaries="infer")
assert isinstance(part, ds.PartitioningFactory)
part = ds.partitioning(field_names=names)
assert isinstance(part, ds.PartitioningFactory)
# needs schema or list of names
with pytest.raises(ValueError):
ds.partitioning()
with pytest.raises(ValueError, match="Expected list"):
ds.partitioning(field_names=schema)
with pytest.raises(ValueError, match="Cannot specify both"):
ds.partitioning(schema, field_names=schema)
# Hive partitioning
part = ds.partitioning(schema, flavor="hive")
assert isinstance(part, ds.HivePartitioning)
part = ds.partitioning(schema, dictionaries="infer", flavor="hive")
assert isinstance(part, ds.PartitioningFactory)
part = ds.partitioning(flavor="hive")
assert isinstance(part, ds.PartitioningFactory)
# cannot pass list of names
with pytest.raises(ValueError):
ds.partitioning(names, flavor="hive")
with pytest.raises(ValueError, match="Cannot specify 'field_names'"):
ds.partitioning(field_names=names, flavor="hive")
# unsupported flavor
with pytest.raises(ValueError):
ds.partitioning(schema, flavor="unsupported")
def test_directory_partitioning_dictionary_key(mockfs):
# ARROW-8088 specifying partition key as dictionary type
schema = pa.schema([
pa.field('group', pa.dictionary(pa.int8(), pa.int32())),
pa.field('key', pa.dictionary(pa.int8(), pa.string()))
])
part = ds.DirectoryPartitioning.discover(schema=schema)
dataset = ds.dataset(
"subdir", format="parquet", filesystem=mockfs, partitioning=part
)
assert dataset.partitioning.schema == schema
table = dataset.to_table()
assert table.column('group').type.equals(schema.types[0])
assert table.column('group').to_pylist() == [1] * 5 + [2] * 5
assert table.column('key').type.equals(schema.types[1])
assert table.column('key').to_pylist() == ['xxx'] * 5 + ['yyy'] * 5
def test_hive_partitioning_dictionary_key(multisourcefs):
# ARROW-8088 specifying partition key as dictionary type
schema = pa.schema([
pa.field('year', pa.dictionary(pa.int8(), pa.int16())),
pa.field('month', pa.dictionary(pa.int8(), pa.int16()))
])
part = ds.HivePartitioning.discover(schema=schema)
dataset = ds.dataset(
"hive", format="parquet", filesystem=multisourcefs, partitioning=part
)
assert dataset.partitioning.schema == schema
table = dataset.to_table()
year_dictionary = list(range(2006, 2011))
month_dictionary = list(range(1, 13))
assert table.column('year').type.equals(schema.types[0])
for chunk in table.column('year').chunks:
actual = chunk.dictionary.to_pylist()
actual.sort()
assert actual == year_dictionary
assert table.column('month').type.equals(schema.types[1])
for chunk in table.column('month').chunks:
actual = chunk.dictionary.to_pylist()
actual.sort()
assert actual == month_dictionary
def _create_single_file(base_dir, table=None, row_group_size=None):
import pyarrow.parquet as pq
if table is None:
table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5})
path = base_dir / "test.parquet"
pq.write_table(table, path, row_group_size=row_group_size)
return table, path
def _create_directory_of_files(base_dir):
import pyarrow.parquet as pq
table1 = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5})
path1 = base_dir / "test1.parquet"
pq.write_table(table1, path1)
table2 = pa.table({'a': range(9, 18), 'b': [0.] * 4 + [1.] * 5})
path2 = base_dir / "test2.parquet"
pq.write_table(table2, path2)
return (table1, table2), (path1, path2)
def _check_dataset(dataset, table, dataset_reader):
# also test that pickle roundtrip keeps the functionality
for d in [dataset, pickle.loads(pickle.dumps(dataset))]:
assert dataset.schema.equals(table.schema)
assert dataset_reader.to_table(dataset).equals(table)
def _check_dataset_from_path(path, table, dataset_reader, **kwargs):
# pathlib object
assert isinstance(path, pathlib.Path)
# accept Path, str, List[Path], List[str]
for p in [path, str(path), [path], [str(path)]]:
dataset = ds.dataset(path, **kwargs)
assert isinstance(dataset, ds.FileSystemDataset)
_check_dataset(dataset, table, dataset_reader)
# relative string path
with change_cwd(path.parent):
dataset = ds.dataset(path.name, **kwargs)
assert isinstance(dataset, ds.FileSystemDataset)
_check_dataset(dataset, table, dataset_reader)
@pytest.mark.parquet
def test_open_dataset_single_file(tempdir, dataset_reader):
table, path = _create_single_file(tempdir)
_check_dataset_from_path(path, table, dataset_reader)
@pytest.mark.parquet
def test_deterministic_row_order(tempdir, dataset_reader):
# ARROW-8447 Ensure that dataset.to_table (and Scanner::ToTable) returns a
# deterministic row ordering. This is achieved by constructing a single
# parquet file with one row per RowGroup.
table, path = _create_single_file(tempdir, row_group_size=1)
_check_dataset_from_path(path, table, dataset_reader)
@pytest.mark.parquet
def test_open_dataset_directory(tempdir, dataset_reader):
tables, _ = _create_directory_of_files(tempdir)
table = pa.concat_tables(tables)
_check_dataset_from_path(tempdir, table, dataset_reader)
@pytest.mark.parquet
def test_open_dataset_list_of_files(tempdir, dataset_reader):
tables, (path1, path2) = _create_directory_of_files(tempdir)
table = pa.concat_tables(tables)
datasets = [
ds.dataset([path1, path2]),
ds.dataset([str(path1), str(path2)])
]
datasets += [
pickle.loads(pickle.dumps(d)) for d in datasets
]
for dataset in datasets:
assert dataset.schema.equals(table.schema)
result = dataset_reader.to_table(dataset)
assert result.equals(table)
@pytest.mark.parquet
def test_open_dataset_filesystem_fspath(tempdir):
# single file
table, path = _create_single_file(tempdir)
fspath = FSProtocolClass(path)
# filesystem inferred from path
dataset1 = ds.dataset(fspath)
assert dataset1.schema.equals(table.schema)
# filesystem specified
dataset2 = ds.dataset(fspath, filesystem=fs.LocalFileSystem())
assert dataset2.schema.equals(table.schema)
# passing different filesystem
with pytest.raises(TypeError):
ds.dataset(fspath, filesystem=fs._MockFileSystem())
def test_construct_from_single_file(tempdir, dataset_reader):
directory = tempdir / 'single-file'
directory.mkdir()
table, path = _create_single_file(directory)
relative_path = path.relative_to(directory)
# instantiate from a single file
d1 = ds.dataset(path)
# instantiate from a single file with a filesystem object
d2 = ds.dataset(path, filesystem=fs.LocalFileSystem())
# instantiate from a single file with prefixed filesystem URI
d3 = ds.dataset(str(relative_path), filesystem=_filesystem_uri(directory))
# pickle roundtrip
d4 = pickle.loads(pickle.dumps(d1))
assert dataset_reader.to_table(d1) == dataset_reader.to_table(
d2) == dataset_reader.to_table(d3) == dataset_reader.to_table(d4)
def test_construct_from_single_directory(tempdir, dataset_reader):
directory = tempdir / 'single-directory'
directory.mkdir()
tables, paths = _create_directory_of_files(directory)
d1 = ds.dataset(directory)
d2 = ds.dataset(directory, filesystem=fs.LocalFileSystem())
d3 = ds.dataset(directory.name, filesystem=_filesystem_uri(tempdir))
t1 = dataset_reader.to_table(d1)
t2 = dataset_reader.to_table(d2)
t3 = dataset_reader.to_table(d3)
assert t1 == t2 == t3
# test pickle roundtrip
for d in [d1, d2, d3]:
restored = pickle.loads(pickle.dumps(d))
assert dataset_reader.to_table(restored) == t1
def test_construct_from_list_of_files(tempdir, dataset_reader):
# instantiate from a list of files
directory = tempdir / 'list-of-files'
directory.mkdir()
tables, paths = _create_directory_of_files(directory)
relative_paths = [p.relative_to(tempdir) for p in paths]
with change_cwd(tempdir):
d1 = ds.dataset(relative_paths)
t1 = dataset_reader.to_table(d1)
assert len(t1) == sum(map(len, tables))
d2 = ds.dataset(relative_paths, filesystem=_filesystem_uri(tempdir))
t2 = dataset_reader.to_table(d2)
d3 = ds.dataset(paths)
t3 = dataset_reader.to_table(d3)
d4 = ds.dataset(paths, filesystem=fs.LocalFileSystem())
t4 = dataset_reader.to_table(d4)
assert t1 == t2 == t3 == t4
def test_construct_from_list_of_mixed_paths_fails(mockfs):
# isntantiate from a list of mixed paths
files = [
'subdir/1/xxx/file0.parquet',
'subdir/1/xxx/doesnt-exist.parquet',
]
with pytest.raises(FileNotFoundError, match='doesnt-exist'):
ds.dataset(files, filesystem=mockfs)
def test_construct_from_mixed_child_datasets(mockfs):
# isntantiate from a list of mixed paths
a = ds.dataset(['subdir/1/xxx/file0.parquet',
'subdir/2/yyy/file1.parquet'], filesystem=mockfs)
b = ds.dataset('subdir', filesystem=mockfs)
dataset = ds.dataset([a, b])
assert isinstance(dataset, ds.UnionDataset)
assert len(list(dataset.get_fragments())) == 4
table = dataset.to_table()
assert len(table) == 20
assert table.num_columns == 4
assert len(dataset.children) == 2
for child in dataset.children:
assert child.files == ['subdir/1/xxx/file0.parquet',
'subdir/2/yyy/file1.parquet']
def test_construct_empty_dataset():
empty = ds.dataset([])
table = empty.to_table()
assert table.num_rows == 0
assert table.num_columns == 0
def test_construct_dataset_with_invalid_schema():
empty = ds.dataset([], schema=pa.schema([
('a', pa.int64()),
('a', pa.string())
]))
with pytest.raises(ValueError, match='Multiple matches for .*a.* in '):
empty.to_table()
def test_construct_from_invalid_sources_raise(multisourcefs):
child1 = ds.FileSystemDatasetFactory(
multisourcefs,
fs.FileSelector('/plain'),
format=ds.ParquetFileFormat()
)
child2 = ds.FileSystemDatasetFactory(
multisourcefs,
fs.FileSelector('/schema'),
format=ds.ParquetFileFormat()
)
batch1 = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["a"])
batch2 = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["b"])
with pytest.raises(TypeError, match='Expected.*FileSystemDatasetFactory'):
ds.dataset([child1, child2])
expected = (
"Expected a list of path-like or dataset objects, or a list "
"of batches or tables. The given list contains the following "
"types: int"
)
with pytest.raises(TypeError, match=expected):
ds.dataset([1, 2, 3])
expected = (
"Expected a path-like, list of path-likes or a list of Datasets "
"instead of the given type: NoneType"
)
with pytest.raises(TypeError, match=expected):
ds.dataset(None)
expected = (
"Expected a path-like, list of path-likes or a list of Datasets "
"instead of the given type: generator"
)
with pytest.raises(TypeError, match=expected):
ds.dataset((batch1 for _ in range(3)))
expected = (
"Must provide schema to construct in-memory dataset from an empty list"
)
with pytest.raises(ValueError, match=expected):
ds.InMemoryDataset([])
expected = (
"Item has schema\nb: int64\nwhich does not match expected schema\n"
"a: int64"
)
with pytest.raises(TypeError, match=expected):
ds.dataset([batch1, batch2])
expected = (
"Expected a list of path-like or dataset objects, or a list of "
"batches or tables. The given list contains the following types:"
)
with pytest.raises(TypeError, match=expected):
ds.dataset([batch1, 0])
expected = (
"Expected a list of tables or batches. The given list contains a int"
)
with pytest.raises(TypeError, match=expected):
ds.InMemoryDataset([batch1, 0])
def test_construct_in_memory(dataset_reader):
batch = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["a"])
table = pa.Table.from_batches([batch])
assert ds.dataset([], schema=pa.schema([])).to_table() == pa.table([])
for source in (batch, table, [batch], [table]):
dataset = ds.dataset(source)
assert dataset_reader.to_table(dataset) == table
assert len(list(dataset.get_fragments())) == 1
assert next(dataset.get_fragments()).to_table() == table
assert pa.Table.from_batches(list(dataset.to_batches())) == table
@pytest.mark.parametrize('use_threads,use_async',
[(False, False), (False, True),
(True, False), (True, True)])
def test_scan_iterator(use_threads, use_async):
batch = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["a"])
table = pa.Table.from_batches([batch])
# When constructed from readers/iterators, should be one-shot
match = "OneShotFragment was already scanned"
for factory, schema in (
(lambda: pa.ipc.RecordBatchReader.from_batches(
batch.schema, [batch]), None),
(lambda: (batch for _ in range(1)), batch.schema),
):
# Scanning the fragment consumes the underlying iterator
scanner = ds.Scanner.from_batches(
factory(), schema=schema, use_threads=use_threads,
use_async=use_async)
assert scanner.to_table() == table
with pytest.raises(pa.ArrowInvalid, match=match):
scanner.to_table()
def _create_partitioned_dataset(basedir):
import pyarrow.parquet as pq
table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5})
path = basedir / "dataset-partitioned"
path.mkdir()
for i in range(3):
part = path / "part={}".format(i)
part.mkdir()
pq.write_table(table.slice(3*i, 3), part / "test.parquet")
full_table = table.append_column(
"part", pa.array(np.repeat([0, 1, 2], 3), type=pa.int32()))
return full_table, path
@pytest.mark.parquet
def test_open_dataset_partitioned_directory(tempdir, dataset_reader):
full_table, path = _create_partitioned_dataset(tempdir)
# no partitioning specified, just read all individual files
table = full_table.select(['a', 'b'])
_check_dataset_from_path(path, table, dataset_reader)
# specify partition scheme with discovery
dataset = ds.dataset(
str(path), partitioning=ds.partitioning(flavor="hive"))
assert dataset.schema.equals(full_table.schema)
# specify partition scheme with discovery and relative path
with change_cwd(tempdir):
dataset = ds.dataset("dataset-partitioned/",
partitioning=ds.partitioning(flavor="hive"))
assert dataset.schema.equals(full_table.schema)
# specify partition scheme with string short-cut
dataset = ds.dataset(str(path), partitioning="hive")
assert dataset.schema.equals(full_table.schema)
# specify partition scheme with explicit scheme
dataset = ds.dataset(
str(path),
partitioning=ds.partitioning(
pa.schema([("part", pa.int8())]), flavor="hive"))
expected_schema = table.schema.append(pa.field("part", pa.int8()))
assert dataset.schema.equals(expected_schema)
result = dataset.to_table()
expected = table.append_column(
"part", pa.array(np.repeat([0, 1, 2], 3), type=pa.int8()))
assert result.equals(expected)
@pytest.mark.parquet
def test_open_dataset_filesystem(tempdir):
# single file
table, path = _create_single_file(tempdir)
# filesystem inferred from path
dataset1 = ds.dataset(str(path))
assert dataset1.schema.equals(table.schema)
# filesystem specified
dataset2 = ds.dataset(str(path), filesystem=fs.LocalFileSystem())
assert dataset2.schema.equals(table.schema)
# local filesystem specified with relative path
with change_cwd(tempdir):
dataset3 = ds.dataset("test.parquet", filesystem=fs.LocalFileSystem())
assert dataset3.schema.equals(table.schema)
# passing different filesystem
with pytest.raises(FileNotFoundError):
ds.dataset(str(path), filesystem=fs._MockFileSystem())
@pytest.mark.parquet
def test_open_dataset_unsupported_format(tempdir):
_, path = _create_single_file(tempdir)
with pytest.raises(ValueError, match="format 'blabla' is not supported"):
ds.dataset([path], format="blabla")
@pytest.mark.parquet
def test_open_union_dataset(tempdir, dataset_reader):
_, path = _create_single_file(tempdir)
dataset = ds.dataset(path)
union = ds.dataset([dataset, dataset])
assert isinstance(union, ds.UnionDataset)
pickled = pickle.loads(pickle.dumps(union))
assert dataset_reader.to_table(pickled) == dataset_reader.to_table(union)
def test_open_union_dataset_with_additional_kwargs(multisourcefs):
child = ds.dataset('/plain', filesystem=multisourcefs, format='parquet')
with pytest.raises(ValueError, match="cannot pass any additional"):
ds.dataset([child], format="parquet")
def test_open_dataset_non_existing_file():
# ARROW-8213: Opening a dataset with a local incorrect path gives confusing
# error message
with pytest.raises(FileNotFoundError):
ds.dataset('i-am-not-existing.parquet', format='parquet')
with pytest.raises(pa.ArrowInvalid, match='cannot be relative'):
ds.dataset('file:i-am-not-existing.parquet', format='parquet')
@pytest.mark.parquet
@pytest.mark.parametrize('partitioning', ["directory", "hive"])
@pytest.mark.parametrize('null_fallback', ['xyz', None])
@pytest.mark.parametrize('infer_dictionary', [False, True])
@pytest.mark.parametrize('partition_keys', [
(["A", "B", "C"], [1, 2, 3]),
([1, 2, 3], ["A", "B", "C"]),
(["A", "B", "C"], ["D", "E", "F"]),
([1, 2, 3], [4, 5, 6]),
([1, None, 3], ["A", "B", "C"]),
([1, 2, 3], ["A", None, "C"]),
([None, 2, 3], [None, 2, 3]),
])
def test_partition_discovery(
tempdir, partitioning, null_fallback, infer_dictionary, partition_keys
):
# ARROW-9288 / ARROW-9476
import pyarrow.parquet as pq
table = pa.table({'a': range(9), 'b': [0.0] * 4 + [1.0] * 5})
has_null = None in partition_keys[0] or None in partition_keys[1]
if partitioning == "directory" and has_null:
# Directory partitioning can't handle the first part being null
return
if partitioning == "directory":
partitioning = ds.DirectoryPartitioning.discover(
["part1", "part2"], infer_dictionary=infer_dictionary)
fmt = "{0}/{1}"
null_value = None
else:
if null_fallback:
partitioning = ds.HivePartitioning.discover(
infer_dictionary=infer_dictionary, null_fallback=null_fallback
)
else:
partitioning = ds.HivePartitioning.discover(
infer_dictionary=infer_dictionary)
fmt = "part1={0}/part2={1}"
if null_fallback:
null_value = null_fallback
else:
null_value = "__HIVE_DEFAULT_PARTITION__"
basepath = tempdir / "dataset"
basepath.mkdir()
part_keys1, part_keys2 = partition_keys
for part1 in part_keys1:
for part2 in part_keys2:
path = basepath / \
fmt.format(part1 or null_value, part2 or null_value)
path.mkdir(parents=True)
pq.write_table(table, path / "test.parquet")
dataset = ds.dataset(str(basepath), partitioning=partitioning)
def expected_type(key):
if infer_dictionary:
value_type = pa.string() if isinstance(key, str) else pa.int32()
return pa.dictionary(pa.int32(), value_type)
else:
return pa.string() if isinstance(key, str) else pa.int32()
expected_schema = table.schema.append(
pa.field("part1", expected_type(part_keys1[0]))
).append(
pa.field("part2", expected_type(part_keys2[0]))
)
assert dataset.schema.equals(expected_schema)
@pytest.mark.pandas
def test_dataset_partitioned_dictionary_type_reconstruct(tempdir):
# https://issues.apache.org/jira/browse/ARROW-11400
table = pa.table({'part': np.repeat(['A', 'B'], 5), 'col': range(10)})
part = ds.partitioning(table.select(['part']).schema, flavor="hive")
ds.write_dataset(table, tempdir, partitioning=part, format="feather")
dataset = ds.dataset(
tempdir, format="feather",
partitioning=ds.HivePartitioning.discover(infer_dictionary=True)
)
expected = pa.table(
{'col': table['col'], 'part': table['part'].dictionary_encode()}
)
assert dataset.to_table().equals(expected)
fragment = list(dataset.get_fragments())[0]
assert fragment.to_table(schema=dataset.schema).equals(expected[:5])
part_expr = fragment.partition_expression
restored = pickle.loads(pickle.dumps(dataset))
assert restored.to_table().equals(expected)
restored = pickle.loads(pickle.dumps(fragment))
assert restored.to_table(schema=dataset.schema).equals(expected[:5])
# to_pandas call triggers computation of the actual dictionary values
assert restored.to_table(schema=dataset.schema).to_pandas().equals(
expected[:5].to_pandas()
)
assert restored.partition_expression.equals(part_expr)
@pytest.fixture
def s3_example_simple(s3_server):
from pyarrow.fs import FileSystem
import pyarrow.parquet as pq
host, port, access_key, secret_key = s3_server['connection']
uri = (
"s3://{}:{}@mybucket/data.parquet?scheme=http&endpoint_override={}:{}"
.format(access_key, secret_key, host, port)
)
fs, path = FileSystem.from_uri(uri)
fs.create_dir("mybucket")
table = pa.table({'a': [1, 2, 3]})
with fs.open_output_stream("mybucket/data.parquet") as out:
pq.write_table(table, out)
return table, path, fs, uri, host, port, access_key, secret_key
@pytest.mark.parquet
@pytest.mark.s3
def test_open_dataset_from_uri_s3(s3_example_simple, dataset_reader):
# open dataset from non-localfs string path
table, path, fs, uri, _, _, _, _ = s3_example_simple
# full string URI
dataset = ds.dataset(uri, format="parquet")
assert dataset_reader.to_table(dataset).equals(table)
# passing filesystem object
dataset = ds.dataset(path, format="parquet", filesystem=fs)
assert dataset_reader.to_table(dataset).equals(table)
@pytest.mark.parquet
@pytest.mark.s3 # still needed to create the data
def test_open_dataset_from_uri_s3_fsspec(s3_example_simple):
table, path, _, _, host, port, access_key, secret_key = s3_example_simple
s3fs = pytest.importorskip("s3fs")
from pyarrow.fs import PyFileSystem, FSSpecHandler
fs = s3fs.S3FileSystem(
key=access_key,
secret=secret_key,
client_kwargs={
'endpoint_url': 'http://{}:{}'.format(host, port)
}
)
# passing as fsspec filesystem
dataset = ds.dataset(path, format="parquet", filesystem=fs)
assert dataset.to_table().equals(table)
# directly passing the fsspec-handler
fs = PyFileSystem(FSSpecHandler(fs))
dataset = ds.dataset(path, format="parquet", filesystem=fs)
assert dataset.to_table().equals(table)
@pytest.mark.parquet
@pytest.mark.s3
def test_open_dataset_from_s3_with_filesystem_uri(s3_server):
from pyarrow.fs import FileSystem
import pyarrow.parquet as pq
host, port, access_key, secret_key = s3_server['connection']
bucket = 'theirbucket'
path = 'nested/folder/data.parquet'
uri = "s3://{}:{}@{}/{}?scheme=http&endpoint_override={}:{}".format(
access_key, secret_key, bucket, path, host, port
)
fs, path = FileSystem.from_uri(uri)
assert path == 'theirbucket/nested/folder/data.parquet'
fs.create_dir(bucket)
table = pa.table({'a': [1, 2, 3]})
with fs.open_output_stream(path) as out:
pq.write_table(table, out)
# full string URI
dataset = ds.dataset(uri, format="parquet")
assert dataset.to_table().equals(table)
# passing filesystem as an uri
template = (
"s3://{}:{}@{{}}?scheme=http&endpoint_override={}:{}".format(
access_key, secret_key, host, port
)
)
cases = [
('theirbucket/nested/folder/', '/data.parquet'),
('theirbucket/nested/folder', 'data.parquet'),
('theirbucket/nested/', 'folder/data.parquet'),
('theirbucket/nested', 'folder/data.parquet'),
('theirbucket', '/nested/folder/data.parquet'),
('theirbucket', 'nested/folder/data.parquet'),
]
for prefix, path in cases:
uri = template.format(prefix)
dataset = ds.dataset(path, filesystem=uri, format="parquet")
assert dataset.to_table().equals(table)
with pytest.raises(pa.ArrowInvalid, match='Missing bucket name'):
uri = template.format('/')
ds.dataset('/theirbucket/nested/folder/data.parquet', filesystem=uri)
error = (
"The path component of the filesystem URI must point to a directory "
"but it has a type: `{}`. The path component is `{}` and the given "
"filesystem URI is `{}`"
)
path = 'theirbucket/doesnt/exist'
uri = template.format(path)
with pytest.raises(ValueError) as exc:
ds.dataset('data.parquet', filesystem=uri)
assert str(exc.value) == error.format('NotFound', path, uri)
path = 'theirbucket/nested/folder/data.parquet'
uri = template.format(path)
with pytest.raises(ValueError) as exc:
ds.dataset('data.parquet', filesystem=uri)
assert str(exc.value) == error.format('File', path, uri)
@pytest.mark.parquet
def test_open_dataset_from_fsspec(tempdir):
table, path = _create_single_file(tempdir)
fsspec = pytest.importorskip("fsspec")
localfs = fsspec.filesystem("file")
dataset = ds.dataset(path, filesystem=localfs)
assert dataset.schema.equals(table.schema)
@pytest.mark.pandas
def test_filter_timestamp(tempdir, dataset_reader):
# ARROW-11379
path = tempdir / "test_partition_timestamps"
table = pa.table({
"dates": ['2012-01-01', '2012-01-02'] * 5,
"id": range(10)})
# write dataset partitioned on dates (as strings)
part = ds.partitioning(table.select(['dates']).schema, flavor="hive")
ds.write_dataset(table, path, partitioning=part, format="feather")
# read dataset partitioned on dates (as timestamps)
part = ds.partitioning(pa.schema([("dates", pa.timestamp("s"))]),
flavor="hive")
dataset = ds.dataset(path, format="feather", partitioning=part)
condition = ds.field("dates") > pd.Timestamp("2012-01-01")
table = dataset_reader.to_table(dataset, filter=condition)
assert table.column('id').to_pylist() == [1, 3, 5, 7, 9]
import datetime
condition = ds.field("dates") > datetime.datetime(2012, 1, 1)
table = dataset_reader.to_table(dataset, filter=condition)
assert table.column('id').to_pylist() == [1, 3, 5, 7, 9]
@pytest.mark.parquet
def test_filter_implicit_cast(tempdir, dataset_reader):
# ARROW-7652
table = pa.table({'a': pa.array([0, 1, 2, 3, 4, 5], type=pa.int8())})
_, path = _create_single_file(tempdir, table)
dataset = ds.dataset(str(path))
filter_ = ds.field('a') > 2
assert len(dataset_reader.to_table(dataset, filter=filter_)) == 3
def test_dataset_union(multisourcefs):
child = ds.FileSystemDatasetFactory(
multisourcefs, fs.FileSelector('/plain'),
format=ds.ParquetFileFormat()
)
factory = ds.UnionDatasetFactory([child])
# TODO(bkietz) reintroduce factory.children property
assert len(factory.inspect_schemas()) == 1
assert all(isinstance(s, pa.Schema) for s in factory.inspect_schemas())
assert factory.inspect_schemas()[0].equals(child.inspect())
assert factory.inspect().equals(child.inspect())
assert isinstance(factory.finish(), ds.Dataset)
def test_union_dataset_from_other_datasets(tempdir, multisourcefs):
child1 = ds.dataset('/plain', filesystem=multisourcefs, format='parquet')
child2 = ds.dataset('/schema', filesystem=multisourcefs, format='parquet',
partitioning=['week', 'color'])
child3 = ds.dataset('/hive', filesystem=multisourcefs, format='parquet',
partitioning='hive')
assert child1.schema != child2.schema != child3.schema
assembled = ds.dataset([child1, child2, child3])
assert isinstance(assembled, ds.UnionDataset)
msg = 'cannot pass any additional arguments'
with pytest.raises(ValueError, match=msg):
ds.dataset([child1, child2], filesystem=multisourcefs)
expected_schema = pa.schema([
('date', pa.date32()),
('index', pa.int64()),
('value', pa.float64()),
('color', pa.string()),
('week', pa.int32()),
('year', pa.int32()),
('month', pa.int32()),
])
assert assembled.schema.equals(expected_schema)
assert assembled.to_table().schema.equals(expected_schema)
assembled = ds.dataset([child1, child3])
expected_schema = pa.schema([
('date', pa.date32()),
('index', pa.int64()),
('value', pa.float64()),
('color', pa.string()),
('year', pa.int32()),
('month', pa.int32()),
])
assert assembled.schema.equals(expected_schema)
assert assembled.to_table().schema.equals(expected_schema)
expected_schema = pa.schema([
('month', pa.int32()),
('color', pa.string()),
('date', pa.date32()),
])
assembled = ds.dataset([child1, child3], schema=expected_schema)
assert assembled.to_table().schema.equals(expected_schema)
expected_schema = pa.schema([
('month', pa.int32()),
('color', pa.string()),
('unknown', pa.string()) # fill with nulls
])
assembled = ds.dataset([child1, child3], schema=expected_schema)
assert assembled.to_table().schema.equals(expected_schema)
# incompatible schemas, date and index columns have conflicting types
table = pa.table([range(9), [0.] * 4 + [1.] * 5, 'abcdefghj'],
names=['date', 'value', 'index'])
_, path = _create_single_file(tempdir, table=table)
child4 = ds.dataset(path)
with pytest.raises(pa.ArrowInvalid, match='Unable to merge'):
ds.dataset([child1, child4])
def test_dataset_from_a_list_of_local_directories_raises(multisourcefs):
msg = 'points to a directory, but only file paths are supported'
with pytest.raises(IsADirectoryError, match=msg):
ds.dataset(['/plain', '/schema', '/hive'], filesystem=multisourcefs)
def test_union_dataset_filesystem_datasets(multisourcefs):
# without partitioning
dataset = ds.dataset([
ds.dataset('/plain', filesystem=multisourcefs),
ds.dataset('/schema', filesystem=multisourcefs),
ds.dataset('/hive', filesystem=multisourcefs),
])
expected_schema = pa.schema([
('date', pa.date32()),
('index', pa.int64()),
('value', pa.float64()),
('color', pa.string()),
])
assert dataset.schema.equals(expected_schema)
# with hive partitioning for two hive sources
dataset = ds.dataset([
ds.dataset('/plain', filesystem=multisourcefs),
ds.dataset('/schema', filesystem=multisourcefs),
ds.dataset('/hive', filesystem=multisourcefs, partitioning='hive')
])
expected_schema = pa.schema([
('date', pa.date32()),
('index', pa.int64()),
('value', pa.float64()),
('color', pa.string()),
('year', pa.int32()),
('month', pa.int32()),
])
assert dataset.schema.equals(expected_schema)
@pytest.mark.parquet
def test_specified_schema(tempdir, dataset_reader):
import pyarrow.parquet as pq
table = pa.table({'a': [1, 2, 3], 'b': [.1, .2, .3]})
pq.write_table(table, tempdir / "data.parquet")
def _check_dataset(schema, expected, expected_schema=None):
dataset = ds.dataset(str(tempdir / "data.parquet"), schema=schema)
if expected_schema is not None:
assert dataset.schema.equals(expected_schema)
else:
assert dataset.schema.equals(schema)
result = dataset_reader.to_table(dataset)
assert result.equals(expected)
# no schema specified
schema = None
expected = table
_check_dataset(schema, expected, expected_schema=table.schema)
# identical schema specified
schema = table.schema
expected = table
_check_dataset(schema, expected)
# Specifying schema with change column order
schema = pa.schema([('b', 'float64'), ('a', 'int64')])
expected = pa.table([[.1, .2, .3], [1, 2, 3]], names=['b', 'a'])
_check_dataset(schema, expected)
# Specifying schema with missing column
schema = pa.schema([('a', 'int64')])
expected = pa.table([[1, 2, 3]], names=['a'])
_check_dataset(schema, expected)
# Specifying schema with additional column
schema = pa.schema([('a', 'int64'), ('c', 'int32')])
expected = pa.table([[1, 2, 3],
pa.array([None, None, None], type='int32')],
names=['a', 'c'])
_check_dataset(schema, expected)
# Specifying with differing field types
schema = pa.schema([('a', 'int32'), ('b', 'float64')])
dataset = ds.dataset(str(tempdir / "data.parquet"), schema=schema)
expected = pa.table([table['a'].cast('int32'),
table['b']],
names=['a', 'b'])
_check_dataset(schema, expected)
# Specifying with incompatible schema
schema = pa.schema([('a', pa.list_(pa.int32())), ('b', 'float64')])
dataset = ds.dataset(str(tempdir / "data.parquet"), schema=schema)
assert dataset.schema.equals(schema)
with pytest.raises(NotImplementedError,
match='Unsupported cast from int64 to list'):
dataset_reader.to_table(dataset)
@pytest.mark.parquet
def test_incompatible_schema_hang(tempdir, dataset_reader):
# ARROW-13480: deadlock when reading past an errored fragment
import pyarrow.parquet as pq
fn = tempdir / "data.parquet"
table = pa.table({'a': [1, 2, 3]})
pq.write_table(table, fn)
schema = pa.schema([('a', pa.null())])
dataset = ds.dataset([str(fn)] * 100, schema=schema)
assert dataset.schema.equals(schema)
scanner = dataset_reader.scanner(dataset)
reader = scanner.to_reader()
with pytest.raises(NotImplementedError,
match='Unsupported cast from int64 to null'):
reader.read_all()
def test_ipc_format(tempdir, dataset_reader):
table = pa.table({'a': pa.array([1, 2, 3], type="int8"),
'b': pa.array([.1, .2, .3], type="float64")})
path = str(tempdir / 'test.arrow')
with pa.output_stream(path) as sink:
writer = pa.RecordBatchFileWriter(sink, table.schema)
writer.write_batch(table.to_batches()[0])
writer.close()
dataset = ds.dataset(path, format=ds.IpcFileFormat())
result = dataset_reader.to_table(dataset)
assert result.equals(table)
for format_str in ["ipc", "arrow"]:
dataset = ds.dataset(path, format=format_str)
result = dataset_reader.to_table(dataset)
assert result.equals(table)
@pytest.mark.orc
def test_orc_format(tempdir, dataset_reader):
from pyarrow import orc
table = pa.table({'a': pa.array([1, 2, 3], type="int8"),
'b': pa.array([.1, .2, .3], type="float64")})
path = str(tempdir / 'test.orc')
orc.write_table(table, path)
dataset = ds.dataset(path, format=ds.OrcFileFormat())
result = dataset_reader.to_table(dataset)
result.validate(full=True)
assert result.equals(table)
dataset = ds.dataset(path, format="orc")
result = dataset_reader.to_table(dataset)
result.validate(full=True)
assert result.equals(table)
result = dataset_reader.to_table(dataset, columns=["b"])
result.validate(full=True)
assert result.equals(table.select(["b"]))
result = dataset_reader.to_table(
dataset, columns={"b2": ds.field("b") * 2}
)
result.validate(full=True)
assert result.equals(
pa.table({'b2': pa.array([.2, .4, .6], type="float64")})
)
assert dataset_reader.count_rows(dataset) == 3
assert dataset_reader.count_rows(dataset, filter=ds.field("a") > 2) == 1
@pytest.mark.orc
def test_orc_scan_options(tempdir, dataset_reader):
from pyarrow import orc
table = pa.table({'a': pa.array([1, 2, 3], type="int8"),
'b': pa.array([.1, .2, .3], type="float64")})
path = str(tempdir / 'test.orc')
orc.write_table(table, path)
dataset = ds.dataset(path, format="orc")
result = list(dataset_reader.to_batches(dataset))
assert len(result) == 1
assert result[0].num_rows == 3
assert result[0].equals(table.to_batches()[0])
# TODO batch_size is not yet supported (ARROW-14153)
# result = list(dataset_reader.to_batches(dataset, batch_size=2))
# assert len(result) == 2
# assert result[0].num_rows == 2
# assert result[0].equals(table.slice(0, 2).to_batches()[0])
# assert result[1].num_rows == 1
# assert result[1].equals(table.slice(2, 1).to_batches()[0])
def test_orc_format_not_supported():
try:
from pyarrow.dataset import OrcFileFormat # noqa
except (ImportError, AttributeError):
# catch AttributeError for Python 3.6
# ORC is not available, test error message
with pytest.raises(
ValueError, match="not built with support for the ORC file"
):
ds.dataset(".", format="orc")
@pytest.mark.pandas
def test_csv_format(tempdir, dataset_reader):
table = pa.table({'a': pa.array([1, 2, 3], type="int64"),
'b': pa.array([.1, .2, .3], type="float64")})
path = str(tempdir / 'test.csv')
table.to_pandas().to_csv(path, index=False)
dataset = ds.dataset(path, format=ds.CsvFileFormat())
result = dataset_reader.to_table(dataset)
assert result.equals(table)
dataset = ds.dataset(path, format='csv')
result = dataset_reader.to_table(dataset)
assert result.equals(table)
@pytest.mark.pandas
@pytest.mark.parametrize("compression", [
"bz2",
"gzip",
"lz4",
"zstd",
])
def test_csv_format_compressed(tempdir, compression, dataset_reader):
if not pyarrow.Codec.is_available(compression):
pytest.skip("{} support is not built".format(compression))
table = pa.table({'a': pa.array([1, 2, 3], type="int64"),
'b': pa.array([.1, .2, .3], type="float64")})
filesystem = fs.LocalFileSystem()
suffix = compression if compression != 'gzip' else 'gz'
path = str(tempdir / f'test.csv.{suffix}')
with filesystem.open_output_stream(path, compression=compression) as sink:
# https://github.com/pandas-dev/pandas/issues/23854
# With CI version of Pandas (anything < 1.2), Pandas tries to write
# str to the sink
csv_str = table.to_pandas().to_csv(index=False)
sink.write(csv_str.encode('utf-8'))
dataset = ds.dataset(path, format=ds.CsvFileFormat())
result = dataset_reader.to_table(dataset)
assert result.equals(table)
def test_csv_format_options(tempdir, dataset_reader):
path = str(tempdir / 'test.csv')
with open(path, 'w') as sink:
sink.write('skipped\ncol0\nfoo\nbar\n')
dataset = ds.dataset(path, format='csv')
result = dataset_reader.to_table(dataset)
assert result.equals(
pa.table({'skipped': pa.array(['col0', 'foo', 'bar'])}))
dataset = ds.dataset(path, format=ds.CsvFileFormat(
read_options=pa.csv.ReadOptions(skip_rows=1)))
result = dataset_reader.to_table(dataset)
assert result.equals(pa.table({'col0': pa.array(['foo', 'bar'])}))
dataset = ds.dataset(path, format=ds.CsvFileFormat(
read_options=pa.csv.ReadOptions(column_names=['foo'])))
result = dataset_reader.to_table(dataset)
assert result.equals(
pa.table({'foo': pa.array(['skipped', 'col0', 'foo', 'bar'])}))
def test_csv_fragment_options(tempdir, dataset_reader):
path = str(tempdir / 'test.csv')
with open(path, 'w') as sink:
sink.write('col0\nfoo\nspam\nMYNULL\n')
dataset = ds.dataset(path, format='csv')
convert_options = pyarrow.csv.ConvertOptions(null_values=['MYNULL'],
strings_can_be_null=True)
options = ds.CsvFragmentScanOptions(
convert_options=convert_options,
read_options=pa.csv.ReadOptions(block_size=2**16))
result = dataset_reader.to_table(dataset, fragment_scan_options=options)
assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])}))
csv_format = ds.CsvFileFormat(convert_options=convert_options)
dataset = ds.dataset(path, format=csv_format)
result = dataset_reader.to_table(dataset)
assert result.equals(pa.table({'col0': pa.array(['foo', 'spam', None])}))
options = ds.CsvFragmentScanOptions()
result = dataset_reader.to_table(dataset, fragment_scan_options=options)
assert result.equals(
pa.table({'col0': pa.array(['foo', 'spam', 'MYNULL'])}))
def test_feather_format(tempdir, dataset_reader):
from pyarrow.feather import write_feather
table = pa.table({'a': pa.array([1, 2, 3], type="int8"),
'b': pa.array([.1, .2, .3], type="float64")})
basedir = tempdir / "feather_dataset"
basedir.mkdir()
write_feather(table, str(basedir / "data.feather"))
dataset = ds.dataset(basedir, format=ds.IpcFileFormat())
result = dataset_reader.to_table(dataset)
assert result.equals(table)
dataset = ds.dataset(basedir, format="feather")
result = dataset_reader.to_table(dataset)
assert result.equals(table)
# ARROW-8641 - column selection order
result = dataset_reader.to_table(dataset, columns=["b", "a"])
assert result.column_names == ["b", "a"]
result = dataset_reader.to_table(dataset, columns=["a", "a"])
assert result.column_names == ["a", "a"]
# error with Feather v1 files
write_feather(table, str(basedir / "data1.feather"), version=1)
with pytest.raises(ValueError):
dataset_reader.to_table(ds.dataset(basedir, format="feather"))
def _create_parquet_dataset_simple(root_path):
"""
Creates a simple (flat files, no nested partitioning) Parquet dataset
"""
import pyarrow.parquet as pq
metadata_collector = []
for i in range(4):
table = pa.table({'f1': [i] * 10, 'f2': np.random.randn(10)})
pq.write_to_dataset(
table, str(root_path), metadata_collector=metadata_collector
)
metadata_path = str(root_path / '_metadata')
# write _metadata file
pq.write_metadata(
table.schema, metadata_path,
metadata_collector=metadata_collector
)
return metadata_path, table
@pytest.mark.parquet
@pytest.mark.pandas # write_to_dataset currently requires pandas
def test_parquet_dataset_factory(tempdir):
root_path = tempdir / "test_parquet_dataset"
metadata_path, table = _create_parquet_dataset_simple(root_path)
dataset = ds.parquet_dataset(metadata_path)
assert dataset.schema.equals(table.schema)
assert len(dataset.files) == 4
result = dataset.to_table()
assert result.num_rows == 40
@pytest.mark.parquet
@pytest.mark.pandas # write_to_dataset currently requires pandas
@pytest.mark.parametrize('use_legacy_dataset', [False, True])
def test_parquet_dataset_factory_roundtrip(tempdir, use_legacy_dataset):
# Simple test to ensure we can roundtrip dataset to
# _metadata/common_metadata and back. A more complex test
# using partitioning will have to wait for ARROW-13269. The
# above test (test_parquet_dataset_factory) will not work
# when legacy is False as there is no "append" equivalent in
# the new dataset until ARROW-12358
import pyarrow.parquet as pq
root_path = tempdir / "test_parquet_dataset"
table = pa.table({'f1': [0] * 10, 'f2': np.random.randn(10)})
metadata_collector = []
pq.write_to_dataset(
table, str(root_path), metadata_collector=metadata_collector,
use_legacy_dataset=use_legacy_dataset
)
metadata_path = str(root_path / '_metadata')
# write _metadata file
pq.write_metadata(
table.schema, metadata_path,
metadata_collector=metadata_collector
)
dataset = ds.parquet_dataset(metadata_path)
assert dataset.schema.equals(table.schema)
result = dataset.to_table()
assert result.num_rows == 10
def test_parquet_dataset_factory_order(tempdir):
# The order of the fragments in the dataset should match the order of the
# row groups in the _metadata file.
import pyarrow.parquet as pq
metadatas = []
# Create a dataset where f1 is incrementing from 0 to 100 spread across
# 10 files. Put the row groups in the correct order in _metadata
for i in range(10):
table = pa.table(
{'f1': list(range(i*10, (i+1)*10))})
table_path = tempdir / f'{i}.parquet'
pq.write_table(table, table_path, metadata_collector=metadatas)
metadatas[-1].set_file_path(f'{i}.parquet')
metadata_path = str(tempdir / '_metadata')
pq.write_metadata(table.schema, metadata_path, metadatas)
dataset = ds.parquet_dataset(metadata_path)
# Ensure the table contains values from 0-100 in the right order
scanned_table = dataset.to_table()
scanned_col = scanned_table.column('f1').to_pylist()
assert scanned_col == list(range(0, 100))
@pytest.mark.parquet
@pytest.mark.pandas
def test_parquet_dataset_factory_invalid(tempdir):
root_path = tempdir / "test_parquet_dataset_invalid"
metadata_path, table = _create_parquet_dataset_simple(root_path)
# remove one of the files
list(root_path.glob("*.parquet"))[0].unlink()
dataset = ds.parquet_dataset(metadata_path)
assert dataset.schema.equals(table.schema)
assert len(dataset.files) == 4
with pytest.raises(FileNotFoundError):
dataset.to_table()
def _create_metadata_file(root_path):
# create _metadata file from existing parquet dataset
import pyarrow.parquet as pq
parquet_paths = list(sorted(root_path.rglob("*.parquet")))
schema = pq.ParquetFile(parquet_paths[0]).schema.to_arrow_schema()
metadata_collector = []
for path in parquet_paths:
metadata = pq.ParquetFile(path).metadata
metadata.set_file_path(str(path.relative_to(root_path)))
metadata_collector.append(metadata)
metadata_path = root_path / "_metadata"
pq.write_metadata(
schema, metadata_path, metadata_collector=metadata_collector
)
return metadata_path
def _create_parquet_dataset_partitioned(root_path):
import pyarrow.parquet as pq
table = pa.table([
pa.array(range(20)), pa.array(np.random.randn(20)),
pa.array(np.repeat(['a', 'b'], 10))],
names=["f1", "f2", "part"]
)
table = table.replace_schema_metadata({"key": "value"})
pq.write_to_dataset(table, str(root_path), partition_cols=['part'])
return _create_metadata_file(root_path), table
@pytest.mark.parquet
@pytest.mark.pandas
def test_parquet_dataset_factory_partitioned(tempdir):
root_path = tempdir / "test_parquet_dataset_factory_partitioned"
metadata_path, table = _create_parquet_dataset_partitioned(root_path)
partitioning = ds.partitioning(flavor="hive")
dataset = ds.parquet_dataset(metadata_path, partitioning=partitioning)
assert dataset.schema.equals(table.schema)
assert len(dataset.files) == 2
result = dataset.to_table()
assert result.num_rows == 20
# the partitioned dataset does not preserve order
result = result.to_pandas().sort_values("f1").reset_index(drop=True)
expected = table.to_pandas()
pd.testing.assert_frame_equal(result, expected)
@pytest.mark.parquet
@pytest.mark.pandas
def test_parquet_dataset_factory_metadata(tempdir):
# ensure ParquetDatasetFactory preserves metadata (ARROW-9363)
root_path = tempdir / "test_parquet_dataset_factory_metadata"
metadata_path, table = _create_parquet_dataset_partitioned(root_path)
dataset = ds.parquet_dataset(metadata_path, partitioning="hive")
assert dataset.schema.equals(table.schema)
assert b"key" in dataset.schema.metadata
fragments = list(dataset.get_fragments())
assert b"key" in fragments[0].physical_schema.metadata
@pytest.mark.parquet
@pytest.mark.pandas
def test_parquet_dataset_lazy_filtering(tempdir, open_logging_fs):
fs, assert_opens = open_logging_fs
# Test to ensure that no IO happens when filtering a dataset
# created with ParquetDatasetFactory from a _metadata file
root_path = tempdir / "test_parquet_dataset_lazy_filtering"
metadata_path, _ = _create_parquet_dataset_simple(root_path)
# creating the dataset should only open the metadata file
with assert_opens([metadata_path]):
dataset = ds.parquet_dataset(
metadata_path,
partitioning=ds.partitioning(flavor="hive"),
filesystem=fs)
# materializing fragments should not open any file
with assert_opens([]):
fragments = list(dataset.get_fragments())
# filtering fragments should not open any file
with assert_opens([]):
list(dataset.get_fragments(ds.field("f1") > 15))
# splitting by row group should still not open any file
with assert_opens([]):
fragments[0].split_by_row_group(ds.field("f1") > 15)
# ensuring metadata of splitted fragment should also not open any file
with assert_opens([]):
rg_fragments = fragments[0].split_by_row_group()
rg_fragments[0].ensure_complete_metadata()
# FIXME(bkietz) on Windows this results in FileNotFoundErrors.
# but actually scanning does open files
# with assert_opens([f.path for f in fragments]):
# dataset.to_table()
@pytest.mark.parquet
@pytest.mark.pandas
def test_dataset_schema_metadata(tempdir, dataset_reader):
# ARROW-8802
df = pd.DataFrame({'a': [1, 2, 3]})
path = tempdir / "test.parquet"
df.to_parquet(path)
dataset = ds.dataset(path)
schema = dataset_reader.to_table(dataset).schema
projected_schema = dataset_reader.to_table(dataset, columns=["a"]).schema
# ensure the pandas metadata is included in the schema
assert b"pandas" in schema.metadata
# ensure it is still there in a projected schema (with column selection)
assert schema.equals(projected_schema, check_metadata=True)
@pytest.mark.parquet
def test_filter_mismatching_schema(tempdir, dataset_reader):
# ARROW-9146
import pyarrow.parquet as pq
table = pa.table({"col": pa.array([1, 2, 3, 4], type='int32')})
pq.write_table(table, str(tempdir / "data.parquet"))
# specifying explicit schema, but that mismatches the schema of the data
schema = pa.schema([("col", pa.int64())])
dataset = ds.dataset(
tempdir / "data.parquet", format="parquet", schema=schema)
# filtering on a column with such type mismatch should implicitly
# cast the column
filtered = dataset_reader.to_table(dataset, filter=ds.field("col") > 2)
assert filtered["col"].equals(table["col"].cast('int64').slice(2))
fragment = list(dataset.get_fragments())[0]
filtered = dataset_reader.to_table(
fragment, filter=ds.field("col") > 2, schema=schema)
assert filtered["col"].equals(table["col"].cast('int64').slice(2))
@pytest.mark.parquet
@pytest.mark.pandas
def test_dataset_project_only_partition_columns(tempdir, dataset_reader):
# ARROW-8729
import pyarrow.parquet as pq
table = pa.table({'part': 'a a b b'.split(), 'col': list(range(4))})
path = str(tempdir / 'test_dataset')
pq.write_to_dataset(table, path, partition_cols=['part'])
dataset = ds.dataset(path, partitioning='hive')
all_cols = dataset_reader.to_table(dataset)
part_only = dataset_reader.to_table(dataset, columns=['part'])
assert all_cols.column('part').equals(part_only.column('part'))
@pytest.mark.parquet
@pytest.mark.pandas
def test_dataset_project_null_column(tempdir, dataset_reader):
import pandas as pd
df = pd.DataFrame({"col": np.array([None, None, None], dtype='object')})
f = tempdir / "test_dataset_project_null_column.parquet"
df.to_parquet(f, engine="pyarrow")
dataset = ds.dataset(f, format="parquet",
schema=pa.schema([("col", pa.int64())]))
expected = pa.table({'col': pa.array([None, None, None], pa.int64())})
assert dataset_reader.to_table(dataset).equals(expected)
def test_dataset_project_columns(tempdir, dataset_reader):
# basic column re-projection with expressions
from pyarrow import feather
table = pa.table({"A": [1, 2, 3], "B": [1., 2., 3.], "C": ["a", "b", "c"]})
feather.write_feather(table, tempdir / "data.feather")
dataset = ds.dataset(tempdir / "data.feather", format="feather")
result = dataset_reader.to_table(dataset, columns={
'A_renamed': ds.field('A'),
'B_as_int': ds.field('B').cast("int32", safe=False),
'C_is_a': ds.field('C') == 'a'
})
expected = pa.table({
"A_renamed": [1, 2, 3],
"B_as_int": pa.array([1, 2, 3], type="int32"),
"C_is_a": [True, False, False],
})
assert result.equals(expected)
# raise proper error when not passing an expression
with pytest.raises(TypeError, match="Expected an Expression"):
dataset_reader.to_table(dataset, columns={"A": "A"})
@pytest.mark.pandas
@pytest.mark.parquet
def test_dataset_preserved_partitioning(tempdir):
# ARROW-8655
# through discovery, but without partitioning
_, path = _create_single_file(tempdir)
dataset = ds.dataset(path)
assert dataset.partitioning is None
# through discovery, with hive partitioning but not specified
full_table, path = _create_partitioned_dataset(tempdir)
dataset = ds.dataset(path)
assert dataset.partitioning is None
# through discovery, with hive partitioning (from a partitioning factory)
dataset = ds.dataset(path, partitioning="hive")
part = dataset.partitioning
assert part is not None
assert isinstance(part, ds.HivePartitioning)
assert part.schema == pa.schema([("part", pa.int32())])
assert len(part.dictionaries) == 1
assert part.dictionaries[0] == pa.array([0, 1, 2], pa.int32())
# through discovery, with hive partitioning (from a partitioning object)
part = ds.partitioning(pa.schema([("part", pa.int32())]), flavor="hive")
assert isinstance(part, ds.HivePartitioning) # not a factory
assert part.dictionaries is None
dataset = ds.dataset(path, partitioning=part)
part = dataset.partitioning
assert isinstance(part, ds.HivePartitioning)
assert part.schema == pa.schema([("part", pa.int32())])
# TODO is this expected?
assert part.dictionaries is None
# through manual creation -> not available
dataset = ds.dataset(path, partitioning="hive")
dataset2 = ds.FileSystemDataset(
list(dataset.get_fragments()), schema=dataset.schema,
format=dataset.format, filesystem=dataset.filesystem
)
assert dataset2.partitioning is None
# through discovery with ParquetDatasetFactory
root_path = tempdir / "data-partitioned-metadata"
metadata_path, _ = _create_parquet_dataset_partitioned(root_path)
dataset = ds.parquet_dataset(metadata_path, partitioning="hive")
part = dataset.partitioning
assert part is not None
assert isinstance(part, ds.HivePartitioning)
assert part.schema == pa.schema([("part", pa.string())])
assert len(part.dictionaries) == 1
# will be fixed by ARROW-13153 (order is not preserved at the moment)
# assert part.dictionaries[0] == pa.array(["a", "b"], pa.string())
assert set(part.dictionaries[0].to_pylist()) == {"a", "b"}
@pytest.mark.parquet
@pytest.mark.pandas
def test_write_to_dataset_given_null_just_works(tempdir):
import pyarrow.parquet as pq
schema = pa.schema([
pa.field('col', pa.int64()),
pa.field('part', pa.dictionary(pa.int32(), pa.string()))
])
table = pa.table({'part': [None, None, 'a', 'a'],
'col': list(range(4))}, schema=schema)
path = str(tempdir / 'test_dataset')
pq.write_to_dataset(table, path, partition_cols=[
'part'], use_legacy_dataset=False)
actual_table = pq.read_table(tempdir / 'test_dataset')
# column.equals can handle the difference in chunking but not the fact
# that `part` will have different dictionaries for the two chunks
assert actual_table.column('part').to_pylist(
) == table.column('part').to_pylist()
assert actual_table.column('col').equals(table.column('col'))
@pytest.mark.parquet
@pytest.mark.pandas
def test_legacy_write_to_dataset_drops_null(tempdir):
import pyarrow.parquet as pq
schema = pa.schema([
pa.field('col', pa.int64()),
pa.field('part', pa.dictionary(pa.int32(), pa.string()))
])
table = pa.table({'part': ['a', 'a', None, None],
'col': list(range(4))}, schema=schema)
expected = pa.table(
{'part': ['a', 'a'], 'col': list(range(2))}, schema=schema)
path = str(tempdir / 'test_dataset')
pq.write_to_dataset(table, path, partition_cols=[
'part'], use_legacy_dataset=True)
actual = pq.read_table(tempdir / 'test_dataset')
assert actual == expected
def _sort_table(tab, sort_col):
import pyarrow.compute as pc
sorted_indices = pc.sort_indices(
tab, options=pc.SortOptions([(sort_col, 'ascending')]))
return pc.take(tab, sorted_indices)
def _check_dataset_roundtrip(dataset, base_dir, expected_files, sort_col,
base_dir_path=None, partitioning=None):
base_dir_path = base_dir_path or base_dir
ds.write_dataset(dataset, base_dir, format="feather",
partitioning=partitioning, use_threads=False)
# check that all files are present
file_paths = list(base_dir_path.rglob("*"))
assert set(file_paths) == set(expected_files)
# check that reading back in as dataset gives the same result
dataset2 = ds.dataset(
base_dir_path, format="feather", partitioning=partitioning)
assert _sort_table(dataset2.to_table(), sort_col).equals(
_sort_table(dataset.to_table(), sort_col))
@pytest.mark.parquet
def test_write_dataset(tempdir):
# manually create a written dataset and read as dataset object
directory = tempdir / 'single-file'
directory.mkdir()
_ = _create_single_file(directory)
dataset = ds.dataset(directory)
# full string path
target = tempdir / 'single-file-target'
expected_files = [target / "part-0.feather"]
_check_dataset_roundtrip(dataset, str(target), expected_files, 'a', target)
# pathlib path object
target = tempdir / 'single-file-target2'
expected_files = [target / "part-0.feather"]
_check_dataset_roundtrip(dataset, target, expected_files, 'a', target)
# TODO
# # relative path
# target = tempdir / 'single-file-target3'
# expected_files = [target / "part-0.ipc"]
# _check_dataset_roundtrip(
# dataset, './single-file-target3', expected_files, target)
# Directory of files
directory = tempdir / 'single-directory'
directory.mkdir()
_ = _create_directory_of_files(directory)
dataset = ds.dataset(directory)
target = tempdir / 'single-directory-target'
expected_files = [target / "part-0.feather"]
_check_dataset_roundtrip(dataset, str(target), expected_files, 'a', target)
@pytest.mark.parquet
@pytest.mark.pandas
def test_write_dataset_partitioned(tempdir):
directory = tempdir / "partitioned"
_ = _create_parquet_dataset_partitioned(directory)
partitioning = ds.partitioning(flavor="hive")
dataset = ds.dataset(directory, partitioning=partitioning)
# hive partitioning
target = tempdir / 'partitioned-hive-target'
expected_paths = [
target / "part=a", target / "part=a" / "part-0.feather",
target / "part=b", target / "part=b" / "part-0.feather"
]
partitioning_schema = ds.partitioning(
pa.schema([("part", pa.string())]), flavor="hive")
_check_dataset_roundtrip(
dataset, str(target), expected_paths, 'f1', target,
partitioning=partitioning_schema)
# directory partitioning
target = tempdir / 'partitioned-dir-target'
expected_paths = [
target / "a", target / "a" / "part-0.feather",
target / "b", target / "b" / "part-0.feather"
]
partitioning_schema = ds.partitioning(
pa.schema([("part", pa.string())]))
_check_dataset_roundtrip(
dataset, str(target), expected_paths, 'f1', target,
partitioning=partitioning_schema)
def test_write_dataset_with_field_names(tempdir):
table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z']})
ds.write_dataset(table, tempdir, format='parquet',
partitioning=["b"])
load_back = ds.dataset(tempdir, partitioning=["b"])
files = load_back.files
partitioning_dirs = {
str(pathlib.Path(f).relative_to(tempdir).parent) for f in files
}
assert partitioning_dirs == {"x", "y", "z"}
load_back_table = load_back.to_table()
assert load_back_table.equals(table)
def test_write_dataset_with_field_names_hive(tempdir):
table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z']})
ds.write_dataset(table, tempdir, format='parquet',
partitioning=["b"], partitioning_flavor="hive")
load_back = ds.dataset(tempdir, partitioning="hive")
files = load_back.files
partitioning_dirs = {
str(pathlib.Path(f).relative_to(tempdir).parent) for f in files
}
assert partitioning_dirs == {"b=x", "b=y", "b=z"}
load_back_table = load_back.to_table()
assert load_back_table.equals(table)
def test_write_dataset_with_scanner(tempdir):
table = pa.table({'a': ['x', 'y', None], 'b': ['x', 'y', 'z'],
'c': [1, 2, 3]})
ds.write_dataset(table, tempdir, format='parquet',
partitioning=["b"])
dataset = ds.dataset(tempdir, partitioning=["b"])
with tempfile.TemporaryDirectory() as tempdir2:
ds.write_dataset(dataset.scanner(columns=["b", "c"], use_async=True),
tempdir2, format='parquet', partitioning=["b"])
load_back = ds.dataset(tempdir2, partitioning=["b"])
load_back_table = load_back.to_table()
assert dict(load_back_table.to_pydict()
) == table.drop(["a"]).to_pydict()
def test_write_dataset_with_backpressure(tempdir):
consumer_gate = threading.Event()
# A filesystem that blocks all writes so that we can build
# up backpressure. The writes are released at the end of
# the test.
class GatingFs(ProxyHandler):
def open_output_stream(self, path, metadata):
# Block until the end of the test
consumer_gate.wait()
return self._fs.open_output_stream(path, metadata=metadata)
gating_fs = fs.PyFileSystem(GatingFs(fs.LocalFileSystem()))
schema = pa.schema([pa.field('data', pa.int32())])
# By default, the dataset writer will queue up 64Mi rows so
# with batches of 1M it should only fit ~67 batches
batch = pa.record_batch([pa.array(list(range(1_000_000)))], schema=schema)
batches_read = 0
min_backpressure = 67
end = 200
def counting_generator():
nonlocal batches_read
while batches_read < end:
time.sleep(0.01)
batches_read += 1
yield batch
scanner = ds.Scanner.from_batches(
counting_generator(), schema=schema, use_threads=True,
use_async=True)
write_thread = threading.Thread(
target=lambda: ds.write_dataset(
scanner, str(tempdir), format='parquet', filesystem=gating_fs))
write_thread.start()
try:
start = time.time()
def duration():
return time.time() - start
# This test is timing dependent. There is no signal from the C++
# when backpressure has been hit. We don't know exactly when
# backpressure will be hit because it may take some time for the
# signal to get from the sink to the scanner.
#
# The test may emit false positives on slow systems. It could
# theoretically emit a false negative if the scanner managed to read
# and emit all 200 batches before the backpressure signal had a chance
# to propagate but the 0.01s delay in the generator should make that
# scenario unlikely.
last_value = 0
backpressure_probably_hit = False
while duration() < 10:
if batches_read > min_backpressure:
if batches_read == last_value:
backpressure_probably_hit = True
break
last_value = batches_read
time.sleep(0.5)
assert backpressure_probably_hit
finally:
consumer_gate.set()
write_thread.join()
assert batches_read == end
def test_write_dataset_with_dataset(tempdir):
table = pa.table({'b': ['x', 'y', 'z'], 'c': [1, 2, 3]})
ds.write_dataset(table, tempdir, format='parquet',
partitioning=["b"])
dataset = ds.dataset(tempdir, partitioning=["b"])
with tempfile.TemporaryDirectory() as tempdir2:
ds.write_dataset(dataset, tempdir2,
format='parquet', partitioning=["b"])
load_back = ds.dataset(tempdir2, partitioning=["b"])
load_back_table = load_back.to_table()
assert dict(load_back_table.to_pydict()) == table.to_pydict()
@pytest.mark.pandas
def test_write_dataset_existing_data(tempdir):
directory = tempdir / 'ds'
table = pa.table({'b': ['x', 'y', 'z'], 'c': [1, 2, 3]})
partitioning = ds.partitioning(schema=pa.schema(
[pa.field('c', pa.int64())]), flavor='hive')
def compare_tables_ignoring_order(t1, t2):
df1 = t1.to_pandas().sort_values('b').reset_index(drop=True)
df2 = t2.to_pandas().sort_values('b').reset_index(drop=True)
assert df1.equals(df2)
# First write is ok
ds.write_dataset(table, directory, partitioning=partitioning, format='ipc')
table = pa.table({'b': ['a', 'b', 'c'], 'c': [2, 3, 4]})
# Second write should fail
with pytest.raises(pa.ArrowInvalid):
ds.write_dataset(table, directory,
partitioning=partitioning, format='ipc')
extra_table = pa.table({'b': ['e']})
extra_file = directory / 'c=2' / 'foo.arrow'
pyarrow.feather.write_feather(extra_table, extra_file)
# Should be ok and overwrite with overwrite behavior
ds.write_dataset(table, directory, partitioning=partitioning,
format='ipc',
existing_data_behavior='overwrite_or_ignore')
overwritten = pa.table(
{'b': ['e', 'x', 'a', 'b', 'c'], 'c': [2, 1, 2, 3, 4]})
readback = ds.dataset(tempdir, format='ipc',
partitioning=partitioning).to_table()
compare_tables_ignoring_order(readback, overwritten)
assert extra_file.exists()
# Should be ok and delete matching with delete_matching
ds.write_dataset(table, directory, partitioning=partitioning,
format='ipc', existing_data_behavior='delete_matching')
overwritten = pa.table({'b': ['x', 'a', 'b', 'c'], 'c': [1, 2, 3, 4]})
readback = ds.dataset(tempdir, format='ipc',
partitioning=partitioning).to_table()
compare_tables_ignoring_order(readback, overwritten)
assert not extra_file.exists()
@pytest.mark.parquet
@pytest.mark.pandas
def test_write_dataset_partitioned_dict(tempdir):
directory = tempdir / "partitioned"
_ = _create_parquet_dataset_partitioned(directory)
# directory partitioning, dictionary partition columns
dataset = ds.dataset(
directory,
partitioning=ds.HivePartitioning.discover(infer_dictionary=True))
target = tempdir / 'partitioned-dir-target'
expected_paths = [
target / "a", target / "a" / "part-0.feather",
target / "b", target / "b" / "part-0.feather"
]
partitioning = ds.partitioning(pa.schema([
dataset.schema.field('part')]),
dictionaries={'part': pa.array(['a', 'b'])})
# NB: dictionaries required here since we use partitioning to parse
# directories in _check_dataset_roundtrip (not currently required for
# the formatting step)
_check_dataset_roundtrip(
dataset, str(target), expected_paths, 'f1', target,
partitioning=partitioning)
@pytest.mark.parquet
@pytest.mark.pandas
def test_write_dataset_use_threads(tempdir):
directory = tempdir / "partitioned"
_ = _create_parquet_dataset_partitioned(directory)
dataset = ds.dataset(directory, partitioning="hive")
partitioning = ds.partitioning(
pa.schema([("part", pa.string())]), flavor="hive")
target1 = tempdir / 'partitioned1'
paths_written = []
def file_visitor(written_file):
paths_written.append(written_file.path)
ds.write_dataset(
dataset, target1, format="feather", partitioning=partitioning,
use_threads=True, file_visitor=file_visitor
)
expected_paths = {
target1 / 'part=a' / 'part-0.feather',
target1 / 'part=b' / 'part-0.feather'
}
paths_written_set = set(map(pathlib.Path, paths_written))
assert paths_written_set == expected_paths
target2 = tempdir / 'partitioned2'
ds.write_dataset(
dataset, target2, format="feather", partitioning=partitioning,
use_threads=False
)
# check that reading in gives same result
result1 = ds.dataset(target1, format="feather", partitioning=partitioning)
result2 = ds.dataset(target2, format="feather", partitioning=partitioning)
assert result1.to_table().equals(result2.to_table())
def test_write_table(tempdir):
table = pa.table([
pa.array(range(20)), pa.array(np.random.randn(20)),
pa.array(np.repeat(['a', 'b'], 10))
], names=["f1", "f2", "part"])
base_dir = tempdir / 'single'
ds.write_dataset(table, base_dir,
basename_template='dat_{i}.arrow', format="feather")
# check that all files are present
file_paths = list(base_dir.rglob("*"))
expected_paths = [base_dir / "dat_0.arrow"]
assert set(file_paths) == set(expected_paths)
# check Table roundtrip
result = ds.dataset(base_dir, format="ipc").to_table()
assert result.equals(table)
# with partitioning
base_dir = tempdir / 'partitioned'
expected_paths = [
base_dir / "part=a", base_dir / "part=a" / "dat_0.arrow",
base_dir / "part=b", base_dir / "part=b" / "dat_0.arrow"
]
visited_paths = []
def file_visitor(written_file):
visited_paths.append(written_file.path)
partitioning = ds.partitioning(
pa.schema([("part", pa.string())]), flavor="hive")
ds.write_dataset(table, base_dir, format="feather",
basename_template='dat_{i}.arrow',
partitioning=partitioning, file_visitor=file_visitor)
file_paths = list(base_dir.rglob("*"))
assert set(file_paths) == set(expected_paths)
result = ds.dataset(base_dir, format="ipc", partitioning=partitioning)
assert result.to_table().equals(table)
assert len(visited_paths) == 2
for visited_path in visited_paths:
assert pathlib.Path(visited_path) in expected_paths
def test_write_table_multiple_fragments(tempdir):
table = pa.table([
pa.array(range(10)), pa.array(np.random.randn(10)),
pa.array(np.repeat(['a', 'b'], 5))
], names=["f1", "f2", "part"])
table = pa.concat_tables([table]*2)
# Table with multiple batches written as single Fragment by default
base_dir = tempdir / 'single'
ds.write_dataset(table, base_dir, format="feather")
assert set(base_dir.rglob("*")) == set([base_dir / "part-0.feather"])
assert ds.dataset(base_dir, format="ipc").to_table().equals(table)
# Same for single-element list of Table
base_dir = tempdir / 'single-list'
ds.write_dataset([table], base_dir, format="feather")
assert set(base_dir.rglob("*")) == set([base_dir / "part-0.feather"])
assert ds.dataset(base_dir, format="ipc").to_table().equals(table)
# Provide list of batches to write multiple fragments
base_dir = tempdir / 'multiple'
ds.write_dataset(table.to_batches(), base_dir, format="feather")
assert set(base_dir.rglob("*")) == set(
[base_dir / "part-0.feather"])
assert ds.dataset(base_dir, format="ipc").to_table().equals(table)
# Provide list of tables to write multiple fragments
base_dir = tempdir / 'multiple-table'
ds.write_dataset([table, table], base_dir, format="feather")
assert set(base_dir.rglob("*")) == set(
[base_dir / "part-0.feather"])
assert ds.dataset(base_dir, format="ipc").to_table().equals(
pa.concat_tables([table]*2)
)
def test_write_iterable(tempdir):
table = pa.table([
pa.array(range(20)), pa.array(np.random.randn(20)),
pa.array(np.repeat(['a', 'b'], 10))
], names=["f1", "f2", "part"])
base_dir = tempdir / 'inmemory_iterable'
ds.write_dataset((batch for batch in table.to_batches()), base_dir,
schema=table.schema,
basename_template='dat_{i}.arrow', format="feather")
result = ds.dataset(base_dir, format="ipc").to_table()
assert result.equals(table)
base_dir = tempdir / 'inmemory_reader'
reader = pa.ipc.RecordBatchReader.from_batches(table.schema,
table.to_batches())
ds.write_dataset(reader, base_dir,
basename_template='dat_{i}.arrow', format="feather")
result = ds.dataset(base_dir, format="ipc").to_table()
assert result.equals(table)
def test_write_scanner(tempdir, dataset_reader):
if not dataset_reader.use_async:
pytest.skip(
('ARROW-13338: Write dataset with scanner does not'
' support synchronous scan'))
table = pa.table([
pa.array(range(20)), pa.array(np.random.randn(20)),
pa.array(np.repeat(['a', 'b'], 10))
], names=["f1", "f2", "part"])
dataset = ds.dataset(table)
base_dir = tempdir / 'dataset_from_scanner'
ds.write_dataset(dataset_reader.scanner(
dataset), base_dir, format="feather")
result = dataset_reader.to_table(ds.dataset(base_dir, format="ipc"))
assert result.equals(table)
# scanner with different projected_schema
base_dir = tempdir / 'dataset_from_scanner2'
ds.write_dataset(dataset_reader.scanner(dataset, columns=["f1"]),
base_dir, format="feather")
result = dataset_reader.to_table(ds.dataset(base_dir, format="ipc"))
assert result.equals(table.select(["f1"]))
# schema not allowed when writing a scanner
with pytest.raises(ValueError, match="Cannot specify a schema"):
ds.write_dataset(dataset_reader.scanner(dataset), base_dir,
schema=table.schema, format="feather")
def test_write_table_partitioned_dict(tempdir):
# ensure writing table partitioned on a dictionary column works without
# specifying the dictionary values explicitly
table = pa.table([
pa.array(range(20)),
pa.array(np.repeat(['a', 'b'], 10)).dictionary_encode(),
], names=['col', 'part'])
partitioning = ds.partitioning(table.select(["part"]).schema)
base_dir = tempdir / "dataset"
ds.write_dataset(
table, base_dir, format="feather", partitioning=partitioning
)
# check roundtrip
partitioning_read = ds.DirectoryPartitioning.discover(
["part"], infer_dictionary=True)
result = ds.dataset(
base_dir, format="ipc", partitioning=partitioning_read
).to_table()
assert result.equals(table)
@pytest.mark.parquet
def test_write_dataset_parquet(tempdir):
import pyarrow.parquet as pq
table = pa.table([
pa.array(range(20)), pa.array(np.random.randn(20)),
pa.array(np.repeat(['a', 'b'], 10))
], names=["f1", "f2", "part"])
# using default "parquet" format string
base_dir = tempdir / 'parquet_dataset'
ds.write_dataset(table, base_dir, format="parquet")
# check that all files are present
file_paths = list(base_dir.rglob("*"))
expected_paths = [base_dir / "part-0.parquet"]
assert set(file_paths) == set(expected_paths)
# check Table roundtrip
result = ds.dataset(base_dir, format="parquet").to_table()
assert result.equals(table)
# using custom options
for version in ["1.0", "2.4", "2.6"]:
format = ds.ParquetFileFormat()
opts = format.make_write_options(version=version)
base_dir = tempdir / 'parquet_dataset_version{0}'.format(version)
ds.write_dataset(table, base_dir, format=format, file_options=opts)
meta = pq.read_metadata(base_dir / "part-0.parquet")
expected_version = "1.0" if version == "1.0" else "2.6"
assert meta.format_version == expected_version
def test_write_dataset_csv(tempdir):
table = pa.table([
pa.array(range(20)), pa.array(np.random.randn(20)),
pa.array(np.repeat(['a', 'b'], 10))
], names=["f1", "f2", "chr1"])
base_dir = tempdir / 'csv_dataset'
ds.write_dataset(table, base_dir, format="csv")
# check that all files are present
file_paths = list(base_dir.rglob("*"))
expected_paths = [base_dir / "part-0.csv"]
assert set(file_paths) == set(expected_paths)
# check Table roundtrip
result = ds.dataset(base_dir, format="csv").to_table()
assert result.equals(table)
# using custom options
format = ds.CsvFileFormat(read_options=pyarrow.csv.ReadOptions(
column_names=table.schema.names))
opts = format.make_write_options(include_header=False)
base_dir = tempdir / 'csv_dataset_noheader'
ds.write_dataset(table, base_dir, format=format, file_options=opts)
result = ds.dataset(base_dir, format=format).to_table()
assert result.equals(table)
@pytest.mark.parquet
def test_write_dataset_parquet_file_visitor(tempdir):
table = pa.table([
pa.array(range(20)), pa.array(np.random.randn(20)),
pa.array(np.repeat(['a', 'b'], 10))
], names=["f1", "f2", "part"])
visitor_called = False
def file_visitor(written_file):
nonlocal visitor_called
if (written_file.metadata is not None and
written_file.metadata.num_columns == 3):
visitor_called = True
base_dir = tempdir / 'parquet_dataset'
ds.write_dataset(table, base_dir, format="parquet",
file_visitor=file_visitor)
assert visitor_called
def test_partition_dataset_parquet_file_visitor(tempdir):
f1_vals = [item for chunk in range(4) for item in [chunk] * 10]
f2_vals = [item*10 for chunk in range(4) for item in [chunk] * 10]
table = pa.table({'f1': f1_vals, 'f2': f2_vals,
'part': np.repeat(['a', 'b'], 20)})
root_path = tempdir / 'partitioned'
partitioning = ds.partitioning(
pa.schema([("part", pa.string())]), flavor="hive")
paths_written = []
sample_metadata = None
def file_visitor(written_file):
nonlocal sample_metadata
if written_file.metadata:
sample_metadata = written_file.metadata
paths_written.append(written_file.path)
ds.write_dataset(
table, root_path, format="parquet", partitioning=partitioning,
use_threads=True, file_visitor=file_visitor
)
expected_paths = {
root_path / 'part=a' / 'part-0.parquet',
root_path / 'part=b' / 'part-0.parquet'
}
paths_written_set = set(map(pathlib.Path, paths_written))
assert paths_written_set == expected_paths
assert sample_metadata is not None
assert sample_metadata.num_columns == 2
@pytest.mark.parquet
@pytest.mark.pandas
def test_write_dataset_arrow_schema_metadata(tempdir):
# ensure we serialize ARROW schema in the parquet metadata, to have a
# correct roundtrip (e.g. preserve non-UTC timezone)
import pyarrow.parquet as pq
table = pa.table({"a": [pd.Timestamp("2012-01-01", tz="Europe/Brussels")]})
assert table["a"].type.tz == "Europe/Brussels"
ds.write_dataset(table, tempdir, format="parquet")
result = pq.read_table(tempdir / "part-0.parquet")
assert result["a"].type.tz == "Europe/Brussels"
def test_write_dataset_schema_metadata(tempdir):
# ensure that schema metadata gets written
from pyarrow import feather
table = pa.table({'a': [1, 2, 3]})
table = table.replace_schema_metadata({b'key': b'value'})
ds.write_dataset(table, tempdir, format="feather")
schema = feather.read_table(tempdir / "part-0.feather").schema
assert schema.metadata == {b'key': b'value'}
@pytest.mark.parquet
def test_write_dataset_schema_metadata_parquet(tempdir):
# ensure that schema metadata gets written
import pyarrow.parquet as pq
table = pa.table({'a': [1, 2, 3]})
table = table.replace_schema_metadata({b'key': b'value'})
ds.write_dataset(table, tempdir, format="parquet")
schema = pq.read_table(tempdir / "part-0.parquet").schema
assert schema.metadata == {b'key': b'value'}
@pytest.mark.parquet
@pytest.mark.s3
def test_write_dataset_s3(s3_example_simple):
# write dataset with s3 filesystem
_, _, fs, _, host, port, access_key, secret_key = s3_example_simple
uri_template = (
"s3://{}:{}@{{}}?scheme=http&endpoint_override={}:{}".format(
access_key, secret_key, host, port)
)
table = pa.table([
pa.array(range(20)), pa.array(np.random.randn(20)),
pa.array(np.repeat(['a', 'b'], 10))],
names=["f1", "f2", "part"]
)
part = ds.partitioning(pa.schema([("part", pa.string())]), flavor="hive")
# writing with filesystem object
ds.write_dataset(
table, "mybucket/dataset", filesystem=fs, format="feather",
partitioning=part
)
# check rountrip
result = ds.dataset(
"mybucket/dataset", filesystem=fs, format="ipc", partitioning="hive"
).to_table()
assert result.equals(table)
# writing with URI
uri = uri_template.format("mybucket/dataset2")
ds.write_dataset(table, uri, format="feather", partitioning=part)
# check rountrip
result = ds.dataset(
"mybucket/dataset2", filesystem=fs, format="ipc", partitioning="hive"
).to_table()
assert result.equals(table)
# writing with path + URI as filesystem
uri = uri_template.format("mybucket")
ds.write_dataset(
table, "dataset3", filesystem=uri, format="feather", partitioning=part
)
# check rountrip
result = ds.dataset(
"mybucket/dataset3", filesystem=fs, format="ipc", partitioning="hive"
).to_table()
assert result.equals(table)
@pytest.mark.parquet
def test_dataset_null_to_dictionary_cast(tempdir, dataset_reader):
# ARROW-12420
import pyarrow.parquet as pq
table = pa.table({"a": [None, None]})
pq.write_table(table, tempdir / "test.parquet")
schema = pa.schema([
pa.field("a", pa.dictionary(pa.int32(), pa.string()))
])
fsds = ds.FileSystemDataset.from_paths(
paths=[tempdir / "test.parquet"],
schema=schema,
format=ds.ParquetFileFormat(),
filesystem=fs.LocalFileSystem(),
)
table = dataset_reader.to_table(fsds)
assert table.schema == schema