blob: 0e94272ea9ffa14e6f524fc3e202a8c7866f804b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef KUDU_TABLET_LOCAL_TABLET_WRITER_H
#define KUDU_TABLET_LOCAL_TABLET_WRITER_H
#include <vector>
#include "kudu/common/partial_row.h"
#include "kudu/common/row_operations.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/tablet/row_op.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/transactions/write_transaction.h"
#include "kudu/gutil/macros.h"
namespace kudu {
namespace tablet {
// Helper class to write directly into a local tablet, without going
// through TabletPeer, consensus, etc.
//
// This is useful for unit-testing the Tablet code paths with no consensus
// implementation or thread pools.
class LocalTabletWriter {
public:
struct Op {
Op(RowOperationsPB::Type type,
const KuduPartialRow* row)
: type(type),
row(row) {
}
RowOperationsPB::Type type;
const KuduPartialRow* row;
};
explicit LocalTabletWriter(Tablet* tablet,
const Schema* client_schema)
: tablet_(tablet),
client_schema_(client_schema) {
CHECK(!client_schema->has_column_ids());
CHECK_OK(SchemaToPB(*client_schema, req_.mutable_schema()));
}
~LocalTabletWriter() {}
Status Insert(const KuduPartialRow& row) {
return Write(RowOperationsPB::INSERT, row);
}
Status Delete(const KuduPartialRow& row) {
return Write(RowOperationsPB::DELETE, row);
}
Status Update(const KuduPartialRow& row) {
return Write(RowOperationsPB::UPDATE, row);
}
// Perform a write against the local tablet.
// Returns a bad Status if the applied operation had a per-row error.
Status Write(RowOperationsPB::Type type,
const KuduPartialRow& row) {
vector<Op> ops;
ops.push_back(Op(type, &row));
return WriteBatch(ops);
}
Status WriteBatch(const std::vector<Op>& ops) {
req_.mutable_row_operations()->Clear();
RowOperationsPBEncoder encoder(req_.mutable_row_operations());
for (const Op& op : ops) {
encoder.Add(op.type, *op.row);
}
tx_state_.reset(new WriteTransactionState(NULL, &req_, NULL));
RETURN_NOT_OK(tablet_->DecodeWriteOperations(client_schema_, tx_state_.get()));
RETURN_NOT_OK(tablet_->AcquireRowLocks(tx_state_.get()));
tablet_->StartTransaction(tx_state_.get());
// Create a "fake" OpId and set it in the TransactionState for anchoring.
tx_state_->mutable_op_id()->CopyFrom(consensus::MaximumOpId());
tablet_->ApplyRowOperations(tx_state_.get());
tx_state_->ReleaseTxResultPB(&result_);
tx_state_->CommitOrAbort(Transaction::COMMITTED);
// Return the status of first failed op.
int op_idx = 0;
for (const OperationResultPB& result : result_.ops()) {
if (result.has_failed_status()) {
return StatusFromPB(result.failed_status())
.CloneAndPrepend(ops[op_idx].row->ToString());
break;
}
op_idx++;
}
return Status::OK();
}
// Return the result of the last row operation run against the tablet.
const OperationResultPB& last_op_result() {
CHECK_GE(result_.ops_size(), 1);
return result_.ops(result_.ops_size() - 1);
}
private:
Tablet* const tablet_;
const Schema* client_schema_;
TxResultPB result_;
tserver::WriteRequestPB req_;
gscoped_ptr<WriteTransactionState> tx_state_;
DISALLOW_COPY_AND_ASSIGN(LocalTabletWriter);
};
} // namespace tablet
} // namespace kudu
#endif /* KUDU_TABLET_LOCAL_TABLET_WRITER_H */