| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import ctypes |
| import hypothesis as h |
| import hypothesis.strategies as st |
| |
| import pytest |
| try: |
| import numpy as np |
| except ImportError: |
| np = None |
| import pyarrow as pa |
| import pyarrow.tests.strategies as past |
| |
| |
| all_types = st.deferred( |
| lambda: ( |
| past.signed_integer_types | |
| past.unsigned_integer_types | |
| past.floating_types | |
| past.bool_type | |
| past.string_type | |
| past.large_string_type |
| ) |
| ) |
| |
| |
| # datetime is tested in test_extra.py |
| # dictionary is tested in test_categorical() |
| @pytest.mark.numpy |
| @h.settings(suppress_health_check=(h.HealthCheck.too_slow,)) |
| @h.given(past.arrays(all_types, size=3)) |
| def test_dtypes(arr): |
| table = pa.table([arr], names=["a"]) |
| df = table.__dataframe__() |
| |
| null_count = df.get_column(0).null_count |
| assert null_count == arr.null_count |
| assert isinstance(null_count, int) |
| assert df.get_column(0).size() == 3 |
| assert df.get_column(0).offset == 0 |
| |
| |
| @pytest.mark.numpy |
| @pytest.mark.parametrize( |
| "uint, uint_bw", |
| [ |
| (pa.uint8(), 8), |
| (pa.uint16(), 16), |
| (pa.uint32(), 32) |
| ] |
| ) |
| @pytest.mark.parametrize( |
| "int, int_bw", [ |
| (pa.int8(), 8), |
| (pa.int16(), 16), |
| (pa.int32(), 32), |
| (pa.int64(), 64) |
| ] |
| ) |
| @pytest.mark.parametrize( |
| "float, float_bw, np_float_str", [ |
| (pa.float16(), 16, "float16"), |
| (pa.float32(), 32, "float32"), |
| (pa.float64(), 64, "float64") |
| ] |
| ) |
| @pytest.mark.parametrize("unit", ['s', 'ms', 'us', 'ns']) |
| @pytest.mark.parametrize("tz", ['', 'America/New_York', '+07:30', '-04:30']) |
| @pytest.mark.parametrize("use_batch", [False, True]) |
| def test_mixed_dtypes(uint, uint_bw, int, int_bw, |
| float, float_bw, np_float_str, unit, tz, |
| use_batch): |
| from datetime import datetime as dt |
| arr = [1, 2, 3] |
| dt_arr = [dt(2007, 7, 13), dt(2007, 7, 14), dt(2007, 7, 15)] |
| table = pa.table( |
| { |
| "a": pa.array(arr, type=uint), |
| "b": pa.array(arr, type=int), |
| "c": pa.array(np.array(arr, dtype=np.dtype(np_float_str)), type=float), |
| "d": [True, False, True], |
| "e": ["a", "", "c"], |
| "f": pa.array(dt_arr, type=pa.timestamp(unit, tz=tz)) |
| } |
| ) |
| if use_batch: |
| table = table.to_batches()[0] |
| df = table.__dataframe__() |
| # 0 = DtypeKind.INT, 1 = DtypeKind.UINT, 2 = DtypeKind.FLOAT, |
| # 20 = DtypeKind.BOOL, 21 = DtypeKind.STRING, 22 = DtypeKind.DATETIME |
| # see DtypeKind class in column.py |
| columns = {"a": 1, "b": 0, "c": 2, "d": 20, "e": 21, "f": 22} |
| |
| for column, kind in columns.items(): |
| col = df.get_column_by_name(column) |
| |
| assert col.null_count == 0 |
| assert col.size() == 3 |
| assert col.offset == 0 |
| assert col.dtype[0] == kind |
| |
| assert df.get_column_by_name("a").dtype[1] == uint_bw |
| assert df.get_column_by_name("b").dtype[1] == int_bw |
| assert df.get_column_by_name("c").dtype[1] == float_bw |
| |
| |
| def test_na_float(): |
| table = pa.table({"a": [1.0, None, 2.0]}) |
| df = table.__dataframe__() |
| col = df.get_column_by_name("a") |
| assert col.null_count == 1 |
| assert isinstance(col.null_count, int) |
| |
| |
| def test_noncategorical(): |
| table = pa.table({"a": [1, 2, 3]}) |
| df = table.__dataframe__() |
| col = df.get_column_by_name("a") |
| with pytest.raises(TypeError, match=".*categorical.*"): |
| col.describe_categorical |
| |
| |
| @pytest.mark.parametrize("use_batch", [False, True]) |
| def test_categorical(use_batch): |
| import pyarrow as pa |
| arr = ["Mon", "Tue", "Mon", "Wed", "Mon", "Thu", "Fri", "Sat", None] |
| table = pa.table( |
| {"weekday": pa.array(arr).dictionary_encode()} |
| ) |
| if use_batch: |
| table = table.to_batches()[0] |
| |
| col = table.__dataframe__().get_column_by_name("weekday") |
| categorical = col.describe_categorical |
| assert isinstance(categorical["is_ordered"], bool) |
| assert isinstance(categorical["is_dictionary"], bool) |
| |
| |
| @pytest.mark.parametrize("use_batch", [False, True]) |
| def test_dataframe(use_batch): |
| n = pa.chunked_array([[2, 2, 4], [4, 5, 100]]) |
| a = pa.chunked_array([["Flamingo", "Parrot", "Cow"], |
| ["Horse", "Brittle stars", "Centipede"]]) |
| table = pa.table([n, a], names=['n_legs', 'animals']) |
| if use_batch: |
| table = table.combine_chunks().to_batches()[0] |
| df = table.__dataframe__() |
| |
| assert df.num_columns() == 2 |
| assert df.num_rows() == 6 |
| if use_batch: |
| assert df.num_chunks() == 1 |
| else: |
| assert df.num_chunks() == 2 |
| assert list(df.column_names()) == ['n_legs', 'animals'] |
| assert list(df.select_columns((1,)).column_names()) == list( |
| df.select_columns_by_name(("animals",)).column_names() |
| ) |
| |
| |
| @pytest.mark.parametrize("use_batch", [False, True]) |
| @pytest.mark.parametrize(["size", "n_chunks"], [(10, 3), (12, 3), (12, 5)]) |
| def test_df_get_chunks(use_batch, size, n_chunks): |
| table = pa.table({"x": list(range(size))}) |
| if use_batch: |
| table = table.to_batches()[0] |
| df = table.__dataframe__() |
| chunks = list(df.get_chunks(n_chunks)) |
| assert len(chunks) == n_chunks |
| assert sum(chunk.num_rows() for chunk in chunks) == size |
| |
| |
| @pytest.mark.parametrize("use_batch", [False, True]) |
| @pytest.mark.parametrize(["size", "n_chunks"], [(10, 3), (12, 3), (12, 5)]) |
| def test_column_get_chunks(use_batch, size, n_chunks): |
| table = pa.table({"x": list(range(size))}) |
| if use_batch: |
| table = table.to_batches()[0] |
| df = table.__dataframe__() |
| chunks = list(df.get_column(0).get_chunks(n_chunks)) |
| assert len(chunks) == n_chunks |
| assert sum(chunk.size() for chunk in chunks) == size |
| |
| |
| @pytest.mark.pandas |
| @pytest.mark.parametrize( |
| "uint", [pa.uint8(), pa.uint16(), pa.uint32()] |
| ) |
| @pytest.mark.parametrize( |
| "int", [pa.int8(), pa.int16(), pa.int32(), pa.int64()] |
| ) |
| @pytest.mark.parametrize( |
| "float, np_float_str", [ |
| (pa.float16(), "float16"), |
| (pa.float32(), "float32"), |
| (pa.float64(), "float64") |
| ] |
| ) |
| @pytest.mark.parametrize("use_batch", [False, True]) |
| def test_get_columns(uint, int, float, np_float_str, use_batch): |
| arr = [[1, 2, 3], [4, 5]] |
| arr_float = np.array([1, 2, 3, 4, 5], dtype=np.dtype(np_float_str)) |
| table = pa.table( |
| { |
| "a": pa.chunked_array(arr, type=uint), |
| "b": pa.chunked_array(arr, type=int), |
| "c": pa.array(arr_float, type=float) |
| } |
| ) |
| if use_batch: |
| table = table.combine_chunks().to_batches()[0] |
| df = table.__dataframe__() |
| for col in df.get_columns(): |
| assert col.size() == 5 |
| assert col.num_chunks() == 1 |
| |
| # 0 = DtypeKind.INT, 1 = DtypeKind.UINT, 2 = DtypeKind.FLOAT, |
| # see DtypeKind class in column.py |
| assert df.get_column(0).dtype[0] == 1 # UINT |
| assert df.get_column(1).dtype[0] == 0 # INT |
| assert df.get_column(2).dtype[0] == 2 # FLOAT |
| |
| |
| @pytest.mark.parametrize( |
| "int", [pa.int8(), pa.int16(), pa.int32(), pa.int64()] |
| ) |
| @pytest.mark.parametrize("use_batch", [False, True]) |
| def test_buffer(int, use_batch): |
| arr = [0, 1, -1] |
| table = pa.table({"a": pa.array(arr, type=int)}) |
| if use_batch: |
| table = table.to_batches()[0] |
| df = table.__dataframe__() |
| col = df.get_column(0) |
| buf = col.get_buffers() |
| |
| dataBuf, dataDtype = buf["data"] |
| |
| assert dataBuf.bufsize > 0 |
| assert dataBuf.ptr != 0 |
| device, _ = dataBuf.__dlpack_device__() |
| |
| # 0 = DtypeKind.INT |
| # see DtypeKind class in column.py |
| assert dataDtype[0] == 0 |
| |
| if device == 1: # CPU-only as we're going to directly read memory here |
| bitwidth = dataDtype[1] |
| ctype = { |
| 8: ctypes.c_int8, |
| 16: ctypes.c_int16, |
| 32: ctypes.c_int32, |
| 64: ctypes.c_int64, |
| }[bitwidth] |
| |
| for idx, truth in enumerate(arr): |
| val = ctype.from_address(dataBuf.ptr + idx * (bitwidth // 8)).value |
| assert val == truth, f"Buffer at index {idx} mismatch" |
| |
| |
| @pytest.mark.parametrize( |
| "indices_type, bitwidth, f_string", [ |
| (pa.int8(), 8, "c"), |
| (pa.int16(), 16, "s"), |
| (pa.int32(), 32, "i"), |
| (pa.int64(), 64, "l") |
| ] |
| ) |
| def test_categorical_dtype(indices_type, bitwidth, f_string): |
| type = pa.dictionary(indices_type, pa.string()) |
| arr = pa.array(["a", "b", None, "d"], type) |
| table = pa.table({'a': arr}) |
| |
| df = table.__dataframe__() |
| col = df.get_column(0) |
| assert col.dtype[0] == 23 # <DtypeKind.CATEGORICAL: 23> |
| assert col.dtype[1] == bitwidth |
| assert col.dtype[2] == f_string |