blob: 0929a1549f5b6f0336cd4e90e1ecb3e90c2501a6 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from collections import defaultdict
import os
import inspect
import json
import re
import six
from six.moves.urllib.parse import urlparse
# pathlib might not be available in Python 2
try:
import pathlib
_has_pathlib = True
except ImportError:
_has_pathlib = False
import numpy as np
from pyarrow.filesystem import FileSystem, LocalFileSystem, S3FSWrapper
from pyarrow._parquet import (ParquetReader, FileMetaData, # noqa
RowGroupMetaData, ParquetSchema)
import pyarrow._parquet as _parquet # noqa
import pyarrow.lib as lib
import pyarrow as pa
# ----------------------------------------------------------------------
# Reading a single Parquet file
class ParquetFile(object):
"""
Reader interface for a single Parquet file
Parameters
----------
source : str, pyarrow.NativeFile, or file-like object
Readable source. For passing bytes or buffer-like file containing a
Parquet file, use pyarorw.BufferReader
metadata : ParquetFileMetadata, default None
Use existing metadata object, rather than reading from file.
common_metadata : ParquetFileMetadata, default None
Will be used in reads for pandas schema metadata if not found in the
main file's metadata, no other uses at the moment
"""
def __init__(self, source, metadata=None, common_metadata=None):
self.reader = ParquetReader()
source = _ensure_file(source)
self.reader.open(source, metadata=metadata)
self.common_metadata = common_metadata
self._nested_paths_by_prefix = self._build_nested_paths()
def _build_nested_paths(self):
paths = self.reader.column_paths
result = defaultdict(list)
def _visit_piece(i, key, rest):
result[key].append(i)
if len(rest) > 0:
nested_key = '.'.join((key, rest[0]))
_visit_piece(i, nested_key, rest[1:])
for i, path in enumerate(paths):
_visit_piece(i, path[0], path[1:])
return result
@property
def metadata(self):
return self.reader.metadata
@property
def schema(self):
return self.metadata.schema
@property
def num_row_groups(self):
return self.reader.num_row_groups
def read_row_group(self, i, columns=None, nthreads=1,
use_pandas_metadata=False):
"""
Read a single row group from a Parquet file
Parameters
----------
columns: list
If not None, only these columns will be read from the row group. A
column name may be a prefix of a nested field, e.g. 'a' will select
'a.b', 'a.c', and 'a.d.e'
nthreads : int, default 1
Number of columns to read in parallel. If > 1, requires that the
underlying file source is threadsafe
use_pandas_metadata : boolean, default False
If True and file has custom pandas schema metadata, ensure that
index columns are also loaded
Returns
-------
pyarrow.table.Table
Content of the row group as a table (of columns)
"""
column_indices = self._get_column_indices(
columns, use_pandas_metadata=use_pandas_metadata)
return self.reader.read_row_group(i, column_indices=column_indices,
nthreads=nthreads)
def read(self, columns=None, nthreads=1, use_pandas_metadata=False):
"""
Read a Table from Parquet format
Parameters
----------
columns: list
If not None, only these columns will be read from the file. A
column name may be a prefix of a nested field, e.g. 'a' will select
'a.b', 'a.c', and 'a.d.e'
nthreads : int, default 1
Number of columns to read in parallel. If > 1, requires that the
underlying file source is threadsafe
use_pandas_metadata : boolean, default False
If True and file has custom pandas schema metadata, ensure that
index columns are also loaded
Returns
-------
pyarrow.table.Table
Content of the file as a table (of columns)
"""
column_indices = self._get_column_indices(
columns, use_pandas_metadata=use_pandas_metadata)
return self.reader.read_all(column_indices=column_indices,
nthreads=nthreads)
def scan_contents(self, columns=None, batch_size=65536):
"""
Read contents of file with a single thread for indicated columns and
batch size. Number of rows in file is returned. This function is used
for benchmarking
Parameters
----------
columns : list of integers, default None
If None, scan all columns
batch_size : int, default 64K
Number of rows to read at a time internally
Returns
-------
num_rows : number of rows in file
"""
column_indices = self._get_column_indices(columns)
return self.reader.scan_contents(column_indices,
batch_size=batch_size)
def _get_column_indices(self, column_names, use_pandas_metadata=False):
if column_names is None:
return None
indices = []
for name in column_names:
if name in self._nested_paths_by_prefix:
indices.extend(self._nested_paths_by_prefix[name])
if use_pandas_metadata:
file_keyvalues = self.metadata.metadata
common_keyvalues = (self.common_metadata.metadata
if self.common_metadata is not None
else None)
if file_keyvalues and b'pandas' in file_keyvalues:
index_columns = _get_pandas_index_columns(file_keyvalues)
elif common_keyvalues and b'pandas' in common_keyvalues:
index_columns = _get_pandas_index_columns(common_keyvalues)
else:
index_columns = []
if indices is not None and index_columns:
indices += map(self.reader.column_name_idx, index_columns)
return indices
_SPARK_DISALLOWED_CHARS = re.compile('[ ,;{}()\n\t=]')
def _sanitized_spark_field_name(name):
return _SPARK_DISALLOWED_CHARS.sub('_', name)
def _sanitize_schema(schema, flavor):
if 'spark' in flavor:
sanitized_fields = []
schema_changed = False
for field in schema:
name = field.name
sanitized_name = _sanitized_spark_field_name(name)
if sanitized_name != name:
schema_changed = True
sanitized_field = pa.field(sanitized_name, field.type,
field.nullable, field.metadata)
sanitized_fields.append(sanitized_field)
else:
sanitized_fields.append(field)
new_schema = pa.schema(sanitized_fields, metadata=schema.metadata)
return new_schema, schema_changed
else:
return schema, False
def _sanitize_table(table, new_schema, flavor):
# TODO: This will not handle prohibited characters in nested field names
if 'spark' in flavor:
column_data = [table[i].data for i in range(table.num_columns)]
return pa.Table.from_arrays(column_data, schema=new_schema)
else:
return table
_parquet_writer_arg_docs = """version : {"1.0", "2.0"}, default "1.0"
The Parquet format version, defaults to 1.0
use_dictionary : bool or list
Specify if we should use dictionary encoding in general or only for
some columns.
use_deprecated_int96_timestamps : boolean, default None
Write nanosecond resolution timestamps to INT96 Parquet
format. Defaults to False unless enabled by flavor argument
coerce_timestamps : string, default None
Cast timestamps a particular resolution.
Valid values: {None, 'ms', 'us'}
compression : str or dict
Specify the compression codec, either on a general basis or per-column.
flavor : {'spark'}, default None
Sanitize schema or set other compatibility options for compatibility"""
class ParquetWriter(object):
__doc__ = """
Class for incrementally building a Parquet file for Arrow tables
Parameters
----------
where : path or file-like object
schema : arrow Schema
{0}
""".format(_parquet_writer_arg_docs)
def __init__(self, where, schema, flavor=None,
version='1.0',
use_dictionary=True,
compression='snappy',
use_deprecated_int96_timestamps=None, **options):
if use_deprecated_int96_timestamps is None:
# Use int96 timestamps for Spark
if flavor is not None and 'spark' in flavor:
use_deprecated_int96_timestamps = True
else:
use_deprecated_int96_timestamps = False
self.flavor = flavor
if flavor is not None:
schema, self.schema_changed = _sanitize_schema(schema, flavor)
else:
self.schema_changed = False
self.schema = schema
self.where = where
# If we open a file using an implied filesystem, so it can be assured
# to be closed
self.file_handle = None
if is_path(where):
fs = _get_fs_from_path(where)
sink = self.file_handle = fs.open(where, 'wb')
else:
sink = where
self.writer = _parquet.ParquetWriter(
sink, schema,
version=version,
compression=compression,
use_dictionary=use_dictionary,
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
**options)
self.is_open = True
def __del__(self):
if getattr(self, 'is_open', False):
self.close()
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
# return false since we want to propagate exceptions
return False
def write_table(self, table, row_group_size=None):
if self.schema_changed:
table = _sanitize_table(table, self.schema, self.flavor)
assert self.is_open
self.writer.write_table(table, row_group_size=row_group_size)
def close(self):
if self.is_open:
self.writer.close()
self.is_open = False
if self.file_handle is not None:
self.file_handle.close()
def _get_pandas_index_columns(keyvalues):
return (json.loads(keyvalues[b'pandas'].decode('utf8'))
['index_columns'])
# ----------------------------------------------------------------------
# Metadata container providing instructions about reading a single Parquet
# file, possibly part of a partitioned dataset
class ParquetDatasetPiece(object):
"""
A single chunk of a potentially larger Parquet dataset to read. The
arguments will indicate to read either a single row group or all row
groups, and whether to add partition keys to the resulting pyarrow.Table
Parameters
----------
path : str
Path to file in the file system where this piece is located
partition_keys : list of tuples
[(column name, ordinal index)]
row_group : int, default None
Row group to load. By default, reads all row groups
"""
def __init__(self, path, row_group=None, partition_keys=None):
self.path = path
self.row_group = row_group
self.partition_keys = partition_keys or []
def __eq__(self, other):
if not isinstance(other, ParquetDatasetPiece):
return False
return (self.path == other.path and
self.row_group == other.row_group and
self.partition_keys == other.partition_keys)
def __ne__(self, other):
return not (self == other)
def __repr__(self):
return ('{0}({1!r}, row_group={2!r}, partition_keys={3!r})'
.format(type(self).__name__, self.path,
self.row_group,
self.partition_keys))
def __str__(self):
result = ''
if len(self.partition_keys) > 0:
partition_str = ', '.join('{0}={1}'.format(name, index)
for name, index in self.partition_keys)
result += 'partition[{0}] '.format(partition_str)
result += self.path
if self.row_group is not None:
result += ' | row_group={0}'.format(self.row_group)
return result
def get_metadata(self, open_file_func=None):
"""
Given a function that can create an open ParquetFile object, return the
file's metadata
"""
return self._open(open_file_func).metadata
def _open(self, open_file_func=None):
"""
Returns instance of ParquetFile
"""
reader = open_file_func(self.path)
if not isinstance(reader, ParquetFile):
reader = ParquetFile(reader)
return reader
def read(self, columns=None, nthreads=1, partitions=None,
open_file_func=None, file=None, use_pandas_metadata=False):
"""
Read this piece as a pyarrow.Table
Parameters
----------
columns : list of column names, default None
nthreads : int, default 1
For multithreaded file reads
partitions : ParquetPartitions, default None
open_file_func : function, default None
A function that knows how to construct a ParquetFile object given
the file path in this piece
file : file-like object
passed to ParquetFile
Returns
-------
table : pyarrow.Table
"""
if open_file_func is not None:
reader = self._open(open_file_func)
elif file is not None:
reader = ParquetFile(file)
else:
# try to read the local path
reader = ParquetFile(self.path)
options = dict(columns=columns,
nthreads=nthreads,
use_pandas_metadata=use_pandas_metadata)
if self.row_group is not None:
table = reader.read_row_group(self.row_group, **options)
else:
table = reader.read(**options)
if len(self.partition_keys) > 0:
if partitions is None:
raise ValueError('Must pass partition sets')
# Here, the index is the categorical code of the partition where
# this piece is located. Suppose we had
#
# /foo=a/0.parq
# /foo=b/0.parq
# /foo=c/0.parq
#
# Then we assign a=0, b=1, c=2. And the resulting Table pieces will
# have a DictionaryArray column named foo having the constant index
# value as indicated. The distinct categories of the partition have
# been computed in the ParquetManifest
for i, (name, index) in enumerate(self.partition_keys):
# The partition code is the same for all values in this piece
indices = np.array([index], dtype='i4').repeat(len(table))
# This is set of all partition values, computed as part of the
# manifest, so ['a', 'b', 'c'] as in our example above.
dictionary = partitions.levels[i].dictionary
arr = lib.DictionaryArray.from_arrays(indices, dictionary)
col = lib.Column.from_array(name, arr)
table = table.append_column(col)
return table
class PartitionSet(object):
"""A data structure for cataloguing the observed Parquet partitions at a
particular level. So if we have
/foo=a/bar=0
/foo=a/bar=1
/foo=a/bar=2
/foo=b/bar=0
/foo=b/bar=1
/foo=b/bar=2
Then we have two partition sets, one for foo, another for bar. As we visit
levels of the partition hierarchy, a PartitionSet tracks the distinct
values and assigns categorical codes to use when reading the pieces
"""
def __init__(self, name, keys=None):
self.name = name
self.keys = keys or []
self.key_indices = {k: i for i, k in enumerate(self.keys)}
self._dictionary = None
def get_index(self, key):
"""
Get the index of the partition value if it is known, otherwise assign
one
"""
if key in self.key_indices:
return self.key_indices[key]
else:
index = len(self.key_indices)
self.keys.append(key)
self.key_indices[key] = index
return index
@property
def dictionary(self):
if self._dictionary is not None:
return self._dictionary
if len(self.keys) == 0:
raise ValueError('No known partition keys')
# Only integer and string partition types are supported right now
try:
integer_keys = [int(x) for x in self.keys]
dictionary = lib.array(integer_keys)
except ValueError:
dictionary = lib.array(self.keys)
self._dictionary = dictionary
return dictionary
@property
def is_sorted(self):
return list(self.keys) == sorted(self.keys)
class ParquetPartitions(object):
def __init__(self):
self.levels = []
self.partition_names = set()
def __len__(self):
return len(self.levels)
def __getitem__(self, i):
return self.levels[i]
def get_index(self, level, name, key):
"""
Record a partition value at a particular level, returning the distinct
code for that value at that level. Example:
partitions.get_index(1, 'foo', 'a') returns 0
partitions.get_index(1, 'foo', 'b') returns 1
partitions.get_index(1, 'foo', 'c') returns 2
partitions.get_index(1, 'foo', 'a') returns 0
Parameters
----------
level : int
The nesting level of the partition we are observing
name : string
The partition name
key : string or int
The partition value
"""
if level == len(self.levels):
if name in self.partition_names:
raise ValueError('{0} was the name of the partition in '
'another level'.format(name))
part_set = PartitionSet(name)
self.levels.append(part_set)
self.partition_names.add(name)
return self.levels[level].get_index(key)
def is_path(x):
return (isinstance(x, six.string_types)
or (_has_pathlib and isinstance(x, pathlib.Path)))
class ParquetManifest(object):
"""
"""
def __init__(self, dirpath, filesystem=None, pathsep='/',
partition_scheme='hive'):
self.filesystem = filesystem or _get_fs_from_path(dirpath)
self.pathsep = pathsep
self.dirpath = dirpath
self.partition_scheme = partition_scheme
self.partitions = ParquetPartitions()
self.pieces = []
self.common_metadata_path = None
self.metadata_path = None
self._visit_level(0, self.dirpath, [])
if self.common_metadata_path is None:
# _common_metadata is a subset of _metadata
self.common_metadata_path = self.metadata_path
def _visit_level(self, level, base_path, part_keys):
fs = self.filesystem
_, directories, files = next(fs.walk(base_path))
filtered_files = []
for path in files:
full_path = self.pathsep.join((base_path, path))
if path.endswith('_common_metadata'):
self.common_metadata_path = full_path
elif path.endswith('_metadata'):
self.metadata_path = full_path
elif self._should_silently_exclude(path):
continue
else:
filtered_files.append(full_path)
# ARROW-1079: Filter out "private" directories starting with underscore
filtered_directories = [self.pathsep.join((base_path, x))
for x in directories
if not _is_private_directory(x)]
filtered_files.sort()
filtered_directories.sort()
if len(filtered_files) > 0 and len(filtered_directories) > 0:
raise ValueError('Found files in an intermediate '
'directory: {0}'.format(base_path))
elif len(filtered_directories) > 0:
self._visit_directories(level, filtered_directories, part_keys)
else:
self._push_pieces(filtered_files, part_keys)
def _should_silently_exclude(self, file_name):
return (file_name.endswith('.crc') or
file_name in EXCLUDED_PARQUET_PATHS)
def _visit_directories(self, level, directories, part_keys):
for path in directories:
head, tail = _path_split(path, self.pathsep)
name, key = _parse_hive_partition(tail)
index = self.partitions.get_index(level, name, key)
dir_part_keys = part_keys + [(name, index)]
self._visit_level(level + 1, path, dir_part_keys)
def _parse_partition(self, dirname):
if self.partition_scheme == 'hive':
return _parse_hive_partition(dirname)
else:
raise NotImplementedError('partition schema: {0}'
.format(self.partition_scheme))
def _push_pieces(self, files, part_keys):
self.pieces.extend([
ParquetDatasetPiece(path, partition_keys=part_keys)
for path in files
])
def _parse_hive_partition(value):
if '=' not in value:
raise ValueError('Directory name did not appear to be a '
'partition: {0}'.format(value))
return value.split('=', 1)
def _is_private_directory(x):
_, tail = os.path.split(x)
return tail.startswith('_') and '=' not in tail
def _path_split(path, sep):
i = path.rfind(sep) + 1
head, tail = path[:i], path[i:]
head = head.rstrip(sep)
return head, tail
EXCLUDED_PARQUET_PATHS = {'_SUCCESS'}
class ParquetDataset(object):
"""
Encapsulates details of reading a complete Parquet dataset possibly
consisting of multiple files and partitions in subdirectories
Parameters
----------
path_or_paths : str or List[str]
A directory name, single file name, or list of file names
filesystem : FileSystem, default None
If nothing passed, paths assumed to be found in the local on-disk
filesystem
metadata : pyarrow.parquet.FileMetaData
Use metadata obtained elsewhere to validate file schemas
schema : pyarrow.parquet.Schema
Use schema obtained elsewhere to validate file schemas. Alternative to
metadata parameter
split_row_groups : boolean, default False
Divide files into pieces for each row group in the file
validate_schema : boolean, default True
Check that individual file schemas are all the same / compatible
"""
def __init__(self, path_or_paths, filesystem=None, schema=None,
metadata=None, split_row_groups=False, validate_schema=True):
if filesystem is None:
a_path = path_or_paths
if isinstance(a_path, list):
a_path = a_path[0]
self.fs = _get_fs_from_path(a_path)
else:
self.fs = _ensure_filesystem(filesystem)
self.paths = path_or_paths
(self.pieces, self.partitions,
self.common_metadata_path) = _make_manifest(path_or_paths, self.fs)
if self.common_metadata_path is not None:
with self.fs.open(self.common_metadata_path) as f:
self.common_metadata = ParquetFile(f).metadata
else:
self.common_metadata = None
self.metadata = metadata
self.schema = schema
self.split_row_groups = split_row_groups
if split_row_groups:
raise NotImplementedError("split_row_groups not yet implemented")
if validate_schema:
self.validate_schemas()
def validate_schemas(self):
open_file = self._get_open_file_func()
if self.metadata is None and self.schema is None:
if self.common_metadata_path is not None:
self.schema = open_file(self.common_metadata_path).schema
else:
self.schema = self.pieces[0].get_metadata(open_file).schema
elif self.schema is None:
self.schema = self.metadata.schema
# Verify schemas are all compatible
dataset_schema = self.schema.to_arrow_schema()
# Exclude the partition columns from the schema, they are provided
# by the path, not the DatasetPiece
if self.partitions is not None:
for partition_name in self.partitions.partition_names:
if dataset_schema.get_field_index(partition_name) != -1:
field_idx = dataset_schema.get_field_index(partition_name)
dataset_schema = dataset_schema.remove(field_idx)
for piece in self.pieces:
file_metadata = piece.get_metadata(open_file)
file_schema = file_metadata.schema.to_arrow_schema()
if not dataset_schema.equals(file_schema):
raise ValueError('Schema in {0!s} was different. \n'
'{1!s}\n\nvs\n\n{2!s}'
.format(piece, file_schema,
dataset_schema))
def read(self, columns=None, nthreads=1, use_pandas_metadata=False):
"""
Read multiple Parquet files as a single pyarrow.Table
Parameters
----------
columns : List[str]
Names of columns to read from the file
nthreads : int, default 1
Number of columns to read in parallel. Requires that the underlying
file source is threadsafe
use_pandas_metadata : bool, default False
Passed through to each dataset piece
Returns
-------
pyarrow.Table
Content of the file as a table (of columns)
"""
open_file = self._get_open_file_func()
tables = []
for piece in self.pieces:
table = piece.read(columns=columns, nthreads=nthreads,
partitions=self.partitions,
open_file_func=open_file,
use_pandas_metadata=use_pandas_metadata)
tables.append(table)
all_data = lib.concat_tables(tables)
if use_pandas_metadata:
# We need to ensure that this metadata is set in the Table's schema
# so that Table.to_pandas will construct pandas.DataFrame with the
# right index
common_metadata = self._get_common_pandas_metadata()
current_metadata = all_data.schema.metadata or {}
if common_metadata and b'pandas' not in current_metadata:
all_data = all_data.replace_schema_metadata({
b'pandas': common_metadata})
return all_data
def read_pandas(self, **kwargs):
"""
Read dataset including pandas metadata, if any. Other arguments passed
through to ParquetDataset.read, see docstring for further details
Returns
-------
pyarrow.Table
Content of the file as a table (of columns)
"""
return self.read(use_pandas_metadata=True, **kwargs)
def _get_common_pandas_metadata(self):
if self.common_metadata is None:
return None
keyvalues = self.common_metadata.metadata
return keyvalues.get(b'pandas', None)
def _get_open_file_func(self):
if self.fs is None or isinstance(self.fs, LocalFileSystem):
def open_file(path, meta=None):
return ParquetFile(path, metadata=meta,
common_metadata=self.common_metadata)
else:
def open_file(path, meta=None):
return ParquetFile(self.fs.open(path, mode='rb'),
metadata=meta,
common_metadata=self.common_metadata)
return open_file
def _ensure_filesystem(fs):
fs_type = type(fs)
# If the arrow filesystem was subclassed, assume it supports the full
# interface and return it
if not issubclass(fs_type, FileSystem):
for mro in inspect.getmro(fs_type):
if mro.__name__ is 'S3FileSystem':
return S3FSWrapper(fs)
# In case its a simple LocalFileSystem (e.g. dask) use native arrow
# FS
elif mro.__name__ is 'LocalFileSystem':
return LocalFileSystem.get_instance()
raise IOError('Unrecognized filesystem: {0}'.format(fs_type))
else:
return fs
def _make_manifest(path_or_paths, fs, pathsep='/'):
partitions = None
common_metadata_path = None
if len(path_or_paths) == 1:
# Dask passes a directory as a list of length 1
path_or_paths = path_or_paths[0]
if is_path(path_or_paths) and fs.isdir(path_or_paths):
manifest = ParquetManifest(path_or_paths, filesystem=fs,
pathsep=fs.pathsep)
common_metadata_path = manifest.common_metadata_path
pieces = manifest.pieces
partitions = manifest.partitions
else:
if not isinstance(path_or_paths, list):
path_or_paths = [path_or_paths]
# List of paths
if len(path_or_paths) == 0:
raise ValueError('Must pass at least one file path')
pieces = []
for path in path_or_paths:
if not fs.isfile(path):
raise IOError('Passed non-file path: {0}'
.format(path))
piece = ParquetDatasetPiece(path)
pieces.append(piece)
return pieces, partitions, common_metadata_path
_read_table_docstring = """
{0}
Parameters
----------
source: str, pyarrow.NativeFile, or file-like object
If a string passed, can be a single file name or directory name. For
file-like objects, only read a single file. Use pyarrow.BufferReader to
read a file contained in a bytes or buffer-like object
columns: list
If not None, only these columns will be read from the file. A column
name may be a prefix of a nested field, e.g. 'a' will select 'a.b',
'a.c', and 'a.d.e'
nthreads : int, default 1
Number of columns to read in parallel. Requires that the underlying
file source is threadsafe
metadata : FileMetaData
If separately computed
{1}
Returns
-------
{2}
"""
def read_table(source, columns=None, nthreads=1, metadata=None,
use_pandas_metadata=False):
if is_path(source):
fs = _get_fs_from_path(source)
if fs.isdir(source):
return fs.read_parquet(source, columns=columns, metadata=metadata)
pf = ParquetFile(source, metadata=metadata)
return pf.read(columns=columns, nthreads=nthreads,
use_pandas_metadata=use_pandas_metadata)
read_table.__doc__ = _read_table_docstring.format(
'Read a Table from Parquet format',
"""use_pandas_metadata : boolean, default False
If True and file has custom pandas schema metadata, ensure that
index columns are also loaded""",
"""pyarrow.Table
Content of the file as a table (of columns)""")
def read_pandas(source, columns=None, nthreads=1, metadata=None):
return read_table(source, columns=columns, nthreads=nthreads,
metadata=metadata, use_pandas_metadata=True)
read_pandas.__doc__ = _read_table_docstring.format(
'Read a Table from Parquet format, also reading DataFrame\n'
'index values if known in the file metadata',
'',
"""pyarrow.Table
Content of the file as a Table of Columns, including DataFrame
indexes as columns""")
def write_table(table, where, row_group_size=None, version='1.0',
use_dictionary=True, compression='snappy',
use_deprecated_int96_timestamps=None,
coerce_timestamps=None,
flavor=None, **kwargs):
row_group_size = kwargs.pop('chunk_size', row_group_size)
use_int96 = use_deprecated_int96_timestamps
try:
with ParquetWriter(
where, table.schema,
version=version,
flavor=flavor,
use_dictionary=use_dictionary,
coerce_timestamps=coerce_timestamps,
compression=compression,
use_deprecated_int96_timestamps=use_int96,
**kwargs) as writer:
writer.write_table(table, row_group_size=row_group_size)
except Exception:
if is_path(where):
try:
os.remove(where)
except os.error:
pass
raise
write_table.__doc__ = """
Write a Table to Parquet format
Parameters
----------
table : pyarrow.Table
where: string or pyarrow.NativeFile
{0}
""".format(_parquet_writer_arg_docs)
def _mkdir_if_not_exists(fs, path):
if fs._isfilestore() and not fs.exists(path):
try:
fs.mkdir(path)
except OSError:
assert fs.exists(path)
def write_to_dataset(table, root_path, partition_cols=None,
filesystem=None, preserve_index=True, **kwargs):
"""
Wrapper around parquet.write_table for writing a Table to
Parquet format by partitions.
For each combination of partition columns and values,
a subdirectories are created in the following
manner:
root_dir/
group1=value1
group2=value1
<uuid>.parquet
group2=value2
<uuid>.parquet
group1=valueN
group2=value1
<uuid>.parquet
group2=valueN
<uuid>.parquet
Parameters
----------
table : pyarrow.Table
root_path : string,
The root directory of the dataset
filesystem : FileSystem, default None
If nothing passed, paths assumed to be found in the local on-disk
filesystem
partition_cols : list,
Column names by which to partition the dataset
Columns are partitioned in the order they are given
preserve_index : bool,
Parameter for instantiating Table; preserve pandas index or not.
**kwargs : dict, kwargs for write_table function.
"""
from pyarrow import (
Table,
compat
)
if filesystem is None:
fs = _get_fs_from_path(root_path)
else:
fs = _ensure_filesystem(filesystem)
_mkdir_if_not_exists(fs, root_path)
if partition_cols is not None and len(partition_cols) > 0:
df = table.to_pandas()
partition_keys = [df[col] for col in partition_cols]
data_df = df.drop(partition_cols, axis='columns')
data_cols = df.columns.drop(partition_cols)
if len(data_cols) == 0:
raise ValueError("No data left to save outside partition columns")
for keys, subgroup in data_df.groupby(partition_keys):
if not isinstance(keys, tuple):
keys = (keys,)
subdir = "/".join(
["{colname}={value}".format(colname=name, value=val)
for name, val in zip(partition_cols, keys)])
subtable = Table.from_pandas(subgroup,
preserve_index=preserve_index)
prefix = "/".join([root_path, subdir])
_mkdir_if_not_exists(fs, prefix)
outfile = compat.guid() + ".parquet"
full_path = "/".join([prefix, outfile])
with fs.open(full_path, 'wb') as f:
write_table(subtable, f, **kwargs)
else:
outfile = compat.guid() + ".parquet"
full_path = "/".join([root_path, outfile])
with fs.open(full_path, 'wb') as f:
write_table(table, f, **kwargs)
def write_metadata(schema, where, version='1.0',
use_deprecated_int96_timestamps=False,
coerce_timestamps=None):
"""
Write metadata-only Parquet file from schema
Parameters
----------
schema : pyarrow.Schema
where: string or pyarrow.NativeFile
version : {"1.0", "2.0"}, default "1.0"
The Parquet format version, defaults to 1.0
use_deprecated_int96_timestamps : boolean, default False
Write nanosecond resolution timestamps to INT96 Parquet format
coerce_timestamps : string, default None
Cast timestamps a particular resolution.
Valid values: {None, 'ms', 'us'}
filesystem : FileSystem, default None
If nothing passed, paths assumed to be found in the local on-disk
filesystem
"""
writer = ParquetWriter(
where, schema, version=version,
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
coerce_timestamps=coerce_timestamps)
writer.close()
def read_metadata(where):
"""
Read FileMetadata from footer of a single Parquet file
Parameters
----------
where : string (filepath) or file-like object
Returns
-------
metadata : FileMetadata
"""
return ParquetFile(where).metadata
def read_schema(where):
"""
Read effective Arrow schema from Parquet file metadata
Parameters
----------
where : string (filepath) or file-like object
Returns
-------
schema : pyarrow.Schema
"""
return ParquetFile(where).schema.to_arrow_schema()
def _ensure_file(source):
if is_path(source):
fs = _get_fs_from_path(source)
try:
return fs.open(source)
except IOError as e:
raise lib.ArrowIOError("failed to open file {}, {}"
.format(source, e))
elif not hasattr(source, 'seek'):
raise ValueError('Source does not appear file-like')
else:
return source
def _get_fs_from_path(path):
"""
return filesystem from path which could be an HDFS URI
"""
# input can be hdfs URI such as hdfs://host:port/myfile.parquet
if _has_pathlib and isinstance(path, pathlib.Path):
path = str(path)
parsed_uri = urlparse(path)
if parsed_uri.scheme == 'hdfs':
netloc_split = parsed_uri.netloc.split(':')
host = netloc_split[0]
if host == '':
host = 'default'
port = 0
if len(netloc_split) == 2 and netloc_split[1].isnumeric():
port = int(netloc_split[1])
fs = pa.hdfs.connect(host=host, port=port)
else:
fs = LocalFileSystem.get_instance()
return fs