blob: 56608c9f556cd1aad09a128a2e9335ba30c6dd3c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <cstdint>
#include <functional>
#include <iostream>
#include <map>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/common.h>
#include <google/protobuf/stubs/status.h>
#include <google/protobuf/stubs/stringpiece.h>
#include <google/protobuf/util/json_util.h>
#include <rapidjson/document.h>
#include "kudu/client/client-internal.h"
#include "kudu/client/client.h"
#include "kudu/client/replica_controller-internal.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scan_predicate.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/table_alterer-internal.h"
#include "kudu/client/value.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/partition.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tools/table_scanner.h"
#include "kudu/tools/tool.pb.h"
#include "kudu/tools/tool_action.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/util/flag_validators.h"
#include "kudu/util/jsonreader.h"
#include "kudu/util/jsonwriter.h"
#include "kudu/util/status.h"
#include "kudu/util/string_case.h"
using google::protobuf::util::JsonStringToMessage;
using google::protobuf::util::JsonParseOptions;
using google::protobuf::RepeatedPtrField;
using kudu::client::KuduClient;
using kudu::client::KuduColumnSchema;
using kudu::client::KuduColumnSpec;
using kudu::client::KuduColumnStorageAttributes;
using kudu::client::KuduPredicate;
using kudu::client::KuduRangePartition;
using kudu::client::KuduScanToken;
using kudu::client::KuduScanTokenBuilder;
using kudu::client::KuduScanner;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduTable;
using kudu::client::KuduTableAlterer;
using kudu::client::KuduTableCreator;
using kudu::client::KuduTableStatistics;
using kudu::client::KuduValue;
using kudu::client::internal::ReplicaController;
using kudu::iequals;
using kudu::tools::PartitionPB;
using std::cerr;
using std::cout;
using std::endl;
using std::find_if;
using std::make_unique;
using std::map;
using std::ostringstream;
using std::pair;
using std::set;
using std::string;
using std::unique_ptr;
using std::unordered_map;
using std::vector;
using strings::Split;
using strings::Substitute;
DEFINE_bool(check_row_existence, false,
"Also check for the existence of the row on the leader replica of "
"the tablet. If found, the full row will be printed; if not found, "
"an error message will be printed and the command will return a "
"non-zero status.");
DEFINE_string(dst_table, "",
"The name of the destination table the data will be copied to. "
"If the empty string, use the same name as the source table.");
DEFINE_bool(list_tablets, false,
"Include tablet and replica UUIDs in the output");
DEFINE_bool(show_table_info, false,
"Include extra information such as number of tablets, replicas, "
"and live row count for a table in the output");
DEFINE_bool(show_tablet_partition_info, false,
"Include partition keys information corresponding to tablet in the output.");
bool ValidateShowTabletPartitionInfo() {
if (!FLAGS_list_tablets && FLAGS_show_tablet_partition_info) {
LOG(ERROR) << Substitute("--show_tablet_partition_info is meaningless "
"when --list_tablets=false");
return false;
}
return true;
}
GROUP_FLAG_VALIDATOR(show_tablet_partition_info, ValidateShowTabletPartitionInfo);
DEFINE_bool(show_hash_partition_info, false,
"Include hash partition keys information corresponding to tablet in the output.");
bool ValidateShowHashPartitionInfo() {
if (!FLAGS_show_tablet_partition_info && FLAGS_show_hash_partition_info) {
LOG(ERROR) << Substitute("--show_hash_partition_info is meaningless "
"when --show_tablet_partition_info=false");
return false;
}
return true;
}
GROUP_FLAG_VALIDATOR(show_hash_partition_info, ValidateShowHashPartitionInfo);
DEFINE_bool(soft_deleted_only, false,
"Show only soft-deleted tables if set true, otherwise show regular tables.");
DEFINE_string(new_table_name, "",
"The new name for the recalled table. "
"Leave empty to recall the table under its original name.");
DEFINE_string(list_table_output_format, "pretty",
"One of 'json', 'json_compact' or 'pretty'. Pretty output flattens "
"the table list hierarchy.");
static bool ValidateListTableOutput(const char* flag_name,
const string& flag_value) {
const vector<string> allowed_values = { "pretty", "json", "json_compact" };
if (find_if(allowed_values.begin(), allowed_values.end(),
[&](const string& allowed_value) {
return iequals(allowed_value, flag_value);
}) != allowed_values.end()) {
return true;
}
LOG(ERROR) << Substitute("'$0' : unsupported value for -- $1 flag; "
"should be one of $2.",
flag_value, flag_name,
JoinStrings(allowed_values, " "));
return false;
}
DEFINE_validator(list_table_output_format, ValidateListTableOutput);
DEFINE_bool(modify_external_catalogs, true,
"Whether to modify external catalogs, such as the Hive Metastore, "
"when renaming or dropping a table.");
DEFINE_string(config_names, "",
"Comma-separated list of configurations to display. "
"An empty value displays all configs.");
DEFINE_string(lower_bound_type, "INCLUSIVE_BOUND",
"The type of the lower bound, either inclusive or exclusive. "
"Defaults to inclusive. This flag is case-insensitive.");
DEFINE_string(upper_bound_type, "EXCLUSIVE_BOUND",
"The type of the upper bound, either inclusive or exclusive. "
"Defaults to exclusive. This flag is case-insensitive.");
DEFINE_string(hash_schema, "",
"String representation of range-specific hash schema as a JSON "
"object, e.g. "
"{\"hash_schema\": [{\"columns\": [\"c0\"], \"num_buckets\": 5}]}");
DEFINE_int32(scan_batch_size, -1,
"The size for scan results batches, in bytes. A negative value "
"means the server-side default is used, where the server-side "
"default is controlled by the tablet server's "
"--scanner_default_batch_size_bytes flag.");
DEFINE_bool(show_avro_format_schema, false,
"Display the table schema in avro format. When enabled it only outputs the "
"table schema in Avro format without any other information like "
"partition/owner/comments. It cannot be used in conjunction with other flags");
DEFINE_uint32(reserve_seconds, 0,
"Grace period before purging a soft-deleted table, in seconds. "
"If set to 0, table is purged once it dropped/deleted.");
DECLARE_bool(create_table);
DECLARE_bool(fault_tolerant);
DECLARE_int32(create_table_replication_factor);
DECLARE_bool(row_count_only);
DECLARE_bool(show_scanner_stats);
DEFINE_string(encoding_type, "AUTO_ENCODING",
"Type of encoding for the column including AUTO_ENCODING, PLAIN_ENCODING, "
"PREFIX_ENCODING, RLE, DICT_ENCODING, BIT_SHUFFLE, GROUP_VARINT");
DEFINE_string(compression_type, "DEFAULT_COMPRESSION",
"Type of compression for the column including DEFAULT_COMPRESSION, "
"NO_COMPRESSION, SNAPPY, LZ4, ZLIB");
DEFINE_string(default_value, "", "Default value for this column.");
DEFINE_string(comment, "", "Comment for this column.");
DECLARE_bool(show_values);
DECLARE_string(replica_selection);
DECLARE_string(tables);
bool ValidateCreateTable() {
if (!FLAGS_create_table && FLAGS_create_table_replication_factor != -1) {
LOG(ERROR) << Substitute("--create_table_replication_factor is meaningless "
"when --create_table=false");
return false;
}
return true;
}
GROUP_FLAG_VALIDATOR(create_table, ValidateCreateTable);
namespace kudu {
namespace tools {
// This class only exists so that ListTables() can easily be friended by
// KuduReplica, KuduReplica::Data, and KuduClientBuilder.
class TableLister {
public:
static string SearchPartitionInfo(const KuduClient::Data::TableInfo& table_info,
const client::sp::shared_ptr<KuduTable>& table,
const string& tablet_id) {
string pinfo;
const auto& partition_with_tablet_info = table_info.partition_with_tablet_info;
for (const auto& pt : partition_with_tablet_info) {
if (tablet_id != pt.tablet_id) {
continue;
}
const auto& schema_internal = KuduSchema::ToSchema(table->schema());
const auto& partition_schema = table->partition_schema();
auto show_hp = FLAGS_show_hash_partition_info ? PartitionSchema::HashPartitionInfo::SHOW :
PartitionSchema::HashPartitionInfo::HIDE;
pinfo = partition_schema.PartitionDebugString(pt.partition, schema_internal,
show_hp);
break;
}
return pinfo;
}
static string ToPrettyFormat(const TablesInfoPB& tables_info) {
string output;
for (const auto& table_info : tables_info.tables()) {
if (!output.empty()) {
output.append("\n");
}
output += table_info.name();
if (table_info.has_num_tablets()) {
output += Substitute(" num_tablets:$0 num_replicas:$1 live_row_count:$2",
table_info.num_tablets(),
table_info.num_replicas(),
table_info.live_row_count());
}
for (const auto& tablet_info : table_info.tablet_with_partition()) {
output += string("\n") + string(" T ") + tablet_info.tablet_id();
if (tablet_info.has_partition_info()) {
output += tablet_info.partition_info();
}
for (const auto& replica_info : tablet_info.replica_info()) {
output += Substitute("\n $0 $1 $2",
replica_info.role(),
replica_info.uuid(),
replica_info.host_port());
}
}
}
return output;
}
static Status ListTablets(const vector<string>& master_addresses) {
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(master_addresses,
&client,
true/* can_see_all_replicas */));
vector<kudu::client::KuduClient::Data::TableInfo> tables_info;
RETURN_NOT_OK(client->data_->ListTablesWithInfo(
client.get(),
&tables_info,
"" /* filter */,
FLAGS_show_tablet_partition_info,
FLAGS_soft_deleted_only));
vector<string> table_filters = Split(FLAGS_tables, ",", strings::SkipEmpty());
TablesInfoPB tables_info_pb;
for (const auto& tinfo : tables_info) {
const auto& tname = tinfo.table_name;
if (!MatchesAnyPattern(table_filters, tname)) continue;
TablesInfoPB::TableInfoPB* table_info_pb = tables_info_pb.add_tables();
table_info_pb->set_name(tname);
if (FLAGS_show_table_info) {
table_info_pb->set_num_tablets(tinfo.num_tablets);
table_info_pb->set_num_replicas(tinfo.num_replicas);
table_info_pb->set_live_row_count(tinfo.live_row_count);
}
if (!FLAGS_list_tablets) {
continue;
}
client::sp::shared_ptr<KuduTable> client_table;
RETURN_NOT_OK(client->OpenTable(tname, &client_table));
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
KuduScanTokenBuilder builder(client_table.get());
RETURN_NOT_OK(builder.Build(&tokens));
for (const auto* token : tokens) {
string partition_info;
string tablet_id = token->tablet().id();
if (FLAGS_show_tablet_partition_info) {
if (iequals(FLAGS_list_table_output_format, "pretty")) {
partition_info = " : " + SearchPartitionInfo(tinfo, client_table, tablet_id);
} else {
partition_info = SearchPartitionInfo(tinfo, client_table, tablet_id);
}
}
TablesInfoPB::TabletWithPartitionPB* tpinfo = table_info_pb->add_tablet_with_partition();
tpinfo->set_tablet_id(tablet_id);
tpinfo->set_partition_info(partition_info);
for (const auto* replica : token->tablet().replicas()) {
const bool is_voter = ReplicaController::is_voter(*replica);
const bool is_leader = replica->is_leader();
TablesInfoPB::ReplicaInfoPB* rinfo = tpinfo->add_replica_info();
rinfo->set_role(is_leader ? "L" : (is_voter ? "V" : "N"));
rinfo->set_uuid(replica->ts().uuid());
rinfo->set_host_port(replica->ts().hostname() + ":" +
std::to_string(replica->ts().port()));
}
}
}
if (iequals(FLAGS_list_table_output_format, "pretty")) {
cout << ToPrettyFormat(tables_info_pb) << endl;
} else {
DCHECK(iequals(FLAGS_list_table_output_format, "json") ||
iequals(FLAGS_list_table_output_format, "json_compact"));
auto mode = iequals(FLAGS_list_table_output_format, "json") ?
JsonWriter::Mode::PRETTY : JsonWriter::Mode::COMPACT;
cout << JsonWriter::ToJson(tables_info_pb, mode) << endl;
}
return Status::OK();
}
};
// This class only exists so that it can easily be friended by KuduTableAlterer.
class TableAlter {
public:
static Status SetReplicationFactor(const vector<string>& master_addresses,
const string& table_name,
int32_t replication_factor) {
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(master_addresses, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
alterer->data_->set_replication_factor_to_ = replication_factor;
return alterer->Alter();
}
};
namespace {
constexpr const char* const kBlockSizeArg = "block_size";
constexpr const char* const kColumnCommentArg = "column_comment";
constexpr const char* const kColumnNameArg = "column_name";
constexpr const char* const kCompressionTypeArg = "compression_type";
constexpr const char* const kConfigNameArg = "config_name";
constexpr const char* const kConfigValueArg = "config_value";
constexpr const char* const kCreateTableExtraDescription =
R"*(provide parameters for the table to create as a JSON object, e.g.
'{
"table_name": "test",
"schema": {
"columns": [
{
"column_name": "id",
"column_type": "INT32",
"default_value": "1"
},
{
"column_name": "key",
"column_type": "INT64",
"is_nullable": false,
"comment": "range partition column"
},
{
"column_name": "name",
"column_type": "STRING",
"is_nullable": false,
"comment": "user name"
}
],
"key_column_names": ["id", "key"]
},
"partition": {
"hash_partitions": [{"columns": ["id"], "num_buckets": 2, "seed": 8}],
"range_partition": {
"columns": ["key"],
"range_bounds": [
{
"lower_bound": {"bound_type": "inclusive", "bound_values": ["2"]},
"upper_bound": {"bound_type": "exclusive", "bound_values": ["3"]}
},
{
"lower_bound": {"bound_type": "inclusive", "bound_values": ["3"]}
}
]
}
},
"extra_configs": {
"configs": { "kudu.table.history_max_age_sec": "3600" }
},
"comment": "a test table",
"num_replicas": 3
}')*";
constexpr const char* const kCreateTableJSONArg = "create_table_json";
constexpr const char* const kDataTypeArg = "data_type";
constexpr const char* const kDefaultValueArg = "default_value";
constexpr const char* const kEncodingTypeArg = "encoding_type";
constexpr const char* const kErrorMsgArg =
"unable to parse value $0 for column $1 of type $2";
constexpr const char* const kKeyArg = "primary_key";
constexpr const char* const kNewColumnNameArg = "new_column_name";
constexpr const char* const kNewTableNameArg = "new_table_name";
constexpr const char* const kReplicationFactorArg = "replication_factor";
constexpr const char* const kTableRangeLowerBoundArg = "table_range_lower_bound";
constexpr const char* const kTableRangeUpperBoundArg = "table_range_upper_bound";
enum PartitionAction {
ADD,
DROP,
};
Status AddLogicalType(JsonWriter* writer, const string& type, const string& logical_type,
const ColumnSchema& col_schema) {
writer->StartArray();
writer->StartObject();
writer->String("type");
writer->String(type);
writer->String("logicalType");
writer->String(logical_type);
writer->EndObject();
writer->EndArray();
if (col_schema.has_read_default()) {
writer->String("default");
writer->String(col_schema.Stringify(col_schema.read_default_value()));
}
return Status::OK();
}
Status AddPrimitiveType(const ColumnSchema& col_schema, const string& type, JsonWriter* writer) {
if (col_schema.is_nullable()) {
writer->StartArray();
writer->String("null");
writer->String(type);
writer->EndArray();
} else {
writer->String(type);
}
if (col_schema.has_read_default()) {
writer->String("default");
writer->String(col_schema.Stringify(col_schema.read_default_value()));
}
return Status::OK();
}
Status PopulateAvroSchema(const string& table_name,
const string& cluster_id,
const KuduSchema& kudu_schema) {
ostringstream out;
JsonWriter writer(&out, JsonWriter::Mode::PRETTY);
// Start writing in Json format
writer.StartObject();
vector<string> json_attributes = {"type", "table", "name", table_name,
"namespace", "kudu.cluster." + cluster_id, "fields"};
for (const string& json: json_attributes) {
writer.String(json);
}
writer.StartArray();
const Schema schema = kudu::client::KuduSchema::ToSchema(kudu_schema);
// Each column type is a nested field
for (int i = 0; i < schema.num_columns(); i++) {
writer.StartObject();
writer.String("name");
writer.String(kudu_schema.Column(i).name());
writer.String("type");
switch (kudu_schema.Column(i).type()) {
case kudu::client::KuduColumnSchema::INT8:
case kudu::client::KuduColumnSchema::INT16:
case kudu::client::KuduColumnSchema::INT32:
RETURN_NOT_OK(AddPrimitiveType(schema.column(i), "int", &writer));
break;
case kudu::client::KuduColumnSchema::INT64:
RETURN_NOT_OK(AddPrimitiveType(schema.column(i), "long", &writer));
break;
case kudu::client::KuduColumnSchema::STRING:
RETURN_NOT_OK(AddPrimitiveType(schema.column(i), "string", &writer));
break;
case kudu::client::KuduColumnSchema::BOOL:
RETURN_NOT_OK(AddPrimitiveType(schema.column(i), "bool", &writer));
break;
case kudu::client::KuduColumnSchema::FLOAT:
RETURN_NOT_OK(AddPrimitiveType(schema.column(i), "float", &writer));
break;
case kudu::client::KuduColumnSchema::DOUBLE:
RETURN_NOT_OK(AddPrimitiveType(schema.column(i), "double", &writer));
break;
case kudu::client::KuduColumnSchema::BINARY:
RETURN_NOT_OK(AddPrimitiveType(schema.column(i), "bytes", &writer));
break;
case kudu::client::KuduColumnSchema::VARCHAR:
RETURN_NOT_OK(AddPrimitiveType(schema.column(i), "string", &writer));
break;
// Each logical type in avro schema has sub-nested fields
case kudu::client::KuduColumnSchema::UNIXTIME_MICROS:
RETURN_NOT_OK(AddLogicalType(&writer, "long", "time-micros", schema.column(i)));
break;
case kudu::client::KuduColumnSchema::DATE:
RETURN_NOT_OK(AddLogicalType(&writer, "int", "date", schema.column(i)));
break;
case kudu::client::KuduColumnSchema::DECIMAL:
RETURN_NOT_OK(AddLogicalType(&writer, "bytes", "decimal", schema.column(i)));
break;
default:
LOG(DFATAL) << kudu_schema.Column(i).name() << ": Invalid column type";
}
writer.EndObject();
}
writer.EndArray();
writer.EndObject();
cout << out.str() << endl;
return Status::OK();
}
Status DeleteTable(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
return client->DeleteTableInCatalogs(table_name,
FLAGS_modify_external_catalogs,
FLAGS_reserve_seconds);
}
Status DescribeTable(const RunnerContext& context) {
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
client::sp::shared_ptr<KuduTable> table;
RETURN_NOT_OK(client->OpenTable(table_name, &table));
// The schema.
const KuduSchema& schema = table->schema();
if (FLAGS_show_avro_format_schema) {
return PopulateAvroSchema(FindOrDie(context.required_args, kTableNameArg),
client->cluster_id(), schema);
}
cout << "TABLE " << table_name << " " << schema.ToString() << endl;
// The partition schema with current range partitions.
vector<Partition> partitions;
RETURN_NOT_OK_PREPEND(table->ListPartitions(&partitions),
"failed to retrieve current partitions");
const auto& schema_internal = KuduSchema::ToSchema(schema);
const auto& partition_schema = table->partition_schema();
vector<string> partition_strs;
for (const auto& partition : partitions) {
// Deduplicate by hash bucket to get a unique entry per range partition.
const auto& hash_buckets = partition.hash_buckets();
if (!std::all_of(hash_buckets.begin(),
hash_buckets.end(),
[](int32_t bucket) { return bucket == 0; })) {
continue;
}
auto range_partition_str =
partition_schema.RangeWithCustomHashPartitionDebugString(partition.begin().range_key(),
partition.end().range_key(),
schema_internal);
partition_strs.emplace_back(std::move(range_partition_str));
}
cout << partition_schema.DisplayString(schema_internal, partition_strs)
<< endl;
// The owner.
cout << "OWNER " << table->owner() << endl;
// Finally, the replication factor.
cout << "REPLICAS " << table->num_replicas() << endl;
// The comment.
cout << "COMMENT " << table->comment() << endl;
return Status::OK();
}
Status LocateRow(const RunnerContext& context) {
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
client::sp::shared_ptr<KuduTable> table;
RETURN_NOT_OK(client->OpenTable(table_name, &table));
// Create an equality predicate for each primary key column.
const string& row_str = FindOrDie(context.required_args, kKeyArg);
JsonReader reader(row_str);
RETURN_NOT_OK(reader.Init());
vector<const rapidjson::Value*> values;
RETURN_NOT_OK(reader.ExtractObjectArray(reader.root(),
/*field=*/nullptr,
&values));
const auto& schema = table->schema();
vector<int> key_indexes;
schema.GetPrimaryKeyColumnIndexes(&key_indexes);
if (values.size() != key_indexes.size()) {
return Status::InvalidArgument(
Substitute("wrong number of key columns specified: expected $0 but received $1",
key_indexes.size(),
values.size()));
}
vector<unique_ptr<KuduPredicate>> predicates;
for (int i = 0; i < values.size(); i++) {
const auto key_index = key_indexes[i];
const auto& column = schema.Column(key_index);
const auto& col_name = column.name();
const auto type = column.type();
switch (type) {
case KuduColumnSchema::INT8:
case KuduColumnSchema::INT16:
case KuduColumnSchema::INT32:
case KuduColumnSchema::INT64:
case KuduColumnSchema::DATE:
case KuduColumnSchema::UNIXTIME_MICROS: {
int64_t value;
RETURN_NOT_OK_PREPEND(
reader.ExtractInt64(values[i], /*field=*/nullptr, &value),
Substitute("unable to parse value for column '$0' of type $1",
col_name,
KuduColumnSchema::DataTypeToString(type)));
predicates.emplace_back(
table->NewComparisonPredicate(col_name,
client::KuduPredicate::EQUAL,
client::KuduValue::FromInt(value)));
break;
}
case KuduColumnSchema::BINARY:
case KuduColumnSchema::STRING:
case KuduColumnSchema::VARCHAR: {
string value;
RETURN_NOT_OK_PREPEND(
reader.ExtractString(values[i], /*field=*/nullptr, &value),
Substitute("unable to parse value for column '$0' of type $1",
col_name,
KuduColumnSchema::DataTypeToString(type)));
predicates.emplace_back(
table->NewComparisonPredicate(col_name,
client::KuduPredicate::EQUAL,
client::KuduValue::CopyString(value)));
break;
}
case KuduColumnSchema::BOOL: {
// As of the writing of this tool, BOOL is not a supported key column
// type, but just in case it becomes one, we pre-load support for it.
bool value;
RETURN_NOT_OK_PREPEND(
reader.ExtractBool(values[i], /*field=*/nullptr, &value),
Substitute("unable to parse value for column '$0' of type $1",
col_name,
KuduColumnSchema::DataTypeToString(type)));
predicates.emplace_back(
table->NewComparisonPredicate(col_name,
client::KuduPredicate::EQUAL,
client::KuduValue::FromBool(value)));
break;
}
case KuduColumnSchema::FLOAT:
case KuduColumnSchema::DOUBLE: {
// Like BOOL, as of the writing of this tool, floating point types are
// not supported for key columns, but we can pre-load support for them
// in case they become supported.
double value;
RETURN_NOT_OK_PREPEND(
reader.ExtractDouble(values[i], /*field=*/nullptr, &value),
Substitute("unable to parse value for column '$0' of type $1",
col_name,
KuduColumnSchema::DataTypeToString(type)));
predicates.emplace_back(
table->NewComparisonPredicate(col_name,
client::KuduPredicate::EQUAL,
client::KuduValue::FromDouble(value)));
break;
}
case KuduColumnSchema::DECIMAL:
return Status::NotSupported(
Substitute("unsupported type $0 for key column '$1': "
"$0 key columns are not supported by this tool",
KuduColumnSchema::DataTypeToString(type),
col_name));
default:
return Status::NotSupported(
Substitute("unsupported type $0 for key column '$1': "
"is this tool out of date?",
KuduColumnSchema::DataTypeToString(type),
col_name));
}
}
// Find the tablet by constructing scan tokens for a scan with equality
// predicates on all key columns. At most one tablet will match, so there
// will be at most one token, and we can report the id of its tablet.
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
KuduScanTokenBuilder builder(table.get());
// In case we go on to check for existence of the row.
RETURN_NOT_OK(builder.SetSelection(KuduClient::ReplicaSelection::LEADER_ONLY));
for (auto& predicate : predicates) {
RETURN_NOT_OK(builder.AddConjunctPredicate(predicate.release()));
}
RETURN_NOT_OK(builder.Build(&tokens));
if (tokens.empty()) {
// Must be in a non-covered range partition.
return Status::NotFound("row does not belong to any currently existing tablet");
}
if (tokens.size() > 1) {
// This should be impossible. But if it does happen, we'd like to know what
// all the matching tablets were.
for (const auto& token : tokens) {
cerr << token->tablet().id() << endl;
}
return Status::IllegalState(Substitute(
"all primary key columns specified but found $0 matching tablets!",
tokens.size()));
}
cout << tokens[0]->tablet().id() << endl;
if (FLAGS_check_row_existence) {
KuduScanner* scanner_ptr;
RETURN_NOT_OK(tokens[0]->IntoKuduScanner(&scanner_ptr));
unique_ptr<KuduScanner> scanner(scanner_ptr);
RETURN_NOT_OK(scanner->Open());
vector<string> row_str;
client::KuduScanBatch batch;
while (scanner->HasMoreRows()) {
RETURN_NOT_OK(scanner->NextBatch(&batch));
for (const auto& row : batch) {
row_str.emplace_back(row.ToString());
}
}
if (row_str.empty()) {
return Status::NotFound("row does not exist");
}
// There should be exactly one result, but if somehow there are more, print
// them all before returning an error.
cout << JoinStrings(row_str, "\n") << endl;
if (row_str.size() != 1) {
// This should be impossible.
return Status::IllegalState(
Substitute("expected 1 row but received $0", row_str.size()));
}
}
return Status::OK();
}
Status SetDiskSizeLimit(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& disk_size_limit_str = FindOrDie(context.required_args, "disk_size");
int64_t disk_size_limit;
if (iequals(disk_size_limit_str, "unlimited")) {
disk_size_limit = -1;
} else if (!safe_strto64(disk_size_limit_str, &disk_size_limit)) {
return Status::InvalidArgument(Substitute(
"Could not parse $0 as disk size limit", disk_size_limit_str));
}
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
alterer->SetTableDiskSizeLimit(disk_size_limit);
return alterer->Alter();
}
Status SetRowCountLimit(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& row_count_limit_str = FindOrDie(context.required_args, "row_count");
int64_t row_count_limit;
if (iequals(row_count_limit_str, "unlimited")) {
row_count_limit = -1;
} else if (!safe_strto64(row_count_limit_str, &row_count_limit)) {
return Status::InvalidArgument(Substitute(
"Could not parse $0 as row count limit", row_count_limit_str));
}
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
alterer->SetTableRowCountLimit(row_count_limit);
return alterer->Alter();
}
Status RecallTable(const RunnerContext& context) {
const string& table_id = FindOrDie(context.required_args, kTabletIdArg);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
return client->RecallTable(table_id, FLAGS_new_table_name);
}
Status RenameTable(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& new_table_name = FindOrDie(context.required_args, kNewTableNameArg);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
return alterer->RenameTo(new_table_name)
->modify_external_catalogs(FLAGS_modify_external_catalogs)
->Alter();
}
Status RenameColumn(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& column_name = FindOrDie(context.required_args, kColumnNameArg);
const string& new_column_name = FindOrDie(context.required_args, kNewColumnNameArg);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
alterer->AlterColumn(column_name)->RenameTo(new_column_name);
return alterer->Alter();
}
Status ListTables(const RunnerContext& context) {
vector<string> master_addresses;
RETURN_NOT_OK(ParseMasterAddresses(context, &master_addresses));
return TableLister::ListTablets(master_addresses);
}
Status ScanTable(const RunnerContext &context) {
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
if (!FLAGS_row_count_only) {
FLAGS_show_values = true;
}
TableScanner scanner(client, table_name);
scanner.SetOutput(&cout);
scanner.SetScanBatchSize(FLAGS_scan_batch_size);
const auto& replica_selection_str = FLAGS_replica_selection;
if (!replica_selection_str.empty()) {
RETURN_NOT_OK(scanner.SetReplicaSelection(replica_selection_str));
}
return scanner.StartScan();
}
Status CopyTable(const RunnerContext& context) {
client::sp::shared_ptr<KuduClient> src_client;
RETURN_NOT_OK(CreateKuduClient(context, &src_client));
const string& src_table_name = FindOrDie(context.required_args, kTableNameArg);
client::sp::shared_ptr<KuduClient> dst_client;
if (FindOrDie(context.required_args, kMasterAddressesArg)
== FindOrDie(context.required_args, kDestMasterAddressesArg)) {
dst_client = src_client;
} else {
RETURN_NOT_OK(CreateKuduClient(context, kDestMasterAddressesArg, &dst_client));
}
const string& dst_table_name = FLAGS_dst_table.empty() ? src_table_name : FLAGS_dst_table;
TableScanner scanner(src_client, src_table_name, dst_client, dst_table_name);
scanner.SetOutput(&cout);
scanner.SetScanBatchSize(FLAGS_scan_batch_size);
return scanner.StartCopy();
}
Status SetExtraConfig(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& config_name = FindOrDie(context.required_args, kConfigNameArg);
const string& config_value = FindOrDie(context.required_args, kConfigValueArg);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
return alterer->AlterExtraConfig({ { config_name, config_value} })
->Alter();
}
Status GetExtraConfigs(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
set<string> config_names = strings::Split(FLAGS_config_names, ",", strings::SkipEmpty());
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
client::sp::shared_ptr<KuduTable> table;
RETURN_NOT_OK(client->OpenTable(table_name, &table));
DataTable data_table({ "Configuration", "Value" });
if (config_names.empty()) {
for (const auto& extra_config : table->extra_configs()) {
data_table.AddRow({ extra_config.first, extra_config.second });
}
} else {
for (const auto& config_name : config_names) {
const string* config_value = FindOrNull(table->extra_configs(), config_name);
if (config_value) {
data_table.AddRow({ config_name, *config_value });
}
}
}
return data_table.PrintTo(cout);
}
Status ConvertToKuduPartialRow(
const vector<pair<string, KuduColumnSchema::DataType>>& range_columns,
const string& range_bound_str,
KuduPartialRow* range_bound_partial_row) {
JsonReader reader(range_bound_str);
RETURN_NOT_OK(reader.Init());
vector<const rapidjson::Value *> values;
RETURN_NOT_OK(reader.ExtractObjectArray(reader.root(),
/*field=*/nullptr,
&values));
// If range_bound_str is empty, an empty row will be used.
if (values.empty()) {
return Status::OK();
}
if (values.size() != range_columns.size()) {
return Status::InvalidArgument(
Substitute("wrong number of range columns specified: expected $0 but received $1",
range_columns.size(),
values.size()));
}
for (int i = 0; i < values.size(); i++) {
const auto& col_name = range_columns[i].first;
const auto type = range_columns[i].second;
const auto error_msg = Substitute(kErrorMsgArg, values[i], col_name,
KuduColumnSchema::DataTypeToString(type));
switch (type) {
case KuduColumnSchema::INT8: {
int64_t value;
RETURN_NOT_OK_PREPEND(
reader.ExtractInt64(values[i], /*field=*/nullptr, &value),
error_msg);
RETURN_NOT_OK(range_bound_partial_row->SetInt8(col_name, static_cast<int8_t>(value)));
break;
}
case KuduColumnSchema::INT16: {
int64_t value;
RETURN_NOT_OK_PREPEND(
reader.ExtractInt64(values[i], /*field=*/nullptr, &value),
error_msg);
RETURN_NOT_OK(range_bound_partial_row->SetInt16(col_name, static_cast<int16_t>(value)));
break;
}
case KuduColumnSchema::INT32: {
int32_t value;
RETURN_NOT_OK_PREPEND(
reader.ExtractInt32(values[i], /*field=*/nullptr, &value),
error_msg);
RETURN_NOT_OK(range_bound_partial_row->SetInt32(col_name, value));
break;
}
case KuduColumnSchema::INT64: {
int64_t value;
RETURN_NOT_OK_PREPEND(
reader.ExtractInt64(values[i], /*field=*/nullptr, &value),
error_msg);
RETURN_NOT_OK(range_bound_partial_row->SetInt64(col_name, value));
break;
}
case KuduColumnSchema::DATE: {
int32_t value;
RETURN_NOT_OK_PREPEND(
reader.ExtractInt32(values[i], /*field=*/nullptr, &value),
error_msg);
RETURN_NOT_OK(range_bound_partial_row->SetDate(col_name, value));
break;
}
case KuduColumnSchema::UNIXTIME_MICROS: {
int64_t value;
RETURN_NOT_OK_PREPEND(
reader.ExtractInt64(values[i], /*field=*/nullptr, &value),
error_msg);
RETURN_NOT_OK(range_bound_partial_row->SetUnixTimeMicros(col_name, value));
break;
}
case KuduColumnSchema::BINARY: {
string value;
RETURN_NOT_OK_PREPEND(
reader.ExtractString(values[i], /*field=*/nullptr, &value),
error_msg);
RETURN_NOT_OK(range_bound_partial_row->SetBinary(col_name, value));
break;
}
case KuduColumnSchema::STRING: {
string value;
RETURN_NOT_OK_PREPEND(
reader.ExtractString(values[i], /*field=*/nullptr, &value),
error_msg);
RETURN_NOT_OK(range_bound_partial_row->SetString(col_name, value));
break;
}
case KuduColumnSchema::VARCHAR: {
string value;
RETURN_NOT_OK_PREPEND(
reader.ExtractString(values[i], /*field=*/nullptr, &value),
error_msg);
RETURN_NOT_OK(range_bound_partial_row->SetVarchar(col_name, value));
break;
}
case KuduColumnSchema::BOOL:
case KuduColumnSchema::FLOAT:
case KuduColumnSchema::DOUBLE:
case KuduColumnSchema::DECIMAL:
default:
return Status::NotSupported(
Substitute("unsupported type $0 for key column '$1': "
"is this tool out of date?",
KuduColumnSchema::DataTypeToString(type),
col_name));
}
}
return Status::OK();
}
Status ModifyRangePartition(const RunnerContext& context, PartitionAction action) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& table_range_lower_bound = FindOrDie(context.required_args,
kTableRangeLowerBoundArg);
const string& table_range_upper_bound = FindOrDie(context.required_args,
kTableRangeUpperBoundArg);
const auto convert_bounds_type = [&] (const string& range_bound,
const string& flags_range_bound_type,
KuduTableCreator::RangePartitionBound* range_bound_type) {
string inclusive_bound = iequals(flags_range_bound_type, "INCLUSIVE_BOUND") ?
"INCLUSIVE_BOUND" : "";
string exclusive_bound = iequals(flags_range_bound_type, "EXCLUSIVE_BOUND") ?
"EXCLUSIVE_BOUND" : "";
if (inclusive_bound.empty() && exclusive_bound.empty()) {
return Status::InvalidArgument(Substitute(
"wrong type of range $0 : only 'INCLUSIVE_BOUND' or "
"'EXCLUSIVE_BOUND' can be received", range_bound));
}
*range_bound_type = exclusive_bound.empty() ? KuduTableCreator::INCLUSIVE_BOUND :
KuduTableCreator::EXCLUSIVE_BOUND;
return Status::OK();
};
client::sp::shared_ptr<KuduClient> client;
client::sp::shared_ptr<KuduTable> table;
RETURN_NOT_OK(CreateKuduClient(context, &client));
RETURN_NOT_OK(client->OpenTable(table_name, &table));
const auto& schema = table->schema();
unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
vector<pair<string, KuduColumnSchema::DataType>> range_col_names_and_types;
const Schema& schema_tmp = KuduSchema::ToSchema(schema);
const auto& partition_schema = table->partition_schema();
vector<int32_t> key_indexes;
RETURN_NOT_OK(partition_schema.GetRangeSchemaColumnIndexes(schema_tmp, &key_indexes));
for (int i = 0; i < key_indexes.size(); i++) {
const auto key_index = key_indexes[i];
const auto& column = schema.Column(key_index);
range_col_names_and_types.emplace_back(std::make_pair(column.name(),
column.type()));
}
RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types,
table_range_lower_bound,
lower_bound.get()));
RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types,
table_range_upper_bound,
upper_bound.get()));
KuduTableCreator::RangePartitionBound lower_bound_type;
RETURN_NOT_OK(convert_bounds_type(
"lower bound", FLAGS_lower_bound_type, &lower_bound_type));
KuduTableCreator::RangePartitionBound upper_bound_type;
RETURN_NOT_OK(convert_bounds_type(
"upper bound", FLAGS_upper_bound_type, &upper_bound_type));
const auto& hash_schema_str = FLAGS_hash_schema;
PartitionPB::HashSchemaPB hash_schema;
if (!hash_schema_str.empty()) {
JsonParseOptions opts;
opts.case_insensitive_enum_parsing = true;
if (const auto& s = JsonStringToMessage(
hash_schema_str, &hash_schema, opts); !s.ok()) {
return Status::InvalidArgument(
Substitute("unable to parse JSON: $0", hash_schema_str),
s.error_message().ToString());
}
}
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
if (action == PartitionAction::ADD) {
if (hash_schema_str.empty()) {
// Add range partition with table-wide hash schema.
return alterer->AddRangePartition(lower_bound.release(),
upper_bound.release(),
lower_bound_type,
upper_bound_type)->Alter();
}
// Add range partition with custom hash schema.
auto p = make_unique<KuduRangePartition>(lower_bound.release(),
upper_bound.release(),
lower_bound_type,
upper_bound_type);
for (const auto& dimension_pb : hash_schema.hash_schema()) {
vector<string> columns;
for (const auto& column : dimension_pb.columns()) {
columns.emplace_back(column);
}
p->add_hash_partitions(
columns, dimension_pb.num_buckets(), dimension_pb.seed());
}
return alterer->AddRangePartition(p.release())->Alter();
}
DCHECK_EQ(PartitionAction::DROP, action);
return alterer->DropRangePartition(lower_bound.release(),
upper_bound.release(),
lower_bound_type,
upper_bound_type)->Alter();
}
Status DropRangePartition(const RunnerContext& context) {
return ModifyRangePartition(context, PartitionAction::DROP);
}
Status AddRangePartition(const RunnerContext& context) {
return ModifyRangePartition(context, PartitionAction::ADD);
}
Status ParseValueOfType(const string& default_value,
KuduColumnSchema::DataType type,
KuduValue** value) {
JsonReader reader(default_value);
RETURN_NOT_OK(reader.Init());
vector<const rapidjson::Value*> values;
RETURN_NOT_OK(reader.ExtractObjectArray(reader.root(),
/*field=*/nullptr,
&values));
if (values.size() != 1) {
return Status::InvalidArgument(Substitute(
"We got $0 value(s), you should provide one default value.",
std::to_string(values.size())));
}
string msg = Substitute("unable to parse value for column type $0",
KuduColumnSchema::DataTypeToString(type));
switch (type) {
case KuduColumnSchema::DataType::INT8:
case KuduColumnSchema::DataType::INT16:
case KuduColumnSchema::DataType::INT32:
case KuduColumnSchema::DataType::INT64:
case KuduColumnSchema::DataType::DATE:
case KuduColumnSchema::DataType::UNIXTIME_MICROS: {
int64_t int_value;
RETURN_NOT_OK_PREPEND(
reader.ExtractInt64(values[0], /*field=*/nullptr, &int_value), msg);
*value = KuduValue::FromInt(int_value);
break;
}
case KuduColumnSchema::DataType::BINARY:
case KuduColumnSchema::DataType::STRING:
case KuduColumnSchema::DataType::VARCHAR: {
string str_value;
RETURN_NOT_OK_PREPEND(
reader.ExtractString(values[0], /*field=*/nullptr, &str_value), msg);
*value = KuduValue::CopyString(str_value);
break;
}
case KuduColumnSchema::DataType::BOOL: {
bool bool_value;
RETURN_NOT_OK_PREPEND(
reader.ExtractBool(values[0], /*field=*/nullptr, &bool_value), msg);
*value = KuduValue::FromBool(bool_value);
break;
}
case KuduColumnSchema::DataType::FLOAT: {
float float_value;
RETURN_NOT_OK_PREPEND(
reader.ExtractFloat(values[0], /*field=*/nullptr, &float_value), msg);
*value = KuduValue::FromFloat(float_value);
break;
}
case KuduColumnSchema::DataType::DOUBLE: {
double double_value;
RETURN_NOT_OK_PREPEND(
reader.ExtractDouble(values[0], /*field=*/nullptr, &double_value), msg);
*value = KuduValue::FromDouble(double_value);
break;
}
case KuduColumnSchema::DataType::DECIMAL:
default:
return Status::NotSupported(Substitute(
"$0 columns are not supported for setting default value by this tool, "
"is this tool out of date?",
KuduColumnSchema::DataTypeToString(type)));
}
return Status::OK();
}
Status ColumnSetDefault(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& column_name = FindOrDie(context.required_args, kColumnNameArg);
const string& default_value = FindOrDie(context.required_args, kDefaultValueArg);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
KuduSchema schema;
RETURN_NOT_OK(client->GetTableSchema(table_name, &schema));
// Here we use the first column to initialize an object of KuduColumnSchema
// for there is no default constructor for it.
KuduColumnSchema col_schema = schema.Column(0);
if (!schema.HasColumn(column_name, &col_schema)) {
return Status::NotFound(Substitute("Couldn't find column $0", column_name));
}
KuduValue* value = nullptr;
RETURN_NOT_OK(ParseValueOfType(default_value, col_schema.type(), &value));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
alterer->AlterColumn(column_name)->Default(value);
return alterer->Alter();
}
Status ColumnRemoveDefault(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& column_name = FindOrDie(context.required_args, kColumnNameArg);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
alterer->AlterColumn(column_name)->RemoveDefault();
return alterer->Alter();
}
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
Status ColumnSetCompression(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& column_name = FindOrDie(context.required_args, kColumnNameArg);
const string& compression_type_arg = FindOrDie(context.required_args, kCompressionTypeArg);
KuduColumnStorageAttributes::CompressionType compression_type;
RETURN_NOT_OK(KuduColumnStorageAttributes::StringToCompressionType(
compression_type_arg, &compression_type));
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
alterer->AlterColumn(column_name)->Compression(compression_type);
return alterer->Alter();
}
Status ColumnSetEncoding(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& column_name = FindOrDie(context.required_args, kColumnNameArg);
const string& encoding_type_arg = FindOrDie(context.required_args, kEncodingTypeArg);
KuduColumnStorageAttributes::EncodingType encoding_type;
RETURN_NOT_OK(KuduColumnStorageAttributes::StringToEncodingType(
encoding_type_arg, &encoding_type));
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
alterer->AlterColumn(column_name)->Encoding(encoding_type);
return alterer->Alter();
}
#pragma GCC diagnostic pop
Status ColumnSetBlockSize(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& column_name = FindOrDie(context.required_args, kColumnNameArg);
const string& str_block_size = FindOrDie(context.required_args, kBlockSizeArg);
int32_t block_size;
if (!safe_strto32(str_block_size, &block_size)) {
return Status::InvalidArgument(Substitute(
"Unable to parse block_size value: $0.", str_block_size));
}
if (block_size <= 0) {
return Status::InvalidArgument(Substitute(
"Invalid block size: $0, it should be set higher than 0.", str_block_size));
}
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
alterer->AlterColumn(column_name)->BlockSize(block_size);
return alterer->Alter();
}
Status ClearComment(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
alterer->SetComment("");
return alterer->Alter();
}
Status SetComment(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& table_comment = FindOrDie(context.required_args, kColumnCommentArg);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
alterer->SetComment(table_comment);
return alterer->Alter();
}
Status ColumnSetComment(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& column_name = FindOrDie(context.required_args, kColumnNameArg);
const string& column_comment = FindOrDie(context.required_args, kColumnCommentArg);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
alterer->AlterColumn(column_name)->Comment(column_comment);
return alterer->Alter();
}
Status AddColumn(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& column_name = FindOrDie(context.required_args, kColumnNameArg);
const string& data_type_name = FindOrDie(context.required_args, kDataTypeArg);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
KuduColumnSpec* column_spec = alterer->AddColumn(column_name);
KuduColumnSchema::DataType data_type;
RETURN_NOT_OK(KuduColumnSchema::StringToDataType(data_type_name, &data_type));
column_spec->Type(data_type);
if (!FLAGS_default_value.empty()) {
KuduValue* value = nullptr;
RETURN_NOT_OK(ParseValueOfType(FLAGS_default_value, data_type, &value));
column_spec->Default(value);
}
if (!FLAGS_encoding_type.empty()) {
KuduColumnStorageAttributes::EncodingType encoding_type;
RETURN_NOT_OK(KuduColumnStorageAttributes::StringToEncodingType(FLAGS_encoding_type,
&encoding_type));
column_spec->Encoding(encoding_type);
}
if (!FLAGS_compression_type.empty()) {
KuduColumnStorageAttributes::CompressionType compress_type;
RETURN_NOT_OK(KuduColumnStorageAttributes::StringToCompressionType(FLAGS_compression_type,
&compress_type));
column_spec->Compression(compress_type);
}
if (!FLAGS_comment.empty()) {
column_spec->Comment(FLAGS_comment);
}
return alterer->Alter();
}
Status DeleteColumn(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& column_name = FindOrDie(context.required_args, kColumnNameArg);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableAlterer> alterer(client->NewTableAlterer(table_name));
alterer->DropColumn(column_name);
return alterer->Alter();
}
Status SetReplicationFactor(const RunnerContext& context) {
vector<string> master_addresses;
RETURN_NOT_OK(ParseMasterAddresses(context, &master_addresses));
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
const string& str_replication_factor = FindOrDie(context.required_args, kReplicationFactorArg);
int32_t replication_factor;
if (!safe_strto32(str_replication_factor, &replication_factor)) {
return Status::InvalidArgument(Substitute(
"Unable to parse replication factor value: $0.", str_replication_factor));
}
return TableAlter::SetReplicationFactor(master_addresses, table_name, replication_factor);
}
Status GetTableStatistics(const RunnerContext& context) {
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
unique_ptr<KuduTableStatistics> statistics;
KuduTableStatistics *table_statistics;
RETURN_NOT_OK_PREPEND(client->GetTableStatistics(table_name, &table_statistics),
"failed to get table statistics.");
statistics.reset(table_statistics);
cout << "TABLE " << table_name << endl;
cout << statistics->ToString() << endl;
return Status::OK();
}
Status ToJsonPartialRow(const RepeatedPtrField<string>& values,
const vector<pair<string, KuduColumnSchema::DataType>>& range_columns,
string* json_value) {
json_value->clear();
if (values.empty() || values.size() != range_columns.size()) {
return Status::InvalidArgument(Substitute(
"Invalid range value size, value size should be equal to number of range keys."));
}
int i = 0;
string joined = JoinMapped(values, [&](const string& v) {
auto data_type = range_columns[i++].second;
if (data_type == KuduColumnSchema::STRING ||
data_type == KuduColumnSchema::VARCHAR ||
data_type == KuduColumnSchema::BINARY) {
return "\"" + v + "\"";
}
return v;
}, ",");
*json_value = "[" + joined + "]";
return Status::OK();
}
Status ToClientEncodingType(
ColumnPB::EncodingType type_pb,
KuduColumnStorageAttributes::EncodingType* type) {
Status s;
switch (type_pb) {
case ColumnPB::AUTO_ENCODING :
*type = KuduColumnStorageAttributes::AUTO_ENCODING;
break;
case ColumnPB::PLAIN_ENCODING :
*type = KuduColumnStorageAttributes::PLAIN_ENCODING;
break;
case ColumnPB::PREFIX_ENCODING :
*type = KuduColumnStorageAttributes::PREFIX_ENCODING;
break;
case ColumnPB::DICT_ENCODING :
*type = KuduColumnStorageAttributes::DICT_ENCODING;
break;
case ColumnPB::RLE :
*type = KuduColumnStorageAttributes::RLE;
break;
case ColumnPB::BIT_SHUFFLE :
*type = KuduColumnStorageAttributes::BIT_SHUFFLE;
break;
default :
s = Status::InvalidArgument(Substitute("Unexpected encoding type: $0", type_pb));
}
return s;
}
Status ToClientCompressionType(
ColumnPB::CompressionType type_pb,
KuduColumnStorageAttributes::CompressionType* type) {
Status s;
switch (type_pb) {
case ColumnPB::DEFAULT_COMPRESSION :
*type = KuduColumnStorageAttributes::DEFAULT_COMPRESSION;
break;
case ColumnPB::NO_COMPRESSION :
*type = KuduColumnStorageAttributes::NO_COMPRESSION;
break;
case ColumnPB::SNAPPY :
*type = KuduColumnStorageAttributes::SNAPPY;
break;
case ColumnPB::LZ4 :
*type = KuduColumnStorageAttributes::LZ4;
break;
case ColumnPB::ZLIB :
*type = KuduColumnStorageAttributes::ZLIB;
break;
default :
s = Status::InvalidArgument(Substitute("Unexpected compression type: $0", type_pb));
}
return s;
}
Status ToClientRangePartitionBound(
PartitionPB_RangePartitionPB_BoundPB::Type type_pb,
KuduTableCreator::RangePartitionBound* type) {
Status s;
switch (type_pb) {
case PartitionPB_RangePartitionPB_BoundPB::EXCLUSIVE:
*type = KuduTableCreator::EXCLUSIVE_BOUND;
break;
case PartitionPB_RangePartitionPB_BoundPB::INCLUSIVE:
*type = KuduTableCreator::INCLUSIVE_BOUND;
break;
case PartitionPB_RangePartitionPB_BoundPB::UNKNOWN_BOUND :
default:
s = Status::InvalidArgument(Substitute("Unexpected range partition bound type: ", type_pb));
}
return s;
}
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
Status ParseTableSchema(const SchemaPB& schema,
KuduSchema* kudu_schema) {
KuduSchemaBuilder b;
for (const auto& column : schema.columns()) {
KuduColumnSpec* spec = b.AddColumn(column.column_name());
KuduColumnSchema::DataType type;
RETURN_NOT_OK(KuduColumnSchema::StringToDataType(
column.column_type(), &type));
spec->Type(type);
if (column.has_type_attributes()) {
if (type == KuduColumnSchema::DataType::DECIMAL) {
spec->Precision(column.type_attributes().precision());
spec->Scale(column.type_attributes().scale());
} else if (type == KuduColumnSchema::DataType::VARCHAR) {
spec->Length(column.type_attributes().length());
}
}
if (!column.is_nullable()) {
spec->NotNull();
}
if (column.has_default_value()) {
KuduValue* value = nullptr;
string default_v;
if (column.column_type() == "STRING" ||
column.column_type() == "BINARY" ||
column.column_type() == "VARCHAR" ||
column.column_type() == "DECIMAL") {
default_v = "[\"" + column.default_value() + "\"]";
} else {
default_v = "[" + column.default_value() + "]";
}
RETURN_NOT_OK(ParseValueOfType(default_v, type, &value));
spec->Default(value);
}
if (column.has_comment()) {
spec->Comment(column.comment());
}
// If no valid encoding is provided, AUTO_ENCODING will be used by default.
if (column.has_encoding()) {
KuduColumnStorageAttributes::EncodingType type;
RETURN_NOT_OK(ToClientEncodingType(column.encoding(), &type));
spec->Encoding(type);
}
// If no valid compression is provided, DEFAULT_COMPRESSION will be used.
if (column.has_compression()) {
KuduColumnStorageAttributes::CompressionType type;
RETURN_NOT_OK(ToClientCompressionType(column.compression(), &type));
spec->Compression(type);
}
}
b.SetPrimaryKey(vector<string>(schema.key_column_names().begin(),
schema.key_column_names().end()));
RETURN_NOT_OK(b.Build(kudu_schema));
return Status::OK();
}
#pragma GCC diagnostic pop
Status ParseTablePartition(const PartitionPB& partition,
const KuduSchema& kudu_schema,
KuduTableCreator* table_creator) {
for (const auto& hash_partition : partition.hash_partitions()) {
vector<string> hash_keys;
for (const auto& hk : hash_partition.columns()) {
hash_keys.push_back(hk);
}
int32_t seed = 0;
if (hash_partition.has_seed()) {
seed = hash_partition.seed();
}
table_creator->add_hash_partitions(hash_keys, hash_partition.num_buckets(), seed);
}
// Generate and add the range partition splits for the table.
if (!partition.has_range_partition()) {
table_creator->set_range_partition_columns({});
return Status::OK();
}
set<string> range_keys;
vector<pair<string, KuduColumnSchema::DataType>> range_col_names_and_types;
for (const auto& range_key : partition.range_partition().columns()) {
range_keys.insert(range_key);
}
table_creator->set_range_partition_columns(
vector<string>(range_keys.begin(), range_keys.end()));
for (int idx = 0; idx < kudu_schema.num_columns(); ++idx) {
// Find the range key type,
KuduColumnSchema column = kudu_schema.Column(idx);
if (ContainsKey(range_keys, column.name())) {
range_col_names_and_types.emplace_back(
std::make_pair(column.name(), column.type()));
}
}
string bound_partial_row_json;
for (const auto& bound : partition.range_partition().range_bounds()) {
unique_ptr<KuduPartialRow> lower_bound(kudu_schema.NewRow());
KuduTableCreator::RangePartitionBound lower_bound_type =
KuduTableCreator::INCLUSIVE_BOUND;
if (bound.has_lower_bound()) {
RETURN_NOT_OK(ToJsonPartialRow(bound.lower_bound().bound_values(),
range_col_names_and_types,
&bound_partial_row_json));
RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types,
bound_partial_row_json,
lower_bound.get()));
RETURN_NOT_OK(ToClientRangePartitionBound(
bound.lower_bound().bound_type(), &lower_bound_type));
}
unique_ptr<KuduPartialRow> upper_bound(kudu_schema.NewRow());
KuduTableCreator::RangePartitionBound upper_bound_type =
KuduTableCreator::EXCLUSIVE_BOUND;
if (bound.has_upper_bound()) {
RETURN_NOT_OK(ToJsonPartialRow(bound.upper_bound().bound_values(),
range_col_names_and_types,
&bound_partial_row_json));
RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types,
bound_partial_row_json,
upper_bound.get()));
RETURN_NOT_OK(ToClientRangePartitionBound(
bound.upper_bound().bound_type(), &upper_bound_type));
}
table_creator->add_range_partition(lower_bound.release(), upper_bound.release(),
lower_bound_type, upper_bound_type);
}
for (const auto& range : partition.range_partition().custom_hash_schema_ranges()) {
const auto& bounds = range.range_bounds();
unique_ptr<KuduPartialRow> lower_bound(kudu_schema.NewRow());
KuduTableCreator::RangePartitionBound lower_bound_type =
KuduTableCreator::INCLUSIVE_BOUND;
if (bounds.has_lower_bound()) {
RETURN_NOT_OK(ToJsonPartialRow(bounds.lower_bound().bound_values(),
range_col_names_and_types,
&bound_partial_row_json));
RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types,
bound_partial_row_json,
lower_bound.get()));
RETURN_NOT_OK(ToClientRangePartitionBound(
bounds.lower_bound().bound_type(), &lower_bound_type));
}
unique_ptr<KuduPartialRow> upper_bound(kudu_schema.NewRow());
KuduTableCreator::RangePartitionBound upper_bound_type =
KuduTableCreator::EXCLUSIVE_BOUND;
if (bounds.has_upper_bound()) {
RETURN_NOT_OK(ToJsonPartialRow(bounds.upper_bound().bound_values(),
range_col_names_and_types,
&bound_partial_row_json));
RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types,
bound_partial_row_json,
upper_bound.get()));
RETURN_NOT_OK(ToClientRangePartitionBound(
bounds.upper_bound().bound_type(), &upper_bound_type));
}
unique_ptr<KuduRangePartition> partition(
new KuduRangePartition(lower_bound.release(), upper_bound.release(),
lower_bound_type, upper_bound_type));
for (const auto& hash_dimension : range.hash_schema()) {
vector<string> hash_columns;
for (const auto& c : hash_dimension.columns()) {
hash_columns.emplace_back(c);
}
const int32_t seed = hash_dimension.has_seed() ? hash_dimension.seed() : 0;
partition->add_hash_partitions(
hash_columns, hash_dimension.num_buckets(), seed);
}
table_creator->add_custom_range_partition(partition.release());
}
for (const auto& split_pb : partition.range_partition().range_splits()) {
RETURN_NOT_OK(ToJsonPartialRow(split_pb.split_values(),
range_col_names_and_types,
&bound_partial_row_json));
unique_ptr<KuduPartialRow> split(kudu_schema.NewRow());
RETURN_NOT_OK(ConvertToKuduPartialRow(range_col_names_and_types,
bound_partial_row_json,
split.get()));
table_creator->add_range_partition_split(split.release());
}
return Status::OK();
}
Status CreateTable(const RunnerContext& context) {
const string& json_str = FindOrDie(context.required_args, kCreateTableJSONArg);
CreateTablePB table_req;
JsonParseOptions opts;
opts.case_insensitive_enum_parsing = true;
const auto& google_status = JsonStringToMessage(json_str, &table_req, opts);
if (!google_status.ok()) {
return Status::InvalidArgument(
Substitute("unable to parse JSON: $0", json_str),
google_status.error_message().ToString());
}
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
KuduSchema kudu_schema;
RETURN_NOT_OK(ParseTableSchema(table_req.schema(), &kudu_schema));
unique_ptr<KuduTableCreator> tc(client->NewTableCreator());
auto& table_creator = *tc;
table_creator
.table_name(table_req.table_name())
.schema(&kudu_schema);
RETURN_NOT_OK(ParseTablePartition(table_req.partition(), kudu_schema, &table_creator));
if (table_req.has_num_replicas()) {
table_creator.num_replicas(table_req.num_replicas());
}
if (table_req.has_extra_configs()) {
map<string, string> extra_configs(table_req.extra_configs().configs().begin(),
table_req.extra_configs().configs().end());
table_creator.extra_configs(extra_configs);
}
if (table_req.has_dimension_label()) {
table_creator.dimension_label(table_req.dimension_label());
}
if (table_req.has_owner()) {
table_creator.set_owner(table_req.owner());
}
if (table_req.has_comment()) {
table_creator.set_comment(table_req.comment());
}
return table_creator.Create();
}
} // anonymous namespace
unique_ptr<Mode> BuildSetTableLimitMode() {
unique_ptr<Action> set_disk_size_limit =
ClusterActionBuilder("disk_size", &SetDiskSizeLimit)
.Description("Set the disk size limit")
.AddRequiredParameter({ kTableNameArg, "Name of the table to set limit" })
.AddRequiredParameter({ "disk_size",
"The disk size limit, 'unlimited' for no write limit" })
.Build();
unique_ptr<Action> set_row_count_limit =
ClusterActionBuilder("row_count", &SetRowCountLimit)
.Description("Set the row count limit")
.AddRequiredParameter({ kTableNameArg, "Name of the table to set limit" })
.AddRequiredParameter({ "row_count",
"The row count limit, 'unlimited' for no write limit" })
.Build();
return ModeBuilder("set_limit")
.Description("Set the write limit for a table")
.AddAction(std::move(set_disk_size_limit))
.AddAction(std::move(set_row_count_limit))
.Build();
}
unique_ptr<Mode> BuildTableMode() {
unique_ptr<Action> delete_table =
ClusterActionBuilder("delete", &DeleteTable)
.Description("Delete a table")
.AddRequiredParameter({ kTableNameArg, "Name of the table to delete" })
.AddOptionalParameter("modify_external_catalogs")
.AddOptionalParameter("reserve_seconds")
.Build();
unique_ptr<Action> describe_table =
ClusterActionBuilder("describe", &DescribeTable)
.Description("Describe a table")
.AddRequiredParameter({ kTableNameArg, "Name of the table to describe" })
.AddOptionalParameter("show_attributes")
.AddOptionalParameter("show_column_comment")
.AddOptionalParameter("show_avro_format_schema")
.Build();
unique_ptr<Action> list_tables =
ClusterActionBuilder("list", &ListTables)
.Description("List tables")
.AddOptionalParameter("soft_deleted_only")
.AddOptionalParameter("tables")
.AddOptionalParameter("list_tablets")
.AddOptionalParameter("show_tablet_partition_info")
.AddOptionalParameter("show_hash_partition_info")
.AddOptionalParameter("show_table_info")
.AddOptionalParameter("list_table_output_format")
.Build();
unique_ptr<Action> locate_row =
ClusterActionBuilder("locate_row", &LocateRow)
.Description("Locate which tablet a row belongs to")
.ExtraDescription("Provide the primary key as a JSON array of primary "
"key values, e.g. '[1, \"foo\", 2, \"bar\"]'. The "
"output will be the tablet id associated with the row "
"key. If there is no such tablet, an error message "
"will be printed and the command will return a "
"non-zero status")
.AddRequiredParameter({ kTableNameArg, "Name of the table to look up against" })
.AddRequiredParameter({ kKeyArg,
"String representation of the row's primary key "
"as a JSON array" })
.AddOptionalParameter("check_row_existence")
.Build();
unique_ptr<Action> rename_column =
ClusterActionBuilder("rename_column", &RenameColumn)
.Description("Rename a column")
.AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
.AddRequiredParameter({ kColumnNameArg, "Name of the table column to rename" })
.AddRequiredParameter({ kNewColumnNameArg, "New column name" })
.Build();
unique_ptr<Action> recall =
ActionBuilder("recall", &RecallTable)
.Description("Recall a deleted but still reserved table")
.AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
.AddRequiredParameter({ kTabletIdArg, "ID of the table to recall" })
.AddOptionalParameter("new_table_name")
.Build();
unique_ptr<Action> rename_table =
ClusterActionBuilder("rename_table", &RenameTable)
.Description("Rename a table")
.AddRequiredParameter({ kTableNameArg, "Name of the table to rename" })
.AddRequiredParameter({ kNewTableNameArg, "New table name" })
.AddOptionalParameter("modify_external_catalogs")
.Build();
unique_ptr<Action> scan_table =
ClusterActionBuilder("scan", &ScanTable)
.Description("Scan rows from a table")
.ExtraDescription("Scan rows from an existing table. See the help "
"for the --predicates flag on how predicates can be specified.")
.AddRequiredParameter({ kTableNameArg, "Name of the table to scan"})
.AddOptionalParameter("columns")
.AddOptionalParameter("row_count_only")
.AddOptionalParameter("report_scanner_stats")
.AddOptionalParameter("scan_batch_size")
.AddOptionalParameter("fault_tolerant")
.AddOptionalParameter("fill_cache")
.AddOptionalParameter("num_threads")
.AddOptionalParameter("predicates")
.AddOptionalParameter("tablets")
.AddOptionalParameter("replica_selection")
.Build();
unique_ptr<Action> copy_table =
ClusterActionBuilder("copy", &CopyTable)
.Description("Copy table data to another table")
.ExtraDescription("Copy table data to another table; the two tables could be in the same "
"cluster or not. The two tables must have the same table schema, but "
"could have different partition schemas. Alternatively, the tool can "
"create the new table using the same table and partition schema as the "
"source table.")
.AddRequiredParameter({ kTableNameArg, "Name of the source table" })
.AddRequiredParameter({ kDestMasterAddressesArg, kDestMasterAddressesArgDesc })
.AddOptionalParameter("create_table")
.AddOptionalParameter("create_table_hash_bucket_nums")
.AddOptionalParameter("create_table_replication_factor")
.AddOptionalParameter("dst_table")
.AddOptionalParameter("fault_tolerant")
.AddOptionalParameter("fill_cache")
.AddOptionalParameter("num_threads")
.AddOptionalParameter("predicates")
.AddOptionalParameter("scan_batch_size")
.AddOptionalParameter("tablets")
.AddOptionalParameter("write_type")
.Build();
unique_ptr<Action> set_extra_config =
ClusterActionBuilder("set_extra_config", &SetExtraConfig)
.Description("Change a extra configuration value on a table")
.AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
.AddRequiredParameter({ kConfigNameArg, "Name of the configuration" })
.AddRequiredParameter({ kConfigValueArg, "New value for the configuration" })
.Build();
unique_ptr<Action> get_extra_configs =
ClusterActionBuilder("get_extra_configs", &GetExtraConfigs)
.Description("Get the extra configuration properties for a table")
.AddRequiredParameter({ kTableNameArg,
"Name of the table for which to get extra configurations" })
.AddOptionalParameter("config_names")
.Build();
unique_ptr<Action> drop_range_partition =
ClusterActionBuilder("drop_range_partition", &DropRangePartition)
.Description("Drop a range partition of table")
.AddRequiredParameter({ kTableNameArg, "Name of the table" })
.AddRequiredParameter({ kTableRangeLowerBoundArg,
"String representation of lower bound of "
"the table range partition as a JSON array" })
.AddRequiredParameter({ kTableRangeUpperBoundArg,
"String representation of upper bound of "
"the table range partition as a JSON array" })
.AddOptionalParameter("lower_bound_type")
.AddOptionalParameter("upper_bound_type")
.Build();
unique_ptr<Action> add_range_partition =
ClusterActionBuilder("add_range_partition", &AddRangePartition)
.Description("Add a range partition for table")
.AddRequiredParameter({ kTableNameArg, "Name of the table" })
.AddRequiredParameter({ kTableRangeLowerBoundArg,
"String representation of lower bound of "
"the table range partition as a JSON array."
"If the parameter is an empty array, "
"the lower range partition will be unbounded" })
.AddRequiredParameter({ kTableRangeUpperBoundArg,
"String representation of upper bound of "
"the table range partition as a JSON array."
"If the parameter is an empty array, "
"the upper range partition will be unbounded" })
.AddOptionalParameter("lower_bound_type")
.AddOptionalParameter("upper_bound_type")
.AddOptionalParameter("hash_schema")
.Build();
unique_ptr<Action> column_set_default =
ClusterActionBuilder("column_set_default", &ColumnSetDefault)
.Description("Set write_default value for a column")
.AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
.AddRequiredParameter({ kColumnNameArg, "Name of the table column to alter" })
.AddRequiredParameter({ kDefaultValueArg,
"Write default value of the column, should be provided as a "
"JSON array, e.g. [1] or [\"foo\"]" })
.Build();
unique_ptr<Action> column_remove_default =
ClusterActionBuilder("column_remove_default", &ColumnRemoveDefault)
.Description("Remove write_default value for a column")
.AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
.AddRequiredParameter({ kColumnNameArg, "Name of the table column to alter" })
.Build();
unique_ptr<Action> column_set_compression =
ClusterActionBuilder("column_set_compression", &ColumnSetCompression)
.Description("Set compression type for a column")
.AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
.AddRequiredParameter({ kColumnNameArg, "Name of the table column to alter" })
.AddRequiredParameter({ kCompressionTypeArg, "Compression type of the column" })
.Build();
unique_ptr<Action> column_set_encoding =
ClusterActionBuilder("column_set_encoding", &ColumnSetEncoding)
.Description("Set encoding type for a column")
.AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
.AddRequiredParameter({ kColumnNameArg, "Name of the table column to alter" })
.AddRequiredParameter({ kEncodingTypeArg, "Encoding type of the column" })
.Build();
unique_ptr<Action> column_set_block_size =
ClusterActionBuilder("column_set_block_size", &ColumnSetBlockSize)
.Description("Set block size for a column")
.AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
.AddRequiredParameter({ kColumnNameArg, "Name of the table column to alter" })
.AddRequiredParameter({ kBlockSizeArg, "Block size of the column" })
.Build();
unique_ptr<Action> column_set_comment =
ClusterActionBuilder("column_set_comment", &ColumnSetComment)
.Description("Set comment for a column")
.AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
.AddRequiredParameter({ kColumnNameArg, "Name of the table column to alter" })
.AddRequiredParameter({ kColumnCommentArg, "Comment of the column" })
.Build();
unique_ptr<Action> add_column =
ActionBuilder("add_column", &AddColumn)
.Description("Add a column")
.AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
.AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
.AddRequiredParameter({ kColumnNameArg, "Name of the table column to add" })
.AddRequiredParameter({ kDataTypeArg, "Data Type, eg: INT8, INT16, INT32, INT64, STRING,"
" BOOL, FLOAT, DOUBLE, BINARY, UNIXTIME_MICROS, DECIMAL, VARCHAR,"
" TIMESTAMP, DATE"})
.AddOptionalParameter(kEncodingTypeArg)
.AddOptionalParameter(kCompressionTypeArg)
.AddOptionalParameter(kDefaultValueArg)
.AddOptionalParameter("comment")
.Build();
unique_ptr<Action> delete_column =
ClusterActionBuilder("delete_column", &DeleteColumn)
.Description("Delete a column")
.AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
.AddRequiredParameter({ kColumnNameArg, "Name of the table column to delete" })
.Build();
unique_ptr<Action> set_comment =
ClusterActionBuilder("set_comment", &SetComment)
.Description("Set the comment for a table")
.AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
.AddRequiredParameter({ kColumnCommentArg, "Comment of the table" })
.Build();
unique_ptr<Action> clear_comment =
ClusterActionBuilder("clear_comment", &ClearComment)
.Description("Clear the comment for a table")
.AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
.Build();
unique_ptr<Action> set_replication_factor =
ClusterActionBuilder("set_replication_factor", &SetReplicationFactor)
.Description("Change a table's replication factor")
.AddRequiredParameter({ kTableNameArg, "Name of the table to alter" })
.AddRequiredParameter({ kReplicationFactorArg, "New replication factor of the table" })
.Build();
unique_ptr<Action> statistics =
ClusterActionBuilder("statistics", &GetTableStatistics)
.Description("Get table statistics")
.AddRequiredParameter({ kTableNameArg, "Name of the table to get statistics" })
.Build();
unique_ptr<Action> create_table =
ClusterActionBuilder("create", &CreateTable)
.Description("Create a new table")
.ExtraDescription(kCreateTableExtraDescription)
.AddRequiredParameter({ kCreateTableJSONArg,
"JSON object for creating table" })
.Build();
return ModeBuilder("table")
.Description("Operate on Kudu tables")
.AddMode(BuildSetTableLimitMode())
.AddAction(std::move(add_column))
.AddAction(std::move(add_range_partition))
.AddAction(std::move(clear_comment))
.AddAction(std::move(column_remove_default))
.AddAction(std::move(column_set_block_size))
.AddAction(std::move(column_set_compression))
.AddAction(std::move(column_set_default))
.AddAction(std::move(column_set_encoding))
.AddAction(std::move(column_set_comment))
.AddAction(std::move(copy_table))
.AddAction(std::move(create_table))
.AddAction(std::move(delete_column))
.AddAction(std::move(delete_table))
.AddAction(std::move(describe_table))
.AddAction(std::move(drop_range_partition))
.AddAction(std::move(get_extra_configs))
.AddAction(std::move(list_tables))
.AddAction(std::move(locate_row))
.AddAction(std::move(recall))
.AddAction(std::move(rename_column))
.AddAction(std::move(rename_table))
.AddAction(std::move(scan_table))
.AddAction(std::move(set_comment))
.AddAction(std::move(set_extra_config))
.AddAction(std::move(set_replication_factor))
.AddAction(std::move(statistics))
.Build();
}
} // namespace tools
} // namespace kudu