blob: 5d17ac2489672217975a4cf548549807af2a6b84 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <ctime>
#include <iostream>
#include <sstream>
#include "kudu/client/callbacks.h"
#include "kudu/client/client.h"
#include "kudu/client/row_result.h"
#include "kudu/client/stubs.h"
#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
#include "kudu/common/partial_row.h"
#include "kudu/util/monotime.h"
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduColumnSchema;
using kudu::client::KuduError;
using kudu::client::KuduInsert;
using kudu::client::KuduPredicate;
using kudu::client::KuduScanBatch;
using kudu::client::KuduScanner;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduSession;
using kudu::client::KuduStatusFunctionCallback;
using kudu::client::KuduTable;
using kudu::client::KuduTableAlterer;
using kudu::client::KuduTableCreator;
using kudu::client::KuduValue;
using kudu::client::sp::shared_ptr;
using kudu::KuduPartialRow;
using kudu::MonoDelta;
using kudu::Status;
using std::ostringstream;
using std::string;
using std::vector;
static Status CreateClient(const vector<string>& master_addrs,
shared_ptr<KuduClient>* client) {
return KuduClientBuilder()
.master_server_addrs(master_addrs)
.default_admin_operation_timeout(MonoDelta::FromSeconds(20))
.Build(client);
}
static KuduSchema CreateSchema() {
KuduSchema schema;
KuduSchemaBuilder b;
b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
b.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->NotNull();
b.AddColumn("non_null_with_default")->Type(KuduColumnSchema::INT32)->NotNull()
->Default(KuduValue::FromInt(12345));
KUDU_CHECK_OK(b.Build(&schema));
return schema;
}
static Status DoesTableExist(const shared_ptr<KuduClient>& client,
const string& table_name,
bool *exists) {
shared_ptr<KuduTable> table;
Status s = client->OpenTable(table_name, &table);
if (s.ok()) {
*exists = true;
} else if (s.IsNotFound()) {
*exists = false;
s = Status::OK();
}
return s;
}
static Status CreateTable(const shared_ptr<KuduClient>& client,
const string& table_name,
const KuduSchema& schema,
int num_tablets) {
vector<string> column_names;
column_names.push_back("key");
// Set the schema and range partition columns.
KuduTableCreator* table_creator = client->NewTableCreator();
table_creator->table_name(table_name)
.schema(&schema)
.set_range_partition_columns(column_names);
// Generate and add the range partition splits for the table.
int32_t increment = 1000 / num_tablets;
for (int32_t i = 1; i < num_tablets; i++) {
KuduPartialRow* row = schema.NewRow();
KUDU_CHECK_OK(row->SetInt32(0, i * increment));
table_creator->add_range_partition_split(row);
}
Status s = table_creator->Create();
delete table_creator;
return s;
}
static Status AlterTable(const shared_ptr<KuduClient>& client,
const string& table_name) {
KuduTableAlterer* table_alterer = client->NewTableAlterer(table_name);
table_alterer->AlterColumn("int_val")->RenameTo("integer_val");
table_alterer->AddColumn("another_val")->Type(KuduColumnSchema::BOOL);
table_alterer->DropColumn("string_val");
Status s = table_alterer->Alter();
delete table_alterer;
return s;
}
static void StatusCB(void* unused, const Status& status) {
KUDU_LOG(INFO) << "Asynchronous flush finished with status: "
<< status.ToString();
}
static Status InsertRows(const shared_ptr<KuduTable>& table, int num_rows) {
shared_ptr<KuduSession> session = table->client()->NewSession();
KUDU_RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
session->SetTimeoutMillis(5000);
for (int i = 0; i < num_rows; i++) {
KuduInsert* insert = table->NewInsert();
KuduPartialRow* row = insert->mutable_row();
KUDU_CHECK_OK(row->SetInt32("key", i));
KUDU_CHECK_OK(row->SetInt32("integer_val", i * 2));
KUDU_CHECK_OK(row->SetInt32("non_null_with_default", i * 5));
KUDU_CHECK_OK(session->Apply(insert));
}
Status s = session->Flush();
if (s.ok()) {
return s;
}
// Test asynchronous flush.
KuduStatusFunctionCallback<void*> status_cb(&StatusCB, NULL);
session->FlushAsync(&status_cb);
// Look at the session's errors.
vector<KuduError*> errors;
bool overflow;
session->GetPendingErrors(&errors, &overflow);
if (!errors.empty()) {
s = overflow ? Status::IOError("Overflowed pending errors in session") :
errors.front()->status();
while (!errors.empty()) {
delete errors.back();
errors.pop_back();
}
}
KUDU_RETURN_NOT_OK(s);
// Close the session.
return session->Close();
}
static Status ScanRows(const shared_ptr<KuduTable>& table) {
const int kLowerBound = 5;
const int kUpperBound = 600;
KuduScanner scanner(table.get());
// To be guaranteed results are returned in primary key order, make the
// scanner fault-tolerant. This also means the scanner can recover if,
// for example, the server it is scanning fails in the middle of a scan.
KUDU_RETURN_NOT_OK(scanner.SetFaultTolerant());
// Add a predicate: WHERE key >= 5
KuduPredicate* p = table->NewComparisonPredicate(
"key", KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(kLowerBound));
KUDU_RETURN_NOT_OK(scanner.AddConjunctPredicate(p));
// Add a predicate: WHERE key <= 600
p = table->NewComparisonPredicate(
"key", KuduPredicate::LESS_EQUAL, KuduValue::FromInt(kUpperBound));
KUDU_RETURN_NOT_OK(scanner.AddConjunctPredicate(p));
KUDU_RETURN_NOT_OK(scanner.Open());
KuduScanBatch batch;
int next_row = kLowerBound;
while (scanner.HasMoreRows()) {
KUDU_RETURN_NOT_OK(scanner.NextBatch(&batch));
for (KuduScanBatch::const_iterator it = batch.begin();
it != batch.end();
++it, ++next_row) {
KuduScanBatch::RowPtr row(*it);
int32_t val;
KUDU_RETURN_NOT_OK(row.GetInt32("key", &val));
if (val != next_row) {
ostringstream out;
out << "Scan returned the wrong results. Expected key "
<< next_row << " but got " << val;
return Status::IOError(out.str());
}
}
}
// next_row is now one past the last row we read.
int last_row_seen = next_row - 1;
if (last_row_seen != kUpperBound) {
ostringstream out;
out << "Scan returned the wrong results. Expected last row to be "
<< kUpperBound << " rows but got " << last_row_seen;
return Status::IOError(out.str());
}
return Status::OK();
}
// A helper class providing custom logging callback. It also manages
// automatic callback installation and removal.
class LogCallbackHelper {
public:
LogCallbackHelper() : log_cb_(&LogCallbackHelper::LogCb, NULL) {
kudu::client::InstallLoggingCallback(&log_cb_);
}
~LogCallbackHelper() {
kudu::client::UninstallLoggingCallback();
}
static void LogCb(void* unused,
kudu::client::KuduLogSeverity severity,
const char* filename,
int line_number,
const struct ::tm* time,
const char* message,
size_t message_len) {
KUDU_LOG(INFO) << "Received log message from Kudu client library";
KUDU_LOG(INFO) << " Severity: " << severity;
KUDU_LOG(INFO) << " Filename: " << filename;
KUDU_LOG(INFO) << " Line number: " << line_number;
char time_buf[32];
// Example: Tue Mar 24 11:46:43 2015.
KUDU_CHECK(strftime(time_buf, sizeof(time_buf), "%a %b %d %T %Y", time));
KUDU_LOG(INFO) << " Time: " << time_buf;
KUDU_LOG(INFO) << " Message: " << string(message, message_len);
}
private:
kudu::client::KuduLoggingFunctionCallback<void*> log_cb_;
};
int main(int argc, char* argv[]) {
KUDU_LOG(INFO) << "Running with Kudu client version: " <<
kudu::client::GetShortVersionString();
KUDU_LOG(INFO) << "Long version info: " <<
kudu::client::GetAllVersionInfo();
// This is to install and automatically un-install custom logging callback.
LogCallbackHelper log_cb_helper;
if (argc < 2) {
KUDU_LOG(FATAL) << "usage: " << argv[0] << " <master host> ...";
}
vector<string> master_addrs;
for (int i = 1; i < argc; i++) {
master_addrs.push_back(argv[i]);
}
const string kTableName = "test_table";
// Enable verbose debugging for the client library.
kudu::client::SetVerboseLogLevel(2);
// Create and connect a client.
shared_ptr<KuduClient> client;
KUDU_CHECK_OK(CreateClient(master_addrs, &client));
KUDU_LOG(INFO) << "Created a client connection";
// Disable the verbose logging.
kudu::client::SetVerboseLogLevel(0);
// Create a schema.
KuduSchema schema(CreateSchema());
KUDU_LOG(INFO) << "Created a schema";
// Create a table with that schema.
bool exists;
KUDU_CHECK_OK(DoesTableExist(client, kTableName, &exists));
if (exists) {
client->DeleteTable(kTableName);
KUDU_LOG(INFO) << "Deleting old table before creating new one";
}
KUDU_CHECK_OK(CreateTable(client, kTableName, schema, 10));
KUDU_LOG(INFO) << "Created a table";
// Alter the table.
KUDU_CHECK_OK(AlterTable(client, kTableName));
KUDU_LOG(INFO) << "Altered a table";
// Insert some rows into the table.
shared_ptr<KuduTable> table;
KUDU_CHECK_OK(client->OpenTable(kTableName, &table));
KUDU_CHECK_OK(InsertRows(table, 1000));
KUDU_LOG(INFO) << "Inserted some rows into a table";
// Scan some rows.
KUDU_CHECK_OK(ScanRows(table));
KUDU_LOG(INFO) << "Scanned some rows out of a table";
// Delete the table.
KUDU_CHECK_OK(client->DeleteTable(kTableName));
KUDU_LOG(INFO) << "Deleted a table";
// Done!
KUDU_LOG(INFO) << "Done";
return 0;
}