blob: 0a76ddbc6e5dfc8c17a31fc21a9b857c31c59d21 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
cdef class ChunkedArray(_PandasConvertible):
"""
Array backed via one or more memory chunks.
Warning
-------
Do not call this class's constructor directly.
"""
def __cinit__(self):
self.chunked_array = NULL
def __init__(self):
raise TypeError("Do not call ChunkedArray's constructor directly, use "
"`chunked_array` function instead.")
cdef void init(self, const shared_ptr[CChunkedArray]& chunked_array):
self.sp_chunked_array = chunked_array
self.chunked_array = chunked_array.get()
def __reduce__(self):
return chunked_array, (self.chunks, self.type)
@property
def type(self):
return pyarrow_wrap_data_type(self.sp_chunked_array.get().type())
def length(self):
return self.chunked_array.length()
def __len__(self):
return self.length()
def __repr__(self):
type_format = object.__repr__(self)
return '{0}\n{1}'.format(type_format, str(self))
def format(self, int indent=0, int window=10):
cdef:
c_string result
with nogil:
check_status(
PrettyPrint(
deref(self.chunked_array),
PrettyPrintOptions(indent, window),
&result
)
)
return frombytes(result)
def __str__(self):
return self.format()
@property
def null_count(self):
"""
Number of null entires
Returns
-------
int
"""
return self.chunked_array.null_count()
def __iter__(self):
for chunk in self.iterchunks():
for item in chunk:
yield item
def __getitem__(self, key):
if isinstance(key, slice):
return _normalize_slice(self, key)
elif isinstance(key, six.integer_types):
return self.getitem(key)
else:
raise TypeError("key must either be a slice or integer")
cdef getitem(self, int64_t i):
cdef int j
index = _normalize_index(i, self.chunked_array.length())
for j in range(self.num_chunks):
if index < self.chunked_array.chunk(j).get().length():
return self.chunk(j)[index]
else:
index -= self.chunked_array.chunk(j).get().length()
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, ChunkedArray other):
"""
Return whether the contents of two chunked arrays are equal
Parameters
----------
other : pyarrow.ChunkedArray
Returns
-------
are_equal : boolean
"""
cdef:
CChunkedArray* this_arr = self.chunked_array
CChunkedArray* other_arr = other.chunked_array
c_bool result
if other is None:
return False
with nogil:
result = this_arr.Equals(deref(other_arr))
return result
def _to_pandas(self, options, **kwargs):
cdef:
PyObject* out
PandasOptions c_options = options
with nogil:
check_status(libarrow.ConvertChunkedArrayToPandas(
c_options,
self.sp_chunked_array,
self, &out))
return wrap_array_output(out)
def __array__(self, dtype=None):
if dtype is None:
return self.to_pandas()
return self.to_pandas().astype(dtype)
def dictionary_encode(self):
"""
Compute dictionary-encoded representation of array
Returns
-------
pyarrow.ChunkedArray
Same chunking as the input, all chunks share a common dictionary.
"""
cdef CDatum out
with nogil:
check_status(
DictionaryEncode(_context(), CDatum(self.sp_chunked_array),
&out))
return wrap_datum(out)
def unique(self):
"""
Compute distinct elements in array
Returns
-------
pyarrow.Array
"""
cdef shared_ptr[CArray] result
with nogil:
check_status(
Unique(_context(), CDatum(self.sp_chunked_array), &result))
return pyarrow_wrap_array(result)
def slice(self, offset=0, length=None):
"""
Compute zero-copy slice of this ChunkedArray
Parameters
----------
offset : int, default 0
Offset from start of array to slice
length : int, default None
Length of slice (default is until end of batch starting from
offset)
Returns
-------
sliced : ChunkedArray
"""
cdef shared_ptr[CChunkedArray] result
if offset < 0:
raise IndexError('Offset must be non-negative')
if length is None:
result = self.chunked_array.Slice(offset)
else:
result = self.chunked_array.Slice(offset, length)
return pyarrow_wrap_chunked_array(result)
@property
def num_chunks(self):
"""
Number of underlying chunks
Returns
-------
int
"""
return self.chunked_array.num_chunks()
def chunk(self, i):
"""
Select a chunk by its index
Parameters
----------
i : int
Returns
-------
pyarrow.Array
"""
if i >= self.num_chunks or i < 0:
raise IndexError('Chunk index out of range.')
return pyarrow_wrap_array(self.chunked_array.chunk(i))
@property
def chunks(self):
return list(self.iterchunks())
def iterchunks(self):
for i in range(self.num_chunks):
yield self.chunk(i)
def to_pylist(self):
"""
Convert to a list of native Python objects.
"""
result = []
for i in range(self.num_chunks):
result += self.chunk(i).to_pylist()
return result
def chunked_array(arrays, type=None):
"""
Construct chunked array from list of array-like objects
Parameters
----------
arrays : list of Array or values coercible to arrays
Must all be the same data type. Can be empty only if type also
passed
type : DataType or string coercible to DataType
Returns
-------
ChunkedArray
"""
cdef:
Array arr
vector[shared_ptr[CArray]] c_arrays
shared_ptr[CChunkedArray] sp_chunked_array
shared_ptr[CDataType] sp_data_type
for x in arrays:
if isinstance(x, Array):
arr = x
if type is not None:
assert x.type == type
else:
arr = array(x, type=type)
c_arrays.push_back(arr.sp_array)
if type:
sp_data_type = pyarrow_unwrap_data_type(type)
sp_chunked_array.reset(new CChunkedArray(c_arrays, sp_data_type))
else:
if c_arrays.size() == 0:
raise ValueError("Cannot construct a chunked array with neither "
"arrays nor type")
sp_chunked_array.reset(new CChunkedArray(c_arrays))
with nogil:
check_status(sp_chunked_array.get().Validate())
return pyarrow_wrap_chunked_array(sp_chunked_array)
def column(object field_or_name, arr):
"""
Create Column object from field/string and array-like data
Parameters
----------
field_or_name : string or Field
arr : Array, list of Arrays, or ChunkedArray
Returns
-------
column : Column
"""
cdef:
Field boxed_field
Array _arr
ChunkedArray _carr
shared_ptr[CColumn] sp_column
if isinstance(arr, list):
arr = chunked_array(arr)
elif not isinstance(arr, (Array, ChunkedArray)):
arr = array(arr)
if isinstance(field_or_name, Field):
boxed_field = field_or_name
if arr.type != boxed_field.type:
raise ValueError('Passed field type does not match array')
else:
boxed_field = field(field_or_name, arr.type)
if isinstance(arr, Array):
_arr = arr
sp_column.reset(new CColumn(boxed_field.sp_field, _arr.sp_array))
elif isinstance(arr, ChunkedArray):
_carr = arr
sp_column.reset(new CColumn(boxed_field.sp_field,
_carr.sp_chunked_array))
else:
raise ValueError("Unsupported type for column(...): {}"
.format(type(arr)))
return pyarrow_wrap_column(sp_column)
cdef class Column(_PandasConvertible):
"""
Named vector of elements of equal type.
Warning
-------
Do not call this class's constructor directly.
"""
def __cinit__(self):
self.column = NULL
def __init__(self):
raise TypeError("Do not call Column's constructor directly, use one "
"of the `Column.from_*` functions instead.")
cdef void init(self, const shared_ptr[CColumn]& column):
self.sp_column = column
self.column = column.get()
def __reduce__(self):
return column, (self.field, self.data)
def __repr__(self):
from pyarrow.compat import StringIO
result = StringIO()
result.write('<Column name={0!r} type={1!r}>'
.format(self.name, self.type))
result.write('\n{}'.format(str(self.data)))
return result.getvalue()
def __getitem__(self, key):
return self.data[key]
@staticmethod
def from_array(*args):
return column(*args)
def cast(self, object target_type, bint safe=True):
"""
Cast column values to another data type
Parameters
----------
target_type : DataType
Type to cast to
safe : boolean, default True
Check for overflows or other unsafe conversions
Returns
-------
casted : Column
"""
cdef:
CCastOptions options = CCastOptions(safe)
DataType type = ensure_type(target_type)
shared_ptr[CArray] result
CDatum out
with nogil:
check_status(Cast(_context(), CDatum(self.column.data()),
type.sp_type, options, &out))
casted_data = pyarrow_wrap_chunked_array(out.chunked_array())
return column(self.name, casted_data)
def dictionary_encode(self):
"""
Compute dictionary-encoded representation of array
Returns
-------
pyarrow.Column
Same chunking as the input, all chunks share a common dictionary.
"""
ca = self.data.dictionary_encode()
return column(self.name, ca)
def unique(self):
"""
Compute distinct elements in array
Returns
-------
pyarrow.Array
"""
return self.data.unique()
def flatten(self, MemoryPool memory_pool=None):
"""
Flatten this Column. If it has a struct type, the column is
flattened into one column per struct field.
Parameters
----------
memory_pool : MemoryPool, default None
For memory allocations, if required, otherwise use default pool
Returns
-------
result : List[Column]
"""
cdef:
vector[shared_ptr[CColumn]] flattened
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
with nogil:
check_status(self.column.Flatten(pool, &flattened))
return [pyarrow_wrap_column(col) for col in flattened]
def _to_pandas(self, options, **kwargs):
values = self.data._to_pandas(options)
result = pandas_api.make_series(values, name=self.name)
if isinstance(self.type, TimestampType):
tz = self.type.tz
if tz is not None:
tz = string_to_tzinfo(tz)
result = (result.dt.tz_localize('utc')
.dt.tz_convert(tz))
return result
def __array__(self, dtype=None):
return self.data.__array__(dtype=dtype)
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, Column other):
"""
Check if contents of two columns are equal
Parameters
----------
other : pyarrow.Column
Returns
-------
are_equal : boolean
"""
cdef:
CColumn* this_col = self.column
CColumn* other_col = other.column
c_bool result
if other is None:
return False
with nogil:
result = this_col.Equals(deref(other_col))
return result
def to_pylist(self):
"""
Convert to a list of native Python objects.
"""
return self.data.to_pylist()
def __len__(self):
return self.length()
def length(self):
return self.column.length()
@property
def field(self):
return pyarrow_wrap_field(self.column.field())
@property
def shape(self):
"""
Dimensions of this columns
Returns
-------
(int,)
"""
return (self.length(),)
@property
def null_count(self):
"""
Number of null entires
Returns
-------
int
"""
return self.column.null_count()
@property
def name(self):
"""
Label of the column
Returns
-------
str
"""
return bytes(self.column.name()).decode('utf8')
@property
def type(self):
"""
Type information for this column
Returns
-------
pyarrow.DataType
"""
return pyarrow_wrap_data_type(self.column.type())
@property
def data(self):
"""
The underlying data
Returns
-------
pyarrow.ChunkedArray
"""
return pyarrow_wrap_chunked_array(self.column.data())
cdef _schema_from_arrays(arrays, names, metadata, shared_ptr[CSchema]* schema):
cdef:
Py_ssize_t K = len(arrays)
c_string c_name
CColumn* c_column
shared_ptr[CDataType] c_type
shared_ptr[CKeyValueMetadata] c_meta
vector[shared_ptr[CField]] c_fields
if metadata is not None:
if not isinstance(metadata, dict):
raise TypeError('Metadata must be an instance of dict')
c_meta = pyarrow_unwrap_metadata(metadata)
if K == 0:
schema.reset(new CSchema(c_fields, c_meta))
return
c_fields.resize(K)
if isinstance(arrays[0], Column):
for i in range(K):
c_column = (<Column>arrays[i]).column
c_fields[i] = c_column.field()
else:
if names is None:
raise ValueError('Must pass names when constructing '
'from Array objects')
if len(names) != K:
raise ValueError('Length of names ({}) does not match '
'length of arrays ({})'.format(len(names), K))
for i in range(K):
val = arrays[i]
if isinstance(val, (Array, ChunkedArray)):
c_type = (<DataType> val.type).sp_type
else:
raise TypeError(type(val))
if names[i] is None:
c_name = tobytes(u'None')
else:
c_name = tobytes(names[i])
c_fields[i].reset(new CField(c_name, c_type, True))
schema.reset(new CSchema(c_fields, c_meta))
cdef class RecordBatch(_PandasConvertible):
"""
Batch of rows of columns of equal length
Warning
-------
Do not call this class's constructor directly, use one of the
``RecordBatch.from_*`` functions instead.
"""
def __cinit__(self):
self.batch = NULL
self._schema = None
def __init__(self):
raise TypeError("Do not call RecordBatch's constructor directly, use "
"one of the `RecordBatch.from_*` functions instead.")
cdef void init(self, const shared_ptr[CRecordBatch]& batch):
self.sp_batch = batch
self.batch = batch.get()
def __reduce__(self):
return _reconstruct_record_batch, (self.columns, self.schema)
def __len__(self):
return self.batch.num_rows()
def replace_schema_metadata(self, metadata=None):
"""
EXPERIMENTAL: Create shallow copy of record batch by replacing schema
key-value metadata with the indicated new metadata (which may be None,
which deletes any existing metadata
Parameters
----------
metadata : dict, default None
Returns
-------
shallow_copy : RecordBatch
"""
cdef:
shared_ptr[CKeyValueMetadata] c_meta
shared_ptr[CRecordBatch] c_batch
if metadata is not None:
if not isinstance(metadata, dict):
raise TypeError('Metadata must be an instance of dict')
c_meta = pyarrow_unwrap_metadata(metadata)
with nogil:
c_batch = self.batch.ReplaceSchemaMetadata(c_meta)
return pyarrow_wrap_batch(c_batch)
@property
def num_columns(self):
"""
Number of columns
Returns
-------
int
"""
return self.batch.num_columns()
@property
def num_rows(self):
"""
Number of rows
Due to the definition of a RecordBatch, all columns have the same
number of rows.
Returns
-------
int
"""
return len(self)
@property
def schema(self):
"""
Schema of the RecordBatch and its columns
Returns
-------
pyarrow.Schema
"""
if self._schema is None:
self._schema = pyarrow_wrap_schema(self.batch.schema())
return self._schema
@property
def columns(self):
"""
List of all columns in numerical order
Returns
-------
list of pa.Column
"""
return [self.column(i) for i in range(self.num_columns)]
def column(self, i):
"""
Select single column from record batch
Returns
-------
column : pyarrow.Array
"""
if not -self.num_columns <= i < self.num_columns:
raise IndexError(
'Record batch column index {:d} is out of range'.format(i)
)
return pyarrow_wrap_array(self.batch.column(i))
def __getitem__(self, key):
if isinstance(key, slice):
return _normalize_slice(self, key)
else:
return self.column(_normalize_index(key, self.num_columns))
def serialize(self, memory_pool=None):
"""
Write RecordBatch to Buffer as encapsulated IPC message
Parameters
----------
memory_pool : MemoryPool, default None
Uses default memory pool if not specified
Returns
-------
serialized : Buffer
"""
cdef:
shared_ptr[CBuffer] buffer
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
with nogil:
check_status(SerializeRecordBatch(deref(self.batch),
pool, &buffer))
return pyarrow_wrap_buffer(buffer)
def slice(self, offset=0, length=None):
"""
Compute zero-copy slice of this RecordBatch
Parameters
----------
offset : int, default 0
Offset from start of array to slice
length : int, default None
Length of slice (default is until end of batch starting from
offset)
Returns
-------
sliced : RecordBatch
"""
cdef shared_ptr[CRecordBatch] result
if offset < 0:
raise IndexError('Offset must be non-negative')
if length is None:
result = self.batch.Slice(offset)
else:
result = self.batch.Slice(offset, length)
return pyarrow_wrap_batch(result)
def equals(self, RecordBatch other):
cdef:
CRecordBatch* this_batch = self.batch
CRecordBatch* other_batch = other.batch
c_bool result
with nogil:
result = this_batch.Equals(deref(other_batch))
return result
def to_pydict(self):
"""
Convert the RecordBatch to a dict or OrderedDict.
Returns
-------
dict
"""
entries = []
for i in range(self.batch.num_columns()):
name = bytes(self.batch.column_name(i)).decode('utf8')
column = self[i].to_pylist()
entries.append((name, column))
return ordered_dict(entries)
def _to_pandas(self, options, **kwargs):
return Table.from_batches([self])._to_pandas(options, **kwargs)
@classmethod
def from_pandas(cls, df, Schema schema=None, preserve_index=None,
nthreads=None, columns=None):
"""
Convert pandas.DataFrame to an Arrow RecordBatch
Parameters
----------
df: pandas.DataFrame
schema: pyarrow.Schema, optional
The expected schema of the RecordBatch. This can be used to
indicate the type of columns if we cannot infer it automatically.
preserve_index : bool, optional
Whether to store the index as an additional column in the resulting
``RecordBatch``. The default of None will store the index as a
column, except for RangeIndex which is stored as metadata only. Use
``preserve_index=True`` to force it to be stored as a column.
nthreads : int, default None (may use up to system CPU count threads)
If greater than 1, convert columns to Arrow in parallel using
indicated number of threads
columns : list, optional
List of column to be converted. If None, use all columns.
Returns
-------
pyarrow.RecordBatch
"""
from pyarrow.pandas_compat import dataframe_to_arrays
arrays, schema = dataframe_to_arrays(
df, schema, preserve_index, nthreads=nthreads, columns=columns
)
return cls.from_arrays(arrays, schema)
@staticmethod
def from_arrays(list arrays, names, metadata=None):
"""
Construct a RecordBatch from multiple pyarrow.Arrays
Parameters
----------
arrays: list of pyarrow.Array
column-wise data vectors
names: pyarrow.Schema or list of str
schema or list of labels for the columns
Returns
-------
pyarrow.RecordBatch
"""
cdef:
Array arr
c_string c_name
shared_ptr[CSchema] c_schema
vector[shared_ptr[CArray]] c_arrays
int64_t num_rows
int64_t i
int64_t number_of_arrays = len(arrays)
if len(arrays) > 0:
num_rows = len(arrays[0])
else:
num_rows = 0
if isinstance(names, Schema):
c_schema = (<Schema> names).sp_schema
else:
_schema_from_arrays(arrays, names, metadata, &c_schema)
c_arrays.reserve(len(arrays))
for arr in arrays:
if len(arr) != num_rows:
raise ValueError('Arrays were not all the same length: '
'{0} vs {1}'.format(len(arr), num_rows))
c_arrays.push_back(arr.sp_array)
return pyarrow_wrap_batch(
CRecordBatch.Make(c_schema, num_rows, c_arrays))
def _reconstruct_record_batch(columns, schema):
"""
Internal: reconstruct RecordBatch from pickled components.
"""
return RecordBatch.from_arrays(columns, schema)
def table_to_blocks(PandasOptions options, Table table,
MemoryPool memory_pool, categories):
cdef:
PyObject* result_obj
shared_ptr[CTable] c_table = table.sp_table
CMemoryPool* pool
unordered_set[c_string] categorical_columns
if categories is not None:
categorical_columns = {tobytes(cat) for cat in categories}
pool = maybe_unbox_memory_pool(memory_pool)
with nogil:
check_status(
libarrow.ConvertTableToPandas(
options, categorical_columns, c_table, pool, &result_obj)
)
return PyObject_to_object(result_obj)
cdef class Table(_PandasConvertible):
"""
A collection of top-level named, equal length Arrow arrays.
Warning
-------
Do not call this class's constructor directly, use one of the ``from_*``
methods instead.
"""
def __cinit__(self):
self.table = NULL
def __init__(self):
raise TypeError("Do not call Table's constructor directly, use one of "
"the `Table.from_*` functions instead.")
def __repr__(self):
return 'pyarrow.{}\n{}'.format(type(self).__name__, str(self.schema))
cdef void init(self, const shared_ptr[CTable]& table):
self.sp_table = table
self.table = table.get()
def _validate(self):
"""
Validate table consistency.
"""
with nogil:
check_status(self.table.Validate())
def __reduce__(self):
# Reduce the columns as ChunkedArrays to avoid serializing schema
# data twice
columns = [col.data for col in self.columns]
return _reconstruct_table, (columns, self.schema)
def replace_schema_metadata(self, metadata=None):
"""
EXPERIMENTAL: Create shallow copy of table by replacing schema
key-value metadata with the indicated new metadata (which may be None,
which deletes any existing metadata
Parameters
----------
metadata : dict, default None
Returns
-------
shallow_copy : Table
"""
cdef:
shared_ptr[CKeyValueMetadata] c_meta
shared_ptr[CTable] c_table
if metadata is not None:
if not isinstance(metadata, dict):
raise TypeError('Metadata must be an instance of dict')
c_meta = pyarrow_unwrap_metadata(metadata)
with nogil:
c_table = self.table.ReplaceSchemaMetadata(c_meta)
return pyarrow_wrap_table(c_table)
def flatten(self, MemoryPool memory_pool=None):
"""
Flatten this Table. Each column with a struct type is flattened
into one column per struct field. Other columns are left unchanged.
Parameters
----------
memory_pool : MemoryPool, default None
For memory allocations, if required, otherwise use default pool
Returns
-------
result : Table
"""
cdef:
shared_ptr[CTable] flattened
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
with nogil:
check_status(self.table.Flatten(pool, &flattened))
return pyarrow_wrap_table(flattened)
def combine_chunks(self, MemoryPool memory_pool=None):
"""
Make a new table by combining the chunks this table has.
All the underlying chunks in the ChunkedArray of each column are
concatenated into zero or one chunk.
Parameters
----------
memory_pool : MemoryPool, default None
For memory allocations, if required, otherwise use default pool
Returns
-------
result : Table
"""
cdef:
shared_ptr[CTable] combined
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
with nogil:
check_status(self.table.CombineChunks(pool, &combined))
return pyarrow_wrap_table(combined)
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, Table other):
"""
Check if contents of two tables are equal
Parameters
----------
other : pyarrow.Table
Returns
-------
are_equal : boolean
"""
cdef:
CTable* this_table = self.table
CTable* other_table = other.table
c_bool result
if other is None:
return False
with nogil:
result = this_table.Equals(deref(other_table))
return result
def cast(self, Schema target_schema, bint safe=True):
"""
Cast table values to another schema
Parameters
----------
target_schema : Schema
Schema to cast to, the names and order of fields must match
safe : boolean, default True
Check for overflows or other unsafe conversions
Returns
-------
casted : Table
"""
cdef:
Column column, casted
Field field
list newcols = []
if self.schema.names != target_schema.names:
raise ValueError("Target schema's field names are not matching "
"the table's field names: {!r}, {!r}"
.format(self.schema.names, target_schema.names))
for column, field in zip(self.itercolumns(), target_schema):
casted = column.cast(field.type, safe=safe)
newcols.append(casted)
return Table.from_arrays(newcols, schema=target_schema)
@classmethod
def from_pandas(cls, df, Schema schema=None, preserve_index=None,
nthreads=None, columns=None, bint safe=True):
"""
Convert pandas.DataFrame to an Arrow Table.
The column types in the resulting Arrow Table are inferred from the
dtypes of the pandas.Series in the DataFrame. In the case of non-object
Series, the NumPy dtype is translated to its Arrow equivalent. In the
case of `object`, we need to guess the datatype by looking at the
Python objects in this Series.
Be aware that Series of the `object` dtype don't carry enough
information to always lead to a meaningful Arrow type. In the case that
we cannot infer a type, e.g. because the DataFrame is of length 0 or
the Series only contains None/nan objects, the type is set to
null. This behavior can be avoided by constructing an explicit schema
and passing it to this function.
Parameters
----------
df : pandas.DataFrame
schema : pyarrow.Schema, optional
The expected schema of the Arrow Table. This can be used to
indicate the type of columns if we cannot infer it automatically.
preserve_index : bool, optional
Whether to store the index as an additional column in the resulting
``Table``. The default of None will store the index as a column,
except for RangeIndex which is stored as metadata only. Use
``preserve_index=True`` to force it to be stored as a column.
nthreads : int, default None (may use up to system CPU count threads)
If greater than 1, convert columns to Arrow in parallel using
indicated number of threads
columns : list, optional
List of column to be converted. If None, use all columns.
safe : boolean, default True
Check for overflows or other unsafe conversions
Returns
-------
pyarrow.Table
Examples
--------
>>> import pandas as pd
>>> import pyarrow as pa
>>> df = pd.DataFrame({
... 'int': [1, 2],
... 'str': ['a', 'b']
... })
>>> pa.Table.from_pandas(df)
<pyarrow.lib.Table object at 0x7f05d1fb1b40>
"""
from pyarrow.pandas_compat import dataframe_to_arrays
arrays, schema = dataframe_to_arrays(
df,
schema=schema,
preserve_index=preserve_index,
nthreads=nthreads,
columns=columns,
safe=safe
)
return cls.from_arrays(arrays, schema=schema)
@staticmethod
def from_arrays(arrays, names=None, schema=None, metadata=None):
"""
Construct a Table from Arrow arrays or columns
Parameters
----------
arrays : list of pyarrow.Array or pyarrow.Column
Equal-length arrays that should form the table.
names : list of str, optional
Names for the table columns. If Columns passed, will be
inferred. If Arrays passed, this argument is required
schema : Schema, default None
If not passed, will be inferred from the arrays
metadata : dict or Mapping, default None
Optional metadata for the schema (if inferred).
Returns
-------
pyarrow.Table
"""
cdef:
vector[shared_ptr[CColumn]] columns
Schema cy_schema
shared_ptr[CSchema] c_schema
int i, K = <int> len(arrays)
if schema is None:
_schema_from_arrays(arrays, names, metadata, &c_schema)
elif schema is not None:
if names is not None:
raise ValueError('Cannot pass both schema and names')
if metadata is not None:
raise ValueError('Cannot pass both schema and metadata')
cy_schema = schema
if len(schema) != len(arrays):
raise ValueError('Schema and number of arrays unequal')
c_schema = cy_schema.sp_schema
columns.reserve(K)
for i in range(K):
if isinstance(arrays[i], Array):
columns.push_back(
make_shared[CColumn](
c_schema.get().field(i),
(<Array> arrays[i]).sp_array
)
)
elif isinstance(arrays[i], ChunkedArray):
columns.push_back(
make_shared[CColumn](
c_schema.get().field(i),
(<ChunkedArray> arrays[i]).sp_chunked_array
)
)
elif isinstance(arrays[i], Column):
# Make sure schema field and column are consistent
columns.push_back(
make_shared[CColumn](
c_schema.get().field(i),
(<Column> arrays[i]).sp_column.get().data()
)
)
else:
raise TypeError(type(arrays[i]))
return pyarrow_wrap_table(CTable.Make(c_schema, columns))
@staticmethod
def from_pydict(mapping, schema=None, metadata=None):
"""
Construct a Table from Arrow arrays or columns
Parameters
----------
mapping : dict or Mapping
A mapping of strings to Arrays or Python lists.
schema : Schema, default None
If not passed, will be inferred from the Mapping values
metadata : dict or Mapping, default None
Optional metadata for the schema (if inferred).
Returns
-------
pyarrow.Table
"""
names = []
arrays = []
for k, v in mapping.items():
names.append(k)
if not isinstance(v, (Array, ChunkedArray)):
v = array(v)
arrays.append(v)
if schema is None:
return Table.from_arrays(arrays, names, metadata=metadata)
else:
# Will raise if metadata is not None
return Table.from_arrays(arrays, schema=schema, metadata=metadata)
@staticmethod
def from_batches(batches, Schema schema=None):
"""
Construct a Table from a sequence or iterator of Arrow RecordBatches
Parameters
----------
batches : sequence or iterator of RecordBatch
Sequence of RecordBatch to be converted, all schemas must be equal
schema : Schema, default None
If not passed, will be inferred from the first RecordBatch
Returns
-------
table : Table
"""
cdef:
vector[shared_ptr[CRecordBatch]] c_batches
shared_ptr[CTable] c_table
shared_ptr[CSchema] c_schema
RecordBatch batch
for batch in batches:
c_batches.push_back(batch.sp_batch)
if schema is None:
if c_batches.size() == 0:
raise ValueError('Must pass schema, or at least '
'one RecordBatch')
c_schema = c_batches[0].get().schema()
else:
c_schema = schema.sp_schema
with nogil:
check_status(CTable.FromRecordBatches(c_schema, c_batches,
&c_table))
return pyarrow_wrap_table(c_table)
def to_batches(self, chunksize=None):
"""
Convert Table to list of (contiguous) RecordBatch objects, with optimal
maximum chunk size
Parameters
----------
chunksize : int, default None
Maximum size for RecordBatch chunks. Individual chunks may be
smaller depending on the chunk layout of individual columns
Returns
-------
batches : list of RecordBatch
"""
cdef:
unique_ptr[TableBatchReader] reader
int64_t c_chunksize
list result = []
shared_ptr[CRecordBatch] batch
reader.reset(new TableBatchReader(deref(self.table)))
if chunksize is not None:
c_chunksize = chunksize
reader.get().set_chunksize(c_chunksize)
while True:
with nogil:
check_status(reader.get().ReadNext(&batch))
if batch.get() == NULL:
break
result.append(pyarrow_wrap_batch(batch))
return result
def _to_pandas(self, options, categories=None, ignore_metadata=False):
from pyarrow.pandas_compat import table_to_blockmanager
mgr = table_to_blockmanager(
options, self, categories,
ignore_metadata=ignore_metadata)
return pandas_api.data_frame(mgr)
def to_pydict(self):
"""
Convert the Table to a dict or OrderedDict.
Returns
-------
dict
"""
cdef:
size_t i
size_t num_columns = self.table.num_columns()
list entries = []
Column column
for i in range(num_columns):
column = self.column(i)
entries.append((column.name, column.to_pylist()))
return ordered_dict(entries)
@property
def schema(self):
"""
Schema of the table and its columns
Returns
-------
pyarrow.Schema
"""
return pyarrow_wrap_schema(self.table.schema())
def column(self, i):
"""
Select a column by its column name, or numeric index.
Parameters
----------
i : int or string
Returns
-------
pyarrow.Column
"""
if isinstance(i, six.string_types):
field_index = self.schema.get_field_index(i)
if field_index < 0:
raise KeyError("Column {} does not exist in table".format(i))
else:
return self._column(field_index)
elif isinstance(i, six.integer_types):
return self._column(i)
else:
raise TypeError("Index must either be string or integer")
def _column(self, int i):
"""
Select a column by its numeric index.
Parameters
----------
i : int
Returns
-------
pyarrow.Column
"""
cdef:
int num_columns = self.num_columns
int index
if not -num_columns <= i < num_columns:
raise IndexError(
'Table column index {:d} is out of range'.format(i)
)
index = i if i >= 0 else num_columns + i
assert index >= 0
return pyarrow_wrap_column(self.table.column(index))
def __getitem__(self, key):
cdef int index = <int> _normalize_index(key, self.num_columns)
return self.column(index)
def itercolumns(self):
"""
Iterator over all columns in their numerical order
"""
for i in range(self.num_columns):
yield self.column(i)
@property
def columns(self):
"""
List of all columns in numerical order
Returns
-------
list of pa.Column
"""
return [self._column(i) for i in range(self.num_columns)]
@property
def num_columns(self):
"""
Number of columns in this table
Returns
-------
int
"""
return self.table.num_columns()
@property
def num_rows(self):
"""
Number of rows in this table.
Due to the definition of a table, all columns have the same number of
rows.
Returns
-------
int
"""
return self.table.num_rows()
def __len__(self):
return self.num_rows
@property
def shape(self):
"""
Dimensions of the table: (#rows, #columns)
Returns
-------
(int, int)
"""
return (self.num_rows, self.num_columns)
def add_column(self, int i, Column column):
"""
Add column to Table at position. Returns new table
"""
cdef shared_ptr[CTable] c_table
with nogil:
check_status(self.table.AddColumn(i, column.sp_column, &c_table))
return pyarrow_wrap_table(c_table)
def append_column(self, Column column):
"""
Append column at end of columns. Returns new table
"""
return self.add_column(self.num_columns, column)
def remove_column(self, int i):
"""
Create new Table with the indicated column removed
"""
cdef shared_ptr[CTable] c_table
with nogil:
check_status(self.table.RemoveColumn(i, &c_table))
return pyarrow_wrap_table(c_table)
def set_column(self, int i, Column column):
"""
Replace column in Table at position. Returns new table
"""
cdef shared_ptr[CTable] c_table
with nogil:
check_status(self.table.SetColumn(i, column.sp_column, &c_table))
return pyarrow_wrap_table(c_table)
@property
def column_names(self):
"""
Names of the table's columns
"""
names = self.table.ColumnNames()
return [frombytes(name) for name in names]
def rename_columns(self, names):
"""
Create new table with columns renamed to provided names
"""
cdef:
shared_ptr[CTable] c_table
vector[c_string] c_names
for name in names:
c_names.push_back(tobytes(name))
with nogil:
check_status(self.table.RenameColumns(c_names, &c_table))
return pyarrow_wrap_table(c_table)
def drop(self, columns):
"""
Drop one or more columns and return a new table.
columns: list of str
Returns pa.Table
"""
indices = []
for col in columns:
idx = self.schema.get_field_index(col)
if idx == -1:
raise KeyError("Column {!r} not found".format(col))
indices.append(idx)
indices.sort()
indices.reverse()
table = self
for idx in indices:
table = table.remove_column(idx)
return table
def _reconstruct_table(arrays, schema):
"""
Internal: reconstruct pa.Table from pickled components.
"""
return Table.from_arrays(arrays, schema=schema)
def table(data, schema=None):
"""
Create a pyarrow.Table from a Python object (table like objects such as
DataFrame, dictionary).
Parameters
----------
data : pandas.DataFrame, dict
A DataFrame or a mapping of strings to Arrays or Python lists.
schema : Schema, default None
The expected schema of the Arrow Table. If not passed, will be
inferred from the data.
Returns
-------
Table
See Also
--------
Table.from_pandas, Table.from_pydict
"""
if isinstance(data, dict):
return Table.from_pydict(data, schema=schema)
elif isinstance(data, _pandas_api.pd.DataFrame):
return Table.from_pandas(data, schema=schema)
else:
return TypeError("Expected pandas DataFrame or python dictionary")
def concat_tables(tables):
"""
Perform zero-copy concatenation of pyarrow.Table objects. Raises exception
if all of the Table schemas are not the same
Parameters
----------
tables : iterable of pyarrow.Table objects
output_name : string, default None
A name for the output table, if any
"""
cdef:
vector[shared_ptr[CTable]] c_tables
shared_ptr[CTable] c_result
Table table
for table in tables:
c_tables.push_back(table.sp_table)
with nogil:
check_status(ConcatenateTables(c_tables, &c_result))
return pyarrow_wrap_table(c_result)