blob: cac2b73ca3e42f53c30847e9b87f3f6bd76753dc [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <iostream>
#include <limits>
#include <memory>
#include <kudu/client/client.h>
#include <kudu/client/write_op.h>
#include "gutil/stl_util.h"
#include "util/kudu-status-util.h"
using kudu::Slice;
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduError;
using kudu::client::KuduInsert;
using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::sp::shared_ptr;
using std::cerr;
using std::endl;
namespace impala {
// Utility program to facilitate complex column insertion into Kudu table.
// This script specifically tailored to insert into Kudu table with following columns:
// (
// id TINYINT PRIMARY KEY,
// array_INT ARRAY<INT>,
// array_TIMESTAMP ARRAY<TIMESTAMP>,
// array_VARCHAR ARRAY<VARCHAR(1)>,
// array_DECIMAL ARRAY<DECIMAL(18,18)>,
// array_DOUBLE ARRAY<DOUBLE>,
// array_BINARY ARRAY<BINARY>,
// array_BOOLEAN ARRAY<BOOLEAN>
// )
//
// The destination table must be empty before this program run.
// Same as in tests/conftest.py
constexpr const char* KUDU_MASTER_DEFAULT_ADDR = "localhost:7051";
const char* KUDU_TEST_TABLE_NAME;
const vector<int32_t> INT32_ARRAY = {
std::numeric_limits<int32_t>::lowest(), -1, std::numeric_limits<int32_t>::max()};
const vector<int64_t> TIMESTAMP_ARRAY = {
-17987443200000000, // See MIN_DATE_AS_UNIX_TIME in be/src/runtime/timestamp-test.cc
-1L,
253402300799999999, // See MAX_DATE_AS_UNIX_TIME in be/src/runtime/timestamp-test.cc
};
// To test multi-byte characters.
const vector<Slice> UTF8_ARRAY = {u8"Σ", u8"π", u8"λ"};
const vector<int64_t> DECIMAL18_ARRAY = {
-999'999'999'999'999'999, // 18 digits
-1L,
999'999'999'999'999'999, // 18 digits
};
// See StringParser::StringToFloatInternal() for how the special values are generated.
const vector<double> DOUBLE_ARRAY = {
-std::numeric_limits<double>::infinity(),
-std::numeric_limits<double>::quiet_NaN(),
std::numeric_limits<double>::infinity()
};
const vector<bool> BOOL_ARRAY = {true, false, true};
// 'id' starts from 0, same as Python's range().
int id = 0;
kudu::Status KuduInsertNulls(
const shared_ptr<KuduSession>& session, const shared_ptr<KuduTable>& table) {
std::unique_ptr<KuduInsert> insert(table->NewInsert());
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetInt8("id", id));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_int"));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_timestamp"));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_varchar"));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_decimal"));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_double"));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_binary"));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetNull("array_boolean"));
KUDU_RETURN_NOT_OK(session->Apply(insert.release()));
++id;
return kudu::Status::OK();
}
// Generates a vector whose length is the same as 'non_null' using the data in 'array'.
template <typename T>
vector<T> repeat(const vector<T>& array, const vector<bool>& non_null) {
vector<T> result;
result.reserve(non_null.size());
for (size_t i = 0UL; i < non_null.size(); ++i) {
result.push_back(array[i % array.size()]);
}
return result;
}
kudu::Status KuduInsertArrays(const shared_ptr<KuduSession>& session,
const shared_ptr<KuduTable>& table, const vector<bool>& non_null) {
std::unique_ptr<KuduInsert> insert(table->NewInsert());
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetInt8("id", id));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayInt32(
"array_int", repeat(INT32_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayUnixTimeMicros(
"array_timestamp", repeat(TIMESTAMP_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayVarchar(
"array_varchar", repeat(UTF8_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayUnscaledDecimal(
"array_decimal", repeat(DECIMAL18_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayDouble(
"array_double", repeat(DOUBLE_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayBinary(
"array_binary", repeat(UTF8_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(insert->mutable_row()->SetArrayBool(
"array_boolean", repeat(BOOL_ARRAY, non_null), non_null));
KUDU_RETURN_NOT_OK(session->Apply(insert.release()));
++id;
return kudu::Status::OK();
}
kudu::Status RunKuduArrayInsert() {
shared_ptr<KuduClient> client;
// Connect to the cluster.
KUDU_RETURN_NOT_OK(KuduClientBuilder()
.add_master_server_addr(KUDU_MASTER_DEFAULT_ADDR)
.Build(&client));
shared_ptr<KuduTable> table;
KUDU_RETURN_NOT_OK(client->OpenTable(KUDU_TEST_TABLE_NAME, &table));
shared_ptr<KuduSession> session = client->NewSession();
KUDU_RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
// The array slot is NULL.
KUDU_RETURN_NOT_OK(KuduInsertNulls(session, table));
// The array is not empty and no element is NULL.
KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {true, true, true}));
// The array is empty.
KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {}));
// Array element at the start is NULL.
KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {false, true, true}));
// Array element at the middle is NULL.
KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {true, false, true}));
// Array element at the end is NULL.
KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {true, true, false}));
// The array is longer than those in the previous rows.
KUDU_RETURN_NOT_OK(KuduInsertArrays(session, table, {true, true, true, true, true}));
kudu::Status status = session->Flush();
if (status.ok()) return status;
vector<KuduError*> errors;
ElementDeleter drop(&errors);
bool overflowed;
session->GetPendingErrors(&errors, &overflowed);
for (const KuduError* error : errors) {
cerr << "Error: " << error->status().ToString() << endl;
}
return status;
}
} // namespace impala
int main(int argc, char** argv) {
// Example usage:
// kudu-array-inserter impala::functional_kudu.kudu_array
assert(argc == 2);
impala::KUDU_TEST_TABLE_NAME = argv[1];
kudu::Status status = impala::RunKuduArrayInsert();
if (!status.ok()) {
cerr << "Error: " << status.ToString() << endl;
return 1;
}
return 0;
}