| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <ctime> |
| #include <iostream> |
| #include <sstream> |
| |
| #include "kudu/client/array_cell.h" |
| #include "kudu/client/callbacks.h" |
| #include "kudu/client/client.h" |
| #include "kudu/client/row_result.h" |
| #include "kudu/client/stubs.h" |
| #include "kudu/client/value.h" |
| #include "kudu/client/write_op.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/util/monotime.h" |
| |
| using kudu::client::KuduArrayCellView; |
| using kudu::client::KuduClient; |
| using kudu::client::KuduClientBuilder; |
| using kudu::client::KuduColumnSchema; |
| using kudu::client::KuduError; |
| using kudu::client::KuduInsert; |
| using kudu::client::KuduPredicate; |
| using kudu::client::KuduScanBatch; |
| using kudu::client::KuduScanner; |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduSchemaBuilder; |
| using kudu::client::KuduSession; |
| using kudu::client::KuduStatusFunctionCallback; |
| using kudu::client::KuduTable; |
| using kudu::client::KuduTableAlterer; |
| using kudu::client::KuduTableCreator; |
| using kudu::client::KuduValue; |
| using kudu::client::sp::shared_ptr; |
| using kudu::KuduPartialRow; |
| using kudu::MonoDelta; |
| using kudu::Status; |
| |
| using std::ostringstream; |
| using std::string; |
| using std::vector; |
| |
| static Status CreateClient(const vector<string>& master_addrs, |
| shared_ptr<KuduClient>* client) { |
| return KuduClientBuilder() |
| .master_server_addrs(master_addrs) |
| .default_admin_operation_timeout(MonoDelta::FromSeconds(20)) |
| .Build(client); |
| } |
| |
| static KuduSchema CreateSchema() { |
| KuduSchemaBuilder b; |
| b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); |
| b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull(); |
| b.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->NotNull(); |
| b.AddColumn("non_null_with_default")-> |
| Type(KuduColumnSchema::INT32)-> |
| NotNull()-> |
| Default(KuduValue::FromInt(12345)); |
| |
| // In addition to scalar type columns, add one nullable nested type column |
| // to contain one-dimensional ararys of INT64 values. |
| const KuduColumnSchema::KuduArrayTypeDescriptor desc(KuduColumnSchema::INT64); |
| b.AddColumn("arr_int64")-> |
| Type(KuduColumnSchema::NESTED)-> |
| NestedType(KuduColumnSchema::KuduNestedTypeDescriptor(desc)); |
| |
| KuduSchema schema; |
| KUDU_CHECK_OK(b.Build(&schema)); |
| return schema; |
| } |
| |
| static Status DoesTableExist(const shared_ptr<KuduClient>& client, |
| const string& table_name, |
| bool *exists) { |
| shared_ptr<KuduTable> table; |
| Status s = client->OpenTable(table_name, &table); |
| if (s.ok()) { |
| *exists = true; |
| } else if (s.IsNotFound()) { |
| *exists = false; |
| s = Status::OK(); |
| } |
| return s; |
| } |
| |
| static Status CreateTable(const shared_ptr<KuduClient>& client, |
| const string& table_name, |
| const KuduSchema& schema, |
| int num_tablets) { |
| vector<string> column_names; |
| column_names.push_back("key"); |
| |
| // Set the schema and range partition columns. |
| KuduTableCreator* table_creator = client->NewTableCreator(); |
| table_creator->table_name(table_name) |
| .schema(&schema) |
| .set_range_partition_columns(column_names); |
| |
| // Generate and add the range partition splits for the table. |
| int32_t increment = 1000 / num_tablets; |
| for (int32_t i = 1; i < num_tablets; i++) { |
| KuduPartialRow* row = schema.NewRow(); |
| KUDU_CHECK_OK(row->SetInt32(0, i * increment)); |
| table_creator->add_range_partition_split(row); |
| } |
| |
| Status s = table_creator->Create(); |
| delete table_creator; |
| return s; |
| } |
| |
| static Status AlterTable(const shared_ptr<KuduClient>& client, |
| const string& table_name) { |
| KuduTableAlterer* table_alterer = client->NewTableAlterer(table_name); |
| table_alterer->AlterColumn("int_val")->RenameTo("integer_val"); |
| table_alterer->AddColumn("another_val")->Type(KuduColumnSchema::BOOL); |
| table_alterer->DropColumn("string_val"); |
| Status s = table_alterer->Alter(); |
| delete table_alterer; |
| return s; |
| } |
| |
| static void StatusCB(void* unused, const Status& status) { |
| KUDU_LOG(INFO) << "Asynchronous flush finished with status: " |
| << status.ToString(); |
| } |
| |
| static Status InsertRows(const shared_ptr<KuduTable>& table, int num_rows) { |
| shared_ptr<KuduSession> session = table->client()->NewSession(); |
| KUDU_RETURN_NOT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); |
| session->SetTimeoutMillis(5000); |
| |
| for (int i = 0; i < num_rows; i++) { |
| KuduInsert* insert = table->NewInsert(); |
| KuduPartialRow* row = insert->mutable_row(); |
| KUDU_CHECK_OK(row->SetInt32("key", i)); |
| KUDU_CHECK_OK(row->SetInt32("integer_val", i * 2)); |
| KUDU_CHECK_OK(row->SetInt32("non_null_with_default", i * 5)); |
| { |
| // An array with three values, two non-nulls and one null in the middle. |
| vector<int64_t> values(3, 0); |
| values[0] = i * 6; |
| values[2] = i * 7; |
| vector<bool> validity(3, false); |
| validity[0] = true; |
| validity[2] = true; |
| KUDU_CHECK_OK(row->SetArrayInt64("arr_int64", values, validity)); |
| } |
| KUDU_CHECK_OK(session->Apply(insert)); |
| } |
| Status s = session->Flush(); |
| if (s.ok()) { |
| return s; |
| } |
| |
| // Test asynchronous flush. |
| KuduStatusFunctionCallback<void*> status_cb(&StatusCB, NULL); |
| session->FlushAsync(&status_cb); |
| |
| // Look at the session's errors. |
| vector<KuduError*> errors; |
| bool overflow; |
| session->GetPendingErrors(&errors, &overflow); |
| if (!errors.empty()) { |
| s = overflow ? Status::IOError("Overflowed pending errors in session") : |
| errors.front()->status(); |
| while (!errors.empty()) { |
| delete errors.back(); |
| errors.pop_back(); |
| } |
| } |
| KUDU_RETURN_NOT_OK(s); |
| |
| // Close the session. |
| return session->Close(); |
| } |
| |
| static Status ScanRows(const shared_ptr<KuduTable>& table) { |
| const int kLowerBound = 5; |
| const int kUpperBound = 600; |
| |
| KuduScanner scanner(table.get()); |
| |
| // To be guaranteed results are returned in primary key order, make the |
| // scanner fault-tolerant. This also means the scanner can recover if, |
| // for example, the server it is scanning fails in the middle of a scan. |
| KUDU_RETURN_NOT_OK(scanner.SetFaultTolerant()); |
| |
| // Add a predicate: WHERE key >= 5 |
| KuduPredicate* p = table->NewComparisonPredicate( |
| "key", KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(kLowerBound)); |
| KUDU_RETURN_NOT_OK(scanner.AddConjunctPredicate(p)); |
| |
| // Add a predicate: WHERE key <= 600 |
| p = table->NewComparisonPredicate( |
| "key", KuduPredicate::LESS_EQUAL, KuduValue::FromInt(kUpperBound)); |
| KUDU_RETURN_NOT_OK(scanner.AddConjunctPredicate(p)); |
| |
| KUDU_RETURN_NOT_OK(scanner.Open()); |
| KuduScanBatch batch; |
| |
| int next_row = kLowerBound; |
| while (scanner.HasMoreRows()) { |
| KUDU_RETURN_NOT_OK(scanner.NextBatch(&batch)); |
| for (KuduScanBatch::const_iterator it = batch.begin(); |
| it != batch.end(); |
| ++it, ++next_row) { |
| KuduScanBatch::RowPtr row(*it); |
| int32_t val; |
| KUDU_RETURN_NOT_OK(row.GetInt32("key", &val)); |
| if (val != next_row) { |
| ostringstream out; |
| out << "Scan returned the wrong results. Expected key " |
| << next_row << " but got " << val; |
| return Status::IOError(out.str()); |
| } |
| |
| // An example of accessing a column by index in a row scan projection. |
| vector<int64_t> arr; |
| vector<bool> arr_notnull; |
| KUDU_RETURN_NOT_OK(row.GetArrayInt64(3, &arr, &arr_notnull)); |
| KUDU_CHECK(arr.size() == arr_notnull.size()); |
| |
| // An example of accessing raw cell data in an array column. |
| const void* cell_ptr = row.cell(3); |
| KuduArrayCellView view(cell_ptr); |
| KUDU_RETURN_NOT_OK(view.Init()); |
| KUDU_CHECK(view.elem_num() == arr.size()); |
| const uint8_t* arr_raw_notnull_bitmap = view.not_null_bitmap(); |
| KUDU_CHECK(arr_raw_notnull_bitmap); |
| const int64_t* arr_raw = reinterpret_cast<const int64_t*>( |
| view.data(KuduColumnSchema::INT64)); |
| KUDU_CHECK(arr_raw); |
| for (size_t i = 0; i < arr_notnull.size(); ++i) { |
| if (arr_notnull[i]) { |
| // That's the same data for non-null array elements regardless |
| // of the way accessing it. |
| KUDU_CHECK(arr[i] == *(arr_raw + i)); |
| } |
| // Check the validity bit: it should correspond to the boolean value |
| // at the same position in the 'arr_notnull' non-nullness array. |
| const bool arr_raw_elem_validity = |
| *(arr_raw_notnull_bitmap + (i >> 3)) & (1 << (i & 7)); |
| KUDU_CHECK(arr_notnull[i] == arr_raw_elem_validity); |
| } |
| } |
| } |
| |
| // next_row is now one past the last row we read. |
| int last_row_seen = next_row - 1; |
| |
| if (last_row_seen != kUpperBound) { |
| ostringstream out; |
| out << "Scan returned the wrong results. Expected last row to be " |
| << kUpperBound << " rows but got " << last_row_seen; |
| return Status::IOError(out.str()); |
| } |
| return Status::OK(); |
| } |
| |
| // A helper class providing custom logging callback. It also manages |
| // automatic callback installation and removal. |
| class LogCallbackHelper { |
| public: |
| LogCallbackHelper() : log_cb_(&LogCallbackHelper::LogCb, NULL) { |
| kudu::client::InstallLoggingCallback(&log_cb_); |
| } |
| |
| ~LogCallbackHelper() { |
| kudu::client::UninstallLoggingCallback(); |
| } |
| |
| static void LogCb(void* unused, |
| kudu::client::KuduLogSeverity severity, |
| const char* filename, |
| int line_number, |
| const struct ::tm* time, |
| const char* message, |
| size_t message_len) { |
| KUDU_LOG(INFO) << "Received log message from Kudu client library"; |
| KUDU_LOG(INFO) << " Severity: " << severity; |
| KUDU_LOG(INFO) << " Filename: " << filename; |
| KUDU_LOG(INFO) << " Line number: " << line_number; |
| char time_buf[32]; |
| // Example: Tue Mar 24 11:46:43 2015. |
| KUDU_CHECK(strftime(time_buf, sizeof(time_buf), "%a %b %d %T %Y", time)); |
| KUDU_LOG(INFO) << " Time: " << time_buf; |
| KUDU_LOG(INFO) << " Message: " << string(message, message_len); |
| } |
| |
| private: |
| kudu::client::KuduLoggingFunctionCallback<void*> log_cb_; |
| }; |
| |
| int main(int argc, char* argv[]) { |
| KUDU_LOG(INFO) << "Running with Kudu client version: " << |
| kudu::client::GetShortVersionString(); |
| KUDU_LOG(INFO) << "Long version info: " << |
| kudu::client::GetAllVersionInfo(); |
| |
| // This is to install and automatically un-install custom logging callback. |
| LogCallbackHelper log_cb_helper; |
| |
| if (argc < 2) { |
| KUDU_LOG(FATAL) << "usage: " << argv[0] << " <master host> ..."; |
| } |
| vector<string> master_addrs; |
| for (int i = 1; i < argc; i++) { |
| master_addrs.push_back(argv[i]); |
| } |
| |
| // Kudu doesn't have a notion of a database namespace, but if the integration |
| // with HMS (Hive MetaStore) is enabled, there are restriction on the name |
| // of tables that can be created, so this example adds the 'db' prefix to |
| // make it work both with and without HMS integration. |
| const string kTableName = "db.test_table"; |
| |
| // Enable verbose debugging for the client library. |
| kudu::client::SetVerboseLogLevel(2); |
| |
| // Create and connect a client. |
| shared_ptr<KuduClient> client; |
| KUDU_CHECK_OK(CreateClient(master_addrs, &client)); |
| KUDU_LOG(INFO) << "Created a client connection"; |
| |
| // Disable the verbose logging. |
| kudu::client::SetVerboseLogLevel(0); |
| |
| // Create a schema. |
| KuduSchema schema(CreateSchema()); |
| KUDU_LOG(INFO) << "Created a schema"; |
| |
| // Create a table with that schema. |
| bool exists; |
| KUDU_CHECK_OK(DoesTableExist(client, kTableName, &exists)); |
| if (exists) { |
| KUDU_CHECK_OK(client->DeleteTable(kTableName)); |
| KUDU_LOG(INFO) << "Deleting old table before creating new one"; |
| } |
| KUDU_CHECK_OK(CreateTable(client, kTableName, schema, 10)); |
| KUDU_LOG(INFO) << "Created a table"; |
| |
| // Alter the table. |
| KUDU_CHECK_OK(AlterTable(client, kTableName)); |
| KUDU_LOG(INFO) << "Altered a table"; |
| |
| // Insert some rows into the table. |
| shared_ptr<KuduTable> table; |
| KUDU_CHECK_OK(client->OpenTable(kTableName, &table)); |
| KUDU_CHECK_OK(InsertRows(table, 1000)); |
| KUDU_LOG(INFO) << "Inserted some rows into a table"; |
| |
| // Scan some rows. |
| KUDU_CHECK_OK(ScanRows(table)); |
| KUDU_LOG(INFO) << "Scanned some rows out of a table"; |
| |
| // Delete the table. |
| KUDU_CHECK_OK(client->DeleteTable(kTableName)); |
| KUDU_LOG(INFO) << "Deleted a table"; |
| |
| // Done! |
| KUDU_LOG(INFO) << "Done"; |
| |
| return 0; |
| } |