| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/transactions/txn_status_tablet.h" |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <memory> |
| #include <optional> |
| #include <ostream> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| |
| #include "kudu/common/column_predicate.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/iterator.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/row_operations.h" |
| #include "kudu/common/row_operations.pb.h" |
| #include "kudu/common/rowblock.h" |
| #include "kudu/common/rowblock_memory.h" |
| #include "kudu/common/scan_spec.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/types.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/tablet/ops/op.h" |
| #include "kudu/tablet/ops/write_op.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/transactions/transactions.pb.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/util/countdown_latch.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/once.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/slice.h" |
| |
| using kudu::tablet::LatchOpCompletionCallback; |
| using kudu::tablet::OpCompletionCallback; |
| using kudu::tablet::WriteOpState; |
| using kudu::tserver::TabletServerErrorPB; |
| using kudu::tserver::WriteRequestPB; |
| using kudu::tserver::WriteResponsePB;; |
| using std::nullopt; |
| using std::optional; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace transactions { |
| |
| namespace { |
| |
| int kTxnIdColIdx = -1; |
| int kEntryTypeColIdx = -1; |
| int kIdentifierColIdx = -1; |
| int kMetadataColIdx = -1; |
| // Initializes the column indices of the transaction status tablet. |
| Status InitTxnStatusColIdxs() { |
| static KuduOnceLambda col_idx_initializer; |
| return col_idx_initializer.Init([] { |
| const auto& schema = TxnStatusTablet::GetSchemaWithoutIds(); |
| kTxnIdColIdx = schema.find_column(TxnStatusTablet::kTxnIdColName); |
| kEntryTypeColIdx = schema.find_column(TxnStatusTablet::kEntryTypeColName); |
| kIdentifierColIdx = schema.find_column(TxnStatusTablet::kIdentifierColName); |
| kMetadataColIdx = schema.find_column(TxnStatusTablet::kMetadataColName); |
| return Status::OK(); |
| }); |
| } |
| // NOTE: these column index getters should only be used once a TxnStatusTablet |
| // has been constructed. |
| int TxnIdColIdx() { |
| DCHECK_NE(-1, kTxnIdColIdx); |
| return kTxnIdColIdx; |
| } |
| int EntryTypeColIdx() { |
| DCHECK_NE(-1, kEntryTypeColIdx); |
| return kEntryTypeColIdx; |
| } |
| int IdentifierColIdx() { |
| DCHECK_NE(-1, kIdentifierColIdx); |
| return kIdentifierColIdx; |
| } |
| int MetadataColIdx() { |
| DCHECK_NE(-1, kMetadataColIdx); |
| return kMetadataColIdx; |
| } |
| |
| Schema kTxnStatusSchema; |
| Schema kTxnStatusSchemaNoIds; |
| // Populates the schema of the transaction status table. |
| Status PopulateTxnStatusSchema(SchemaBuilder* builder) { |
| RETURN_NOT_OK(builder->AddKeyColumn(TxnStatusTablet::kTxnIdColName, INT64)); |
| RETURN_NOT_OK(builder->AddKeyColumn(TxnStatusTablet::kEntryTypeColName, INT8)); |
| RETURN_NOT_OK(builder->AddKeyColumn(TxnStatusTablet::kIdentifierColName, STRING)); |
| return builder->AddColumn(TxnStatusTablet::kMetadataColName, STRING); |
| } |
| // Initializes the static transaction status schema. |
| Status InitTxnStatusSchemaOnce() { |
| static KuduOnceLambda schema_initializer; |
| return schema_initializer.Init([] { |
| SchemaBuilder builder; |
| RETURN_NOT_OK(PopulateTxnStatusSchema(&builder)); |
| kTxnStatusSchema = builder.Build(); |
| return Status::OK(); |
| }); |
| } |
| Status InitTxnStatusSchemaWithNoIdsOnce() { |
| static KuduOnceLambda schema_initializer; |
| return schema_initializer.Init([] { |
| SchemaBuilder builder; |
| RETURN_NOT_OK(PopulateTxnStatusSchema(&builder)); |
| kTxnStatusSchemaNoIds = builder.BuildWithoutIds(); |
| return Status::OK(); |
| }); |
| } |
| |
| WriteRequestPB kTxnStatusWriteReqPB; |
| // Initializes the static transaction status tablet write request. |
| Status InitWriteRequestPBOnce() { |
| static KuduOnceLambda write_initializer; |
| return write_initializer.Init([] { |
| return SchemaToPB(TxnStatusTablet::GetSchemaWithoutIds(), |
| kTxnStatusWriteReqPB.mutable_schema()); |
| }); |
| } |
| |
| // Returns a write request for the transaction status tablet of the given ID. |
| WriteRequestPB BuildWriteReqPB(const string& tablet_id) { |
| CHECK_OK(InitWriteRequestPBOnce()); |
| WriteRequestPB req = kTxnStatusWriteReqPB; |
| req.set_tablet_id(tablet_id); |
| return req; |
| } |
| |
| // Return the values of the keys of the given transaction status tablet row. |
| void ExtractKeys(const RowBlockRow& row, int64_t* txn_id, int8_t* entry_type, Slice* identifier) { |
| const auto& schema = TxnStatusTablet::GetSchemaWithoutIds(); |
| *txn_id = *schema.ExtractColumnFromRow<INT64>(row, TxnIdColIdx()); |
| *entry_type = *schema.ExtractColumnFromRow<INT8>(row, EntryTypeColIdx()); |
| *identifier = *schema.ExtractColumnFromRow<STRING>(row, IdentifierColIdx()); |
| } |
| |
| template <typename T> |
| Status ExtractMetadataEntry(const RowBlockRow& row, T* pb) { |
| const auto& schema = TxnStatusTablet::GetSchemaWithoutIds(); |
| const Slice* entry = schema.ExtractColumnFromRow<STRING>(row, MetadataColIdx()); |
| Status s = pb_util::ParseFromArray(pb, entry->data(), entry->size()); |
| if (PREDICT_FALSE(!s.ok())) { |
| int64_t txn_id; |
| int8_t entry_type; |
| Slice identifier; |
| ExtractKeys(row, &txn_id, &entry_type, &identifier); |
| VLOG(2) << Substitute("bad entry: $0", entry->ToString()); |
| return s.CloneAndPrepend( |
| Substitute("unable to parse entry for $0 record of transaction ID $1 ($2)", |
| txn_id, entry_type, identifier.ToString())); |
| } |
| return Status::OK(); |
| } |
| |
| Status PopulateTransactionEntryRow(int64_t txn_id, const faststring& entry, KuduPartialRow* row) { |
| RETURN_NOT_OK(row->SetInt64(TxnStatusTablet::kTxnIdColName, txn_id)); |
| RETURN_NOT_OK(row->SetInt8(TxnStatusTablet::kEntryTypeColName, TxnStatusTablet::TRANSACTION)); |
| RETURN_NOT_OK(row->SetString(TxnStatusTablet::kIdentifierColName, "")); |
| return row->SetString(TxnStatusTablet::kMetadataColName, entry); |
| } |
| |
| Status PopulateParticipantEntryRow(int64_t txn_id, const string& tablet_id, const faststring& entry, |
| KuduPartialRow* row) { |
| RETURN_NOT_OK(row->SetInt64(TxnStatusTablet::kTxnIdColName, txn_id)); |
| RETURN_NOT_OK(row->SetInt8(TxnStatusTablet::kEntryTypeColName, TxnStatusTablet::PARTICIPANT)); |
| RETURN_NOT_OK(row->SetString(TxnStatusTablet::kIdentifierColName, tablet_id)); |
| return row->SetString(TxnStatusTablet::kMetadataColName, entry); |
| } |
| |
| } // anonymous namespace |
| |
| const char* const TxnStatusTablet::kTxnIdColName = "txn_id"; |
| const char* const TxnStatusTablet::kEntryTypeColName = "entry_type"; |
| const char* const TxnStatusTablet::kIdentifierColName = "identifier"; |
| const char* const TxnStatusTablet::kMetadataColName = "metadata"; |
| const char* const TxnStatusTablet::kTxnStatusTableName = "kudu_system.kudu_transactions"; |
| |
| TxnStatusTablet::TxnStatusTablet(tablet::TabletReplica* tablet_replica) |
| : tablet_replica_(DCHECK_NOTNULL(tablet_replica)) { |
| CHECK_OK(InitTxnStatusColIdxs()); |
| } |
| |
| const Schema& TxnStatusTablet::GetSchema() { |
| CHECK_OK(InitTxnStatusSchemaOnce()); |
| return kTxnStatusSchema; |
| } |
| const Schema& TxnStatusTablet::GetSchemaWithoutIds() { |
| CHECK_OK(InitTxnStatusSchemaWithNoIdsOnce()); |
| return kTxnStatusSchemaNoIds; |
| } |
| |
| Status TxnStatusTablet::VisitTransactions(TransactionsVisitor* visitor) { |
| const auto& schema = GetSchemaWithoutIds(); |
| // There are only TRANSACTION and PARTICIPANT entries today, but this filter |
| // is conservative in case we add more entry types in the future. |
| faststring record_types; |
| record_types.push_back(TRANSACTION); |
| record_types.push_back(PARTICIPANT); |
| vector<const void*> pred_record_types = { &record_types.at(0), &record_types.at(1) }; |
| auto pred = ColumnPredicate::InList(schema.column(EntryTypeColIdx()), &pred_record_types); |
| |
| ScanSpec spec; |
| spec.AddPredicate(pred); |
| unique_ptr<RowwiseIterator> iter; |
| RETURN_NOT_OK(tablet_replica_->tablet()->NewOrderedRowIterator(schema, &iter)); |
| RETURN_NOT_OK(iter->Init(&spec)); |
| |
| // Keep track of the current transaction ID so we know when to start a new |
| // transaction. |
| optional<int64_t> prev_txn_id = nullopt; |
| TxnStatusEntryPB prev_status_entry_pb; |
| vector<ParticipantIdAndPB> prev_participants; |
| RowBlockMemory mem; |
| RowBlock block(&iter->schema(), 512, &mem); |
| // Iterate over the transaction and participant entries, notifying the |
| // visitor once a transaction and all its participants have been found. |
| while (iter->HasNext()) { |
| RETURN_NOT_OK(iter->NextBlock(&block)); |
| const size_t nrows = block.nrows(); |
| for (size_t i = 0; i < nrows; ++i) { |
| if (!block.selection_vector()->IsRowSelected(i)) { |
| continue; |
| } |
| const auto& row = block.row(i); |
| int64_t txn_id; |
| int8_t entry_type; |
| Slice identifier; |
| ExtractKeys(row, &txn_id, &entry_type, &identifier); |
| switch (entry_type) { |
| case TRANSACTION: { |
| if (PREDICT_FALSE(prev_txn_id && *prev_txn_id == txn_id)) { |
| return Status::Corruption( |
| Substitute("duplicate transaction entry: $0", txn_id)); |
| } |
| if (prev_txn_id) { |
| // We've previously collected the state for a transaction. Signal |
| // to the visitor what the state of the previous transaction was. |
| visitor->VisitTransactionEntries(*prev_txn_id, std::move(prev_status_entry_pb), |
| std::move(prev_participants)); |
| |
| // Sanity check: we're iterating in increasing txn_id order. |
| DCHECK_GT(txn_id, *prev_txn_id); |
| } |
| prev_txn_id = txn_id; |
| prev_participants.clear(); |
| RETURN_NOT_OK(ExtractMetadataEntry(row, &prev_status_entry_pb)); |
| continue; |
| } |
| case PARTICIPANT: { |
| if (PREDICT_FALSE(!prev_txn_id || *prev_txn_id != txn_id)) { |
| return Status::Corruption( |
| Substitute("missing transaction status entry for $0$1", txn_id, |
| prev_txn_id ? Substitute(", currently on ID $0", *prev_txn_id) : "")); |
| } |
| TxnParticipantEntryPB pb; |
| RETURN_NOT_OK(ExtractMetadataEntry(row, &pb)); |
| prev_participants.emplace_back( |
| schema.ExtractColumnFromRow<STRING>(row, IdentifierColIdx())->ToString(), |
| std::move(pb)); |
| continue; |
| } |
| default: |
| LOG(DFATAL) << "Unknown entry type: " << entry_type; |
| continue; |
| } |
| } |
| } |
| if (prev_txn_id) { |
| visitor->VisitTransactionEntries(*prev_txn_id, std::move(prev_status_entry_pb), |
| std::move(prev_participants)); |
| } |
| return Status::OK(); |
| } |
| |
| Status TxnStatusTablet::AddNewTransaction(int64_t txn_id, const string& user, |
| int64_t start_timestamp, TabletServerErrorPB* ts_error) { |
| WriteRequestPB req = BuildWriteReqPB(tablet_replica_->tablet_id()); |
| |
| TxnStatusEntryPB entry; |
| entry.set_state(OPEN); |
| entry.set_user(user); |
| entry.set_start_timestamp(start_timestamp); |
| entry.set_last_transition_timestamp(start_timestamp); |
| faststring metadata_buf; |
| pb_util::SerializeToString(entry, &metadata_buf); |
| |
| KuduPartialRow row(&GetSchemaWithoutIds()); |
| RETURN_NOT_OK(PopulateTransactionEntryRow(txn_id, metadata_buf, &row)); |
| RowOperationsPBEncoder enc(req.mutable_row_operations()); |
| enc.Add(RowOperationsPB::INSERT_IGNORE, row); |
| return SyncWrite(req, ts_error); |
| } |
| |
| Status TxnStatusTablet::UpdateTransaction(int64_t txn_id, const TxnStatusEntryPB& pb, |
| TabletServerErrorPB* ts_error) { |
| WriteRequestPB req = BuildWriteReqPB(tablet_replica_->tablet_id()); |
| |
| faststring metadata_buf; |
| pb_util::SerializeToString(pb, &metadata_buf); |
| |
| KuduPartialRow row(&GetSchemaWithoutIds()); |
| RETURN_NOT_OK(PopulateTransactionEntryRow(txn_id, metadata_buf, &row)); |
| RowOperationsPBEncoder enc(req.mutable_row_operations()); |
| enc.Add(RowOperationsPB::UPDATE, row); |
| return SyncWrite(req, ts_error); |
| } |
| |
| Status TxnStatusTablet::AddNewParticipant(int64_t txn_id, const string& tablet_id, |
| TabletServerErrorPB* ts_error) { |
| WriteRequestPB req = BuildWriteReqPB(tablet_replica_->tablet_id()); |
| |
| TxnParticipantEntryPB entry; |
| entry.set_state(OPEN); |
| faststring metadata_buf; |
| pb_util::SerializeToString(entry, &metadata_buf); |
| |
| KuduPartialRow row(&TxnStatusTablet::GetSchemaWithoutIds()); |
| PopulateParticipantEntryRow(txn_id, tablet_id, metadata_buf, &row); |
| RowOperationsPBEncoder enc(req.mutable_row_operations()); |
| enc.Add(RowOperationsPB::INSERT_IGNORE, row); |
| return SyncWrite(req, ts_error); |
| } |
| |
| Status TxnStatusTablet::UpdateParticipant(int64_t txn_id, const string& tablet_id, |
| const TxnParticipantEntryPB& pb, |
| TabletServerErrorPB* ts_error) { |
| WriteRequestPB req = BuildWriteReqPB(tablet_replica_->tablet_id()); |
| |
| faststring metadata_buf; |
| pb_util::SerializeToString(pb, &metadata_buf); |
| |
| KuduPartialRow row(&GetSchemaWithoutIds()); |
| RETURN_NOT_OK(PopulateParticipantEntryRow(txn_id, tablet_id, metadata_buf, &row)); |
| RowOperationsPBEncoder enc(req.mutable_row_operations()); |
| enc.Add(RowOperationsPB::UPDATE, row); |
| return SyncWrite(req, ts_error); |
| } |
| |
| Status TxnStatusTablet::SyncWrite(const WriteRequestPB& req, TabletServerErrorPB* ts_error) { |
| DCHECK(req.has_tablet_id()); |
| DCHECK(req.has_schema()); |
| CountDownLatch latch(1); |
| WriteResponsePB resp; |
| unique_ptr<OpCompletionCallback> op_cb( |
| new LatchOpCompletionCallback<WriteResponsePB>(&latch, &resp)); |
| unique_ptr<WriteOpState> op_state( |
| new WriteOpState(tablet_replica_, |
| &req, |
| nullptr, // RequestIdPB |
| &resp)); |
| op_state->set_completion_callback(std::move(op_cb)); |
| RETURN_NOT_OK(tablet_replica_->SubmitWrite(std::move(op_state))); |
| latch.Wait(); |
| if (resp.has_error()) { |
| DCHECK(ts_error); |
| *ts_error = std::move(resp.error()); |
| return StatusFromPB(ts_error->status()); |
| } |
| if (resp.per_row_errors_size() > 0) { |
| for (const auto& error : resp.per_row_errors()) { |
| LOG(ERROR) << Substitute( |
| "row $0: $1", error.row_index(), StatusFromPB(error.error()).ToString()); |
| } |
| return Status::Incomplete( |
| Substitute("failed to write $0 rows to transaction status tablet $1", |
| resp.per_row_errors_size(), tablet_replica_->tablet_id())); |
| } |
| return Status::OK(); |
| } |
| |
| } // namespace transactions |
| } // namespace kudu |