// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>

#include <boost/optional/optional.hpp>

#include "kudu/gutil/ref_counted.h"
#include "kudu/tablet/txn_coordinator.h"
#include "kudu/transactions/txn_status_entry.h"
#include "kudu/transactions/txn_status_tablet.h"
#include "kudu/util/locks.h"
#include "kudu/util/status.h"

namespace kudu {

namespace tablet {
class TabletReplica;
} // namespace tablet

namespace tserver {
class TabletServerErrorPB;
} // namespace tserver

namespace transactions {

class TxnStatusEntryPB;

// Maps the transaction ID to the corresponding TransactionEntry.
typedef std::unordered_map<int64_t, scoped_refptr<TransactionEntry>> TransactionsMap;

// Visitor used to iterate over and load into memory the existing state from a
// status tablet.
class TxnStatusManagerBuildingVisitor : public TransactionsVisitor {
 public:
  TxnStatusManagerBuildingVisitor();
  ~TxnStatusManagerBuildingVisitor() = default;
  // Builds a TransactionEntry for the given metadata and keeps track of it in
  // txns_by_id_. This is not thread-safe -- callers should ensure only a
  // single thread calls it at once.
  void VisitTransactionEntries(int64_t txn_id, TxnStatusEntryPB status_entry_pb,
                               std::vector<ParticipantIdAndPB> participants) override;

  // Releases the transactions map to the caller. Should only be called once
  // per call to VisitTransactionEntries().
  void Release(int64_t* highest_txn_id, TransactionsMap* txns_by_id);
 private:
  int64_t highest_txn_id_;
  TransactionsMap txns_by_id_;
};

// Manages ongoing transactions and participants therein, backed by an
// underlying tablet.
class TxnStatusManager final : public tablet::TxnCoordinator {
 public:
  explicit TxnStatusManager(tablet::TabletReplica* tablet_replica);
  ~TxnStatusManager() = default;

  // Loads the contents of the status tablet into memory.
  Status LoadFromTablet() override;

  // Writes an entry to the status tablet and creates a transaction in memory.
  // Returns an error if a higher transaction ID has already been attempted
  // (even if that attempt failed), which helps ensure that at most one call to
  // this method will succeed for a given transaction ID. The
  // 'highest_seen_txn_id' output parameter, if not null, is populated in both
  // success and failure cases, except for the case when returning
  // Status::ServiceUnavailable() due to not-yet-loaded data from the
  // backing transaction status tablet.
  //
  // TODO(awong): consider computing the next available transaction ID in this
  // partition and using it in case this transaction is already used, or having
  // callers forward a request for the next-highest transaction ID.
  Status BeginTransaction(int64_t txn_id,
                          const std::string& user,
                          int64_t* highest_seen_txn_id,
                          tserver::TabletServerErrorPB* ts_error) override;

  // Begins committing the given transaction, returning an error if the
  // transaction doesn't exist, isn't open, or isn't owned by the given user.
  Status BeginCommitTransaction(int64_t txn_id, const std::string& user,
                                tserver::TabletServerErrorPB* ts_error) override;

  // Finalizes the commit of the transaction, returning an error if the
  // transaction isn't in an appropraite state.
  //
  // Unlike the other transaction life-cycle calls, this isn't user-initiated,
  // so it doesn't take a user.
  //
  // TODO(awong): add a commit timestamp.
  Status FinalizeCommitTransaction(int64_t txn_id,
                                   tserver::TabletServerErrorPB* ts_error) override;

  // Aborts the given transaction, returning an error if the transaction
  // doesn't exist, is committed or not yet opened, or isn't owned by the given
  // user.
  Status AbortTransaction(int64_t txn_id, const std::string& user,
                          tserver::TabletServerErrorPB* ts_error) override;

  // Retrieves the status of the specified transaction, returning an error if
  // the transaction doesn't exist or isn't owned by the specified user.
  Status GetTransactionStatus(int64_t txn_id,
                              const std::string& user,
                              transactions::TxnStatusEntryPB* txn_status,
                              tserver::TabletServerErrorPB* ts_error) override;

  // Processes keep-alive heartbeat for the specified transaction.
  Status KeepTransactionAlive(int64_t txn_id,
                              const std::string& user,
                              tserver::TabletServerErrorPB* ts_error) override;

  // Creates an in-memory participant, writes an entry to the status table, and
  // attaches the in-memory participant to the transaction.
  //
  // If the transaction is open, it is ensured to be active for the duration of
  // this call. Returns an error if the given transaction isn't open.
  Status RegisterParticipant(int64_t txn_id, const std::string& tablet_id,
                             const std::string& user,
                             tserver::TabletServerErrorPB* ts_error) override;

  // Abort transactions which are still in non-terminal state but haven't
  // received keep-alive updates (see KeepTransactionAlive()) for a long time.
  void AbortStaleTransactions() override;

  int64_t highest_txn_id() const override {
    std::lock_guard<simple_spinlock> l(lock_);
    return highest_txn_id_;
  }

  // Populates a map from transaction ID to the sorted list of participants
  // associated with that transaction ID.
  tablet::ParticipantIdsByTxnId GetParticipantsByTxnIdForTests() const override;

 private:
  // Verifies that the transaction status data has already been loaded from the
  // underlying tablet and the replica is a leader. Returns Status::OK() if the
  // data is loaded and the replica is a leader. Otherwise, if the data hasn't
  // been loaded yet, return Status::ServiceUnavailable().  If the data has
  // been loaded, but the replica isn't a leader, returns
  // Status::ServiceUnavailable() and sets the code in 'ts_error'
  // to TabletServerErrorPB::NOT_THE_LEADER.
  Status CheckTxnStatusDataLoadedUnlocked(
      tserver::TabletServerErrorPB* ts_error) const;

  // Returns the transaction entry, returning an error if the transaction ID
  // doesn't exist or if 'user' is specified but isn't the owner of the
  // transaction. In addition, if the underlying replica isn't a leader,
  // sets the code in 'ts_error' to TabletServerErrorPB::NOT_THE_LEADER
  // correspondingly.
  Status GetTransaction(int64_t txn_id, const boost::optional<std::string>& user,
                        scoped_refptr<TransactionEntry>* txn,
                        tserver::TabletServerErrorPB* ts_error) const;

  // Protects 'highest_txn_id_' and 'txns_by_id_'.
  mutable simple_spinlock lock_;

  // The highest transaction ID seen by this status manager so far. Requests to
  // create a new transaction must provide an ID higher than this ID.
  int64_t highest_txn_id_;

  // Tracks the currently on-going transactions.
  TransactionsMap txns_by_id_;

  // The access to underlying storage.
  TxnStatusTablet status_tablet_;
};

class TxnStatusManagerFactory : public tablet::TxnCoordinatorFactory {
 public:
  TxnStatusManagerFactory() {}

  std::unique_ptr<tablet::TxnCoordinator> Create(tablet::TabletReplica* replica) override {
    return std::unique_ptr<tablet::TxnCoordinator>(new TxnStatusManager(replica));
  }
};

} // namespace transactions
} // namespace kudu
