blob: cce71dd578f5abe8022fa984e31f05747d2f838d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/benchmarks/tpch/rpc_line_item_dao.h"
#include <functional>
#include <memory>
#include <ostream>
#include <vector>
#include <utility>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/benchmarks/tpch/tpch-schemas.h"
#include "kudu/client/callbacks.h"
#include "kudu/client/client.h"
#include "kudu/client/scan_predicate.h"
#include "kudu/client/schema.h"
#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/util/monotime.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
DEFINE_bool(tpch_cache_blocks_when_scanning, true,
"Whether the scanners should cache the blocks that are read or not");
using kudu::client::KuduInsert;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduError;
using kudu::client::KuduPredicate;
using kudu::client::KuduRowResult;
using kudu::client::KuduScanner;
using kudu::client::KuduSchema;
using kudu::client::KuduSession;
using kudu::client::KuduStatusCallback;
using kudu::client::KuduTableCreator;
using kudu::client::KuduUpdate;
using kudu::client::KuduValue;
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
class KuduPartialRow;
namespace {
class FlushCallback : public KuduStatusCallback {
public:
FlushCallback(client::sp::shared_ptr<KuduSession> session, Semaphore* sem)
: session_(std::move(session)),
sem_(sem) {
sem_->Acquire();
}
virtual void Run(const Status& s) OVERRIDE {
BatchFinished();
CHECK_OK(s);
sem_->Release();
delete this;
}
private:
void BatchFinished() {
int nerrs = session_->CountPendingErrors();
if (nerrs > 0) {
LOG(WARNING) << nerrs << " errors occured during last batch.";
vector<KuduError*> errors;
ElementDeleter d(&errors);
bool overflow;
session_->GetPendingErrors(&errors, &overflow);
if (overflow) {
LOG(WARNING) << "Error overflow occured";
}
for (KuduError* error : errors) {
LOG(WARNING) << "FAILED: " << error->failed_op().ToString();
}
}
}
client::sp::shared_ptr<KuduSession> session_;
Semaphore *sem_;
};
} // anonymous namespace
const Slice RpcLineItemDAO::kScanUpperBound = Slice("1998-09-02");
RpcLineItemDAO::~RpcLineItemDAO() {
FinishWriting();
}
RpcLineItemDAO::RpcLineItemDAO(string master_address, string table_name,
int batch_op_num_max, int timeout_ms,
PartitionStrategy partition_strategy,
int num_buckets,
vector<const KuduPartialRow*> tablet_splits)
: master_address_(std::move(master_address)),
table_name_(std::move(table_name)),
timeout_(MonoDelta::FromMilliseconds(timeout_ms)),
batch_op_num_max_(batch_op_num_max),
partition_strategy_(partition_strategy),
num_buckets_(num_buckets),
tablet_splits_(std::move(tablet_splits)),
batch_op_num_(0),
semaphore_(1) {
}
void RpcLineItemDAO::Init() {
const KuduSchema schema = tpch::CreateLineItemSchema();
CHECK_OK(KuduClientBuilder()
.add_master_server_addr(master_address_)
.default_rpc_timeout(timeout_)
.Build(&client_));
Status s = client_->OpenTable(table_name_, &client_table_);
if (s.IsNotFound()) {
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name(table_name_)
.schema(&schema)
.num_replicas(1);
if (partition_strategy_ == RANGE) {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
table_creator
->set_range_partition_columns({tpch::kOrderKeyColName, tpch::kLineNumberColName })
.split_rows(tablet_splits_);
#pragma GCC diagnostic pop
} else {
table_creator->add_hash_partitions({ tpch::kOrderKeyColName }, num_buckets_);
}
CHECK_OK(table_creator->Create());
CHECK_OK(client_->OpenTable(table_name_, &client_table_));
} else {
CHECK_OK(s);
}
session_ = client_->NewSession();
session_->SetTimeoutMillis(timeout_.ToMilliseconds());
CHECK_OK(session_->SetFlushMode(batch_op_num_max_ > 0
? KuduSession::MANUAL_FLUSH
: KuduSession::AUTO_FLUSH_BACKGROUND));
}
void RpcLineItemDAO::WriteLine(const std::function<void(KuduPartialRow*)>& f) {
unique_ptr<KuduInsert> insert(client_table_->NewInsert());
f(insert->mutable_row());
CHECK_OK(session_->Apply(insert.release()));
HandleLine();
}
void RpcLineItemDAO::MutateLine(const std::function<void(KuduPartialRow*)>& f) {
unique_ptr<KuduUpdate> update(client_table_->NewUpdate());
f(update->mutable_row());
CHECK_OK(session_->Apply(update.release()));
HandleLine();
}
void RpcLineItemDAO::FinishWriting() {
FlushCallback* cb = new FlushCallback(session_, &semaphore_);
Status s = session_->Flush();
// Also deletes 'cb'.
cb->Run(s);
}
void RpcLineItemDAO::OpenScanner(const vector<string>& columns,
unique_ptr<Scanner>* out_scanner) {
vector<KuduPredicate*> preds;
OpenScannerImpl(columns, preds, out_scanner);
}
void RpcLineItemDAO::OpenTpch1Scanner(unique_ptr<Scanner>* out_scanner) {
vector<KuduPredicate*> preds;
preds.push_back(client_table_->NewComparisonPredicate(
tpch::kShipDateColName, KuduPredicate::LESS_EQUAL,
KuduValue::CopyString(kScanUpperBound)));
OpenScannerImpl(tpch::GetTpchQ1QueryColumns(), preds, out_scanner);
}
void RpcLineItemDAO::OpenTpch1ScannerForOrderKeyRange(
int64_t min_orderkey, int64_t max_orderkey, unique_ptr<Scanner>* out_scanner) {
vector<KuduPredicate*> preds;
preds.push_back(client_table_->NewComparisonPredicate(
tpch::kShipDateColName, KuduPredicate::LESS_EQUAL,
KuduValue::CopyString(kScanUpperBound)));
preds.push_back(client_table_->NewComparisonPredicate(
tpch::kOrderKeyColName, KuduPredicate::GREATER_EQUAL,
KuduValue::FromInt(min_orderkey)));
preds.push_back(client_table_->NewComparisonPredicate(
tpch::kOrderKeyColName, KuduPredicate::LESS_EQUAL,
KuduValue::FromInt(max_orderkey)));
OpenScannerImpl(tpch::GetTpchQ1QueryColumns(), preds, out_scanner);
}
bool RpcLineItemDAO::IsTableEmpty() {
KuduScanner scanner(client_table_.get());
CHECK_OK(scanner.Open());
return !scanner.HasMoreRows();
}
void RpcLineItemDAO::OpenScannerImpl(const vector<string>& columns,
const vector<KuduPredicate*>& preds,
unique_ptr<Scanner>* out_scanner) {
unique_ptr<Scanner> ret(new Scanner);
ret->scanner_.reset(new KuduScanner(client_table_.get()));
ret->scanner_->SetCacheBlocks(FLAGS_tpch_cache_blocks_when_scanning);
CHECK_OK(ret->scanner_->SetProjectedColumnNames(columns));
for (KuduPredicate* pred : preds) {
CHECK_OK(ret->scanner_->AddConjunctPredicate(pred));
}
CHECK_OK(ret->scanner_->Open());
out_scanner->swap(ret);
}
void RpcLineItemDAO::HandleLine() {
if (batch_op_num_max_ == 0) {
// Nothing to take care in this case because it is an AUTO_FLUSH_BACKGROUND
// session.
return;
}
if (++batch_op_num_ < batch_op_num_max_) {
return;
}
batch_op_num_ = 0;
// The callback object frees itself after it is invoked.
session_->FlushAsync(new FlushCallback(session_, &semaphore_));
}
bool RpcLineItemDAO::Scanner::HasMore() {
bool has_more = scanner_->HasMoreRows();
if (!has_more) {
scanner_->Close();
}
return has_more;
}
void RpcLineItemDAO::Scanner::GetNext(vector<KuduRowResult> *rows) {
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
CHECK_OK(scanner_->NextBatch(rows));
#pragma GCC diagnostic pop
}
} // namespace kudu