| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <functional> |
| #include <iostream> |
| #include <map> |
| #include <memory> |
| #include <set> |
| #include <string> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include <boost/optional/optional.hpp> |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <google/protobuf/stubs/common.h> |
| #include <google/protobuf/stubs/status.h> |
| #include <google/protobuf/stubs/stringpiece.h> |
| #include <google/protobuf/util/json_util.h> |
| #include <rapidjson/document.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/replica_controller-internal.h" |
| #include "kudu/client/scan_batch.h" |
| #include "kudu/client/scan_predicate.h" |
| #include "kudu/client/schema.h" |
| #include "kudu/client/shared_ptr.h" // IWYU pragma: keep |
| #include "kudu/client/value.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/partition.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/stl_util.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/numbers.h" |
| #include "kudu/gutil/strings/split.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/tools/table_scanner.h" |
| #include "kudu/tools/tool.pb.h" |
| #include "kudu/tools/tool_action.h" |
| #include "kudu/tools/tool_action_common.h" |
| #include "kudu/util/jsonreader.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/string_case.h" |
| |
| using google::protobuf::util::JsonStringToMessage; |
| using google::protobuf::util::JsonParseOptions; |
| using google::protobuf::RepeatedPtrField; |
| using kudu::client::KuduClient; |
| using kudu::client::KuduColumnSchema; |
| using kudu::client::KuduColumnSpec; |
| using kudu::client::KuduColumnStorageAttributes; |
| using kudu::client::KuduPredicate; |
| using kudu::client::KuduScanToken; |
| using kudu::client::KuduScanTokenBuilder; |
| using kudu::client::KuduScanner; |
| using kudu::client::KuduSchema; |
| using kudu::client::KuduSchemaBuilder; |
| using kudu::client::KuduTable; |
| using kudu::client::KuduTableAlterer; |
| using kudu::client::KuduTableCreator; |
| using kudu::client::KuduTableStatistics; |
| using kudu::client::KuduValue; |
| using kudu::client::internal::ReplicaController; |
| using std::cerr; |
| using std::cout; |
| using std::endl; |
| using std::map; |
| using std::pair; |
| using std::set; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_map; |
| using std::vector; |
| using strings::Split; |
| using strings::Substitute; |
| |
| DEFINE_bool(check_row_existence, false, |
| "Also check for the existence of the row on the leader replica of " |
| "the tablet. If found, the full row will be printed; if not found, " |
| "an error message will be printed and the command will return a " |
| "non-zero status."); |
| DEFINE_string(dst_table, "", |
| "The name of the destination table the data will be copied to. " |
| "If the empty string, use the same name as the source table."); |
| DEFINE_bool(list_tablets, false, |
| "Include tablet and replica UUIDs in the output"); |
| DEFINE_bool(modify_external_catalogs, true, |
| "Whether to modify external catalogs, such as the Hive Metastore, " |
| "when renaming or dropping a table."); |
| DEFINE_string(config_names, "", |
| "Comma-separated list of configurations to display. " |
| "An empty value displays all configs."); |
| DEFINE_string(lower_bound_type, "INCLUSIVE_BOUND", |
| "The type of the lower bound, either inclusive or exclusive. " |
| "Defaults to inclusive. This flag is case-insensitive."); |
| DEFINE_string(upper_bound_type, "EXCLUSIVE_BOUND", |
| "The type of the upper bound, either inclusive or exclusive. " |
| "Defaults to exclusive. This flag is case-insensitive."); |
| |
| DECLARE_bool(show_values); |
| DECLARE_string(tables); |
| |
| namespace kudu { |
| namespace tools { |
| |
| // This class only exists so that ListTables() can easily be friended by |
| // KuduReplica, KuduReplica::Data, and KuduClientBuilder. |
| class TableLister { |
| public: |
| static Status ListTablets(const vector<string>& master_addresses) { |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(master_addresses, |
| &client, |
| true /* can_see_all_replicas */)); |
| vector<string> table_names; |
| RETURN_NOT_OK(client->ListTables(&table_names)); |
| |
| vector<string> table_filters = Split(FLAGS_tables, ",", strings::SkipEmpty()); |
| for (const auto& tname : table_names) { |
| if (!MatchesAnyPattern(table_filters, tname)) continue; |
| cout << tname << endl; |
| if (!FLAGS_list_tablets) { |
| continue; |
| } |
| client::sp::shared_ptr<KuduTable> client_table; |
| RETURN_NOT_OK(client->OpenTable(tname, &client_table)); |
| vector<KuduScanToken*> tokens; |
| ElementDeleter deleter(&tokens); |
| KuduScanTokenBuilder builder(client_table.get()); |
| RETURN_NOT_OK(builder.Build(&tokens)); |
| |
| for (const auto* token : tokens) { |
| cout << " T " << token->tablet().id() << endl; |
| for (const auto* replica : token->tablet().replicas()) { |
| const bool is_voter = ReplicaController::is_voter(*replica); |
| const bool is_leader = replica->is_leader(); |
| cout << Substitute(" $0 $1 $2:$3", |
| is_leader ? "L" : (is_voter ? "V" : "N"), replica->ts().uuid(), |
| replica->ts().hostname(), replica->ts().port()) << endl; |
| } |
| cout << endl; |
| } |
| cout << endl; |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| namespace { |
| |
| const char* const kNewTableNameArg = "new_table_name"; |
| const char* const kColumnNameArg = "column_name"; |
| const char* const kNewColumnNameArg = "new_column_name"; |
| const char* const kKeyArg = "primary_key"; |
| const char* const kConfigNameArg = "config_name"; |
| const char* const kConfigValueArg = "config_value"; |
| const char* const kErrorMsgArg = "unable to parse value $0 for column $1 of type $2"; |
| const char* const kTableRangeLowerBoundArg = "table_range_lower_bound"; |
| const char* const kTableRangeUpperBoundArg = "table_range_upper_bound"; |
| const char* const kDefaultValueArg = "default_value"; |
| const char* const kCompressionTypeArg = "compression_type"; |
| const char* const kEncodingTypeArg = "encoding_type"; |
| const char* const kBlockSizeArg = "block_size"; |
| const char* const kColumnCommentArg = "column_comment"; |
| const char* const kCreateTableJSONArg = "create_table_json"; |
| |
| enum PartitionAction { |
| ADD, |
| DROP, |
| }; |
| |
| Status DeleteTable(const RunnerContext& context) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| return client->DeleteTableInCatalogs(table_name, FLAGS_modify_external_catalogs); |
| } |
| |
| Status DescribeTable(const RunnerContext& context) { |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| client::sp::shared_ptr<KuduTable> table; |
| RETURN_NOT_OK(client->OpenTable(table_name, &table)); |
| |
| // The schema. |
| const KuduSchema& schema = table->schema(); |
| cout << "TABLE " << table_name << " " << schema.ToString() << endl; |
| |
| // The partition schema with current range partitions. |
| vector<Partition> partitions; |
| RETURN_NOT_OK_PREPEND(table->ListPartitions(&partitions), |
| "failed to retrieve current partitions"); |
| const auto& schema_internal = KuduSchema::ToSchema(schema); |
| const auto& partition_schema = table->partition_schema(); |
| vector<string> partition_strs; |
| for (const auto& partition : partitions) { |
| // Deduplicate by hash bucket to get a unique entry per range partition. |
| const auto& hash_buckets = partition.hash_buckets(); |
| if (!std::all_of(hash_buckets.begin(), |
| hash_buckets.end(), |
| [](int32_t bucket) { return bucket == 0; })) { |
| continue; |
| } |
| auto range_partition_str = |
| partition_schema.RangePartitionDebugString(partition.range_key_start(), |
| partition.range_key_end(), |
| schema_internal); |
| partition_strs.emplace_back(std::move(range_partition_str)); |
| } |
| cout << partition_schema.DisplayString(schema_internal, partition_strs) |
| << endl; |
| |
| // The owner. |
| cout << "OWNER " << table->owner() << endl; |
| |
| // Finally, the replication factor. |
| cout << "REPLICAS " << table->num_replicas() << endl; |
| |
| return Status::OK(); |
| } |
| |
| Status LocateRow(const RunnerContext& context) { |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| client::sp::shared_ptr<KuduTable> table; |
| RETURN_NOT_OK(client->OpenTable(table_name, &table)); |
| |
| // Create an equality predicate for each primary key column. |
| const string& row_str = FindOrDie(context.required_args, kKeyArg); |
| JsonReader reader(row_str); |
| RETURN_NOT_OK(reader.Init()); |
| vector<const rapidjson::Value*> values; |
| RETURN_NOT_OK(reader.ExtractObjectArray(reader.root(), |
| /*field=*/nullptr, |
| &values)); |
| |
| const auto& schema = table->schema(); |
| vector<int> key_indexes; |
| schema.GetPrimaryKeyColumnIndexes(&key_indexes); |
| if (values.size() != key_indexes.size()) { |
| return Status::InvalidArgument( |
| Substitute("wrong number of key columns specified: expected $0 but received $1", |
| key_indexes.size(), |
| values.size())); |
| } |
| |
| vector<unique_ptr<KuduPredicate>> predicates; |
| for (int i = 0; i < values.size(); i++) { |
| const auto key_index = key_indexes[i]; |
| const auto& column = schema.Column(key_index); |
| const auto& col_name = column.name(); |
| const auto type = column.type(); |
| switch (type) { |
| case KuduColumnSchema::INT8: |
| case KuduColumnSchema::INT16: |
| case KuduColumnSchema::INT32: |
| case KuduColumnSchema::INT64: |
| case KuduColumnSchema::DATE: |
| case KuduColumnSchema::UNIXTIME_MICROS: { |
| int64_t value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractInt64(values[i], /*field=*/nullptr, &value), |
| Substitute("unable to parse value for column '$0' of type $1", |
| col_name, |
| KuduColumnSchema::DataTypeToString(type))); |
| predicates.emplace_back( |
| table->NewComparisonPredicate(col_name, |
| client::KuduPredicate::EQUAL, |
| client::KuduValue::FromInt(value))); |
| break; |
| } |
| case KuduColumnSchema::BINARY: |
| case KuduColumnSchema::STRING: |
| case KuduColumnSchema::VARCHAR: { |
| string value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractString(values[i], /*field=*/nullptr, &value), |
| Substitute("unable to parse value for column '$0' of type $1", |
| col_name, |
| KuduColumnSchema::DataTypeToString(type))); |
| predicates.emplace_back( |
| table->NewComparisonPredicate(col_name, |
| client::KuduPredicate::EQUAL, |
| client::KuduValue::CopyString(value))); |
| break; |
| } |
| case KuduColumnSchema::BOOL: { |
| // As of the writing of this tool, BOOL is not a supported key column |
| // type, but just in case it becomes one, we pre-load support for it. |
| bool value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractBool(values[i], /*field=*/nullptr, &value), |
| Substitute("unable to parse value for column '$0' of type $1", |
| col_name, |
| KuduColumnSchema::DataTypeToString(type))); |
| predicates.emplace_back( |
| table->NewComparisonPredicate(col_name, |
| client::KuduPredicate::EQUAL, |
| client::KuduValue::FromBool(value))); |
| break; |
| } |
| case KuduColumnSchema::FLOAT: |
| case KuduColumnSchema::DOUBLE: { |
| // Like BOOL, as of the writing of this tool, floating point types are |
| // not supported for key columns, but we can pre-load support for them |
| // in case they become supported. |
| double value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractDouble(values[i], /*field=*/nullptr, &value), |
| Substitute("unable to parse value for column '$0' of type $1", |
| col_name, |
| KuduColumnSchema::DataTypeToString(type))); |
| predicates.emplace_back( |
| table->NewComparisonPredicate(col_name, |
| client::KuduPredicate::EQUAL, |
| client::KuduValue::FromDouble(value))); |
| break; |
| } |
| case KuduColumnSchema::DECIMAL: |
| return Status::NotSupported( |
| Substitute("unsupported type $0 for key column '$1': " |
| "$0 key columns are not supported by this tool", |
| KuduColumnSchema::DataTypeToString(type), |
| col_name)); |
| default: |
| return Status::NotSupported( |
| Substitute("unsupported type $0 for key column '$1': " |
| "is this tool out of date?", |
| KuduColumnSchema::DataTypeToString(type), |
| col_name)); |
| } |
| } |
| |
| // Find the tablet by constructing scan tokens for a scan with equality |
| // predicates on all key columns. At most one tablet will match, so there |
| // will be at most one token, and we can report the id of its tablet. |
| vector<KuduScanToken*> tokens; |
| ElementDeleter deleter(&tokens); |
| KuduScanTokenBuilder builder(table.get()); |
| // In case we go on to check for existence of the row. |
| RETURN_NOT_OK(builder.SetSelection(KuduClient::ReplicaSelection::LEADER_ONLY)); |
| for (auto& predicate : predicates) { |
| RETURN_NOT_OK(builder.AddConjunctPredicate(predicate.release())); |
| } |
| RETURN_NOT_OK(builder.Build(&tokens)); |
| if (tokens.empty()) { |
| // Must be in a non-covered range partition. |
| return Status::NotFound("row does not belong to any currently existing tablet"); |
| } |
| if (tokens.size() > 1) { |
| // This should be impossible. But if it does happen, we'd like to know what |
| // all the matching tablets were. |
| for (const auto& token : tokens) { |
| cerr << token->tablet().id() << endl; |
| } |
| return Status::IllegalState(Substitute( |
| "all primary key columns specified but found $0 matching tablets!", |
| tokens.size())); |
| } |
| cout << tokens[0]->tablet().id() << endl; |
| |
| if (FLAGS_check_row_existence) { |
| KuduScanner* scanner_ptr; |
| RETURN_NOT_OK(tokens[0]->IntoKuduScanner(&scanner_ptr)); |
| unique_ptr<KuduScanner> scanner(scanner_ptr); |
| RETURN_NOT_OK(scanner->Open()); |
| vector<string> row_str; |
| client::KuduScanBatch batch; |
| while (scanner->HasMoreRows()) { |
| RETURN_NOT_OK(scanner->NextBatch(&batch)); |
| for (const auto& row : batch) { |
| row_str.emplace_back(row.ToString()); |
| } |
| } |
| if (row_str.empty()) { |
| return Status::NotFound("row does not exist"); |
| } |
| // There should be exactly one result, but if somehow there are more, print |
| // them all before returning an error. |
| cout << JoinStrings(row_str, "\n") << endl; |
| if (row_str.size() != 1) { |
| // This should be impossible. |
| return Status::IllegalState( |
| Substitute("expected 1 row but received $0", row_str.size())); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status RenameTable(const RunnerContext& context) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| const string& new_table_name = FindOrDie(context.required_args, kNewTableNameArg); |
| |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name)); |
| return alterer->RenameTo(new_table_name) |
| ->modify_external_catalogs(FLAGS_modify_external_catalogs) |
| ->Alter(); |
| } |
| |
| Status RenameColumn(const RunnerContext& context) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| const string& column_name = FindOrDie(context.required_args, kColumnNameArg); |
| const string& new_column_name = FindOrDie(context.required_args, kNewColumnNameArg); |
| |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name)); |
| alterer->AlterColumn(column_name)->RenameTo(new_column_name); |
| return alterer->Alter(); |
| } |
| |
| Status ListTables(const RunnerContext& context) { |
| vector<string> master_addresses; |
| RETURN_NOT_OK(ParseMasterAddresses(context, &master_addresses)); |
| return TableLister::ListTablets(master_addresses); |
| } |
| |
| Status ScanTable(const RunnerContext &context) { |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| |
| FLAGS_show_values = true; |
| TableScanner scanner(client, table_name); |
| scanner.SetOutput(&cout); |
| return scanner.StartScan(); |
| } |
| |
| Status CopyTable(const RunnerContext& context) { |
| client::sp::shared_ptr<KuduClient> src_client; |
| RETURN_NOT_OK(CreateKuduClient(context, &src_client)); |
| const string& src_table_name = FindOrDie(context.required_args, kTableNameArg); |
| |
| client::sp::shared_ptr<KuduClient> dst_client; |
| if (FindOrDie(context.required_args, kMasterAddressesArg) |
| == FindOrDie(context.required_args, kDestMasterAddressesArg)) { |
| dst_client = src_client; |
| } else { |
| RETURN_NOT_OK(CreateKuduClient(context, kDestMasterAddressesArg, &dst_client)); |
| } |
| |
| const string& dst_table_name = FLAGS_dst_table.empty() ? src_table_name : FLAGS_dst_table; |
| |
| TableScanner scanner(src_client, src_table_name, dst_client, dst_table_name); |
| scanner.SetOutput(&cout); |
| return scanner.StartCopy(); |
| } |
| |
| Status SetExtraConfig(const RunnerContext& context) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| const string& config_name = FindOrDie(context.required_args, kConfigNameArg); |
| const string& config_value = FindOrDie(context.required_args, kConfigValueArg); |
| |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name)); |
| alterer->AlterExtraConfig({ { config_name, config_value} }); |
| return alterer->Alter(); |
| } |
| |
| Status GetExtraConfigs(const RunnerContext& context) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| set<string> config_names = strings::Split(FLAGS_config_names, ",", strings::SkipEmpty()); |
| |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| client::sp::shared_ptr<KuduTable> table; |
| RETURN_NOT_OK(client->OpenTable(table_name, &table)); |
| |
| DataTable data_table({ "Configuration", "Value" }); |
| if (config_names.empty()) { |
| for (const auto& extra_config : table->extra_configs()) { |
| data_table.AddRow({ extra_config.first, extra_config.second }); |
| } |
| } else { |
| for (const auto& config_name : config_names) { |
| const string* config_value = FindOrNull(table->extra_configs(), config_name); |
| if (config_value) { |
| data_table.AddRow({ config_name, *config_value }); |
| } |
| } |
| } |
| return data_table.PrintTo(cout); |
| } |
| |
| Status ConvertToKuduPartialRow( |
| const vector<pair<string, KuduColumnSchema::DataType>>& range_columns, |
| const string& range_bound_str, |
| KuduPartialRow* range_bound_partial_row) { |
| JsonReader reader(range_bound_str); |
| RETURN_NOT_OK(reader.Init()); |
| vector<const rapidjson::Value *> values; |
| RETURN_NOT_OK(reader.ExtractObjectArray(reader.root(), |
| /*field=*/nullptr, |
| &values)); |
| |
| // If range_bound_str is empty, an empty row will be used. |
| if (values.empty()) { |
| return Status::OK(); |
| } |
| if (values.size() != range_columns.size()) { |
| return Status::InvalidArgument( |
| Substitute("wrong number of range columns specified: expected $0 but received $1", |
| range_columns.size(), |
| values.size())); |
| } |
| |
| for (int i = 0; i < values.size(); i++) { |
| const auto& col_name = range_columns[i].first; |
| const auto type = range_columns[i].second; |
| const auto error_msg = Substitute(kErrorMsgArg, values[i], col_name, |
| KuduColumnSchema::DataTypeToString(type)); |
| switch (type) { |
| case KuduColumnSchema::INT8: { |
| int64_t value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractInt64(values[i], /*field=*/nullptr, &value), |
| error_msg); |
| RETURN_NOT_OK(range_bound_partial_row->SetInt8(col_name, static_cast<int8_t>(value))); |
| break; |
| } |
| case KuduColumnSchema::INT16: { |
| int64_t value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractInt64(values[i], /*field=*/nullptr, &value), |
| error_msg); |
| RETURN_NOT_OK(range_bound_partial_row->SetInt16(col_name, static_cast<int16_t>(value))); |
| break; |
| } |
| case KuduColumnSchema::INT32: { |
| int32_t value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractInt32(values[i], /*field=*/nullptr, &value), |
| error_msg); |
| RETURN_NOT_OK(range_bound_partial_row->SetInt32(col_name, value)); |
| break; |
| } |
| case KuduColumnSchema::INT64: { |
| int64_t value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractInt64(values[i], /*field=*/nullptr, &value), |
| error_msg); |
| RETURN_NOT_OK(range_bound_partial_row->SetInt64(col_name, value)); |
| break; |
| } |
| case KuduColumnSchema::DATE: { |
| int32_t value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractInt32(values[i], /*field=*/nullptr, &value), |
| error_msg); |
| RETURN_NOT_OK(range_bound_partial_row->SetDate(col_name, value)); |
| break; |
| } |
| case KuduColumnSchema::UNIXTIME_MICROS: { |
| int64_t value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractInt64(values[i], /*field=*/nullptr, &value), |
| error_msg); |
| RETURN_NOT_OK(range_bound_partial_row->SetUnixTimeMicros(col_name, value)); |
| break; |
| } |
| case KuduColumnSchema::BINARY: { |
| string value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractString(values[i], /*field=*/nullptr, &value), |
| error_msg); |
| RETURN_NOT_OK(range_bound_partial_row->SetBinary(col_name, value)); |
| break; |
| } |
| case KuduColumnSchema::STRING: { |
| string value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractString(values[i], /*field=*/nullptr, &value), |
| error_msg); |
| RETURN_NOT_OK(range_bound_partial_row->SetString(col_name, value)); |
| break; |
| } |
| case KuduColumnSchema::VARCHAR: { |
| string value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractString(values[i], /*field=*/nullptr, &value), |
| error_msg); |
| RETURN_NOT_OK(range_bound_partial_row->SetVarchar(col_name, value)); |
| break; |
| } |
| case KuduColumnSchema::BOOL: |
| case KuduColumnSchema::FLOAT: |
| case KuduColumnSchema::DOUBLE: |
| case KuduColumnSchema::DECIMAL: |
| default: |
| return Status::NotSupported( |
| Substitute("unsupported type $0 for key column '$1': " |
| "is this tool out of date?", |
| KuduColumnSchema::DataTypeToString(type), |
| col_name)); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status ModifyRangePartition(const RunnerContext& context, PartitionAction action) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| const string& table_range_lower_bound = FindOrDie(context.required_args, |
| kTableRangeLowerBoundArg); |
| const string& table_range_upper_bound = FindOrDie(context.required_args, |
| kTableRangeUpperBoundArg); |
| |
| const auto convert_bounds_type = [&] (const string& range_bound, |
| const string& flags_range_bound_type, |
| KuduTableCreator::RangePartitionBound* range_bound_type) { |
| string inclusive_bound = iequals(flags_range_bound_type, "INCLUSIVE_BOUND") ? |
| "INCLUSIVE_BOUND" : ""; |
| string exclusive_bound = iequals(flags_range_bound_type, "EXCLUSIVE_BOUND") ? |
| "EXCLUSIVE_BOUND" : ""; |
| |
| if (inclusive_bound.empty() && exclusive_bound.empty()) { |
| return Status::InvalidArgument(Substitute( |
| "wrong type of range $0 : only 'INCLUSIVE_BOUND' or " |
| "'EXCLUSIVE_BOUND' can be received", range_bound)); |
| |
| } |
| *range_bound_type = exclusive_bound.empty() ? KuduTableCreator::INCLUSIVE_BOUND : |
| KuduTableCreator::EXCLUSIVE_BOUND; |
| |
| return Status::OK(); |
| }; |
| |
| client::sp::shared_ptr<KuduClient> client; |
| client::sp::shared_ptr<KuduTable> table; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| RETURN_NOT_OK(client->OpenTable(table_name, &table)); |
| const auto& schema = table->schema(); |
| |
| unique_ptr<KuduPartialRow> lower_bound(schema.NewRow()); |
| unique_ptr<KuduPartialRow> upper_bound(schema.NewRow()); |
| |
| vector<pair<string, KuduColumnSchema::DataType>> range_col_names_and_types; |
| const Schema& schema_tmp = KuduSchema::ToSchema(schema); |
| const auto& partition_schema = table->partition_schema(); |
| vector<int32_t> key_indexes; |
| RETURN_NOT_OK(partition_schema.GetRangeSchemaColumnIndexes(schema_tmp, &key_indexes)); |
| for (int i = 0; i < key_indexes.size(); i++) { |
| const auto key_index = key_indexes[i]; |
| const auto& column = schema.Column(key_index); |
| range_col_names_and_types.emplace_back(std::make_pair(column.name(), |
| column.type())); |
| } |
| RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types, |
| table_range_lower_bound, |
| lower_bound.get())); |
| RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types, |
| table_range_upper_bound, |
| upper_bound.get())); |
| |
| KuduTableCreator::RangePartitionBound lower_bound_type; |
| KuduTableCreator::RangePartitionBound upper_bound_type; |
| |
| RETURN_NOT_OK(convert_bounds_type("lower bound", FLAGS_lower_bound_type, &lower_bound_type)); |
| RETURN_NOT_OK(convert_bounds_type("upper bound", FLAGS_upper_bound_type, &upper_bound_type)); |
| |
| unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name)); |
| if (action == PartitionAction::ADD) { |
| return alterer->AddRangePartition(lower_bound.release(), |
| upper_bound.release(), |
| lower_bound_type, |
| upper_bound_type)->Alter(); |
| } |
| DCHECK_EQ(PartitionAction::DROP, action); |
| return alterer->DropRangePartition(lower_bound.release(), |
| upper_bound.release(), |
| lower_bound_type, |
| upper_bound_type)->Alter(); |
| } |
| |
| Status DropRangePartition(const RunnerContext& context) { |
| return ModifyRangePartition(context, PartitionAction::DROP); |
| } |
| |
| Status AddRangePartition(const RunnerContext& context) { |
| return ModifyRangePartition(context, PartitionAction::ADD); |
| } |
| |
| Status ParseValueOfType(const string& default_value, |
| KuduColumnSchema::DataType type, |
| KuduValue** value) { |
| JsonReader reader(default_value); |
| RETURN_NOT_OK(reader.Init()); |
| vector<const rapidjson::Value*> values; |
| RETURN_NOT_OK(reader.ExtractObjectArray(reader.root(), |
| /*field=*/nullptr, |
| &values)); |
| if (values.size() != 1) { |
| return Status::InvalidArgument(Substitute( |
| "We got $0 value(s), you should provide one default value.", |
| std::to_string(values.size()))); |
| } |
| |
| string msg = Substitute("unable to parse value for column type $0", |
| KuduColumnSchema::DataTypeToString(type)); |
| switch (type) { |
| case KuduColumnSchema::DataType::INT8: |
| case KuduColumnSchema::DataType::INT16: |
| case KuduColumnSchema::DataType::INT32: |
| case KuduColumnSchema::DataType::INT64: |
| case KuduColumnSchema::DataType::DATE: |
| case KuduColumnSchema::DataType::UNIXTIME_MICROS: { |
| int64_t int_value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractInt64(values[0], /*field=*/nullptr, &int_value), msg); |
| *value = KuduValue::FromInt(int_value); |
| break; |
| } |
| case KuduColumnSchema::DataType::BINARY: |
| case KuduColumnSchema::DataType::STRING: |
| case KuduColumnSchema::DataType::VARCHAR: { |
| string str_value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractString(values[0], /*field=*/nullptr, &str_value), msg); |
| *value = KuduValue::CopyString(str_value); |
| break; |
| } |
| case KuduColumnSchema::DataType::BOOL: { |
| bool bool_value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractBool(values[0], /*field=*/nullptr, &bool_value), msg); |
| *value = KuduValue::FromBool(bool_value); |
| break; |
| } |
| case KuduColumnSchema::DataType::FLOAT: { |
| float float_value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractFloat(values[0], /*field=*/nullptr, &float_value), msg); |
| *value = KuduValue::FromFloat(float_value); |
| break; |
| } |
| case KuduColumnSchema::DataType::DOUBLE: { |
| double double_value; |
| RETURN_NOT_OK_PREPEND( |
| reader.ExtractDouble(values[0], /*field=*/nullptr, &double_value), msg); |
| *value = KuduValue::FromDouble(double_value); |
| break; |
| } |
| case KuduColumnSchema::DataType::DECIMAL: |
| default: |
| return Status::NotSupported(Substitute( |
| "$0 columns are not supported for setting default value by this tool," |
| "is this tool out of date?", |
| KuduColumnSchema::DataTypeToString(type))); |
| } |
| return Status::OK(); |
| } |
| |
| Status ColumnSetDefault(const RunnerContext& context) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| const string& column_name = FindOrDie(context.required_args, kColumnNameArg); |
| const string& default_value = FindOrDie(context.required_args, kDefaultValueArg); |
| |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| KuduSchema schema; |
| RETURN_NOT_OK(client->GetTableSchema(table_name, &schema)); |
| |
| // Here we use the first column to initialize an object of KuduColumnSchema |
| // for there is no default constructor for it. |
| KuduColumnSchema col_schema = schema.Column(0); |
| if (!schema.HasColumn(column_name, &col_schema)) { |
| return Status::NotFound(Substitute("Couldn't find column $0", column_name)); |
| } |
| |
| KuduValue* value = nullptr; |
| RETURN_NOT_OK(ParseValueOfType(default_value, col_schema.type(), &value)); |
| unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name)); |
| alterer->AlterColumn(column_name)->Default(value); |
| return alterer->Alter(); |
| } |
| |
| Status ColumnRemoveDefault(const RunnerContext& context) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| const string& column_name = FindOrDie(context.required_args, kColumnNameArg); |
| |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name)); |
| alterer->AlterColumn(column_name)->RemoveDefault(); |
| return alterer->Alter(); |
| } |
| |
| #pragma GCC diagnostic push |
| #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" |
| Status ColumnSetCompression(const RunnerContext& context) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| const string& column_name = FindOrDie(context.required_args, kColumnNameArg); |
| const string& compression_type_arg = FindOrDie(context.required_args, kCompressionTypeArg); |
| KuduColumnStorageAttributes::CompressionType compression_type; |
| RETURN_NOT_OK(KuduColumnStorageAttributes::StringToCompressionType( |
| compression_type_arg, &compression_type)); |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name)); |
| alterer->AlterColumn(column_name)->Compression(compression_type); |
| return alterer->Alter(); |
| } |
| |
| Status ColumnSetEncoding(const RunnerContext& context) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| const string& column_name = FindOrDie(context.required_args, kColumnNameArg); |
| const string& encoding_type_arg = FindOrDie(context.required_args, kEncodingTypeArg); |
| KuduColumnStorageAttributes::EncodingType encoding_type; |
| RETURN_NOT_OK(KuduColumnStorageAttributes::StringToEncodingType( |
| encoding_type_arg, &encoding_type)); |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name)); |
| alterer->AlterColumn(column_name)->Encoding(encoding_type); |
| return alterer->Alter(); |
| } |
| #pragma GCC diagnostic pop |
| |
| Status ColumnSetBlockSize(const RunnerContext& context) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| const string& column_name = FindOrDie(context.required_args, kColumnNameArg); |
| const string& str_block_size = FindOrDie(context.required_args, kBlockSizeArg); |
| |
| int32_t block_size; |
| if (!safe_strto32(str_block_size, &block_size)) { |
| return Status::InvalidArgument(Substitute( |
| "Unable to parse block_size value: $0.", str_block_size)); |
| } |
| if (block_size <= 0) { |
| return Status::InvalidArgument(Substitute( |
| "Invalid block size: $0, it should be set higher than 0.", str_block_size)); |
| } |
| |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name)); |
| alterer->AlterColumn(column_name)->BlockSize(block_size); |
| return alterer->Alter(); |
| } |
| |
| Status ColumnSetComment(const RunnerContext& context) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| const string& column_name = FindOrDie(context.required_args, kColumnNameArg); |
| const string& column_comment = FindOrDie(context.required_args, kColumnCommentArg); |
| |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name)); |
| alterer->AlterColumn(column_name)->Comment(column_comment); |
| return alterer->Alter(); |
| } |
| |
| Status DeleteColumn(const RunnerContext& context) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| const string& column_name = FindOrDie(context.required_args, kColumnNameArg); |
| |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name)); |
| alterer->DropColumn(column_name); |
| return alterer->Alter(); |
| } |
| |
| Status GetTableStatistics(const RunnerContext& context) { |
| const string& table_name = FindOrDie(context.required_args, kTableNameArg); |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| |
| unique_ptr<KuduTableStatistics> statistics; |
| KuduTableStatistics *table_statistics; |
| RETURN_NOT_OK_PREPEND(client->GetTableStatistics(table_name, &table_statistics), |
| "failed to get table statistics."); |
| statistics.reset(table_statistics); |
| cout << "TABLE " << table_name << endl; |
| cout << statistics->ToString() << endl; |
| |
| return Status::OK(); |
| } |
| |
| Status ToJsonPartialRow(const RepeatedPtrField<string>& values, |
| const vector<pair<string, KuduColumnSchema::DataType>>& range_columns, |
| string* json_value) { |
| json_value->clear(); |
| if (values.empty() || values.size() != range_columns.size()) { |
| return Status::InvalidArgument(Substitute( |
| "Invalid range value size, value size should be equal to number of range keys.")); |
| } |
| int i = 0; |
| string joined = JoinMapped(values, [&](const string& v) { |
| auto data_type = range_columns[i++].second; |
| if (data_type == KuduColumnSchema::STRING || |
| data_type == KuduColumnSchema::VARCHAR || |
| data_type == KuduColumnSchema::BINARY) { |
| return "\"" + v + "\""; |
| } |
| return v; |
| }, ","); |
| *json_value = "[" + joined + "]"; |
| return Status::OK(); |
| } |
| |
| Status ToClientEncodingType( |
| ColumnPB::EncodingType type_pb, |
| KuduColumnStorageAttributes::EncodingType* type) { |
| Status s; |
| switch (type_pb) { |
| case ColumnPB::AUTO_ENCODING : |
| *type = KuduColumnStorageAttributes::AUTO_ENCODING; |
| break; |
| case ColumnPB::PLAIN_ENCODING : |
| *type = KuduColumnStorageAttributes::PLAIN_ENCODING; |
| break; |
| case ColumnPB::PREFIX_ENCODING : |
| *type = KuduColumnStorageAttributes::PREFIX_ENCODING; |
| break; |
| case ColumnPB::DICT_ENCODING : |
| *type = KuduColumnStorageAttributes::DICT_ENCODING; |
| break; |
| case ColumnPB::RLE : |
| *type = KuduColumnStorageAttributes::RLE; |
| break; |
| case ColumnPB::BIT_SHUFFLE : |
| *type = KuduColumnStorageAttributes::BIT_SHUFFLE; |
| break; |
| default : |
| s = Status::InvalidArgument(Substitute("Unexpected encoding type: $0", type_pb)); |
| } |
| return s; |
| } |
| |
| Status ToClientCompressionType( |
| ColumnPB::CompressionType type_pb, |
| KuduColumnStorageAttributes::CompressionType* type) { |
| Status s; |
| switch (type_pb) { |
| case ColumnPB::DEFAULT_COMPRESSION : |
| *type = KuduColumnStorageAttributes::DEFAULT_COMPRESSION; |
| break; |
| case ColumnPB::NO_COMPRESSION : |
| *type = KuduColumnStorageAttributes::NO_COMPRESSION; |
| break; |
| case ColumnPB::SNAPPY : |
| *type = KuduColumnStorageAttributes::SNAPPY; |
| break; |
| case ColumnPB::LZ4 : |
| *type = KuduColumnStorageAttributes::LZ4; |
| break; |
| case ColumnPB::ZLIB : |
| *type = KuduColumnStorageAttributes::ZLIB; |
| break; |
| default : |
| s = Status::InvalidArgument(Substitute("Unexpected compression type: $0", type_pb)); |
| } |
| return s; |
| } |
| |
| Status ToClientRangePartitionBound( |
| PartitionPB_RangePartitionPB_BoundPB::Type type_pb, |
| KuduTableCreator::RangePartitionBound* type) { |
| Status s; |
| switch (type_pb) { |
| case PartitionPB_RangePartitionPB_BoundPB::EXCLUSIVE: |
| *type = KuduTableCreator::EXCLUSIVE_BOUND; |
| break; |
| case PartitionPB_RangePartitionPB_BoundPB::INCLUSIVE: |
| *type = KuduTableCreator::INCLUSIVE_BOUND; |
| break; |
| case PartitionPB_RangePartitionPB_BoundPB::UNKNOWN_BOUND : |
| default: |
| s = Status::InvalidArgument(Substitute("Unexpected range partition bound type: ", type_pb)); |
| } |
| return s; |
| } |
| |
| #pragma GCC diagnostic push |
| #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" |
| Status ParseTableSchema(const SchemaPB& schema, |
| KuduSchema* kudu_schema) { |
| KuduSchemaBuilder b; |
| for (const auto& column : schema.columns()) { |
| KuduColumnSpec* spec = b.AddColumn(column.column_name()); |
| KuduColumnSchema::DataType type; |
| RETURN_NOT_OK(KuduColumnSchema::StringToDataType( |
| column.column_type(), &type)); |
| spec->Type(type); |
| if (column.has_type_attributes()) { |
| if (type == KuduColumnSchema::DataType::DECIMAL) { |
| spec->Precision(column.type_attributes().precision()); |
| spec->Scale(column.type_attributes().scale()); |
| } else if (type == KuduColumnSchema::DataType::VARCHAR) { |
| spec->Length(column.type_attributes().length()); |
| } |
| } |
| if (!column.is_nullable()) { |
| spec->NotNull(); |
| } |
| if (column.has_default_value()) { |
| KuduValue* value = nullptr; |
| string default_v; |
| if (column.column_type() == "STRING" || |
| column.column_type() == "BINARY" || |
| column.column_type() == "VARCHAR" || |
| column.column_type() == "DECIMAL") { |
| default_v = "[\"" + column.default_value() + "\"]"; |
| } else { |
| default_v = "[" + column.default_value() + "]"; |
| } |
| RETURN_NOT_OK(ParseValueOfType(default_v, type, &value)); |
| spec->Default(value); |
| } |
| if (column.has_comment()) { |
| spec->Comment(column.comment()); |
| } |
| // If no valid encoding is provided, AUTO_ENCODING will be used by default. |
| if (column.has_encoding()) { |
| KuduColumnStorageAttributes::EncodingType type; |
| RETURN_NOT_OK(ToClientEncodingType(column.encoding(), &type)); |
| spec->Encoding(type); |
| } |
| // If no valid compression is provided, DEFAULT_COMPRESSION will be used. |
| if (column.has_compression()) { |
| KuduColumnStorageAttributes::CompressionType type; |
| RETURN_NOT_OK(ToClientCompressionType(column.compression(), &type)); |
| spec->Compression(type); |
| } |
| } |
| |
| b.SetPrimaryKey(vector<string>(schema.key_column_names().begin(), |
| schema.key_column_names().end())); |
| RETURN_NOT_OK(b.Build(kudu_schema)); |
| return Status::OK(); |
| } |
| #pragma GCC diagnostic pop |
| |
| Status ParseTablePartition(const PartitionPB& partition, |
| const KuduSchema& kudu_schema, |
| KuduTableCreator* table_creator) { |
| for (const auto& hash_partition : partition.hash_partitions()) { |
| vector<string> hash_keys; |
| for (const auto& hk : hash_partition.columns()) { |
| hash_keys.push_back(hk); |
| } |
| int32_t seed = 0; |
| if (hash_partition.has_seed()) { |
| seed = hash_partition.seed(); |
| } |
| table_creator->add_hash_partitions(hash_keys, hash_partition.num_buckets(), seed); |
| } |
| // Generate and add the range partition splits for the table. |
| if (!partition.has_range_partition()) { |
| table_creator->set_range_partition_columns({}); |
| return Status::OK(); |
| } |
| set<string> range_keys; |
| vector<pair<string, KuduColumnSchema::DataType>> range_col_names_and_types; |
| for (const auto& range_key : partition.range_partition().columns()) { |
| range_keys.insert(range_key); |
| } |
| table_creator->set_range_partition_columns( |
| vector<string>(range_keys.begin(), range_keys.end())); |
| for (int idx = 0; idx < kudu_schema.num_columns(); ++idx) { |
| // Find the range key type, |
| KuduColumnSchema column = kudu_schema.Column(idx); |
| if (ContainsKey(range_keys, column.name())) { |
| range_col_names_and_types.emplace_back( |
| std::make_pair(column.name(), column.type())); |
| } |
| } |
| string bound_partial_row_json; |
| for (const auto& bound : partition.range_partition().range_bounds()) { |
| unique_ptr<KuduPartialRow> lower_bound(kudu_schema.NewRow()); |
| unique_ptr<KuduPartialRow> upper_bound(kudu_schema.NewRow()); |
| KuduTableCreator::RangePartitionBound lower_bound_type = |
| KuduTableCreator::INCLUSIVE_BOUND; |
| KuduTableCreator::RangePartitionBound upper_bound_type = |
| KuduTableCreator::EXCLUSIVE_BOUND; |
| if (bound.has_lower_bound()) { |
| RETURN_NOT_OK(ToJsonPartialRow(bound.lower_bound().bound_values(), |
| range_col_names_and_types, |
| &bound_partial_row_json)); |
| RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types, |
| bound_partial_row_json, |
| lower_bound.get())); |
| RETURN_NOT_OK(ToClientRangePartitionBound(bound.lower_bound().bound_type(), |
| &lower_bound_type)); |
| } |
| if (bound.has_upper_bound()) { |
| RETURN_NOT_OK(ToJsonPartialRow(bound.upper_bound().bound_values(), |
| range_col_names_and_types, |
| &bound_partial_row_json)); |
| RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types, |
| bound_partial_row_json, |
| upper_bound.get())); |
| RETURN_NOT_OK(ToClientRangePartitionBound(bound.upper_bound().bound_type(), |
| &upper_bound_type)); |
| } |
| table_creator->add_range_partition(lower_bound.release(), upper_bound.release(), |
| lower_bound_type, upper_bound_type); |
| } |
| for (const auto& split_pb : partition.range_partition().range_splits()) { |
| RETURN_NOT_OK(ToJsonPartialRow(split_pb.split_values(), |
| range_col_names_and_types, |
| &bound_partial_row_json)); |
| unique_ptr<KuduPartialRow> split(kudu_schema.NewRow()); |
| RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types, |
| bound_partial_row_json, |
| split.get())); |
| table_creator->add_range_partition_split(split.release()); |
| } |
| return Status::OK(); |
| } |
| |
| Status CreateTable(const RunnerContext& context) { |
| const string& json_str = FindOrDie(context.required_args, kCreateTableJSONArg); |
| CreateTablePB table_req; |
| JsonParseOptions opts; |
| opts.case_insensitive_enum_parsing = true; |
| const auto& google_status = JsonStringToMessage(json_str, &table_req, opts); |
| if (!google_status.ok()) { |
| return Status::InvalidArgument( |
| Substitute("unable to parse JSON: $0", json_str), |
| google_status.error_message().ToString()); |
| } |
| |
| client::sp::shared_ptr<KuduClient> client; |
| RETURN_NOT_OK(CreateKuduClient(context, &client)); |
| KuduSchema kudu_schema; |
| RETURN_NOT_OK(ParseTableSchema(table_req.schema(), &kudu_schema)); |
| unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator()); |
| table_creator->table_name(table_req.table_name()) |
| .schema(&kudu_schema); |
| RETURN_NOT_OK(ParseTablePartition(table_req.partition(), kudu_schema, table_creator.get())); |
| if (table_req.has_num_replicas()) { |
| table_creator->num_replicas(table_req.num_replicas()); |
| } |
| if (table_req.has_extra_configs()) { |
| map<string, string> extra_configs(table_req.extra_configs().configs().begin(), |
| table_req.extra_configs().configs().end()); |
| table_creator->extra_configs(extra_configs); |
| } |
| if (table_req.has_dimension_label()) { |
| table_creator->dimension_label(table_req.dimension_label()); |
| } |
| return table_creator->Create(); |
| } |
| |
| |
| } // anonymous namespace |
| |
| unique_ptr<Mode> BuildTableMode() { |
| unique_ptr<Action> delete_table = |
| ClusterActionBuilder("delete", &DeleteTable) |
| .Description("Delete a table") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to delete" }) |
| .AddOptionalParameter("modify_external_catalogs") |
| .Build(); |
| |
| unique_ptr<Action> describe_table = |
| ClusterActionBuilder("describe", &DescribeTable) |
| .Description("Describe a table") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to describe" }) |
| .AddOptionalParameter("show_attributes") |
| .Build(); |
| |
| unique_ptr<Action> list_tables = |
| ClusterActionBuilder("list", &ListTables) |
| .Description("List tables") |
| .AddOptionalParameter("tables") |
| .AddOptionalParameter("list_tablets") |
| .Build(); |
| |
| unique_ptr<Action> locate_row = |
| ClusterActionBuilder("locate_row", &LocateRow) |
| .Description("Locate which tablet a row belongs to") |
| .ExtraDescription("Provide the primary key as a JSON array of primary " |
| "key values, e.g. '[1, \"foo\", 2, \"bar\"]'. The " |
| "output will be the tablet id associated with the row " |
| "key. If there is no such tablet, an error message " |
| "will be printed and the command will return a " |
| "non-zero status") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to look up against" }) |
| .AddRequiredParameter({ kKeyArg, |
| "String representation of the row's primary key " |
| "as a JSON array" }) |
| .AddOptionalParameter("check_row_existence") |
| .Build(); |
| |
| unique_ptr<Action> rename_column = |
| ClusterActionBuilder("rename_column", &RenameColumn) |
| .Description("Rename a column") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to alter" }) |
| .AddRequiredParameter({ kColumnNameArg, "Name of the table column to rename" }) |
| .AddRequiredParameter({ kNewColumnNameArg, "New column name" }) |
| .Build(); |
| |
| unique_ptr<Action> rename_table = |
| ClusterActionBuilder("rename_table", &RenameTable) |
| .Description("Rename a table") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to rename" }) |
| .AddRequiredParameter({ kNewTableNameArg, "New table name" }) |
| .AddOptionalParameter("modify_external_catalogs") |
| .Build(); |
| |
| unique_ptr<Action> scan_table = |
| ClusterActionBuilder("scan", &ScanTable) |
| .Description("Scan rows from a table") |
| .ExtraDescription("Scan rows from an existing table. See the help " |
| "for the --predicates flag on how predicates can be specified.") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to scan"}) |
| .AddOptionalParameter("columns") |
| .AddOptionalParameter("fill_cache") |
| .AddOptionalParameter("num_threads") |
| .AddOptionalParameter("predicates") |
| .AddOptionalParameter("tablets") |
| .Build(); |
| |
| unique_ptr<Action> copy_table = |
| ClusterActionBuilder("copy", &CopyTable) |
| .Description("Copy table data to another table") |
| .ExtraDescription("Copy table data to another table; the two tables could be in the same " |
| "cluster or not. The two tables must have the same table schema, but " |
| "could have different partition schemas. Alternatively, the tool can " |
| "create the new table using the same table and partition schema as the " |
| "source table.") |
| .AddRequiredParameter({ kTableNameArg, "Name of the source table" }) |
| .AddRequiredParameter({ kDestMasterAddressesArg, kDestMasterAddressesArgDesc }) |
| .AddOptionalParameter("create_table") |
| .AddOptionalParameter("dst_table") |
| .AddOptionalParameter("num_threads") |
| .AddOptionalParameter("predicates") |
| .AddOptionalParameter("tablets") |
| .AddOptionalParameter("write_type") |
| .Build(); |
| |
| unique_ptr<Action> set_extra_config = |
| ClusterActionBuilder("set_extra_config", &SetExtraConfig) |
| .Description("Change a extra configuration value on a table") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to alter" }) |
| .AddRequiredParameter({ kConfigNameArg, "Name of the configuration" }) |
| .AddRequiredParameter({ kConfigValueArg, "New value for the configuration" }) |
| .Build(); |
| |
| unique_ptr<Action> get_extra_configs = |
| ClusterActionBuilder("get_extra_configs", &GetExtraConfigs) |
| .Description("Get the extra configuration properties for a table") |
| .AddRequiredParameter({ kTableNameArg, |
| "Name of the table for which to get extra configurations" }) |
| .AddOptionalParameter("config_names") |
| .Build(); |
| |
| unique_ptr<Action> drop_range_partition = |
| ClusterActionBuilder("drop_range_partition", &DropRangePartition) |
| .Description("Drop a range partition of table") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table" }) |
| .AddRequiredParameter({ kTableRangeLowerBoundArg, |
| "String representation of lower bound of " |
| "the table range partition as a JSON array" }) |
| .AddRequiredParameter({ kTableRangeUpperBoundArg, |
| "String representation of upper bound of " |
| "the table range partition as a JSON array" }) |
| .AddOptionalParameter("lower_bound_type") |
| .AddOptionalParameter("upper_bound_type") |
| .Build(); |
| |
| unique_ptr<Action> add_range_partition = |
| ClusterActionBuilder("add_range_partition", &AddRangePartition) |
| .Description("Add a range partition for table") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table" }) |
| .AddRequiredParameter({ kTableRangeLowerBoundArg, |
| "String representation of lower bound of " |
| "the table range partition as a JSON array." |
| "If the parameter is an empty array, " |
| "the lower range partition will be unbounded" }) |
| .AddRequiredParameter({ kTableRangeUpperBoundArg, |
| "String representation of upper bound of " |
| "the table range partition as a JSON array." |
| "If the parameter is an empty array, " |
| "the upper range partition will be unbounded" }) |
| .AddOptionalParameter("lower_bound_type") |
| .AddOptionalParameter("upper_bound_type") |
| .Build(); |
| |
| unique_ptr<Action> column_set_default = |
| ClusterActionBuilder("column_set_default", &ColumnSetDefault) |
| .Description("Set write_default value for a column") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to alter" }) |
| .AddRequiredParameter({ kColumnNameArg, "Name of the table column to alter" }) |
| .AddRequiredParameter({ kDefaultValueArg, |
| "Write default value of the column, should be provided as a " |
| "JSON array, e.g. [1] or [\"foo\"]" }) |
| .Build(); |
| |
| unique_ptr<Action> column_remove_default = |
| ClusterActionBuilder("column_remove_default", &ColumnRemoveDefault) |
| .Description("Remove write_default value for a column") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to alter" }) |
| .AddRequiredParameter({ kColumnNameArg, "Name of the table column to alter" }) |
| .Build(); |
| |
| unique_ptr<Action> column_set_compression = |
| ClusterActionBuilder("column_set_compression", &ColumnSetCompression) |
| .Description("Set compression type for a column") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to alter" }) |
| .AddRequiredParameter({ kColumnNameArg, "Name of the table column to alter" }) |
| .AddRequiredParameter({ kCompressionTypeArg, "Compression type of the column" }) |
| .Build(); |
| |
| unique_ptr<Action> column_set_encoding = |
| ClusterActionBuilder("column_set_encoding", &ColumnSetEncoding) |
| .Description("Set encoding type for a column") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to alter" }) |
| .AddRequiredParameter({ kColumnNameArg, "Name of the table column to alter" }) |
| .AddRequiredParameter({ kEncodingTypeArg, "Encoding type of the column" }) |
| .Build(); |
| |
| unique_ptr<Action> column_set_block_size = |
| ClusterActionBuilder("column_set_block_size", &ColumnSetBlockSize) |
| .Description("Set block size for a column") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to alter" }) |
| .AddRequiredParameter({ kColumnNameArg, "Name of the table column to alter" }) |
| .AddRequiredParameter({ kBlockSizeArg, "Block size of the column" }) |
| .Build(); |
| |
| unique_ptr<Action> column_set_comment = |
| ClusterActionBuilder("column_set_comment", &ColumnSetComment) |
| .Description("Set comment for a column") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to alter" }) |
| .AddRequiredParameter({ kColumnNameArg, "Name of the table column to alter" }) |
| .AddRequiredParameter({ kColumnCommentArg, "Comment of the column" }) |
| .Build(); |
| |
| unique_ptr<Action> delete_column = |
| ClusterActionBuilder("delete_column", &DeleteColumn) |
| .Description("Delete a column") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to alter" }) |
| .AddRequiredParameter({ kColumnNameArg, "Name of the table column to delete" }) |
| .Build(); |
| |
| |
| unique_ptr<Action> statistics = |
| ClusterActionBuilder("statistics", &GetTableStatistics) |
| .Description("Get table statistics") |
| .AddRequiredParameter({ kTableNameArg, "Name of the table to get statistics" }) |
| .Build(); |
| |
| unique_ptr<Action> create_table = |
| ClusterActionBuilder("create", &CreateTable) |
| .Description("Create a new table") |
| .ExtraDescription("Provide the table-build statements as a JSON object, e.g." |
| "'{\"table_name\":\"test\",\"schema\":{\"columns\":[{\"column_name" |
| "\":\"id\",\"column_type\":\"INT32\",\"default_value\":\"1\"},{" |
| "\"column_name\":\"key\",\"column_type\":\"INT64\",\"is_nullable\"" |
| ":false,\"comment\":\"range key\"},{\"column_name\":\"name\",\"" |
| "column_type\":\"STRING\",\"is_nullable\":false,\"comment\":\"" |
| "user name\"}],\"key_column_names\":[\"id\", \"key\"]},\"partition\"" |
| ":{\"hash_partitions\":[{\"columns\":[\"id\"],\"num_buckets\":2,\"seed" |
| "\":100}],\"range_partition\":{\"columns\":[\"key\"],\"range_bounds\":" |
| "[{\"upper_bound\":{\"bound_type\":\"inclusive\",\"bound_values\":[\"2" |
| "\"]}},{\"lower_bound\": {\"bound_type\":\"exclusive\",\"bound_values" |
| "\": [\"2\"]},\"upper_bound\":{\"bound_type\":\"inclusive\",\"" |
| "bound_values\":[\"3\"]}}]}},\"extra_configs\":{\"configs\":{\"" |
| "kudu.table.history_max_age_sec\":\"3600\"}},\"num_replicas\":3}'.") |
| .AddRequiredParameter({ kCreateTableJSONArg, "JSON object for creating table" }) |
| .Build(); |
| |
| return ModeBuilder("table") |
| .Description("Operate on Kudu tables") |
| .AddAction(std::move(add_range_partition)) |
| .AddAction(std::move(column_remove_default)) |
| .AddAction(std::move(column_set_block_size)) |
| .AddAction(std::move(column_set_compression)) |
| .AddAction(std::move(column_set_default)) |
| .AddAction(std::move(column_set_encoding)) |
| .AddAction(std::move(column_set_comment)) |
| .AddAction(std::move(copy_table)) |
| .AddAction(std::move(create_table)) |
| .AddAction(std::move(delete_column)) |
| .AddAction(std::move(delete_table)) |
| .AddAction(std::move(describe_table)) |
| .AddAction(std::move(drop_range_partition)) |
| .AddAction(std::move(get_extra_configs)) |
| .AddAction(std::move(list_tables)) |
| .AddAction(std::move(locate_row)) |
| .AddAction(std::move(rename_column)) |
| .AddAction(std::move(rename_table)) |
| .AddAction(std::move(scan_table)) |
| .AddAction(std::move(set_extra_config)) |
| .AddAction(std::move(statistics)) |
| .Build(); |
| } |
| |
| } // namespace tools |
| } // namespace kudu |
| |