| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import warnings |
| |
| |
| cdef class ChunkedArray(_PandasConvertible): |
| """ |
| An array-like composed from a (possibly empty) collection of pyarrow.Arrays |
| |
| Warnings |
| -------- |
| Do not call this class's constructor directly. |
| """ |
| |
| def __cinit__(self): |
| self.chunked_array = NULL |
| |
| def __init__(self): |
| raise TypeError("Do not call ChunkedArray's constructor directly, use " |
| "`chunked_array` function instead.") |
| |
| cdef void init(self, const shared_ptr[CChunkedArray]& chunked_array): |
| self.sp_chunked_array = chunked_array |
| self.chunked_array = chunked_array.get() |
| |
| def __reduce__(self): |
| return chunked_array, (self.chunks, self.type) |
| |
| @property |
| def data(self): |
| import warnings |
| warnings.warn("Calling .data on ChunkedArray is provided for " |
| "compatibility after Column was removed, simply drop " |
| "this attribute", FutureWarning) |
| return self |
| |
| @property |
| def type(self): |
| return pyarrow_wrap_data_type(self.sp_chunked_array.get().type()) |
| |
| def length(self): |
| return self.chunked_array.length() |
| |
| def __len__(self): |
| return self.length() |
| |
| def __repr__(self): |
| type_format = object.__repr__(self) |
| return '{0}\n{1}'.format(type_format, str(self)) |
| |
| def to_string(self, int indent=0, int window=10): |
| """ |
| Render a "pretty-printed" string representation of the ChunkedArray |
| """ |
| cdef: |
| c_string result |
| |
| with nogil: |
| check_status( |
| PrettyPrint( |
| deref(self.chunked_array), |
| PrettyPrintOptions(indent, window), |
| &result |
| ) |
| ) |
| |
| return frombytes(result, safe=True) |
| |
| def format(self, **kwargs): |
| import warnings |
| warnings.warn('ChunkedArray.format is deprecated, ' |
| 'use ChunkedArray.to_string') |
| return self.to_string(**kwargs) |
| |
| def __str__(self): |
| return self.to_string() |
| |
| def validate(self, *, full=False): |
| """ |
| Perform validation checks. An exception is raised if validation fails. |
| |
| By default only cheap validation checks are run. Pass `full=True` |
| for thorough validation checks (potentially O(n)). |
| |
| Parameters |
| ---------- |
| full: bool, default False |
| If True, run expensive checks, otherwise cheap checks only. |
| |
| Raises |
| ------ |
| ArrowInvalid |
| """ |
| if full: |
| with nogil: |
| check_status(self.sp_chunked_array.get().ValidateFull()) |
| else: |
| with nogil: |
| check_status(self.sp_chunked_array.get().Validate()) |
| |
| @property |
| def null_count(self): |
| """ |
| Number of null entries |
| |
| Returns |
| ------- |
| int |
| """ |
| return self.chunked_array.null_count() |
| |
| @property |
| def nbytes(self): |
| """ |
| Total number of bytes consumed by the elements of the chunked array. |
| """ |
| size = 0 |
| for chunk in self.iterchunks(): |
| size += chunk.nbytes |
| return size |
| |
| def __sizeof__(self): |
| return super(ChunkedArray, self).__sizeof__() + self.nbytes |
| |
| def __iter__(self): |
| for chunk in self.iterchunks(): |
| for item in chunk: |
| yield item |
| |
| def __getitem__(self, key): |
| """ |
| Slice or return value at given index |
| |
| Parameters |
| ---------- |
| key : integer or slice |
| Slices with step not equal to 1 (or None) will produce a copy |
| rather than a zero-copy view |
| |
| Returns |
| ------- |
| value : Scalar (index) or ChunkedArray (slice) |
| """ |
| if isinstance(key, slice): |
| return _normalize_slice(self, key) |
| |
| return self.getitem(_normalize_index(key, self.chunked_array.length())) |
| |
| cdef getitem(self, int64_t index): |
| cdef int j |
| |
| for j in range(self.num_chunks): |
| if index < self.chunked_array.chunk(j).get().length(): |
| return self.chunk(j)[index] |
| else: |
| index -= self.chunked_array.chunk(j).get().length() |
| |
| def is_null(self): |
| """ |
| Return BooleanArray indicating the null values. |
| """ |
| return _pc().is_null(self) |
| |
| def is_valid(self): |
| """ |
| Return BooleanArray indicating the non-null values. |
| """ |
| return _pc().is_valid(self) |
| |
| def __eq__(self, other): |
| try: |
| return self.equals(other) |
| except TypeError: |
| return NotImplemented |
| |
| def fill_null(self, fill_value): |
| """ |
| See pyarrow.compute.fill_null docstring for usage. |
| """ |
| return _pc().fill_null(self, fill_value) |
| |
| def equals(self, ChunkedArray other): |
| """ |
| Return whether the contents of two chunked arrays are equal. |
| |
| Parameters |
| ---------- |
| other : pyarrow.ChunkedArray |
| Chunked array to compare against. |
| |
| Returns |
| ------- |
| are_equal : bool |
| """ |
| if other is None: |
| return False |
| |
| cdef: |
| CChunkedArray* this_arr = self.chunked_array |
| CChunkedArray* other_arr = other.chunked_array |
| c_bool result |
| |
| with nogil: |
| result = this_arr.Equals(deref(other_arr)) |
| |
| return result |
| |
| def _to_pandas(self, options, **kwargs): |
| return _array_like_to_pandas(self, options) |
| |
| def to_numpy(self): |
| """ |
| Return a NumPy copy of this array (experimental). |
| |
| Returns |
| ------- |
| array : numpy.ndarray |
| """ |
| cdef: |
| PyObject* out |
| PandasOptions c_options |
| object values |
| |
| if self.type.id == _Type_EXTENSION: |
| storage_array = chunked_array( |
| [chunk.storage for chunk in self.iterchunks()], |
| type=self.type.storage_type |
| ) |
| return storage_array.to_numpy() |
| |
| with nogil: |
| check_status( |
| ConvertChunkedArrayToPandas( |
| c_options, |
| self.sp_chunked_array, |
| self, |
| &out |
| ) |
| ) |
| |
| # wrap_array_output uses pandas to convert to Categorical, here |
| # always convert to numpy array |
| values = PyObject_to_object(out) |
| |
| if isinstance(values, dict): |
| values = np.take(values['dictionary'], values['indices']) |
| |
| return values |
| |
| def __array__(self, dtype=None): |
| values = self.to_numpy() |
| if dtype is None: |
| return values |
| return values.astype(dtype) |
| |
| def cast(self, object target_type, safe=True): |
| """ |
| Cast array values to another data type |
| |
| See pyarrow.compute.cast for usage |
| """ |
| return _pc().cast(self, target_type, safe=safe) |
| |
| def dictionary_encode(self, null_encoding='mask'): |
| """ |
| Compute dictionary-encoded representation of array |
| |
| Returns |
| ------- |
| pyarrow.ChunkedArray |
| Same chunking as the input, all chunks share a common dictionary. |
| """ |
| options = _pc().DictionaryEncodeOptions(null_encoding) |
| return _pc().call_function('dictionary_encode', [self], options) |
| |
| def flatten(self, MemoryPool memory_pool=None): |
| """ |
| Flatten this ChunkedArray. If it has a struct type, the column is |
| flattened into one array per struct field. |
| |
| Parameters |
| ---------- |
| memory_pool : MemoryPool, default None |
| For memory allocations, if required, otherwise use default pool |
| |
| Returns |
| ------- |
| result : List[ChunkedArray] |
| """ |
| cdef: |
| vector[shared_ptr[CChunkedArray]] flattened |
| CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) |
| |
| with nogil: |
| flattened = GetResultValue(self.chunked_array.Flatten(pool)) |
| |
| return [pyarrow_wrap_chunked_array(col) for col in flattened] |
| |
| def combine_chunks(self, MemoryPool memory_pool=None): |
| """ |
| Flatten this ChunkedArray into a single non-chunked array. |
| |
| Parameters |
| ---------- |
| memory_pool : MemoryPool, default None |
| For memory allocations, if required, otherwise use default pool |
| |
| Returns |
| ------- |
| result : Array |
| """ |
| return concat_arrays(self.chunks) |
| |
| def unique(self): |
| """ |
| Compute distinct elements in array |
| |
| Returns |
| ------- |
| pyarrow.Array |
| """ |
| return _pc().call_function('unique', [self]) |
| |
| def value_counts(self): |
| """ |
| Compute counts of unique elements in array. |
| |
| Returns |
| ------- |
| An array of <input type "Values", int64_t "Counts"> structs |
| """ |
| return _pc().call_function('value_counts', [self]) |
| |
| def slice(self, offset=0, length=None): |
| """ |
| Compute zero-copy slice of this ChunkedArray |
| |
| Parameters |
| ---------- |
| offset : int, default 0 |
| Offset from start of array to slice |
| length : int, default None |
| Length of slice (default is until end of batch starting from |
| offset) |
| |
| Returns |
| ------- |
| sliced : ChunkedArray |
| """ |
| cdef shared_ptr[CChunkedArray] result |
| |
| if offset < 0: |
| raise IndexError('Offset must be non-negative') |
| |
| offset = min(len(self), offset) |
| if length is None: |
| result = self.chunked_array.Slice(offset) |
| else: |
| result = self.chunked_array.Slice(offset, length) |
| |
| return pyarrow_wrap_chunked_array(result) |
| |
| def filter(self, mask, object null_selection_behavior="drop"): |
| """ |
| Select values from a chunked array. See pyarrow.compute.filter for full |
| usage. |
| """ |
| return _pc().filter(self, mask, null_selection_behavior) |
| |
| def take(self, object indices): |
| """ |
| Select values from a chunked array. See pyarrow.compute.take for full |
| usage. |
| """ |
| return _pc().take(self, indices) |
| |
| def unify_dictionaries(self, MemoryPool memory_pool=None): |
| """ |
| Unify dictionaries across all chunks. |
| |
| This method returns an equivalent chunked array, but where all |
| chunks share the same dictionary values. Dictionary indices are |
| transposed accordingly. |
| |
| If there are no dictionaries in the chunked array, it is returned |
| unchanged. |
| |
| Parameters |
| ---------- |
| memory_pool : MemoryPool, default None |
| For memory allocations, if required, otherwise use default pool |
| |
| Returns |
| ------- |
| result : ChunkedArray |
| """ |
| cdef: |
| CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) |
| shared_ptr[CChunkedArray] c_result |
| |
| with nogil: |
| c_result = GetResultValue(CDictionaryUnifier.UnifyChunkedArray( |
| self.sp_chunked_array, pool)) |
| |
| return pyarrow_wrap_chunked_array(c_result) |
| |
| @property |
| def num_chunks(self): |
| """ |
| Number of underlying chunks |
| |
| Returns |
| ------- |
| int |
| """ |
| return self.chunked_array.num_chunks() |
| |
| def chunk(self, i): |
| """ |
| Select a chunk by its index |
| |
| Parameters |
| ---------- |
| i : int |
| |
| Returns |
| ------- |
| pyarrow.Array |
| """ |
| if i >= self.num_chunks or i < 0: |
| raise IndexError('Chunk index out of range.') |
| |
| return pyarrow_wrap_array(self.chunked_array.chunk(i)) |
| |
| @property |
| def chunks(self): |
| return list(self.iterchunks()) |
| |
| def iterchunks(self): |
| for i in range(self.num_chunks): |
| yield self.chunk(i) |
| |
| def to_pylist(self): |
| """ |
| Convert to a list of native Python objects. |
| """ |
| result = [] |
| for i in range(self.num_chunks): |
| result += self.chunk(i).to_pylist() |
| return result |
| |
| |
| def chunked_array(arrays, type=None): |
| """ |
| Construct chunked array from list of array-like objects |
| |
| Parameters |
| ---------- |
| arrays : Array, list of Array, or values coercible to arrays |
| Must all be the same data type. Can be empty only if type also passed. |
| type : DataType or string coercible to DataType |
| |
| Returns |
| ------- |
| ChunkedArray |
| """ |
| cdef: |
| Array arr |
| vector[shared_ptr[CArray]] c_arrays |
| shared_ptr[CChunkedArray] sp_chunked_array |
| |
| type = ensure_type(type, allow_none=True) |
| |
| if isinstance(arrays, Array): |
| arrays = [arrays] |
| |
| for x in arrays: |
| arr = x if isinstance(x, Array) else array(x, type=type) |
| |
| if type is None: |
| # it allows more flexible chunked array construction from to coerce |
| # subsequent arrays to the firstly inferred array type |
| # it also spares the inference overhead after the first chunk |
| type = arr.type |
| else: |
| if arr.type != type: |
| raise TypeError( |
| "All array chunks must have type {}".format(type) |
| ) |
| |
| c_arrays.push_back(arr.sp_array) |
| |
| if c_arrays.size() == 0 and type is None: |
| raise ValueError("When passing an empty collection of arrays " |
| "you must also pass the data type") |
| |
| sp_chunked_array.reset( |
| new CChunkedArray(c_arrays, pyarrow_unwrap_data_type(type)) |
| ) |
| with nogil: |
| check_status(sp_chunked_array.get().Validate()) |
| |
| return pyarrow_wrap_chunked_array(sp_chunked_array) |
| |
| |
| cdef _schema_from_arrays(arrays, names, metadata, shared_ptr[CSchema]* schema): |
| cdef: |
| Py_ssize_t K = len(arrays) |
| c_string c_name |
| shared_ptr[CDataType] c_type |
| shared_ptr[const CKeyValueMetadata] c_meta |
| vector[shared_ptr[CField]] c_fields |
| |
| if metadata is not None: |
| c_meta = KeyValueMetadata(metadata).unwrap() |
| |
| if K == 0: |
| schema.reset(new CSchema(c_fields, c_meta)) |
| return arrays |
| |
| c_fields.resize(K) |
| |
| if names is None: |
| raise ValueError('Must pass names or schema when constructing ' |
| 'Table or RecordBatch.') |
| |
| if len(names) != K: |
| raise ValueError('Length of names ({}) does not match ' |
| 'length of arrays ({})'.format(len(names), K)) |
| |
| converted_arrays = [] |
| for i in range(K): |
| val = arrays[i] |
| if not isinstance(val, (Array, ChunkedArray)): |
| val = array(val) |
| |
| c_type = (<DataType> val.type).sp_type |
| |
| if names[i] is None: |
| c_name = b'None' |
| else: |
| c_name = tobytes(names[i]) |
| c_fields[i].reset(new CField(c_name, c_type, True)) |
| converted_arrays.append(val) |
| |
| schema.reset(new CSchema(c_fields, c_meta)) |
| return converted_arrays |
| |
| |
| cdef _sanitize_arrays(arrays, names, schema, metadata, |
| shared_ptr[CSchema]* c_schema): |
| cdef Schema cy_schema |
| if schema is None: |
| converted_arrays = _schema_from_arrays(arrays, names, metadata, |
| c_schema) |
| else: |
| if names is not None: |
| raise ValueError('Cannot pass both schema and names') |
| if metadata is not None: |
| raise ValueError('Cannot pass both schema and metadata') |
| cy_schema = schema |
| |
| if len(schema) != len(arrays): |
| raise ValueError('Schema and number of arrays unequal') |
| |
| c_schema[0] = cy_schema.sp_schema |
| converted_arrays = [] |
| for i, item in enumerate(arrays): |
| item = asarray(item, type=schema[i].type) |
| converted_arrays.append(item) |
| return converted_arrays |
| |
| |
| cdef class RecordBatch(_PandasConvertible): |
| """ |
| Batch of rows of columns of equal length |
| |
| Warnings |
| -------- |
| Do not call this class's constructor directly, use one of the |
| ``RecordBatch.from_*`` functions instead. |
| """ |
| |
| def __cinit__(self): |
| self.batch = NULL |
| self._schema = None |
| |
| def __init__(self): |
| raise TypeError("Do not call RecordBatch's constructor directly, use " |
| "one of the `RecordBatch.from_*` functions instead.") |
| |
| cdef void init(self, const shared_ptr[CRecordBatch]& batch): |
| self.sp_batch = batch |
| self.batch = batch.get() |
| |
| def __reduce__(self): |
| return _reconstruct_record_batch, (self.columns, self.schema) |
| |
| def __len__(self): |
| return self.batch.num_rows() |
| |
| def __eq__(self, other): |
| try: |
| return self.equals(other) |
| except TypeError: |
| return NotImplemented |
| |
| def to_string(self, show_metadata=False): |
| # Use less verbose schema output. |
| schema_as_string = self.schema.to_string( |
| show_field_metadata=show_metadata, |
| show_schema_metadata=show_metadata |
| ) |
| return 'pyarrow.{}\n{}'.format(type(self).__name__, schema_as_string) |
| |
| def __repr__(self): |
| return self.to_string() |
| |
| def validate(self, *, full=False): |
| """ |
| Perform validation checks. An exception is raised if validation fails. |
| |
| By default only cheap validation checks are run. Pass `full=True` |
| for thorough validation checks (potentially O(n)). |
| |
| Parameters |
| ---------- |
| full: bool, default False |
| If True, run expensive checks, otherwise cheap checks only. |
| |
| Raises |
| ------ |
| ArrowInvalid |
| """ |
| if full: |
| with nogil: |
| check_status(self.batch.ValidateFull()) |
| else: |
| with nogil: |
| check_status(self.batch.Validate()) |
| |
| def replace_schema_metadata(self, metadata=None): |
| """ |
| EXPERIMENTAL: Create shallow copy of record batch by replacing schema |
| key-value metadata with the indicated new metadata (which may be None, |
| which deletes any existing metadata |
| |
| Parameters |
| ---------- |
| metadata : dict, default None |
| |
| Returns |
| ------- |
| shallow_copy : RecordBatch |
| """ |
| cdef: |
| shared_ptr[const CKeyValueMetadata] c_meta |
| shared_ptr[CRecordBatch] c_batch |
| |
| metadata = ensure_metadata(metadata, allow_none=True) |
| c_meta = pyarrow_unwrap_metadata(metadata) |
| with nogil: |
| c_batch = self.batch.ReplaceSchemaMetadata(c_meta) |
| |
| return pyarrow_wrap_batch(c_batch) |
| |
| @property |
| def num_columns(self): |
| """ |
| Number of columns |
| |
| Returns |
| ------- |
| int |
| """ |
| return self.batch.num_columns() |
| |
| @property |
| def num_rows(self): |
| """ |
| Number of rows |
| |
| Due to the definition of a RecordBatch, all columns have the same |
| number of rows. |
| |
| Returns |
| ------- |
| int |
| """ |
| return len(self) |
| |
| @property |
| def schema(self): |
| """ |
| Schema of the RecordBatch and its columns |
| |
| Returns |
| ------- |
| pyarrow.Schema |
| """ |
| if self._schema is None: |
| self._schema = pyarrow_wrap_schema(self.batch.schema()) |
| |
| return self._schema |
| |
| def field(self, i): |
| """ |
| Select a schema field by its column name or numeric index |
| |
| Parameters |
| ---------- |
| i : int or string |
| The index or name of the field to retrieve |
| |
| Returns |
| ------- |
| pyarrow.Field |
| """ |
| return self.schema.field(i) |
| |
| @property |
| def columns(self): |
| """ |
| List of all columns in numerical order |
| |
| Returns |
| ------- |
| list of pa.Array |
| """ |
| return [self.column(i) for i in range(self.num_columns)] |
| |
| def _ensure_integer_index(self, i): |
| """ |
| Ensure integer index (convert string column name to integer if needed). |
| """ |
| if isinstance(i, (bytes, str)): |
| field_indices = self.schema.get_all_field_indices(i) |
| |
| if len(field_indices) == 0: |
| raise KeyError( |
| "Field \"{}\" does not exist in record batch schema" |
| .format(i)) |
| elif len(field_indices) > 1: |
| raise KeyError( |
| "Field \"{}\" exists {} times in record batch schema" |
| .format(i, len(field_indices))) |
| else: |
| return field_indices[0] |
| elif isinstance(i, int): |
| return i |
| else: |
| raise TypeError("Index must either be string or integer") |
| |
| def column(self, i): |
| """ |
| Select single column from record batch |
| |
| Parameters |
| ---------- |
| i : int or string |
| The index or name of the column to retrieve. |
| |
| Returns |
| ------- |
| column : pyarrow.Array |
| """ |
| return self._column(self._ensure_integer_index(i)) |
| |
| def _column(self, int i): |
| """ |
| Select single column from record batch by its numeric index. |
| |
| Parameters |
| ---------- |
| i : int |
| The index of the column to retrieve. |
| |
| Returns |
| ------- |
| column : pyarrow.Array |
| """ |
| cdef int index = <int> _normalize_index(i, self.num_columns) |
| cdef Array result = pyarrow_wrap_array(self.batch.column(index)) |
| result._name = self.schema[index].name |
| return result |
| |
| @property |
| def nbytes(self): |
| """ |
| Total number of bytes consumed by the elements of the record batch. |
| """ |
| size = 0 |
| for i in range(self.num_columns): |
| size += self.column(i).nbytes |
| return size |
| |
| def __sizeof__(self): |
| return super(RecordBatch, self).__sizeof__() + self.nbytes |
| |
| def __getitem__(self, key): |
| """ |
| Slice or return column at given index or column name |
| |
| Parameters |
| ---------- |
| key : integer, str, or slice |
| Slices with step not equal to 1 (or None) will produce a copy |
| rather than a zero-copy view |
| |
| Returns |
| ------- |
| value : Array (index/column) or RecordBatch (slice) |
| """ |
| if isinstance(key, slice): |
| return _normalize_slice(self, key) |
| else: |
| return self.column(key) |
| |
| def serialize(self, memory_pool=None): |
| """ |
| Write RecordBatch to Buffer as encapsulated IPC message. |
| |
| Parameters |
| ---------- |
| memory_pool : MemoryPool, default None |
| Uses default memory pool if not specified |
| |
| Returns |
| ------- |
| serialized : Buffer |
| """ |
| cdef shared_ptr[CBuffer] buffer |
| cdef CIpcWriteOptions options = CIpcWriteOptions.Defaults() |
| options.memory_pool = maybe_unbox_memory_pool(memory_pool) |
| |
| with nogil: |
| buffer = GetResultValue( |
| SerializeRecordBatch(deref(self.batch), options)) |
| return pyarrow_wrap_buffer(buffer) |
| |
| def slice(self, offset=0, length=None): |
| """ |
| Compute zero-copy slice of this RecordBatch |
| |
| Parameters |
| ---------- |
| offset : int, default 0 |
| Offset from start of record batch to slice |
| length : int, default None |
| Length of slice (default is until end of batch starting from |
| offset) |
| |
| Returns |
| ------- |
| sliced : RecordBatch |
| """ |
| cdef shared_ptr[CRecordBatch] result |
| |
| if offset < 0: |
| raise IndexError('Offset must be non-negative') |
| |
| offset = min(len(self), offset) |
| if length is None: |
| result = self.batch.Slice(offset) |
| else: |
| result = self.batch.Slice(offset, length) |
| |
| return pyarrow_wrap_batch(result) |
| |
| def filter(self, Array mask, object null_selection_behavior="drop"): |
| """ |
| Select record from a record batch. See pyarrow.compute.filter for full |
| usage. |
| """ |
| return _pc().filter(self, mask, null_selection_behavior) |
| |
| def equals(self, object other, bint check_metadata=False): |
| """ |
| Check if contents of two record batches are equal. |
| |
| Parameters |
| ---------- |
| other : pyarrow.RecordBatch |
| RecordBatch to compare against. |
| check_metadata : bool, default False |
| Whether schema metadata equality should be checked as well. |
| |
| Returns |
| ------- |
| are_equal : bool |
| """ |
| cdef: |
| CRecordBatch* this_batch = self.batch |
| shared_ptr[CRecordBatch] other_batch = pyarrow_unwrap_batch(other) |
| c_bool result |
| |
| if not other_batch: |
| return False |
| |
| with nogil: |
| result = this_batch.Equals(deref(other_batch), check_metadata) |
| |
| return result |
| |
| def take(self, object indices): |
| """ |
| Select records from an RecordBatch. See pyarrow.compute.take for full |
| usage. |
| """ |
| return _pc().take(self, indices) |
| |
| def to_pydict(self): |
| """ |
| Convert the RecordBatch to a dict or OrderedDict. |
| |
| Returns |
| ------- |
| dict |
| """ |
| entries = [] |
| for i in range(self.batch.num_columns()): |
| name = bytes(self.batch.column_name(i)).decode('utf8') |
| column = self[i].to_pylist() |
| entries.append((name, column)) |
| return ordered_dict(entries) |
| |
| def _to_pandas(self, options, **kwargs): |
| return Table.from_batches([self])._to_pandas(options, **kwargs) |
| |
| @classmethod |
| def from_pandas(cls, df, Schema schema=None, preserve_index=None, |
| nthreads=None, columns=None): |
| """ |
| Convert pandas.DataFrame to an Arrow RecordBatch |
| |
| Parameters |
| ---------- |
| df: pandas.DataFrame |
| schema: pyarrow.Schema, optional |
| The expected schema of the RecordBatch. This can be used to |
| indicate the type of columns if we cannot infer it automatically. |
| If passed, the output will have exactly this schema. Columns |
| specified in the schema that are not found in the DataFrame columns |
| or its index will raise an error. Additional columns or index |
| levels in the DataFrame which are not specified in the schema will |
| be ignored. |
| preserve_index : bool, optional |
| Whether to store the index as an additional column in the resulting |
| ``RecordBatch``. The default of None will store the index as a |
| column, except for RangeIndex which is stored as metadata only. Use |
| ``preserve_index=True`` to force it to be stored as a column. |
| nthreads : int, default None (may use up to system CPU count threads) |
| If greater than 1, convert columns to Arrow in parallel using |
| indicated number of threads |
| columns : list, optional |
| List of column to be converted. If None, use all columns. |
| |
| Returns |
| ------- |
| pyarrow.RecordBatch |
| """ |
| from pyarrow.pandas_compat import dataframe_to_arrays |
| arrays, schema = dataframe_to_arrays( |
| df, schema, preserve_index, nthreads=nthreads, columns=columns |
| ) |
| return cls.from_arrays(arrays, schema=schema) |
| |
| @staticmethod |
| def from_arrays(list arrays, names=None, schema=None, metadata=None): |
| """ |
| Construct a RecordBatch from multiple pyarrow.Arrays |
| |
| Parameters |
| ---------- |
| arrays: list of pyarrow.Array |
| One for each field in RecordBatch |
| names : list of str, optional |
| Names for the batch fields. If not passed, schema must be passed |
| schema : Schema, default None |
| Schema for the created batch. If not passed, names must be passed |
| metadata : dict or Mapping, default None |
| Optional metadata for the schema (if inferred). |
| |
| Returns |
| ------- |
| pyarrow.RecordBatch |
| """ |
| cdef: |
| Array arr |
| shared_ptr[CSchema] c_schema |
| vector[shared_ptr[CArray]] c_arrays |
| int64_t num_rows |
| |
| if len(arrays) > 0: |
| num_rows = len(arrays[0]) |
| else: |
| num_rows = 0 |
| |
| if isinstance(names, Schema): |
| import warnings |
| warnings.warn("Schema passed to names= option, please " |
| "pass schema= explicitly. " |
| "Will raise exception in future", FutureWarning) |
| schema = names |
| names = None |
| |
| converted_arrays = _sanitize_arrays(arrays, names, schema, metadata, |
| &c_schema) |
| |
| c_arrays.reserve(len(arrays)) |
| for arr in converted_arrays: |
| if len(arr) != num_rows: |
| raise ValueError('Arrays were not all the same length: ' |
| '{0} vs {1}'.format(len(arr), num_rows)) |
| c_arrays.push_back(arr.sp_array) |
| |
| result = pyarrow_wrap_batch(CRecordBatch.Make(c_schema, num_rows, |
| c_arrays)) |
| result.validate() |
| return result |
| |
| @staticmethod |
| def from_struct_array(StructArray struct_array): |
| """ |
| Construct a RecordBatch from a StructArray. |
| |
| Each field in the StructArray will become a column in the resulting |
| ``RecordBatch``. |
| |
| Parameters |
| ---------- |
| struct_array : StructArray |
| Array to construct the record batch from. |
| |
| Returns |
| ------- |
| pyarrow.RecordBatch |
| """ |
| cdef: |
| shared_ptr[CRecordBatch] c_record_batch |
| with nogil: |
| c_record_batch = GetResultValue( |
| CRecordBatch.FromStructArray(struct_array.sp_array)) |
| return pyarrow_wrap_batch(c_record_batch) |
| |
| def _export_to_c(self, uintptr_t out_ptr, uintptr_t out_schema_ptr=0): |
| """ |
| Export to a C ArrowArray struct, given its pointer. |
| |
| If a C ArrowSchema struct pointer is also given, the record batch |
| schema is exported to it at the same time. |
| |
| Parameters |
| ---------- |
| out_ptr: int |
| The raw pointer to a C ArrowArray struct. |
| out_schema_ptr: int (optional) |
| The raw pointer to a C ArrowSchema struct. |
| |
| Be careful: if you don't pass the ArrowArray struct to a consumer, |
| array memory will leak. This is a low-level function intended for |
| expert users. |
| """ |
| with nogil: |
| check_status(ExportRecordBatch(deref(self.sp_batch), |
| <ArrowArray*> out_ptr, |
| <ArrowSchema*> out_schema_ptr)) |
| |
| @staticmethod |
| def _import_from_c(uintptr_t in_ptr, schema): |
| """ |
| Import RecordBatch from a C ArrowArray struct, given its pointer |
| and the imported schema. |
| |
| Parameters |
| ---------- |
| in_ptr: int |
| The raw pointer to a C ArrowArray struct. |
| type: Schema or int |
| Either a Schema object, or the raw pointer to a C ArrowSchema |
| struct. |
| |
| This is a low-level function intended for expert users. |
| """ |
| cdef: |
| shared_ptr[CRecordBatch] c_batch |
| |
| c_schema = pyarrow_unwrap_schema(schema) |
| if c_schema == nullptr: |
| # Not a Schema object, perhaps a raw ArrowSchema pointer |
| schema_ptr = <uintptr_t> schema |
| with nogil: |
| c_batch = GetResultValue(ImportRecordBatch( |
| <ArrowArray*> in_ptr, <ArrowSchema*> schema_ptr)) |
| else: |
| with nogil: |
| c_batch = GetResultValue(ImportRecordBatch( |
| <ArrowArray*> in_ptr, c_schema)) |
| return pyarrow_wrap_batch(c_batch) |
| |
| |
| def _reconstruct_record_batch(columns, schema): |
| """ |
| Internal: reconstruct RecordBatch from pickled components. |
| """ |
| return RecordBatch.from_arrays(columns, schema=schema) |
| |
| |
| def table_to_blocks(options, Table table, categories, extension_columns): |
| cdef: |
| PyObject* result_obj |
| shared_ptr[CTable] c_table |
| CMemoryPool* pool |
| PandasOptions c_options = _convert_pandas_options(options) |
| |
| if categories is not None: |
| c_options.categorical_columns = {tobytes(cat) for cat in categories} |
| if extension_columns is not None: |
| c_options.extension_columns = {tobytes(col) |
| for col in extension_columns} |
| |
| # ARROW-3789(wesm); Convert date/timestamp types to datetime64[ns] |
| c_options.coerce_temporal_nanoseconds = True |
| |
| if c_options.self_destruct: |
| # Move the shared_ptr, table is now unsafe to use further |
| c_table = move(table.sp_table) |
| table.table = NULL |
| else: |
| c_table = table.sp_table |
| |
| with nogil: |
| check_status( |
| libarrow.ConvertTableToPandas(c_options, move(c_table), |
| &result_obj) |
| ) |
| |
| return PyObject_to_object(result_obj) |
| |
| |
| cdef class Table(_PandasConvertible): |
| """ |
| A collection of top-level named, equal length Arrow arrays. |
| |
| Warning |
| ------- |
| Do not call this class's constructor directly, use one of the ``from_*`` |
| methods instead. |
| """ |
| |
| def __cinit__(self): |
| self.table = NULL |
| |
| def __init__(self): |
| raise TypeError("Do not call Table's constructor directly, use one of " |
| "the `Table.from_*` functions instead.") |
| |
| def to_string(self, show_metadata=False): |
| """ |
| Return human-readable string representation of Table. |
| |
| Parameters |
| ---------- |
| show_metadata : bool, default True |
| Display Field-level and Schema-level KeyValueMetadata. |
| |
| Returns |
| ------- |
| str |
| """ |
| # Use less verbose schema output. |
| schema_as_string = self.schema.to_string( |
| show_field_metadata=show_metadata, |
| show_schema_metadata=show_metadata |
| ) |
| return 'pyarrow.{}\n{}'.format(type(self).__name__, schema_as_string) |
| |
| def __repr__(self): |
| if self.table == NULL: |
| raise ValueError("Table's internal pointer is NULL, do not use " |
| "any methods or attributes on this object") |
| return self.to_string() |
| |
| cdef void init(self, const shared_ptr[CTable]& table): |
| self.sp_table = table |
| self.table = table.get() |
| |
| def validate(self, *, full=False): |
| """ |
| Perform validation checks. An exception is raised if validation fails. |
| |
| By default only cheap validation checks are run. Pass `full=True` |
| for thorough validation checks (potentially O(n)). |
| |
| Parameters |
| ---------- |
| full: bool, default False |
| If True, run expensive checks, otherwise cheap checks only. |
| |
| Raises |
| ------ |
| ArrowInvalid |
| """ |
| if full: |
| with nogil: |
| check_status(self.table.ValidateFull()) |
| else: |
| with nogil: |
| check_status(self.table.Validate()) |
| |
| def __reduce__(self): |
| # Reduce the columns as ChunkedArrays to avoid serializing schema |
| # data twice |
| columns = [col for col in self.columns] |
| return _reconstruct_table, (columns, self.schema) |
| |
| def __getitem__(self, key): |
| """ |
| Slice or return column at given index or column name. |
| |
| Parameters |
| ---------- |
| key : integer, str, or slice |
| Slices with step not equal to 1 (or None) will produce a copy |
| rather than a zero-copy view. |
| |
| Returns |
| ------- |
| ChunkedArray (index/column) or Table (slice) |
| """ |
| if isinstance(key, slice): |
| return _normalize_slice(self, key) |
| else: |
| return self.column(key) |
| |
| def slice(self, offset=0, length=None): |
| """ |
| Compute zero-copy slice of this Table. |
| |
| Parameters |
| ---------- |
| offset : int, default 0 |
| Offset from start of table to slice. |
| length : int, default None |
| Length of slice (default is until end of table starting from |
| offset). |
| |
| Returns |
| ------- |
| Table |
| """ |
| cdef shared_ptr[CTable] result |
| |
| if offset < 0: |
| raise IndexError('Offset must be non-negative') |
| |
| offset = min(len(self), offset) |
| if length is None: |
| result = self.table.Slice(offset) |
| else: |
| result = self.table.Slice(offset, length) |
| |
| return pyarrow_wrap_table(result) |
| |
| def filter(self, mask, object null_selection_behavior="drop"): |
| """ |
| Select records from a Table. See :func:`pyarrow.compute.filter` for |
| full usage. |
| """ |
| return _pc().filter(self, mask, null_selection_behavior) |
| |
| def take(self, object indices): |
| """ |
| Select records from an Table. See :func:`pyarrow.compute.take` for full |
| usage. |
| """ |
| return _pc().take(self, indices) |
| |
| def select(self, object columns): |
| """ |
| Select columns of the Table. |
| |
| Returns a new Table with the specified columns, and metadata |
| preserved. |
| |
| Parameters |
| ---------- |
| columns : list-like |
| The column names or integer indices to select. |
| |
| Returns |
| ------- |
| Table |
| """ |
| cdef: |
| shared_ptr[CTable] c_table |
| vector[int] c_indices |
| |
| for idx in columns: |
| idx = self._ensure_integer_index(idx) |
| idx = _normalize_index(idx, self.num_columns) |
| c_indices.push_back(<int> idx) |
| |
| with nogil: |
| c_table = GetResultValue(self.table.SelectColumns(move(c_indices))) |
| |
| return pyarrow_wrap_table(c_table) |
| |
| def replace_schema_metadata(self, metadata=None): |
| """ |
| EXPERIMENTAL: Create shallow copy of table by replacing schema |
| key-value metadata with the indicated new metadata (which may be None), |
| which deletes any existing metadata. |
| |
| Parameters |
| ---------- |
| metadata : dict, default None |
| |
| Returns |
| ------- |
| Table |
| """ |
| cdef: |
| shared_ptr[const CKeyValueMetadata] c_meta |
| shared_ptr[CTable] c_table |
| |
| metadata = ensure_metadata(metadata, allow_none=True) |
| c_meta = pyarrow_unwrap_metadata(metadata) |
| with nogil: |
| c_table = self.table.ReplaceSchemaMetadata(c_meta) |
| |
| return pyarrow_wrap_table(c_table) |
| |
| def flatten(self, MemoryPool memory_pool=None): |
| """ |
| Flatten this Table. |
| |
| Each column with a struct type is flattened |
| into one column per struct field. Other columns are left unchanged. |
| |
| Parameters |
| ---------- |
| memory_pool : MemoryPool, default None |
| For memory allocations, if required, otherwise use default pool |
| |
| Returns |
| ------- |
| Table |
| """ |
| cdef: |
| shared_ptr[CTable] flattened |
| CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) |
| |
| with nogil: |
| flattened = GetResultValue(self.table.Flatten(pool)) |
| |
| return pyarrow_wrap_table(flattened) |
| |
| def combine_chunks(self, MemoryPool memory_pool=None): |
| """ |
| Make a new table by combining the chunks this table has. |
| |
| All the underlying chunks in the ChunkedArray of each column are |
| concatenated into zero or one chunk. |
| |
| Parameters |
| ---------- |
| memory_pool : MemoryPool, default None |
| For memory allocations, if required, otherwise use default pool. |
| |
| Returns |
| ------- |
| Table |
| """ |
| cdef: |
| shared_ptr[CTable] combined |
| CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) |
| |
| with nogil: |
| combined = GetResultValue(self.table.CombineChunks(pool)) |
| |
| return pyarrow_wrap_table(combined) |
| |
| def unify_dictionaries(self, MemoryPool memory_pool=None): |
| """ |
| Unify dictionaries across all chunks. |
| |
| This method returns an equivalent table, but where all chunks of |
| each column share the same dictionary values. Dictionary indices |
| are transposed accordingly. |
| |
| Columns without dictionaries are returned unchanged. |
| |
| Parameters |
| ---------- |
| memory_pool : MemoryPool, default None |
| For memory allocations, if required, otherwise use default pool |
| |
| Returns |
| ------- |
| Table |
| """ |
| cdef: |
| CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) |
| shared_ptr[CTable] c_result |
| |
| with nogil: |
| c_result = GetResultValue(CDictionaryUnifier.UnifyTable( |
| deref(self.table), pool)) |
| |
| return pyarrow_wrap_table(c_result) |
| |
| def __eq__(self, other): |
| try: |
| return self.equals(other) |
| except TypeError: |
| return NotImplemented |
| |
| def equals(self, Table other, bint check_metadata=False): |
| """ |
| Check if contents of two tables are equal. |
| |
| Parameters |
| ---------- |
| other : pyarrow.Table |
| Table to compare against. |
| check_metadata : bool, default False |
| Whether schema metadata equality should be checked as well. |
| |
| Returns |
| ------- |
| bool |
| """ |
| if other is None: |
| return False |
| |
| cdef: |
| CTable* this_table = self.table |
| CTable* other_table = other.table |
| c_bool result |
| |
| with nogil: |
| result = this_table.Equals(deref(other_table), check_metadata) |
| |
| return result |
| |
| def cast(self, Schema target_schema, bint safe=True): |
| """ |
| Cast table values to another schema. |
| |
| Parameters |
| ---------- |
| target_schema : Schema |
| Schema to cast to, the names and order of fields must match. |
| safe : bool, default True |
| Check for overflows or other unsafe conversions. |
| |
| Returns |
| ------- |
| Table |
| """ |
| cdef: |
| ChunkedArray column, casted |
| Field field |
| list newcols = [] |
| |
| if self.schema.names != target_schema.names: |
| raise ValueError("Target schema's field names are not matching " |
| "the table's field names: {!r}, {!r}" |
| .format(self.schema.names, target_schema.names)) |
| |
| for column, field in zip(self.itercolumns(), target_schema): |
| casted = column.cast(field.type, safe=safe) |
| newcols.append(casted) |
| |
| return Table.from_arrays(newcols, schema=target_schema) |
| |
| @classmethod |
| def from_pandas(cls, df, Schema schema=None, preserve_index=None, |
| nthreads=None, columns=None, bint safe=True): |
| """ |
| Convert pandas.DataFrame to an Arrow Table. |
| |
| The column types in the resulting Arrow Table are inferred from the |
| dtypes of the pandas.Series in the DataFrame. In the case of non-object |
| Series, the NumPy dtype is translated to its Arrow equivalent. In the |
| case of `object`, we need to guess the datatype by looking at the |
| Python objects in this Series. |
| |
| Be aware that Series of the `object` dtype don't carry enough |
| information to always lead to a meaningful Arrow type. In the case that |
| we cannot infer a type, e.g. because the DataFrame is of length 0 or |
| the Series only contains None/nan objects, the type is set to |
| null. This behavior can be avoided by constructing an explicit schema |
| and passing it to this function. |
| |
| Parameters |
| ---------- |
| df : pandas.DataFrame |
| schema : pyarrow.Schema, optional |
| The expected schema of the Arrow Table. This can be used to |
| indicate the type of columns if we cannot infer it automatically. |
| If passed, the output will have exactly this schema. Columns |
| specified in the schema that are not found in the DataFrame columns |
| or its index will raise an error. Additional columns or index |
| levels in the DataFrame which are not specified in the schema will |
| be ignored. |
| preserve_index : bool, optional |
| Whether to store the index as an additional column in the resulting |
| ``Table``. The default of None will store the index as a column, |
| except for RangeIndex which is stored as metadata only. Use |
| ``preserve_index=True`` to force it to be stored as a column. |
| nthreads : int, default None (may use up to system CPU count threads) |
| If greater than 1, convert columns to Arrow in parallel using |
| indicated number of threads. |
| columns : list, optional |
| List of column to be converted. If None, use all columns. |
| safe : bool, default True |
| Check for overflows or other unsafe conversions. |
| |
| Returns |
| ------- |
| Table |
| |
| Examples |
| -------- |
| |
| >>> import pandas as pd |
| >>> import pyarrow as pa |
| >>> df = pd.DataFrame({ |
| ... 'int': [1, 2], |
| ... 'str': ['a', 'b'] |
| ... }) |
| >>> pa.Table.from_pandas(df) |
| <pyarrow.lib.Table object at 0x7f05d1fb1b40> |
| """ |
| from pyarrow.pandas_compat import dataframe_to_arrays |
| arrays, schema = dataframe_to_arrays( |
| df, |
| schema=schema, |
| preserve_index=preserve_index, |
| nthreads=nthreads, |
| columns=columns, |
| safe=safe |
| ) |
| return cls.from_arrays(arrays, schema=schema) |
| |
| @staticmethod |
| def from_arrays(arrays, names=None, schema=None, metadata=None): |
| """ |
| Construct a Table from Arrow arrays. |
| |
| Parameters |
| ---------- |
| arrays : list of pyarrow.Array or pyarrow.ChunkedArray |
| Equal-length arrays that should form the table. |
| names : list of str, optional |
| Names for the table columns. If not passed, schema must be passed. |
| schema : Schema, default None |
| Schema for the created table. If not passed, names must be passed. |
| metadata : dict or Mapping, default None |
| Optional metadata for the schema (if inferred). |
| |
| Returns |
| ------- |
| Table |
| """ |
| cdef: |
| vector[shared_ptr[CChunkedArray]] columns |
| shared_ptr[CSchema] c_schema |
| int i, K = <int> len(arrays) |
| |
| converted_arrays = _sanitize_arrays(arrays, names, schema, metadata, |
| &c_schema) |
| |
| columns.reserve(K) |
| for item in converted_arrays: |
| if isinstance(item, Array): |
| columns.push_back( |
| make_shared[CChunkedArray]( |
| (<Array> item).sp_array |
| ) |
| ) |
| elif isinstance(item, ChunkedArray): |
| columns.push_back((<ChunkedArray> item).sp_chunked_array) |
| else: |
| raise TypeError(type(item)) |
| |
| result = pyarrow_wrap_table(CTable.Make(c_schema, columns)) |
| result.validate() |
| return result |
| |
| @staticmethod |
| def from_pydict(mapping, schema=None, metadata=None): |
| """ |
| Construct a Table from Arrow arrays or columns. |
| |
| Parameters |
| ---------- |
| mapping : dict or Mapping |
| A mapping of strings to Arrays or Python lists. |
| schema : Schema, default None |
| If not passed, will be inferred from the Mapping values. |
| metadata : dict or Mapping, default None |
| Optional metadata for the schema (if inferred). |
| |
| Returns |
| ------- |
| Table |
| """ |
| arrays = [] |
| if schema is None: |
| names = [] |
| for k, v in mapping.items(): |
| names.append(k) |
| arrays.append(asarray(v)) |
| return Table.from_arrays(arrays, names, metadata=metadata) |
| elif isinstance(schema, Schema): |
| for field in schema: |
| try: |
| v = mapping[field.name] |
| except KeyError: |
| try: |
| v = mapping[tobytes(field.name)] |
| except KeyError: |
| present = mapping.keys() |
| missing = [n for n in schema.names if n not in present] |
| raise KeyError( |
| "The passed mapping doesn't contain the " |
| "following field(s) of the schema: {}". |
| format(', '.join(missing)) |
| ) |
| arrays.append(asarray(v, type=field.type)) |
| # Will raise if metadata is not None |
| return Table.from_arrays(arrays, schema=schema, metadata=metadata) |
| else: |
| raise TypeError('Schema must be an instance of pyarrow.Schema') |
| |
| @staticmethod |
| def from_batches(batches, Schema schema=None): |
| """ |
| Construct a Table from a sequence or iterator of Arrow RecordBatches. |
| |
| Parameters |
| ---------- |
| batches : sequence or iterator of RecordBatch |
| Sequence of RecordBatch to be converted, all schemas must be equal. |
| schema : Schema, default None |
| If not passed, will be inferred from the first RecordBatch. |
| |
| Returns |
| ------- |
| Table |
| """ |
| cdef: |
| vector[shared_ptr[CRecordBatch]] c_batches |
| shared_ptr[CTable] c_table |
| shared_ptr[CSchema] c_schema |
| RecordBatch batch |
| |
| for batch in batches: |
| c_batches.push_back(batch.sp_batch) |
| |
| if schema is None: |
| if c_batches.size() == 0: |
| raise ValueError('Must pass schema, or at least ' |
| 'one RecordBatch') |
| c_schema = c_batches[0].get().schema() |
| else: |
| c_schema = schema.sp_schema |
| |
| with nogil: |
| c_table = GetResultValue( |
| CTable.FromRecordBatches(c_schema, move(c_batches))) |
| |
| return pyarrow_wrap_table(c_table) |
| |
| def to_batches(self, max_chunksize=None, **kwargs): |
| """ |
| Convert Table to list of (contiguous) RecordBatch objects. |
| |
| Parameters |
| ---------- |
| max_chunksize : int, default None |
| Maximum size for RecordBatch chunks. Individual chunks may be |
| smaller depending on the chunk layout of individual columns. |
| |
| Returns |
| ------- |
| list of RecordBatch |
| """ |
| cdef: |
| unique_ptr[TableBatchReader] reader |
| int64_t c_max_chunksize |
| list result = [] |
| shared_ptr[CRecordBatch] batch |
| |
| reader.reset(new TableBatchReader(deref(self.table))) |
| |
| if 'chunksize' in kwargs: |
| max_chunksize = kwargs['chunksize'] |
| msg = ('The parameter chunksize is deprecated for ' |
| 'pyarrow.Table.to_batches as of 0.15, please use ' |
| 'the parameter max_chunksize instead') |
| warnings.warn(msg, FutureWarning) |
| |
| if max_chunksize is not None: |
| c_max_chunksize = max_chunksize |
| reader.get().set_chunksize(c_max_chunksize) |
| |
| while True: |
| with nogil: |
| check_status(reader.get().ReadNext(&batch)) |
| |
| if batch.get() == NULL: |
| break |
| |
| result.append(pyarrow_wrap_batch(batch)) |
| |
| return result |
| |
| def _to_pandas(self, options, categories=None, ignore_metadata=False, |
| types_mapper=None): |
| from pyarrow.pandas_compat import table_to_blockmanager |
| mgr = table_to_blockmanager( |
| options, self, categories, |
| ignore_metadata=ignore_metadata, |
| types_mapper=types_mapper) |
| return pandas_api.data_frame(mgr) |
| |
| def to_pydict(self): |
| """ |
| Convert the Table to a dict or OrderedDict. |
| |
| Returns |
| ------- |
| dict |
| """ |
| cdef: |
| size_t i |
| size_t num_columns = self.table.num_columns() |
| list entries = [] |
| ChunkedArray column |
| |
| for i in range(num_columns): |
| column = self.column(i) |
| entries.append((self.field(i).name, column.to_pylist())) |
| |
| return ordered_dict(entries) |
| |
| @property |
| def schema(self): |
| """ |
| Schema of the table and its columns. |
| |
| Returns |
| ------- |
| Schema |
| """ |
| return pyarrow_wrap_schema(self.table.schema()) |
| |
| def field(self, i): |
| """ |
| Select a schema field by its column name or numeric index. |
| |
| Parameters |
| ---------- |
| i : int or string |
| The index or name of the field to retrieve. |
| |
| Returns |
| ------- |
| Field |
| """ |
| return self.schema.field(i) |
| |
| def _ensure_integer_index(self, i): |
| """ |
| Ensure integer index (convert string column name to integer if needed). |
| """ |
| if isinstance(i, (bytes, str)): |
| field_indices = self.schema.get_all_field_indices(i) |
| |
| if len(field_indices) == 0: |
| raise KeyError("Field \"{}\" does not exist in table schema" |
| .format(i)) |
| elif len(field_indices) > 1: |
| raise KeyError("Field \"{}\" exists {} times in table schema" |
| .format(i, len(field_indices))) |
| else: |
| return field_indices[0] |
| elif isinstance(i, int): |
| return i |
| else: |
| raise TypeError("Index must either be string or integer") |
| |
| def column(self, i): |
| """ |
| Select a column by its column name, or numeric index. |
| |
| Parameters |
| ---------- |
| i : int or string |
| The index or name of the column to retrieve. |
| |
| Returns |
| ------- |
| ChunkedArray |
| """ |
| return self._column(self._ensure_integer_index(i)) |
| |
| def _column(self, int i): |
| """ |
| Select a column by its numeric index. |
| |
| Parameters |
| ---------- |
| i : int |
| The index of the column to retrieve. |
| |
| Returns |
| ------- |
| ChunkedArray |
| """ |
| cdef int index = <int> _normalize_index(i, self.num_columns) |
| cdef ChunkedArray result = pyarrow_wrap_chunked_array( |
| self.table.column(index)) |
| result._name = self.schema[index].name |
| return result |
| |
| def itercolumns(self): |
| """ |
| Iterator over all columns in their numerical order. |
| |
| Yields |
| ------ |
| ChunkedArray |
| """ |
| for i in range(self.num_columns): |
| yield self._column(i) |
| |
| @property |
| def columns(self): |
| """ |
| List of all columns in numerical order. |
| |
| Returns |
| ------- |
| list of ChunkedArray |
| """ |
| return [self._column(i) for i in range(self.num_columns)] |
| |
| @property |
| def num_columns(self): |
| """ |
| Number of columns in this table. |
| |
| Returns |
| ------- |
| int |
| """ |
| return self.table.num_columns() |
| |
| @property |
| def num_rows(self): |
| """ |
| Number of rows in this table. |
| |
| Due to the definition of a table, all columns have the same number of |
| rows. |
| |
| Returns |
| ------- |
| int |
| """ |
| return self.table.num_rows() |
| |
| def __len__(self): |
| return self.num_rows |
| |
| @property |
| def shape(self): |
| """ |
| Dimensions of the table: (#rows, #columns). |
| |
| Returns |
| ------- |
| (int, int) |
| Number of rows and number of columns. |
| """ |
| return (self.num_rows, self.num_columns) |
| |
| @property |
| def nbytes(self): |
| """ |
| Total number of bytes consumed by the elements of the table. |
| |
| Returns |
| ------- |
| int |
| """ |
| size = 0 |
| for column in self.itercolumns(): |
| size += column.nbytes |
| return size |
| |
| def __sizeof__(self): |
| return super(Table, self).__sizeof__() + self.nbytes |
| |
| def add_column(self, int i, field_, column): |
| """ |
| Add column to Table at position. |
| |
| A new table is returned with the column added, the original table |
| object is left unchanged. |
| |
| Parameters |
| ---------- |
| i : int |
| Index to place the column at. |
| field_ : str or Field |
| If a string is passed then the type is deduced from the column |
| data. |
| column : Array, list of Array, or values coercible to arrays |
| Column data. |
| |
| Returns |
| ------- |
| Table |
| New table with the passed column added. |
| """ |
| cdef: |
| shared_ptr[CTable] c_table |
| Field c_field |
| ChunkedArray c_arr |
| |
| if isinstance(column, ChunkedArray): |
| c_arr = column |
| else: |
| c_arr = chunked_array(column) |
| |
| if isinstance(field_, Field): |
| c_field = field_ |
| else: |
| c_field = field(field_, c_arr.type) |
| |
| with nogil: |
| c_table = GetResultValue(self.table.AddColumn( |
| i, c_field.sp_field, c_arr.sp_chunked_array)) |
| |
| return pyarrow_wrap_table(c_table) |
| |
| def append_column(self, field_, column): |
| """ |
| Append column at end of columns. |
| |
| Parameters |
| ---------- |
| field_ : str or Field |
| If a string is passed then the type is deduced from the column |
| data. |
| column : Array, list of Array, or values coercible to arrays |
| Column data. |
| |
| Returns |
| ------- |
| Table |
| New table with the passed column added. |
| """ |
| return self.add_column(self.num_columns, field_, column) |
| |
| def remove_column(self, int i): |
| """ |
| Create new Table with the indicated column removed. |
| |
| Parameters |
| ---------- |
| i : int |
| Index of column to remove. |
| |
| Returns |
| ------- |
| Table |
| New table without the column. |
| """ |
| cdef shared_ptr[CTable] c_table |
| |
| with nogil: |
| c_table = GetResultValue(self.table.RemoveColumn(i)) |
| |
| return pyarrow_wrap_table(c_table) |
| |
| def set_column(self, int i, field_, column): |
| """ |
| Replace column in Table at position. |
| |
| Parameters |
| ---------- |
| i : int |
| Index to place the column at. |
| field_ : str or Field |
| If a string is passed then the type is deduced from the column |
| data. |
| column : Array, list of Array, or values coercible to arrays |
| Column data. |
| |
| Returns |
| ------- |
| Table |
| New table with the passed column set. |
| """ |
| cdef: |
| shared_ptr[CTable] c_table |
| Field c_field |
| ChunkedArray c_arr |
| |
| if isinstance(column, ChunkedArray): |
| c_arr = column |
| else: |
| c_arr = chunked_array(column) |
| |
| if isinstance(field_, Field): |
| c_field = field_ |
| else: |
| c_field = field(field_, c_arr.type) |
| |
| with nogil: |
| c_table = GetResultValue(self.table.SetColumn( |
| i, c_field.sp_field, c_arr.sp_chunked_array)) |
| |
| return pyarrow_wrap_table(c_table) |
| |
| @property |
| def column_names(self): |
| """ |
| Names of the table's columns. |
| |
| Returns |
| ------- |
| list of str |
| """ |
| names = self.table.ColumnNames() |
| return [frombytes(name) for name in names] |
| |
| def rename_columns(self, names): |
| """ |
| Create new table with columns renamed to provided names. |
| |
| Parameters |
| ---------- |
| names : list of str |
| List of new column names. |
| |
| Returns |
| ------- |
| Table |
| """ |
| cdef: |
| shared_ptr[CTable] c_table |
| vector[c_string] c_names |
| |
| for name in names: |
| c_names.push_back(tobytes(name)) |
| |
| with nogil: |
| c_table = GetResultValue(self.table.RenameColumns(move(c_names))) |
| |
| return pyarrow_wrap_table(c_table) |
| |
| def drop(self, columns): |
| """ |
| Drop one or more columns and return a new table. |
| |
| Parameters |
| ---------- |
| columns : list of str |
| List of field names referencing existing columns. |
| |
| Raises |
| ------ |
| KeyError |
| If any of the passed columns name are not existing. |
| |
| Returns |
| ------- |
| Table |
| New table without the columns. |
| """ |
| indices = [] |
| for col in columns: |
| idx = self.schema.get_field_index(col) |
| if idx == -1: |
| raise KeyError("Column {!r} not found".format(col)) |
| indices.append(idx) |
| |
| indices.sort() |
| indices.reverse() |
| |
| table = self |
| for idx in indices: |
| table = table.remove_column(idx) |
| |
| return table |
| |
| |
| def _reconstruct_table(arrays, schema): |
| """ |
| Internal: reconstruct pa.Table from pickled components. |
| """ |
| return Table.from_arrays(arrays, schema=schema) |
| |
| |
| def record_batch(data, names=None, schema=None, metadata=None): |
| """ |
| Create a pyarrow.RecordBatch from another Python data structure or sequence |
| of arrays. |
| |
| Parameters |
| ---------- |
| data : pandas.DataFrame, list |
| A DataFrame or list of arrays or chunked arrays. |
| names : list, default None |
| Column names if list of arrays passed as data. Mutually exclusive with |
| 'schema' argument. |
| schema : Schema, default None |
| The expected schema of the RecordBatch. If not passed, will be inferred |
| from the data. Mutually exclusive with 'names' argument. |
| metadata : dict or Mapping, default None |
| Optional metadata for the schema (if schema not passed). |
| |
| Returns |
| ------- |
| RecordBatch |
| |
| See Also |
| -------- |
| RecordBatch.from_arrays, RecordBatch.from_pandas, table |
| """ |
| # accept schema as first argument for backwards compatibility / usability |
| if isinstance(names, Schema) and schema is None: |
| schema = names |
| names = None |
| |
| if isinstance(data, (list, tuple)): |
| return RecordBatch.from_arrays(data, names=names, schema=schema, |
| metadata=metadata) |
| elif _pandas_api.is_data_frame(data): |
| return RecordBatch.from_pandas(data, schema=schema) |
| else: |
| raise TypeError("Expected pandas DataFrame or list of arrays") |
| |
| |
| def table(data, names=None, schema=None, metadata=None, nthreads=None): |
| """ |
| Create a pyarrow.Table from a Python data structure or sequence of arrays. |
| |
| Parameters |
| ---------- |
| data : pandas.DataFrame, dict, list |
| A DataFrame, mapping of strings to Arrays or Python lists, or list of |
| arrays or chunked arrays. |
| names : list, default None |
| Column names if list of arrays passed as data. Mutually exclusive with |
| 'schema' argument. |
| schema : Schema, default None |
| The expected schema of the Arrow Table. If not passed, will be inferred |
| from the data. Mutually exclusive with 'names' argument. |
| If passed, the output will have exactly this schema (raising an error |
| when columns are not found in the data and ignoring additional data not |
| specified in the schema, when data is a dict or DataFrame). |
| metadata : dict or Mapping, default None |
| Optional metadata for the schema (if schema not passed). |
| nthreads : int, default None (may use up to system CPU count threads) |
| For pandas.DataFrame inputs: if greater than 1, convert columns to |
| Arrow in parallel using indicated number of threads. |
| |
| Returns |
| ------- |
| Table |
| |
| See Also |
| -------- |
| Table.from_arrays, Table.from_pandas, Table.from_pydict |
| """ |
| # accept schema as first argument for backwards compatibility / usability |
| if isinstance(names, Schema) and schema is None: |
| schema = names |
| names = None |
| |
| if isinstance(data, (list, tuple)): |
| return Table.from_arrays(data, names=names, schema=schema, |
| metadata=metadata) |
| elif isinstance(data, dict): |
| if names is not None: |
| raise ValueError( |
| "The 'names' argument is not valid when passing a dictionary") |
| return Table.from_pydict(data, schema=schema, metadata=metadata) |
| elif _pandas_api.is_data_frame(data): |
| if names is not None or metadata is not None: |
| raise ValueError( |
| "The 'names' and 'metadata' arguments are not valid when " |
| "passing a pandas DataFrame") |
| return Table.from_pandas(data, schema=schema, nthreads=nthreads) |
| else: |
| raise TypeError( |
| "Expected pandas DataFrame, python dictionary or list of arrays") |
| |
| |
| def concat_tables(tables, c_bool promote=False, MemoryPool memory_pool=None): |
| """ |
| Concatenate pyarrow.Table objects. |
| |
| If promote==False, a zero-copy concatenation will be performed. The schemas |
| of all the Tables must be the same (except the metadata), otherwise an |
| exception will be raised. The result Table will share the metadata with the |
| first table. |
| |
| If promote==True, any null type arrays will be casted to the type of other |
| arrays in the column of the same name. If a table is missing a particular |
| field, null values of the appropriate type will be generated to take the |
| place of the missing field. The new schema will share the metadata with the |
| first table. Each field in the new schema will share the metadata with the |
| first table which has the field defined. Note that type promotions may |
| involve additional allocations on the given ``memory_pool``. |
| |
| Parameters |
| ---------- |
| tables : iterable of pyarrow.Table objects |
| Pyarrow tables to concatenate into a single Table. |
| promote: bool, default False |
| If True, concatenate tables with null-filling and null type promotion. |
| memory_pool : MemoryPool, default None |
| For memory allocations, if required, otherwise use default pool. |
| """ |
| cdef: |
| vector[shared_ptr[CTable]] c_tables |
| shared_ptr[CTable] c_result_table |
| CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) |
| Table table |
| CConcatenateTablesOptions options = ( |
| CConcatenateTablesOptions.Defaults()) |
| |
| for table in tables: |
| c_tables.push_back(table.sp_table) |
| |
| with nogil: |
| options.unify_schemas = promote |
| c_result_table = GetResultValue( |
| ConcatenateTables(c_tables, options, pool)) |
| |
| return pyarrow_wrap_table(c_result_table) |