| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <limits> |
| #include <optional> |
| #include <tuple> |
| |
| #include <gtest/gtest-param-test.h> |
| #include <gtest/gtest.h> |
| #include <nanoarrow/nanoarrow.hpp> |
| |
| #include "postgres_copy_test_common.h" |
| #include "postgresql/copy/writer.h" |
| #include "postgresql/database.h" |
| #include "validation/adbc_validation_util.h" |
| |
| using adbc_validation::IsOkStatus; |
| |
| namespace adbcpq { |
| |
| class PostgresCopyStreamWriteTester { |
| public: |
| ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array, |
| const PostgresTypeResolver& type_resolver, |
| struct ArrowError* error = nullptr) { |
| NANOARROW_RETURN_NOT_OK(writer_.Init(schema)); |
| NANOARROW_RETURN_NOT_OK(writer_.InitFieldWriters(type_resolver, error)); |
| NANOARROW_RETURN_NOT_OK(writer_.SetArray(array)); |
| return NANOARROW_OK; |
| } |
| |
| ArrowErrorCode WriteAll(struct ArrowError* error) { |
| NANOARROW_RETURN_NOT_OK(writer_.WriteHeader(error)); |
| |
| int result; |
| do { |
| result = writer_.WriteRecord(error); |
| } while (result == NANOARROW_OK); |
| |
| return result; |
| } |
| |
| ArrowErrorCode WriteArray(struct ArrowArray* array, struct ArrowError* error) { |
| writer_.SetArray(array); |
| int result; |
| do { |
| result = writer_.WriteRecord(error); |
| } while (result == NANOARROW_OK); |
| |
| return result; |
| } |
| |
| const struct ArrowBuffer& WriteBuffer() const { return writer_.WriteBuffer(); } |
| |
| void Rewind() { writer_.Rewind(); } |
| |
| private: |
| PostgresCopyStreamWriter writer_; |
| }; |
| |
| static AdbcStatusCode SetupDatabase(struct AdbcDatabase* database, |
| struct AdbcError* error) { |
| const char* uri = std::getenv("ADBC_POSTGRESQL_TEST_URI"); |
| if (!uri) { |
| ADD_FAILURE() << "Must provide env var ADBC_POSTGRESQL_TEST_URI"; |
| return ADBC_STATUS_INVALID_ARGUMENT; |
| } |
| return AdbcDatabaseSetOption(database, "uri", uri, error); |
| } |
| |
| class PostgresCopyTest : public ::testing::Test { |
| public: |
| void SetUp() override { |
| ASSERT_THAT(AdbcDatabaseNew(&database_, &error_), IsOkStatus(&error_)); |
| ASSERT_THAT(SetupDatabase(&database_, &error_), IsOkStatus(&error_)); |
| ASSERT_THAT(AdbcDatabaseInit(&database_, &error_), IsOkStatus(&error_)); |
| |
| const auto pg_db = |
| *reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database_.private_data); |
| type_resolver_ = pg_db->type_resolver(); |
| } |
| void TearDown() override { |
| if (database_.private_data) { |
| ASSERT_THAT(AdbcDatabaseRelease(&database_, &error_), IsOkStatus(&error_)); |
| } |
| |
| if (error_.release) error_.release(&error_); |
| } |
| |
| protected: |
| struct AdbcError error_ = {}; |
| struct AdbcDatabase database_ = {}; |
| std::shared_ptr<PostgresTypeResolver> type_resolver_; |
| }; |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteBoolean) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| adbc_validation::Handle<struct ArrowBuffer> buffer; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_BOOL}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<bool>(&schema.value, &array.value, &na_error, |
| {true, false, std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyBoolean) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyBoolean[i]); |
| } |
| } |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteInt8) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_INT8}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<int8_t>(&schema.value, &array.value, &na_error, |
| {-123, -1, 1, 123, std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopySmallInt) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopySmallInt[i]); |
| } |
| } |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteInt16) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_INT16}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<int16_t>(&schema.value, &array.value, &na_error, |
| {-123, -1, 1, 123, std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopySmallInt) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopySmallInt[i]); |
| } |
| } |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteInt32) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_INT32}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<int32_t>(&schema.value, &array.value, &na_error, |
| {-123, -1, 1, 123, std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyInteger) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyInteger[i]); |
| } |
| } |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteInt64) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_INT64}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<int64_t>(&schema.value, &array.value, &na_error, |
| {-123, -1, 1, 123, std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyBigInt) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyBigInt[i]); |
| } |
| } |
| |
| // COPY (SELECT CAST("col" AS SMALLINT) AS "col" FROM ( VALUES (0), (255), |
| // (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary); |
| static const uint8_t kTestPgCopyUInt8[] = { |
| 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, |
| 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, |
| 0x00, 0xff, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteUInt8) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_UINT8}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<uint8_t>( |
| &schema.value, &array.value, &na_error, |
| {0, (std::numeric_limits<uint8_t>::max)(), std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyUInt8) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyUInt8[i]); |
| } |
| } |
| |
| // COPY (SELECT CAST("col" AS INTEGER) AS "col" FROM ( VALUES (0), (65535), |
| // (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary); |
| static const uint8_t kTestPgCopyUInt16[] = { |
| 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, |
| 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, |
| 0x00, 0xff, 0xff, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteUInt16) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_UINT16}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<uint16_t>( |
| &schema.value, &array.value, &na_error, |
| {0, (std::numeric_limits<uint16_t>::max)(), std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyUInt16) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyUInt16[i]); |
| } |
| } |
| |
| // COPY (SELECT CAST("col" AS BIGINT) AS "col" FROM ( VALUES (0), (2^32-1), |
| // (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary); |
| static const uint8_t kTestPgCopyUInt32[] = { |
| 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, |
| 0x00, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteUInt32) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_UINT32}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<uint32_t>( |
| &schema.value, &array.value, &na_error, |
| {0, (std::numeric_limits<uint32_t>::max)(), std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyUInt32) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyUInt32[i]); |
| } |
| } |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteReal) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_FLOAT}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<float>(&schema.value, &array.value, &na_error, |
| {-123.456, -1, 1, 123.456, std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyReal) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyReal[i]) << " mismatch at index: " << i; |
| } |
| } |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteDoublePrecision) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_DOUBLE}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<double>(&schema.value, &array.value, &na_error, |
| {-123.456, -1, 1, 123.456, std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyDoublePrecision) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyDoublePrecision[i]); |
| } |
| } |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteDate) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_DATE32}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<int32_t>(&schema.value, &array.value, &na_error, |
| {-25567, 47482, std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyDate) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyDate[i]); |
| } |
| } |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteTime) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| |
| const enum ArrowTimeUnit unit = NANOARROW_TIME_UNIT_MICRO; |
| const auto values = |
| std::vector<std::optional<int64_t>>{0, 86399000000, 49376123456, std::nullopt}; |
| |
| ArrowSchemaInit(&schema.value); |
| ArrowSchemaSetTypeStruct(&schema.value, 1); |
| ArrowSchemaSetTypeDateTime(schema->children[0], NANOARROW_TYPE_TIME64, unit, nullptr); |
| ArrowSchemaSetName(schema->children[0], "col"); |
| ASSERT_EQ( |
| adbc_validation::MakeBatch<int64_t>(&schema.value, &array.value, &na_error, values), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyTime) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyTime[i]); |
| } |
| } |
| |
| // This buffer is similar to the read variant above but removes special values |
| // nan, ±inf as they are not supported via the Arrow Decimal types |
| // COPY (SELECT CAST(col AS NUMERIC) AS col FROM ( VALUES (NULL), (-123.456), |
| // ('0.00001234'), (1.0000), (123.456), (1000000)) AS drvd(col)) |
| // TO STDOUT WITH (FORMAT binary); |
| static uint8_t kTestPgCopyNumericWrite[] = { |
| 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x00, |
| 0x00, 0x00, 0x0c, 0x00, 0x02, 0x00, 0x00, 0x40, 0x00, 0x00, 0x03, 0x00, 0x7b, 0x11, |
| 0xd0, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x01, 0xff, 0xfe, 0x00, 0x00, 0x00, |
| 0x08, 0x04, 0xd2, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x01, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x02, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x03, 0x00, 0x7b, 0x11, 0xd0, 0x00, 0x01, 0x00, 0x00, 0x00, |
| 0x0a, 0x00, 0x01, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64, 0xff, 0xff}; |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteNumeric) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| constexpr enum ArrowType type = NANOARROW_TYPE_DECIMAL128; |
| constexpr int32_t size = 128; |
| constexpr int32_t precision = 38; |
| constexpr int32_t scale = 8; |
| |
| struct ArrowDecimal decimal1; |
| struct ArrowDecimal decimal2; |
| struct ArrowDecimal decimal3; |
| struct ArrowDecimal decimal4; |
| struct ArrowDecimal decimal5; |
| |
| ArrowDecimalInit(&decimal1, size, 19, 8); |
| ArrowDecimalSetInt(&decimal1, -12345600000); |
| ArrowDecimalInit(&decimal2, size, 19, 8); |
| ArrowDecimalSetInt(&decimal2, 1234); |
| ArrowDecimalInit(&decimal3, size, 19, 8); |
| ArrowDecimalSetInt(&decimal3, 100000000); |
| ArrowDecimalInit(&decimal4, size, 19, 8); |
| ArrowDecimalSetInt(&decimal4, 12345600000); |
| ArrowDecimalInit(&decimal5, size, 19, 8); |
| ArrowDecimalSetInt(&decimal5, 100000000000000); |
| |
| const std::vector<std::optional<ArrowDecimal*>> values = { |
| std::nullopt, &decimal1, &decimal2, &decimal3, &decimal4, &decimal5}; |
| |
| ArrowSchemaInit(&schema.value); |
| ASSERT_EQ(ArrowSchemaSetTypeStruct(&schema.value, 1), 0); |
| ASSERT_EQ(ArrowSchemaSetTypeDecimal(schema.value.children[0], type, precision, scale), |
| 0); |
| ASSERT_EQ(ArrowSchemaSetName(schema.value.children[0], "col"), 0); |
| ASSERT_EQ(adbc_validation::MakeBatch<ArrowDecimal*>(&schema.value, &array.value, |
| &na_error, values), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyNumericWrite) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyNumericWrite[i]) << " at position " << i; |
| } |
| } |
| |
| using TimestampTestParamType = |
| std::tuple<enum ArrowTimeUnit, const char*, std::vector<std::optional<int64_t>>>; |
| |
| class PostgresCopyWriteTimestampTest |
| : public testing::TestWithParam<TimestampTestParamType> { |
| void SetUp() override { |
| ASSERT_THAT(AdbcDatabaseNew(&database_, &error_), IsOkStatus(&error_)); |
| ASSERT_THAT(SetupDatabase(&database_, &error_), IsOkStatus(&error_)); |
| ASSERT_THAT(AdbcDatabaseInit(&database_, &error_), IsOkStatus(&error_)); |
| |
| const auto pg_db = |
| *reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database_.private_data); |
| type_resolver_ = pg_db->type_resolver(); |
| } |
| void TearDown() override { |
| if (database_.private_data) { |
| ASSERT_THAT(AdbcDatabaseRelease(&database_, &error_), IsOkStatus(&error_)); |
| } |
| |
| if (error_.release) error_.release(&error_); |
| } |
| |
| protected: |
| struct AdbcError error_ = {}; |
| struct AdbcDatabase database_ = {}; |
| std::shared_ptr<PostgresTypeResolver> type_resolver_; |
| }; |
| |
| TEST_P(PostgresCopyWriteTimestampTest, WritesProperBufferValues) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| |
| TimestampTestParamType parameters = GetParam(); |
| enum ArrowTimeUnit unit = std::get<0>(parameters); |
| const char* timezone = std::get<1>(parameters); |
| |
| const std::vector<std::optional<int64_t>> values = std::get<2>(parameters); |
| |
| ArrowSchemaInit(&schema.value); |
| ArrowSchemaSetTypeStruct(&schema.value, 1); |
| ArrowSchemaSetTypeDateTime(schema->children[0], NANOARROW_TYPE_TIMESTAMP, unit, |
| timezone); |
| ArrowSchemaSetName(schema->children[0], "col"); |
| ASSERT_EQ( |
| adbc_validation::MakeBatch<int64_t>(&schema.value, &array.value, &na_error, values), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyTimestamp) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyTimestamp[i]); |
| } |
| } |
| |
| static const std::vector<TimestampTestParamType> ts_values{ |
| {NANOARROW_TIME_UNIT_SECOND, nullptr, {-2208943504, 4102490096, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_MILLI, nullptr, {-2208943504000, 4102490096000, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_MICRO, |
| nullptr, |
| {-2208943504000000, 4102490096000000, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_NANO, |
| nullptr, |
| {-2208943504000000000, 4102490096000000000, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_SECOND, "UTC", {-2208943504, 4102490096, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_MILLI, "UTC", {-2208943504000, 4102490096000, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_MICRO, |
| "UTC", |
| {-2208943504000000, 4102490096000000, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_NANO, |
| "UTC", |
| {-2208943504000000000, 4102490096000000000, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_SECOND, |
| "America/New_York", |
| {-2208943504, 4102490096, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_MILLI, |
| "America/New_York", |
| {-2208943504000, 4102490096000, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_MICRO, |
| "America/New_York", |
| {-2208943504000000, 4102490096000000, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_NANO, |
| "America/New_York", |
| {-2208943504000000000, 4102490096000000000, std::nullopt}}, |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(PostgresCopyWriteTimestamp, PostgresCopyWriteTimestampTest, |
| testing::ValuesIn(ts_values)); |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteInterval) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| const enum ArrowType type = NANOARROW_TYPE_INTERVAL_MONTH_DAY_NANO; |
| // values are days, months, ns |
| struct ArrowInterval neg_interval; |
| struct ArrowInterval pos_interval; |
| |
| ArrowIntervalInit(&neg_interval, type); |
| ArrowIntervalInit(&pos_interval, type); |
| |
| neg_interval.months = -1; |
| neg_interval.days = -2; |
| neg_interval.ns = -4000000000; |
| |
| pos_interval.months = 1; |
| pos_interval.days = 2; |
| pos_interval.ns = 4000000000; |
| |
| const std::vector<std::optional<ArrowInterval*>> values = {&neg_interval, &pos_interval, |
| std::nullopt}; |
| |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", type}}), ADBC_STATUS_OK); |
| |
| ASSERT_EQ(adbc_validation::MakeBatch<ArrowInterval*>(&schema.value, &array.value, |
| &na_error, values), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyInterval) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyInterval[i]); |
| } |
| } |
| |
| // Writing a DURATION from NANOARROW produces INTERVAL in postgres without day/month |
| // COPY (SELECT CAST(col AS INTERVAL) FROM ( VALUES ('-4 seconds'), |
| // ('4 seconds'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT BINARY); |
| static uint8_t kTestPgCopyDuration[] = { |
| 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, |
| 0x10, 0xff, 0xff, 0xff, 0xff, 0xff, 0xc2, 0xf7, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x10, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x3d, 0x09, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; |
| using DurationTestParamType = |
| std::tuple<enum ArrowTimeUnit, std::vector<std::optional<int64_t>>>; |
| |
| class PostgresCopyWriteDurationTest |
| : public testing::TestWithParam<DurationTestParamType> { |
| void SetUp() override { |
| ASSERT_THAT(AdbcDatabaseNew(&database_, &error_), IsOkStatus(&error_)); |
| ASSERT_THAT(SetupDatabase(&database_, &error_), IsOkStatus(&error_)); |
| ASSERT_THAT(AdbcDatabaseInit(&database_, &error_), IsOkStatus(&error_)); |
| |
| const auto pg_db = |
| *reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database_.private_data); |
| type_resolver_ = pg_db->type_resolver(); |
| } |
| void TearDown() override { |
| if (database_.private_data) { |
| ASSERT_THAT(AdbcDatabaseRelease(&database_, &error_), IsOkStatus(&error_)); |
| } |
| |
| if (error_.release) error_.release(&error_); |
| } |
| |
| protected: |
| struct AdbcError error_ = {}; |
| struct AdbcDatabase database_ = {}; |
| std::shared_ptr<PostgresTypeResolver> type_resolver_; |
| }; |
| |
| TEST_P(PostgresCopyWriteDurationTest, WritesProperBufferValues) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| const enum ArrowType type = NANOARROW_TYPE_DURATION; |
| |
| DurationTestParamType parameters = GetParam(); |
| enum ArrowTimeUnit unit = std::get<0>(parameters); |
| const std::vector<std::optional<int64_t>> values = std::get<1>(parameters); |
| |
| ArrowSchemaInit(&schema.value); |
| ArrowSchemaSetTypeStruct(&schema.value, 1); |
| ArrowSchemaSetTypeDateTime(schema->children[0], type, unit, nullptr); |
| ArrowSchemaSetName(schema->children[0], "col"); |
| ASSERT_EQ( |
| adbc_validation::MakeBatch<int64_t>(&schema.value, &array.value, &na_error, values), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyDuration) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyDuration[i]); |
| } |
| } |
| |
| static const std::vector<DurationTestParamType> duration_params{ |
| {NANOARROW_TIME_UNIT_SECOND, {-4, 4, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_MILLI, {-4000, 4000, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_MICRO, {-4000000, 4000000, std::nullopt}}, |
| {NANOARROW_TIME_UNIT_NANO, {-4000000000, 4000000000, std::nullopt}}, |
| }; |
| |
| INSTANTIATE_TEST_SUITE_P(PostgresCopyWriteDuration, PostgresCopyWriteDurationTest, |
| testing::ValuesIn(duration_params)); |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteString) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_STRING}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<std::string>( |
| &schema.value, &array.value, &na_error, {"abc", "1234", std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyText) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyText[i]); |
| } |
| } |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteLargeString) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ( |
| adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_LARGE_STRING}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<std::string>( |
| &schema.value, &array.value, &na_error, {"abc", "1234", std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyText) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyText[i]); |
| } |
| } |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteBinary) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_BINARY}}), |
| ADBC_STATUS_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<std::vector<std::byte>>( |
| &schema.value, &array.value, &na_error, |
| {std::vector<std::byte>{}, |
| std::vector<std::byte>{std::byte{0x00}, std::byte{0x01}}, |
| std::vector<std::byte>{std::byte{0x01}, std::byte{0x02}, std::byte{0x03}, |
| std::byte{0x04}}, |
| std::vector<std::byte>{std::byte{0xfe}, std::byte{0xff}}, std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyBinary) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyBinary[i]) << "failure at index " << i; |
| } |
| } |
| |
| class PostgresCopyListTest : public testing::TestWithParam<enum ArrowType> { |
| public: |
| void SetUp() override { |
| ASSERT_THAT(AdbcDatabaseNew(&database_, &error_), IsOkStatus(&error_)); |
| ASSERT_THAT(SetupDatabase(&database_, &error_), IsOkStatus(&error_)); |
| ASSERT_THAT(AdbcDatabaseInit(&database_, &error_), IsOkStatus(&error_)); |
| |
| const auto pg_db = |
| *reinterpret_cast<std::shared_ptr<PostgresDatabase>*>(database_.private_data); |
| type_resolver_ = pg_db->type_resolver(); |
| } |
| void TearDown() override { |
| if (database_.private_data) { |
| ASSERT_THAT(AdbcDatabaseRelease(&database_, &error_), IsOkStatus(&error_)); |
| } |
| |
| if (error_.release) error_.release(&error_); |
| } |
| |
| protected: |
| struct AdbcError error_ = {}; |
| struct AdbcDatabase database_ = {}; |
| std::shared_ptr<PostgresTypeResolver> type_resolver_; |
| }; |
| |
| // COPY (SELECT CAST("col" AS SMALLINT ARRAY) AS "col" FROM ( VALUES ('{-123, -1}'), |
| // ('{0, 1, 123}'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary); |
| static const uint8_t kTestPgCopySmallIntegerArray[] = { |
| 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, |
| 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x15, 0x00, 0x00, 0x00, 0x02, 0x00, |
| 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0xff, 0x85, 0x00, 0x00, 0x00, 0x02, 0xff, |
| 0xff, 0x00, 0x01, 0x00, 0x00, 0x00, 0x26, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x15, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, |
| 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x01, 0x00, 0x00, 0x00, |
| 0x02, 0x00, 0x7b, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; |
| |
| TEST_P(PostgresCopyListTest, PostgresCopyWriteListSmallInt) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| |
| ASSERT_EQ(adbc_validation::MakeSchema( |
| &schema.value, {adbc_validation::SchemaField::Nested( |
| "col", GetParam(), {{"item", NANOARROW_TYPE_INT16}})}), |
| ADBC_STATUS_OK); |
| |
| ASSERT_EQ(adbc_validation::MakeBatch<std::vector<int16_t>>( |
| &schema.value, &array.value, &na_error, |
| {std::vector<int16_t>{-123, -1}, std::vector<int16_t>{0, 1, 123}, |
| std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopySmallIntegerArray) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopySmallIntegerArray[i]) << "failure at index " << i; |
| } |
| } |
| |
| TEST_P(PostgresCopyListTest, PostgresCopyWriteListInteger) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| |
| ASSERT_EQ(adbc_validation::MakeSchema( |
| &schema.value, {adbc_validation::SchemaField::Nested( |
| "col", GetParam(), {{"item", NANOARROW_TYPE_INT32}})}), |
| ADBC_STATUS_OK); |
| |
| ASSERT_EQ(adbc_validation::MakeBatch<std::vector<int32_t>>( |
| &schema.value, &array.value, &na_error, |
| {std::vector<int32_t>{-123, -1}, std::vector<int32_t>{0, 1, 123}, |
| std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyIntegerArray) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyIntegerArray[i]) << "failure at index " << i; |
| } |
| } |
| |
| // COPY (SELECT CAST("col" AS BIGINT ARRAY) AS "col" FROM ( VALUES ('{-123, -1}'), ('{0, |
| // 1, 123}'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT binary); |
| static const uint8_t kTestPgCopyBigIntegerArray[] = { |
| 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x2c, 0x00, 0x00, 0x00, |
| 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x02, 0x00, |
| 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, |
| 0x85, 0x00, 0x00, 0x00, 0x08, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, |
| 0x01, 0x00, 0x00, 0x00, 0x38, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, |
| 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x7b, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; |
| |
| TEST_P(PostgresCopyListTest, PostgresCopyWriteListBigInt) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| |
| ASSERT_EQ(adbc_validation::MakeSchema( |
| &schema.value, {adbc_validation::SchemaField::Nested( |
| "col", GetParam(), {{"item", NANOARROW_TYPE_INT64}})}), |
| ADBC_STATUS_OK); |
| |
| ASSERT_EQ(adbc_validation::MakeBatch<std::vector<int64_t>>( |
| &schema.value, &array.value, &na_error, |
| {std::vector<int64_t>{-123, -1}, std::vector<int64_t>{0, 1, 123}, |
| std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyBigIntegerArray) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyBigIntegerArray[i]) << "failure at index " << i; |
| } |
| } |
| |
| // COPY (SELECT CAST("col" AS TEXT ARRAY) AS "col" FROM ( VALUES ('{"foo", "bar"}'), |
| // ('{"baz", "qux", "quux"}'), (NULL)) AS drvd("col")) TO '/tmp/pgout.data' WITH (FORMAT |
| // binary); |
| static const uint8_t kTestPgCopyTextArray[] = { |
| 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x22, 0x00, |
| 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x19, 0x00, 0x00, |
| 0x00, 0x02, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x66, 0x6f, 0x6f, |
| 0x00, 0x00, 0x00, 0x03, 0x62, 0x61, 0x72, 0x00, 0x01, 0x00, 0x00, 0x00, 0x2a, |
| 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x19, 0x00, |
| 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x03, 0x62, 0x61, |
| 0x7a, 0x00, 0x00, 0x00, 0x03, 0x71, 0x75, 0x78, 0x00, 0x00, 0x00, 0x04, 0x71, |
| 0x75, 0x75, 0x78, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; |
| |
| TEST_P(PostgresCopyListTest, PostgresCopyWriteListVarchar) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| |
| ASSERT_EQ( |
| adbc_validation::MakeSchema( |
| &schema.value, {adbc_validation::SchemaField::Nested( |
| "col", GetParam(), {{"item", NANOARROW_TYPE_STRING}})}), |
| ADBC_STATUS_OK); |
| |
| ASSERT_EQ(adbc_validation::MakeBatch<std::vector<std::string>>( |
| &schema.value, &array.value, &na_error, |
| {std::vector<std::string>{"foo", "bar"}, |
| std::vector<std::string>{"baz", "qux", "quux"}, std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyTextArray) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyTextArray[i]) << "failure at index " << i; |
| } |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(ArrowListTypes, PostgresCopyListTest, |
| testing::Values(NANOARROW_TYPE_LIST, NANOARROW_TYPE_LARGE_LIST)); |
| |
| // COPY (SELECT CAST("col" AS INTEGER ARRAY) AS "col" FROM ( VALUES ('{1, 2}'), |
| // ('{-1, -2}'), (NULL)) AS drvd("col")) TO STDOUT WITH (FORMAT BINARY); |
| static const uint8_t kTestPgCopyFixedSizeIntegerArray[] = { |
| 0x50, 0x47, 0x43, 0x4f, 0x50, 0x59, 0x0a, 0xff, 0x0d, 0x0a, 0x00, 0x00, 0x00, 0x00, |
| 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x24, 0x00, 0x00, 0x00, |
| 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17, 0x00, 0x00, 0x00, 0x02, 0x00, |
| 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, |
| 0x04, 0x00, 0x00, 0x00, 0x02, 0x00, 0x01, 0x00, 0x00, 0x00, 0x24, 0x00, 0x00, 0x00, |
| 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x17, 0x00, 0x00, 0x00, 0x02, 0x00, |
| 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, |
| 0x04, 0xff, 0xff, 0xff, 0xfe, 0x00, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}; |
| TEST_F(PostgresCopyTest, PostgresCopyWriteFixedSizeListInteger) { |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| |
| ASSERT_EQ(ArrowSchemaInitFromType(&schema.value, NANOARROW_TYPE_STRUCT), NANOARROW_OK); |
| ASSERT_EQ(ArrowSchemaAllocateChildren(&schema.value, 1), NANOARROW_OK); |
| |
| ArrowSchemaInit(schema->children[0]); |
| ASSERT_EQ( |
| ArrowSchemaSetTypeFixedSize(schema->children[0], NANOARROW_TYPE_FIXED_SIZE_LIST, 2), |
| NANOARROW_OK); |
| ASSERT_EQ(ArrowSchemaSetName(schema->children[0], "col"), NANOARROW_OK); |
| ASSERT_EQ(ArrowSchemaSetType(schema->children[0]->children[0], NANOARROW_TYPE_INT32), |
| NANOARROW_OK); |
| |
| ASSERT_EQ(adbc_validation::MakeBatch<std::vector<int32_t>>( |
| &schema.value, &array.value, &na_error, |
| {std::vector<int32_t>{1, 2}, std::vector<int32_t>{-1, -2}, std::nullopt}), |
| ADBC_STATUS_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| const struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| constexpr size_t buf_size = sizeof(kTestPgCopyFixedSizeIntegerArray) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyFixedSizeIntegerArray[i]) |
| << "failure at index " << i; |
| } |
| } |
| |
| TEST_F(PostgresCopyTest, PostgresCopyWriteMultiBatch) { |
| // Regression test for https://github.com/apache/arrow-adbc/issues/1310 |
| adbc_validation::Handle<struct ArrowSchema> schema; |
| adbc_validation::Handle<struct ArrowArray> array; |
| struct ArrowError na_error; |
| ASSERT_EQ(adbc_validation::MakeSchema(&schema.value, {{"col", NANOARROW_TYPE_INT32}}), |
| NANOARROW_OK); |
| ASSERT_EQ(adbc_validation::MakeBatch<int32_t>(&schema.value, &array.value, &na_error, |
| {-123, -1, 1, 123, std::nullopt}), |
| NANOARROW_OK); |
| |
| PostgresCopyStreamWriteTester tester; |
| ASSERT_EQ(tester.Init(&schema.value, &array.value, *type_resolver_), NANOARROW_OK); |
| ASSERT_EQ(tester.WriteAll(nullptr), ENODATA); |
| |
| struct ArrowBuffer buf = tester.WriteBuffer(); |
| // The last 2 bytes of a message can be transmitted via PQputCopyData |
| // so no need to test those bytes from the Writer |
| size_t buf_size = sizeof(kTestPgCopyInteger) - 2; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyInteger[i]); |
| } |
| |
| tester.Rewind(); |
| ASSERT_EQ(tester.WriteArray(&array.value, nullptr), ENODATA); |
| |
| buf = tester.WriteBuffer(); |
| // Ignore the header and footer |
| buf_size = sizeof(kTestPgCopyInteger) - 21; |
| ASSERT_EQ(buf.size_bytes, buf_size); |
| for (size_t i = 0; i < buf_size; i++) { |
| ASSERT_EQ(buf.data[i], kTestPgCopyInteger[i + 19]); |
| } |
| } |
| |
| } // namespace adbcpq |