| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import json |
| |
| from collections import OrderedDict |
| |
| try: |
| import pandas as pd |
| except ImportError: |
| # The pure-Python based API works without a pandas installation |
| pass |
| else: |
| import pyarrow.pandas_compat as pdcompat |
| |
| |
| cdef class ChunkedArray: |
| """ |
| Array backed via one or more memory chunks. |
| |
| Warning |
| ------- |
| Do not call this class's constructor directly. |
| """ |
| |
| def __cinit__(self): |
| self.chunked_array = NULL |
| |
| cdef void init(self, const shared_ptr[CChunkedArray]& chunked_array): |
| self.sp_chunked_array = chunked_array |
| self.chunked_array = chunked_array.get() |
| |
| property type: |
| |
| def __get__(self): |
| return pyarrow_wrap_data_type(self.sp_chunked_array.get().type()) |
| |
| cdef int _check_nullptr(self) except -1: |
| if self.chunked_array == NULL: |
| raise ReferenceError( |
| "{} object references a NULL pointer. Not initialized.".format( |
| type(self).__name__ |
| ) |
| ) |
| return 0 |
| |
| def length(self): |
| self._check_nullptr() |
| return self.chunked_array.length() |
| |
| def __len__(self): |
| return self.length() |
| |
| @property |
| def null_count(self): |
| """ |
| Number of null entires |
| |
| Returns |
| ------- |
| int |
| """ |
| self._check_nullptr() |
| return self.chunked_array.null_count() |
| |
| def __getitem__(self, key): |
| cdef int64_t item |
| cdef int i |
| self._check_nullptr() |
| if isinstance(key, slice): |
| return _normalize_slice(self, key) |
| elif isinstance(key, six.integer_types): |
| item = key |
| if item >= self.chunked_array.length() or item < 0: |
| return IndexError("ChunkedArray selection out of bounds") |
| for i in range(self.num_chunks): |
| if item < self.chunked_array.chunk(i).get().length(): |
| return self.chunk(i)[item] |
| else: |
| item -= self.chunked_array.chunk(i).get().length() |
| else: |
| raise TypeError("key must either be a slice or integer") |
| |
| def slice(self, offset=0, length=None): |
| """ |
| Compute zero-copy slice of this ChunkedArray |
| |
| Parameters |
| ---------- |
| offset : int, default 0 |
| Offset from start of array to slice |
| length : int, default None |
| Length of slice (default is until end of batch starting from |
| offset) |
| |
| Returns |
| ------- |
| sliced : ChunkedArray |
| """ |
| cdef shared_ptr[CChunkedArray] result |
| |
| if offset < 0: |
| raise IndexError('Offset must be non-negative') |
| |
| if length is None: |
| result = self.chunked_array.Slice(offset) |
| else: |
| result = self.chunked_array.Slice(offset, length) |
| |
| return pyarrow_wrap_chunked_array(result) |
| |
| @property |
| def num_chunks(self): |
| """ |
| Number of underlying chunks |
| |
| Returns |
| ------- |
| int |
| """ |
| self._check_nullptr() |
| return self.chunked_array.num_chunks() |
| |
| def chunk(self, i): |
| """ |
| Select a chunk by its index |
| |
| Parameters |
| ---------- |
| i : int |
| |
| Returns |
| ------- |
| pyarrow.Array |
| """ |
| self._check_nullptr() |
| |
| if i >= self.num_chunks or i < 0: |
| raise IndexError('Chunk index out of range.') |
| |
| return pyarrow_wrap_array(self.chunked_array.chunk(i)) |
| |
| property chunks: |
| |
| def __get__(self): |
| cdef int i |
| chunks = [] |
| for i in range(self.num_chunks): |
| chunks.append(self.chunk(i)) |
| return chunks |
| |
| def iterchunks(self): |
| for i in range(self.num_chunks): |
| yield self.chunk(i) |
| |
| def to_pylist(self): |
| """ |
| Convert to a list of native Python objects. |
| """ |
| result = [] |
| for i in range(self.num_chunks): |
| result += self.chunk(i).to_pylist() |
| return result |
| |
| |
| def chunked_array(arrays, type=None): |
| """ |
| Construct chunked array from list of array-like objects |
| |
| Parameters |
| ---------- |
| arrays : list of Array or values coercible to arrays |
| type : DataType or string coercible to DataType |
| |
| Returns |
| ------- |
| ChunkedArray |
| """ |
| cdef: |
| Array arr |
| vector[shared_ptr[CArray]] c_arrays |
| shared_ptr[CChunkedArray] sp_chunked_array |
| |
| for x in arrays: |
| if isinstance(x, Array): |
| arr = x |
| if type is not None: |
| assert x.type == type |
| else: |
| arr = array(x, type=type) |
| |
| c_arrays.push_back(arr.sp_array) |
| |
| sp_chunked_array.reset(new CChunkedArray(c_arrays)) |
| return pyarrow_wrap_chunked_array(sp_chunked_array) |
| |
| |
| def column(object field_or_name, arr): |
| """ |
| Create Column object from field/string and array-like data |
| |
| Parameters |
| ---------- |
| field_or_name : string or Field |
| arr : Array, list of Arrays, or ChunkedArray |
| |
| Returns |
| ------- |
| column : Column |
| """ |
| cdef: |
| Field boxed_field |
| Array _arr |
| ChunkedArray _carr |
| shared_ptr[CColumn] sp_column |
| |
| if isinstance(arr, list): |
| arr = chunked_array(arr) |
| elif not isinstance(arr, (Array, ChunkedArray)): |
| arr = array(arr) |
| |
| if isinstance(field_or_name, Field): |
| boxed_field = field_or_name |
| if arr.type != boxed_field.type: |
| raise ValueError('Passed field type does not match array') |
| else: |
| boxed_field = field(field_or_name, arr.type) |
| |
| if isinstance(arr, Array): |
| _arr = arr |
| sp_column.reset(new CColumn(boxed_field.sp_field, _arr.sp_array)) |
| elif isinstance(arr, ChunkedArray): |
| _carr = arr |
| sp_column.reset(new CColumn(boxed_field.sp_field, |
| _carr.sp_chunked_array)) |
| else: |
| raise ValueError("Unsupported type for column(...): {}" |
| .format(type(arr))) |
| |
| return pyarrow_wrap_column(sp_column) |
| |
| |
| cdef class Column: |
| """ |
| Named vector of elements of equal type. |
| |
| Warning |
| ------- |
| Do not call this class's constructor directly. |
| """ |
| |
| def __cinit__(self): |
| self.column = NULL |
| |
| cdef void init(self, const shared_ptr[CColumn]& column): |
| self.sp_column = column |
| self.column = column.get() |
| |
| def __repr__(self): |
| from pyarrow.compat import StringIO |
| result = StringIO() |
| result.write(object.__repr__(self)) |
| data = self.data |
| for i, chunk in enumerate(data.chunks): |
| result.write('\nchunk {0}: {1}'.format(i, repr(chunk))) |
| |
| return result.getvalue() |
| |
| @staticmethod |
| def from_array(*args): |
| return column(*args) |
| |
| def cast(self, object target_type, safe=True): |
| """ |
| Cast column values to another data type |
| |
| Parameters |
| ---------- |
| target_type : DataType |
| Type to cast to |
| safe : boolean, default True |
| Check for overflows or other unsafe conversions |
| |
| Returns |
| ------- |
| casted : Column |
| """ |
| cdef: |
| CCastOptions options |
| shared_ptr[CArray] result |
| DataType type |
| CDatum out |
| |
| type = _ensure_type(target_type) |
| |
| options.allow_int_overflow = not safe |
| options.allow_time_truncate = not safe |
| |
| with nogil: |
| check_status(Cast(_context(), CDatum(self.column.data()), |
| type.sp_type, options, &out)) |
| |
| casted_data = pyarrow_wrap_chunked_array(out.chunked_array()) |
| return column(self.name, casted_data) |
| |
| def to_pandas(self, |
| c_bool strings_to_categorical=False, |
| c_bool zero_copy_only=False, |
| c_bool integer_object_nulls=False): |
| """ |
| Convert the arrow::Column to a pandas.Series |
| |
| Returns |
| ------- |
| pandas.Series |
| """ |
| cdef: |
| PyObject* out |
| PandasOptions options |
| |
| options = PandasOptions( |
| strings_to_categorical=strings_to_categorical, |
| zero_copy_only=zero_copy_only, |
| integer_object_nulls=integer_object_nulls) |
| |
| with nogil: |
| check_status(libarrow.ConvertColumnToPandas(options, |
| self.sp_column, |
| self, &out)) |
| |
| values = wrap_array_output(out) |
| result = pd.Series(values, name=self.name) |
| |
| if isinstance(self.type, TimestampType): |
| if self.type.tz is not None: |
| result = (result.dt.tz_localize('utc') |
| .dt.tz_convert(self.type.tz)) |
| |
| return result |
| |
| def equals(self, Column other): |
| """ |
| Check if contents of two columns are equal |
| |
| Parameters |
| ---------- |
| other : pyarrow.Column |
| |
| Returns |
| ------- |
| are_equal : boolean |
| """ |
| cdef: |
| CColumn* my_col = self.column |
| CColumn* other_col = other.column |
| c_bool result |
| |
| self._check_nullptr() |
| other._check_nullptr() |
| |
| with nogil: |
| result = my_col.Equals(deref(other_col)) |
| |
| return result |
| |
| def to_pylist(self): |
| """ |
| Convert to a list of native Python objects. |
| """ |
| return self.data.to_pylist() |
| |
| cdef int _check_nullptr(self) except -1: |
| if self.column == NULL: |
| raise ReferenceError( |
| "{} object references a NULL pointer. Not initialized.".format( |
| type(self).__name__ |
| ) |
| ) |
| return 0 |
| |
| def __len__(self): |
| return self.length() |
| |
| def length(self): |
| self._check_nullptr() |
| return self.column.length() |
| |
| @property |
| def field(self): |
| return pyarrow_wrap_field(self.column.field()) |
| |
| @property |
| def shape(self): |
| """ |
| Dimensions of this columns |
| |
| Returns |
| ------- |
| (int,) |
| """ |
| self._check_nullptr() |
| return (self.length(),) |
| |
| @property |
| def null_count(self): |
| """ |
| Number of null entires |
| |
| Returns |
| ------- |
| int |
| """ |
| self._check_nullptr() |
| return self.column.null_count() |
| |
| @property |
| def name(self): |
| """ |
| Label of the column |
| |
| Returns |
| ------- |
| str |
| """ |
| return bytes(self.column.name()).decode('utf8') |
| |
| @property |
| def type(self): |
| """ |
| Type information for this column |
| |
| Returns |
| ------- |
| pyarrow.DataType |
| """ |
| return pyarrow_wrap_data_type(self.column.type()) |
| |
| @property |
| def data(self): |
| """ |
| The underlying data |
| |
| Returns |
| ------- |
| pyarrow.ChunkedArray |
| """ |
| cdef ChunkedArray chunked_array = ChunkedArray() |
| chunked_array.init(self.column.data()) |
| return chunked_array |
| |
| |
| cdef shared_ptr[const CKeyValueMetadata] unbox_metadata(dict metadata): |
| if metadata is None: |
| return <shared_ptr[const CKeyValueMetadata]> nullptr |
| cdef: |
| unordered_map[c_string, c_string] unordered_metadata = metadata |
| return (<shared_ptr[const CKeyValueMetadata]> |
| make_shared[CKeyValueMetadata](unordered_metadata)) |
| |
| |
| cdef _schema_from_arrays(arrays, names, dict metadata, |
| shared_ptr[CSchema]* schema): |
| cdef: |
| Column col |
| c_string c_name |
| vector[shared_ptr[CField]] fields |
| shared_ptr[CDataType] type_ |
| Py_ssize_t K = len(arrays) |
| |
| if K == 0: |
| schema.reset(new CSchema(fields, unbox_metadata(metadata))) |
| return |
| |
| fields.resize(K) |
| |
| if isinstance(arrays[0], Column): |
| for i in range(K): |
| col = arrays[i] |
| type_ = col.sp_column.get().type() |
| c_name = tobytes(col.name) |
| fields[i].reset(new CField(c_name, type_, True)) |
| else: |
| if names is None: |
| raise ValueError('Must pass names when constructing ' |
| 'from Array objects') |
| if len(names) != K: |
| raise ValueError("Length of names ({}) does not match " |
| "length of arrays ({})".format(len(names), K)) |
| for i in range(K): |
| val = arrays[i] |
| if isinstance(val, (Array, ChunkedArray)): |
| type_ = (<DataType> val.type).sp_type |
| else: |
| raise TypeError(type(val)) |
| |
| if names[i] is None: |
| c_name = tobytes(u'None') |
| else: |
| c_name = tobytes(names[i]) |
| fields[i].reset(new CField(c_name, type_, True)) |
| |
| schema.reset(new CSchema(fields, unbox_metadata(metadata))) |
| |
| |
| cdef class RecordBatch: |
| """ |
| Batch of rows of columns of equal length |
| |
| Warning |
| ------- |
| Do not call this class's constructor directly, use one of the ``from_*`` |
| methods instead. |
| """ |
| |
| def __cinit__(self): |
| self.batch = NULL |
| self._schema = None |
| |
| cdef void init(self, const shared_ptr[CRecordBatch]& batch): |
| self.sp_batch = batch |
| self.batch = batch.get() |
| |
| cdef int _check_nullptr(self) except -1: |
| if self.batch == NULL: |
| raise ReferenceError( |
| "{} object references a NULL pointer. Not initialized.".format( |
| type(self).__name__ |
| ) |
| ) |
| return 0 |
| |
| def __len__(self): |
| self._check_nullptr() |
| return self.batch.num_rows() |
| |
| def replace_schema_metadata(self, dict metadata=None): |
| """ |
| EXPERIMENTAL: Create shallow copy of record batch by replacing schema |
| key-value metadata with the indicated new metadata (which may be None, |
| which deletes any existing metadata |
| |
| Parameters |
| ---------- |
| metadata : dict, default None |
| |
| Returns |
| ------- |
| shallow_copy : RecordBatch |
| """ |
| cdef shared_ptr[CKeyValueMetadata] c_meta |
| if metadata is not None: |
| convert_metadata(metadata, &c_meta) |
| |
| cdef shared_ptr[CRecordBatch] new_batch |
| with nogil: |
| new_batch = self.batch.ReplaceSchemaMetadata(c_meta) |
| |
| return pyarrow_wrap_batch(new_batch) |
| |
| @property |
| def num_columns(self): |
| """ |
| Number of columns |
| |
| Returns |
| ------- |
| int |
| """ |
| self._check_nullptr() |
| return self.batch.num_columns() |
| |
| @property |
| def num_rows(self): |
| """ |
| Number of rows |
| |
| Due to the definition of a RecordBatch, all columns have the same |
| number of rows. |
| |
| Returns |
| ------- |
| int |
| """ |
| return len(self) |
| |
| @property |
| def schema(self): |
| """ |
| Schema of the RecordBatch and its columns |
| |
| Returns |
| ------- |
| pyarrow.Schema |
| """ |
| cdef Schema schema |
| self._check_nullptr() |
| if self._schema is None: |
| schema = Schema() |
| schema.init_schema(self.batch.schema()) |
| self._schema = schema |
| |
| return self._schema |
| |
| def column(self, i): |
| """ |
| Select single column from record batcha |
| |
| Returns |
| ------- |
| column : pyarrow.Array |
| """ |
| if not -self.num_columns <= i < self.num_columns: |
| raise IndexError( |
| 'Record batch column index {:d} is out of range'.format(i) |
| ) |
| return pyarrow_wrap_array(self.batch.column(i)) |
| |
| def __getitem__(self, key): |
| cdef: |
| Py_ssize_t start, stop |
| if isinstance(key, slice): |
| return _normalize_slice(self, key) |
| else: |
| return self.column(key) |
| |
| def serialize(self, memory_pool=None): |
| """ |
| Write RecordBatch to Buffer as encapsulated IPC message |
| |
| Parameters |
| ---------- |
| memory_pool : MemoryPool, default None |
| Uses default memory pool if not specified |
| |
| Returns |
| ------- |
| serialized : Buffer |
| """ |
| cdef: |
| shared_ptr[CBuffer] buffer |
| CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) |
| |
| with nogil: |
| check_status(SerializeRecordBatch(deref(self.batch), |
| pool, &buffer)) |
| return pyarrow_wrap_buffer(buffer) |
| |
| def slice(self, offset=0, length=None): |
| """ |
| Compute zero-copy slice of this RecordBatch |
| |
| Parameters |
| ---------- |
| offset : int, default 0 |
| Offset from start of array to slice |
| length : int, default None |
| Length of slice (default is until end of batch starting from |
| offset) |
| |
| Returns |
| ------- |
| sliced : RecordBatch |
| """ |
| cdef shared_ptr[CRecordBatch] result |
| |
| if offset < 0: |
| raise IndexError('Offset must be non-negative') |
| |
| if length is None: |
| result = self.batch.Slice(offset) |
| else: |
| result = self.batch.Slice(offset, length) |
| |
| return pyarrow_wrap_batch(result) |
| |
| def equals(self, RecordBatch other): |
| cdef: |
| CRecordBatch* my_batch = self.batch |
| CRecordBatch* other_batch = other.batch |
| c_bool result |
| |
| self._check_nullptr() |
| other._check_nullptr() |
| |
| with nogil: |
| result = my_batch.Equals(deref(other_batch)) |
| |
| return result |
| |
| def to_pydict(self): |
| """ |
| Converted the arrow::RecordBatch to an OrderedDict |
| |
| Returns |
| ------- |
| OrderedDict |
| """ |
| entries = [] |
| for i in range(self.batch.num_columns()): |
| name = bytes(self.batch.column_name(i)).decode('utf8') |
| column = self[i].to_pylist() |
| entries.append((name, column)) |
| return OrderedDict(entries) |
| |
| def to_pandas(self, nthreads=None): |
| """ |
| Convert the arrow::RecordBatch to a pandas DataFrame |
| |
| Returns |
| ------- |
| pandas.DataFrame |
| """ |
| return Table.from_batches([self]).to_pandas(nthreads=nthreads) |
| |
| @classmethod |
| def from_pandas(cls, df, Schema schema=None, bint preserve_index=True, |
| nthreads=None): |
| """ |
| Convert pandas.DataFrame to an Arrow RecordBatch |
| |
| Parameters |
| ---------- |
| df: pandas.DataFrame |
| schema: pyarrow.Schema, optional |
| The expected schema of the RecordBatch. This can be used to |
| indicate the type of columns if we cannot infer it automatically. |
| preserve_index : bool, optional |
| Whether to store the index as an additional column in the resulting |
| ``RecordBatch``. |
| nthreads : int, default None (may use up to system CPU count threads) |
| If greater than 1, convert columns to Arrow in parallel using |
| indicated number of threads |
| |
| Returns |
| ------- |
| pyarrow.RecordBatch |
| """ |
| names, arrays, metadata = pdcompat.dataframe_to_arrays( |
| df, schema, preserve_index, nthreads=nthreads |
| ) |
| return cls.from_arrays(arrays, names, metadata) |
| |
| @staticmethod |
| def from_arrays(list arrays, list names, dict metadata=None): |
| """ |
| Construct a RecordBatch from multiple pyarrow.Arrays |
| |
| Parameters |
| ---------- |
| arrays: list of pyarrow.Array |
| column-wise data vectors |
| names: list of str |
| Labels for the columns |
| |
| Returns |
| ------- |
| pyarrow.RecordBatch |
| """ |
| cdef: |
| Array arr |
| c_string c_name |
| shared_ptr[CSchema] schema |
| vector[shared_ptr[CArray]] c_arrays |
| int64_t num_rows |
| int64_t i |
| int64_t number_of_arrays = len(arrays) |
| |
| if len(arrays) > 0: |
| num_rows = len(arrays[0]) |
| else: |
| num_rows = 0 |
| _schema_from_arrays(arrays, names, metadata, &schema) |
| |
| c_arrays.reserve(len(arrays)) |
| for arr in arrays: |
| c_arrays.push_back(arr.sp_array) |
| |
| return pyarrow_wrap_batch( |
| CRecordBatch.Make(schema, num_rows, c_arrays)) |
| |
| |
| def table_to_blocks(PandasOptions options, Table table, int nthreads, |
| MemoryPool memory_pool, categories): |
| cdef: |
| PyObject* result_obj |
| shared_ptr[CTable] c_table = table.sp_table |
| CMemoryPool* pool |
| unordered_set[c_string] categorical_columns |
| |
| if categories is not None: |
| categorical_columns = {tobytes(cat) for cat in categories} |
| |
| pool = maybe_unbox_memory_pool(memory_pool) |
| with nogil: |
| check_status( |
| libarrow.ConvertTableToPandas( |
| options, categorical_columns, c_table, nthreads, pool, |
| &result_obj |
| ) |
| ) |
| |
| return PyObject_to_object(result_obj) |
| |
| |
| cdef class Table: |
| """ |
| A collection of top-level named, equal length Arrow arrays. |
| |
| Warning |
| ------- |
| Do not call this class's constructor directly, use one of the ``from_*`` |
| methods instead. |
| """ |
| |
| def __cinit__(self): |
| self.table = NULL |
| |
| def __repr__(self): |
| return 'pyarrow.{}\n{}'.format(type(self).__name__, str(self.schema)) |
| |
| cdef void init(self, const shared_ptr[CTable]& table): |
| self.sp_table = table |
| self.table = table.get() |
| |
| cdef int _check_nullptr(self) except -1: |
| if self.table == nullptr: |
| raise ReferenceError( |
| "Table object references a NULL pointer. Not initialized." |
| ) |
| return 0 |
| |
| def replace_schema_metadata(self, dict metadata=None): |
| """ |
| EXPERIMENTAL: Create shallow copy of table by replacing schema |
| key-value metadata with the indicated new metadata (which may be None, |
| which deletes any existing metadata |
| |
| Parameters |
| ---------- |
| metadata : dict, default None |
| |
| Returns |
| ------- |
| shallow_copy : Table |
| """ |
| cdef shared_ptr[CKeyValueMetadata] c_meta |
| if metadata is not None: |
| convert_metadata(metadata, &c_meta) |
| |
| cdef shared_ptr[CTable] new_table |
| with nogil: |
| new_table = self.table.ReplaceSchemaMetadata(c_meta) |
| |
| return pyarrow_wrap_table(new_table) |
| |
| def equals(self, Table other): |
| """ |
| Check if contents of two tables are equal |
| |
| Parameters |
| ---------- |
| other : pyarrow.Table |
| |
| Returns |
| ------- |
| are_equal : boolean |
| """ |
| cdef: |
| CTable* my_table = self.table |
| CTable* other_table = other.table |
| c_bool result |
| |
| self._check_nullptr() |
| other._check_nullptr() |
| |
| with nogil: |
| result = my_table.Equals(deref(other_table)) |
| |
| return result |
| |
| @classmethod |
| def from_pandas(cls, df, Schema schema=None, bint preserve_index=True, |
| nthreads=None): |
| """ |
| Convert pandas.DataFrame to an Arrow Table |
| |
| Parameters |
| ---------- |
| df : pandas.DataFrame |
| schema : pyarrow.Schema, optional |
| The expected schema of the Arrow Table. This can be used to |
| indicate the type of columns if we cannot infer it automatically. |
| preserve_index : bool, optional |
| Whether to store the index as an additional column in the resulting |
| ``Table``. |
| nthreads : int, default None (may use up to system CPU count threads) |
| If greater than 1, convert columns to Arrow in parallel using |
| indicated number of threads |
| |
| Returns |
| ------- |
| pyarrow.Table |
| |
| Examples |
| -------- |
| |
| >>> import pandas as pd |
| >>> import pyarrow as pa |
| >>> df = pd.DataFrame({ |
| ... 'int': [1, 2], |
| ... 'str': ['a', 'b'] |
| ... }) |
| >>> pa.Table.from_pandas(df) |
| <pyarrow.lib.Table object at 0x7f05d1fb1b40> |
| """ |
| names, arrays, metadata = pdcompat.dataframe_to_arrays( |
| df, |
| schema=schema, |
| preserve_index=preserve_index, |
| nthreads=nthreads |
| ) |
| return cls.from_arrays(arrays, names=names, metadata=metadata) |
| |
| @staticmethod |
| def from_arrays(arrays, names=None, schema=None, dict metadata=None): |
| """ |
| Construct a Table from Arrow arrays or columns |
| |
| Parameters |
| ---------- |
| arrays: list of pyarrow.Array or pyarrow.Column |
| Equal-length arrays that should form the table. |
| names: list of str, optional |
| Names for the table columns. If Columns passed, will be |
| inferred. If Arrays passed, this argument is required |
| |
| Returns |
| ------- |
| pyarrow.Table |
| |
| """ |
| cdef: |
| vector[shared_ptr[CColumn]] columns |
| Schema cy_schema |
| shared_ptr[CSchema] c_schema |
| shared_ptr[CTable] table |
| int i, K = <int> len(arrays) |
| |
| if schema is None: |
| _schema_from_arrays(arrays, names, metadata, &c_schema) |
| elif schema is not None: |
| if names is not None: |
| raise ValueError('Cannot pass schema and arrays') |
| cy_schema = schema |
| |
| if len(schema) != len(arrays): |
| raise ValueError('Schema and number of arrays unequal') |
| |
| c_schema = cy_schema.sp_schema |
| |
| columns.reserve(K) |
| |
| for i in range(K): |
| if isinstance(arrays[i], (Array, list)): |
| columns.push_back( |
| make_shared[CColumn]( |
| c_schema.get().field(i), |
| (<Array> arrays[i]).sp_array |
| ) |
| ) |
| elif isinstance(arrays[i], ChunkedArray): |
| columns.push_back( |
| make_shared[CColumn]( |
| c_schema.get().field(i), |
| (<ChunkedArray> arrays[i]).sp_chunked_array |
| ) |
| ) |
| elif isinstance(arrays[i], Column): |
| # Make sure schema field and column are consistent |
| columns.push_back( |
| make_shared[CColumn]( |
| c_schema.get().field(i), |
| (<Column> arrays[i]).sp_column.get().data() |
| ) |
| ) |
| else: |
| raise ValueError(type(arrays[i])) |
| |
| return pyarrow_wrap_table(CTable.Make(c_schema, columns)) |
| |
| @staticmethod |
| def from_batches(batches, Schema schema=None): |
| """ |
| Construct a Table from a list of Arrow RecordBatches |
| |
| Parameters |
| ---------- |
| batches: list of RecordBatch |
| RecordBatch list to be converted, all schemas must be equal |
| schema : Schema, default None |
| If not passed, will be inferred from the first RecordBatch |
| |
| Returns |
| ------- |
| table : Table |
| """ |
| cdef: |
| vector[shared_ptr[CRecordBatch]] c_batches |
| shared_ptr[CTable] c_table |
| shared_ptr[CSchema] c_schema |
| RecordBatch batch |
| |
| for batch in batches: |
| c_batches.push_back(batch.sp_batch) |
| |
| if schema is None: |
| if len(batches) == 0: |
| raise ValueError('Must pass schema, or at least ' |
| 'one RecordBatch') |
| c_schema = c_batches[0].get().schema() |
| else: |
| c_schema = schema.sp_schema |
| |
| with nogil: |
| check_status(CTable.FromRecordBatches(c_schema, c_batches, |
| &c_table)) |
| |
| return pyarrow_wrap_table(c_table) |
| |
| def to_batches(self, chunksize=None): |
| """ |
| Convert Table to list of (contiguous) RecordBatch objects, with optimal |
| maximum chunk size |
| |
| Parameters |
| ---------- |
| chunksize : int, default None |
| Maximum size for RecordBatch chunks. Individual chunks may be |
| smaller depending on the chunk layout of individual columns |
| |
| Returns |
| ------- |
| batches : list of RecordBatch |
| """ |
| cdef: |
| unique_ptr[TableBatchReader] reader |
| int64_t c_chunksize |
| list result = [] |
| shared_ptr[CRecordBatch] batch |
| |
| reader.reset(new TableBatchReader(deref(self.table))) |
| |
| if chunksize is not None: |
| c_chunksize = chunksize |
| reader.get().set_chunksize(c_chunksize) |
| |
| while True: |
| with nogil: |
| check_status(reader.get().ReadNext(&batch)) |
| |
| if batch.get() == NULL: |
| break |
| |
| result.append(pyarrow_wrap_batch(batch)) |
| |
| return result |
| |
| def to_pandas(self, nthreads=None, strings_to_categorical=False, |
| memory_pool=None, zero_copy_only=False, categories=None, |
| integer_object_nulls=False): |
| """ |
| Convert the arrow::Table to a pandas DataFrame |
| |
| Parameters |
| ---------- |
| nthreads : int, default max(1, multiprocessing.cpu_count() / 2) |
| For the default, we divide the CPU count by 2 because most modern |
| computers have hyperthreading turned on, so doubling the CPU count |
| beyond the number of physical cores does not help |
| strings_to_categorical : boolean, default False |
| Encode string (UTF8) and binary types to pandas.Categorical |
| memory_pool: MemoryPool, optional |
| Specific memory pool to use to allocate casted columns |
| zero_copy_only : boolean, default False |
| Raise an ArrowException if this function call would require copying |
| the underlying data |
| categories: list, default empty |
| List of columns that should be returned as pandas.Categorical |
| integer_object_nulls : boolean, default False |
| Cast integers with nulls to objects |
| |
| Returns |
| ------- |
| pandas.DataFrame |
| """ |
| cdef: |
| PandasOptions options |
| |
| options = PandasOptions( |
| strings_to_categorical=strings_to_categorical, |
| zero_copy_only=zero_copy_only, |
| integer_object_nulls=integer_object_nulls) |
| self._check_nullptr() |
| if nthreads is None: |
| nthreads = cpu_count() |
| mgr = pdcompat.table_to_blockmanager(options, self, memory_pool, |
| nthreads, categories) |
| return pd.DataFrame(mgr) |
| |
| def to_pydict(self): |
| """ |
| Converted the arrow::Table to an OrderedDict |
| |
| Returns |
| ------- |
| OrderedDict |
| """ |
| cdef: |
| size_t i |
| size_t num_columns = self.table.num_columns() |
| list entries = [] |
| Column column |
| |
| self._check_nullptr() |
| for i in range(num_columns): |
| column = self.column(i) |
| entries.append((column.name, column.to_pylist())) |
| |
| return OrderedDict(entries) |
| |
| @property |
| def schema(self): |
| """ |
| Schema of the table and its columns |
| |
| Returns |
| ------- |
| pyarrow.Schema |
| """ |
| self._check_nullptr() |
| return pyarrow_wrap_schema(self.table.schema()) |
| |
| def column(self, int i): |
| """ |
| Select a column by its numeric index. |
| |
| Parameters |
| ---------- |
| i : int |
| |
| Returns |
| ------- |
| pyarrow.Column |
| """ |
| cdef: |
| Column column = Column() |
| int num_columns = self.num_columns |
| int index |
| |
| self._check_nullptr() |
| if not -num_columns <= i < num_columns: |
| raise IndexError( |
| 'Table column index {:d} is out of range'.format(i) |
| ) |
| |
| index = i if i >= 0 else num_columns + i |
| assert index >= 0 |
| |
| column.init(self.table.column(index)) |
| return column |
| |
| def __getitem__(self, int64_t i): |
| return self.column(i) |
| |
| def itercolumns(self): |
| """ |
| Iterator over all columns in their numerical order |
| """ |
| for i in range(self.num_columns): |
| yield self.column(i) |
| |
| @property |
| def num_columns(self): |
| """ |
| Number of columns in this table |
| |
| Returns |
| ------- |
| int |
| """ |
| self._check_nullptr() |
| return self.table.num_columns() |
| |
| @property |
| def num_rows(self): |
| """ |
| Number of rows in this table. |
| |
| Due to the definition of a table, all columns have the same number of |
| rows. |
| |
| Returns |
| ------- |
| int |
| """ |
| self._check_nullptr() |
| return self.table.num_rows() |
| |
| def __len__(self): |
| return self.num_rows |
| |
| @property |
| def shape(self): |
| """ |
| Dimensions of the table: (#rows, #columns) |
| |
| Returns |
| ------- |
| (int, int) |
| """ |
| return (self.num_rows, self.num_columns) |
| |
| def add_column(self, int i, Column column): |
| """ |
| Add column to Table at position. Returns new table |
| """ |
| cdef shared_ptr[CTable] c_table |
| self._check_nullptr() |
| |
| with nogil: |
| check_status(self.table.AddColumn(i, column.sp_column, &c_table)) |
| |
| return pyarrow_wrap_table(c_table) |
| |
| def append_column(self, Column column): |
| """ |
| Append column at end of columns. Returns new table |
| """ |
| return self.add_column(self.num_columns, column) |
| |
| def remove_column(self, int i): |
| """ |
| Create new Table with the indicated column removed |
| """ |
| cdef shared_ptr[CTable] c_table |
| self._check_nullptr() |
| |
| with nogil: |
| check_status(self.table.RemoveColumn(i, &c_table)) |
| |
| return pyarrow_wrap_table(c_table) |
| |
| |
| def concat_tables(tables): |
| """ |
| Perform zero-copy concatenation of pyarrow.Table objects. Raises exception |
| if all of the Table schemas are not the same |
| |
| Parameters |
| ---------- |
| tables : iterable of pyarrow.Table objects |
| output_name : string, default None |
| A name for the output table, if any |
| """ |
| cdef: |
| vector[shared_ptr[CTable]] c_tables |
| shared_ptr[CTable] c_result |
| Table table |
| |
| for table in tables: |
| c_tables.push_back(table.sp_table) |
| |
| with nogil: |
| check_status(ConcatenateTables(c_tables, &c_result)) |
| |
| return pyarrow_wrap_table(c_result) |