// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <thread>
#include <vector>

#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/client/client-test-util.h"
#include "kudu/client/client.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scan_configuration.h"
#include "kudu/client/scan_predicate.h"
#include "kudu/client/scanner-internal.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/value.h"
#include "kudu/client/write_op.h"
#include "kudu/clock/clock.h"
#include "kudu/clock/hybrid_clock.h"
#include "kudu/clock/mock_ntp.h"
#include "kudu/clock/time_service.h"
#include "kudu/common/partial_row.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/internal_mini_cluster-itest-base.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/mini_master.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/util/monotime.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

DECLARE_int32(heartbeat_interval_ms);
DECLARE_int32(max_clock_sync_error_usec);
DECLARE_int32(scanner_gc_check_interval_us);
DECLARE_string(time_source);

using kudu::client::ScanConfiguration;
using kudu::client::sp::shared_ptr;
using kudu::master::CatalogManager;
using kudu::master::GetTableLocationsRequestPB;
using kudu::master::GetTableLocationsResponsePB;
using kudu::clock::HybridClock;
using kudu::tablet::TabletReplica;
using kudu::tserver::MiniTabletServer;
using kudu::tserver::TabletServer;
using std::string;
using std::thread;
using std::vector;
using std::unique_ptr;
using strings::Substitute;

namespace kudu {
namespace client {


class ConsistencyITest : public MiniClusterITestBase {
 public:
  ConsistencyITest()
      : num_tablet_servers_(3),
        table_name_("timestamp_propagation_test_table"),
        key_column_name_("key"),
        key_split_value_(8) {

    // Using the mock clock: need to advance the clock for tablet servers.
    FLAGS_time_source = "mock";

    // Reduce the TS<->Master heartbeat interval: this speeds up testing,
    // saving about 700ms per test.
    FLAGS_heartbeat_interval_ms = 10;
    FLAGS_scanner_gc_check_interval_us = 50 * 1000;

    KuduSchemaBuilder b;
    b.AddColumn(key_column_name_)->Type(KuduColumnSchema::INT32)
        ->NotNull()->PrimaryKey();
    b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull();
    b.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->Nullable();
    CHECK_OK(b.Build(&schema_));
  }

  virtual void SetUp() override {
    MiniClusterITestBase::SetUp();
    StartCluster(num_tablet_servers_);

    // Since we're using mock NTP rather than the hybrid clock, it's possible
    // that the first timestamp assigned to a tablet message is the initial
    // timestamp (0). For correctness of scans, it is illegal to scan in this
    // state. As such, we bump the clock up front so when we create tablets,
    // they start out with more natural, non-0 values.
    for (int i = 0; i < num_tablet_servers_; i++) {
      cluster_->mini_tablet_server(i)->server()->clock()->Now();
    }
  }

  void ScannerThread(KuduClient::ReplicaSelection selection,
                     int rows_to_insert,
                     int first_row,
                     int scans_to_perform) {
    client::sp::shared_ptr<KuduClient> client;
    CHECK_OK(cluster_->CreateClient(nullptr, &client));

    shared_ptr<KuduTable> table;
    CHECK_OK(client->OpenTable(table_name_, &table));

    size_t row_count;
    for (int i = 0; i < 3; i++) {
      // Insert multiple rows into the tablets.
      InsertTestRows(client.get(), table.get(), rows_to_insert, first_row * i);
      int expected_count = rows_to_insert * (i + 1);
      // Perform a bunch of READ_YOUR_WRITES scans to all the replicas
      // that count the rows. And verify that the count of the rows
      // never go down from what previously observed, to ensure subsequent
      // reads will not "go back in time" regarding writes that other
      // clients have done.
      for (int j = 0; j < scans_to_perform; j++) {
        CHECK_OK(GetRowCount(table.get(), KuduScanner::READ_YOUR_WRITES,
                             selection, 0, &row_count));
        EXPECT_LE(expected_count, row_count);
        expected_count = row_count;
      }
    }
  }

 protected:
  static void UpdateClock(HybridClock* clock, MonoDelta delta) {
    const uint64_t new_time(HybridClock::GetPhysicalValueMicros(clock->Now()) +
                            delta.ToMicroseconds());
    auto* ntp = down_cast<clock::MockNtp*>(clock->time_service());
    ntp->SetMockClockWallTimeForTests(new_time);
  }

  // Creates a table with the specified name and replication factor.
  Status CreateTable(KuduClient* client,
                     const string& table_name,
                     int num_replicas = 1) {
    unique_ptr<KuduPartialRow> split_row(schema_.NewRow());
    RETURN_NOT_OK(split_row->SetInt32(0, key_split_value_));

    unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
    RETURN_NOT_OK(table_creator->table_name(table_name)
                  .schema(&schema_)
                  .add_range_partition_split(split_row.release())
                  .set_range_partition_columns({ key_column_name_ })
                  .num_replicas(num_replicas)
                  .Create());
    return Status::OK();
  }

  unique_ptr<KuduInsert> BuildTestRow(KuduTable* table, int index) {
    unique_ptr<KuduInsert> insert(table->NewInsert());
    KuduPartialRow* row = insert->mutable_row();
    CHECK_OK(row->SetInt32(0, index));
    CHECK_OK(row->SetInt32(1, index * 2));
    CHECK_OK(row->SetStringCopy(2, Slice(StringPrintf("hello %d", index))));
    return insert;
  }

  // Inserts given number of tests rows into the default test table
  // in the context of the specified session.
  Status InsertTestRows(KuduClient* client, KuduTable* table,
                        int num_rows, int first_row = 0) {
    shared_ptr<KuduSession> session = client->NewSession();
    RETURN_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
    session->SetTimeoutMillis(60000);
    for (int i = first_row; i < num_rows + first_row; ++i) {
      unique_ptr<KuduInsert> insert(BuildTestRow(table, i));
      RETURN_NOT_OK(session->Apply(insert.release()));
    }
    RETURN_NOT_OK(session->Flush());
    return Status::OK();
  }

  Status GetRowCount(KuduTable* table, KuduScanner::ReadMode read_mode,
                     uint64_t ts, size_t* row_count) {
    return GetRowCount(table, read_mode, KuduClient::LEADER_ONLY, ts, row_count);
  }

  Status GetRowCount(KuduTable* table, KuduScanner::ReadMode read_mode,
                     KuduClient::ReplicaSelection selection, uint64_t ts,
                     size_t* row_count) {
    KuduScanner scanner(table);
    RETURN_NOT_OK(scanner.SetReadMode(read_mode));
    RETURN_NOT_OK(scanner.SetSelection(selection));
    if (read_mode == KuduScanner::READ_AT_SNAPSHOT && ts != 0) {
      RETURN_NOT_OK(scanner.SetSnapshotRaw(ts + 1));
    }
    RETURN_NOT_OK(CountRowsWithRetries(&scanner, row_count));
    return Status::OK();
  }

  Status GetTabletIdForKey(int32_t key_value, string* tablet_id) {
    if (!tablet_id) {
      return Status::InvalidArgument("null output string");
    }
    const int32_t key_value_begin = key_value;
    const int32_t key_value_end = key_value_begin + 1;

    unique_ptr<KuduPartialRow> split_row_start(schema_.NewRow());
    RETURN_NOT_OK(split_row_start->SetInt32(0, key_value_begin));
    string partition_key_start;
    RETURN_NOT_OK(split_row_start->EncodeRowKey(&partition_key_start));

    unique_ptr<KuduPartialRow> split_row_end(schema_.NewRow());
    RETURN_NOT_OK(split_row_end->SetInt32(0, key_value_end));
    string partition_key_end;
    RETURN_NOT_OK(split_row_end->EncodeRowKey(&partition_key_end));

    GetTableLocationsRequestPB req;
    req.mutable_table()->set_table_name(table_name_);
    req.set_partition_key_start(partition_key_start);
    req.set_partition_key_end(partition_key_end);
    master::CatalogManager* catalog =
        cluster_->mini_master()->master()->catalog_manager();
    GetTableLocationsResponsePB resp;
    CatalogManager::ScopedLeaderSharedLock l(catalog);
    RETURN_NOT_OK(l.first_failed_status());
    RETURN_NOT_OK(catalog->GetTableLocations(&req, &resp, /*user=*/std::nullopt));
    if (resp.tablet_locations_size() < 1) {
      return Status::NotFound(Substitute("$0: no tablets for key", key_value));
    }
    if (resp.tablet_locations_size() > 1) {
      return Status::IllegalState(
          Substitute("$0: multiple tablet servers for key", key_value));
    }
    *tablet_id = resp.tablet_locations(0).tablet_id();

    return Status::OK();
  }

  Status FindPeerForTablet(const string& tablet_id,
                           scoped_refptr<TabletReplica>* replica) {
    bool found = false;
    for (size_t i = 0; i < num_tablet_servers_; ++i) {
      MiniTabletServer* mts = cluster_->mini_tablet_server(i);
      TabletServer* ts = mts->server();

      scoped_refptr<TabletReplica> r;
      if (!ts->tablet_manager()->LookupTablet(tablet_id, &r)) {
        // Not this one, continue.
        continue;
      }
      replica->swap(r);
      found = true;
      break;
    }
    if (!found) {
      return Status::NotFound(
          Substitute("$0: cannot find replica for tablet"), tablet_id);
    }
    return Status::OK();
  }

  Status UpdateClockForTabletHostingKey(int32_t key, const MonoDelta& offset) {
    string tablet_id;
    RETURN_NOT_OK(GetTabletIdForKey(key, &tablet_id));
    scoped_refptr<TabletReplica> r;
    RETURN_NOT_OK(FindPeerForTablet(tablet_id, &r));

    HybridClock* clock = CHECK_NOTNULL(dynamic_cast<HybridClock*>(r->clock()));
    UpdateClock(clock, offset);
    return Status::OK();
  }

  const size_t num_tablet_servers_;
  const string table_name_;
  const string key_column_name_;
  const int key_split_value_;
  KuduSchema schema_;
};

// This is a test that exposes the necessity of propagating timestamp
// between reads (scans) if consistent results are desired.
//
// Let T1, T2 be reads from the same client where T2 starts after the response
// from T1 is received and neither are assigned timestamps by the client.
// It might be the case where T2’s observed value actually precedes T1’s value
// in the row history if T1 and T2 are performed in different servers,
// as T2 can be assigned a timestamp that is lower than T1.
//
// The scenario to expose inconsistency when not propagating the timestamp
// for the scan operations:
//
//   * Set the flag to use the mock wall clock.
//
//   * Start two mini cluster tservers and create a table with two tablets
//     ({Ta, Tb} order matters), single replica.
//
//   * Advance the clock in tablet Ta's tserver by some amount.
//
//   * Write a row to the tablet Ta, discard the client.
//
//   * With a new client, read the row from the Ta tablet. Here we are using
//     READ_LATEST read mode because with READ_AT_SNAPSHOT mode the read
//     operation would block and wait for the clock to advance (which in this
//     case is possible only if doing that manually).
//     Then write a row to tablet Tb. Take note of write timestamp and
//     discard the client.
//
//   * Now scan both tablets using READ_AT_SNAPSHOT mode and the timestamp
//     of the write to the Tb tablet.
//     Since the write to the Tb tablet followed the write to the Ta tablet
//     (quite literally, we read those rows from the Ta tablet before writing
//     to the Tb tablet), we should see all the rows written.
//
//     However, that would not be the case if the scan timestamp from the Ta's
//     scan were not propagated to the Tb's write: Tb server's time is lagging
//     behind Ta server's time, and scanning at Tb's write time would not
//     include the rows inserted into Ta.
//
TEST_F(ConsistencyITest, TestTimestampPropagationFromScans) {
  uint64_t ts_a;
  {
    shared_ptr<KuduClient> client;
    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
    ASSERT_OK(CreateTable(client.get(), table_name_));
    shared_ptr<KuduTable> table;
    ASSERT_OK(client->OpenTable(table_name_, &table));

    // Advance tablet server's clock hosting the first key range
    // (i.e. for the row which is about to be inserted below).
    ASSERT_OK(UpdateClockForTabletHostingKey(
        0, MonoDelta::FromMilliseconds(100)));

    // Insert data into the first tablet (a.k.a. Ta).
    const int rows_num = key_split_value_;  // fill in the partition completely
    ASSERT_OK(InsertTestRows(client.get(), table.get(), rows_num, 0));
    size_t row_count;
    ASSERT_OK(GetRowCount(table.get(), KuduScanner::READ_LATEST, 0,
                          &row_count));
    ASSERT_EQ(key_split_value_, row_count);
    // Retrieve the latest observed timestamp.
    ts_a = client->GetLatestObservedTimestamp();
  }

  uint64_t ts_b;
  {
    shared_ptr<KuduClient> client;
    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
    shared_ptr<KuduTable> table;
    ASSERT_OK(client->OpenTable(table_name_, &table));

    // Running the prior version of the client code and using READ_AT_SNAPSHOT
    // for the scan would lead to unintentional propagating of the scan time
    // (i.e. Ta server's time) to the latest observed timestamp of the client.
    // Besides, as already mentioned in the comment for the test, the scan
    // in READ_AT_SNAPSHOT mode would block and wait for the clock to advance.
    size_t row_count;
    ASSERT_OK(GetRowCount(table.get(), KuduScanner::READ_LATEST, 0,
                          &row_count));
    // Check we see the first batch of inserted rows.
    ASSERT_EQ(key_split_value_, row_count);

    // Inserting data into the second tablet (a.k.a. Tb): using the second
    // key range partition.
    const int rows_num = key_split_value_;
    ASSERT_OK(InsertTestRows(client.get(), table.get(),
                             rows_num, key_split_value_));
    // Retrieve the latest observed timestamp.
    ts_b = client->GetLatestObservedTimestamp();
  }
  EXPECT_GT(ts_b, ts_a);

  // We are expecting to see all the inserted rows. It might not be the case
  // if the the timestamps were not propagated.
  {
    shared_ptr<KuduClient> client;
    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
    shared_ptr<KuduTable> table;
    ASSERT_OK(client->OpenTable(table_name_, &table));

    size_t row_count;
    ASSERT_OK(GetRowCount(table.get(), KuduScanner::READ_AT_SNAPSHOT, ts_b,
                          &row_count));
    // In total, inserted 2 * key_split_value_ rows and expecting to see
    // the total row count to reflect that: using snapshot timestamp that
    // is taken at insertng the second batch when the first batch already
    // was there.
    const size_t total_rows = 2UL * key_split_value_;
    ASSERT_EQ(total_rows, row_count);
  }
}

// Make sure the client propagates the timestamp for write operations.
//
// The idea of verification is simple:
//
//   * Let's get two tablet servers, where the clock of the first server
//     is ahead of the second one.
//
//   * Create a client object.
//
//   * Using the newly created client object, insert some data into the tablet
//     hosted by the first server.
//
//   * Record the client's latest observed timestamp.
//
//   * Using the same client object, insert some data into the tablet
//     hosted by the second server.
//
//   * Get the client's latest observed timestamp: it should be strictly greater
//     than the recorded timestamp.
//
//   * Make a full table scan in the READ_AT_SNAPSHOT mode at 'ts_ref'
//     timestamp: the scan should retrieve only the first row.
//
// If the client propates the timestamps, the second server should receive
// the recorded timestamp value in write request in the 'propagated_timestamp'
// field and adjust its clock first. After that it should perform the requested
// write operation. Since a write operation should always advance the server
// clock, the resulting timestamp returned to the client should be strictly
// greater than the propagated one.
TEST_F(ConsistencyITest, TestTimestampPropagationForWriteOps) {
  const int32_t offset_usec = FLAGS_max_clock_sync_error_usec;
  // Assuming the offset is specified as a positive number.
  ASSERT_GT(offset_usec, 0);
  // Need to have at least one row in the first partition starting with key 0.
  ASSERT_LE(1, key_split_value_);

  uint64_t ts_ref;
  {
    shared_ptr<KuduClient> client;
    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
    ASSERT_OK(CreateTable(client.get(), table_name_));
    shared_ptr<KuduTable> table;
    ASSERT_OK(client->OpenTable(table_name_, &table));

    // Advance tablet server's clock hosting the first key range
    // (i.e. for the row which is about to be inserted below).
    ASSERT_OK(UpdateClockForTabletHostingKey(
        0, MonoDelta::FromMicroseconds(offset_usec)));

    // Insert 1 row into the first tablet.
    ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, 0));
    // Retrieve the latest observed timestamp.
    ts_ref = client->GetLatestObservedTimestamp();

    // Insert 1 row into the second tablet.
    ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, key_split_value_));
    // Retrieve the latest observed timestamp.
    const uint64_t ts = client->GetLatestObservedTimestamp();

    // If the client propagates the timestamp with write operations,
    // the timestamp received from the second server should be greater
    // than the timestamp received from the first server.
    EXPECT_GT(ts, ts_ref);
  }

  // An additional check: scan the table at the 'ts_ref' timestamp and
  // make sure only the first row is visible.
  {
    shared_ptr<KuduClient> client;
    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
    shared_ptr<KuduTable> table;
    ASSERT_OK(client->OpenTable(table_name_, &table));

    size_t row_count;
    ASSERT_OK(GetRowCount(table.get(), KuduScanner::READ_AT_SNAPSHOT,
                          ts_ref, &row_count));
    EXPECT_EQ(1, row_count);
  }
}

// This is a test for KUDU-1189. It verifies that in case of a READ_AT_SNAPSHOT
// scan with unspecified snapshot timestamp, the scanner picks timestamp from
// the first server that the data is read from. If the scan spans multiple
// tablets, the timestamp picked when scanning the first tablet is then used
// when scanning following tablets.
//
// The idea of the test is simple: have a scan spanned across two tablets
// where the clocks of the corresponding tablet servers are skewed. The sequence
// of actions is as following:
//
//   1. Create a table which spans across two tablets.
//
//   2. Run the first scenario:
//      * Advance the clock of the second tablet's server.
//      * Create a client object and with it:
//        ** Insert a row into the first tablet.
//        ** Insert a row into the second tablet.
//      * Discard the client object.
//      * Create a new client object and perform a scan at READ_AT_SNAPSHOT
//        mode, no timestamp specified.
//      * Given the tight timings on the after-the-insert scan and difference in
//        server clocks, there should only one row in the result if the snapshot
//        timestamp is taken from the first server. Otherwise, if the snapshot
//        timestamp was taken from the second server, both rows would be visible
//        for the scan.
//      * Discard the client object.
//
//   3. Run the second scenario:
//      * Advance the clock of the first tablet's server, so the clock of the
//        first tablet is ahead of the clock of the second one.
//      * Create a client object and with it:
//        ** Insert an additional row into the first tablet.
//      * Discard the client object.
//      * Create a new client object and perform a scan at READ_AT_SNAPSHOT
//        mode, no timestamp specified.
//      * All the inserted rows should be visible to the scan because we
//        expect the snapshot timestamp to be taken from the first tablet
//        server. If the snapshot timestamp was taken from the second server,
//        given the tight timings on the scan following the prior insert into
//        the first tablet and difference in server clocks, not all rows would
//        be visible the the scan.
//
TEST_F(ConsistencyITest, TestSnapshotScanTimestampReuse) {
  const int32_t offset_usec = FLAGS_max_clock_sync_error_usec / 2;
  // Assuming the offset is specified as a positive number.
  ASSERT_GT(offset_usec, 0);
  // Need to have two rows in the first partition; the values start at 0.
  ASSERT_LT(2, key_split_value_);

  // Prepare the setup: create a proper disposition for tablet servers' clocks
  // and populate the table with appropriate data.
  {
    shared_ptr<KuduClient> client;
    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
    ASSERT_OK(CreateTable(client.get(), table_name_));
    // Advance second partition's tablet server clock.
    ASSERT_OK(UpdateClockForTabletHostingKey(
        key_split_value_, MonoDelta::FromMicroseconds(offset_usec)));
    shared_ptr<KuduTable> table;
    ASSERT_OK(client->OpenTable(table_name_, &table));
    // Insert a row into the first tablet.
    ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, 0));
    // Insert a row into the second tablet.
    ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, key_split_value_));
  }

  // Discarding the prior client object: if using it to perform scans, due
  // to the scan timestamp propagation the lagging tablet server's clock
  // would be advanced and it was not possible to distinguish between
  // the timestamps coming from the first and the second tablet servers.

  // Now, perform the scan at READ_AT_SNAPSHOT where a timestamp is not
  // specified: make sure the snapshot timestamp is taken from the first tablet
  // server among those the data was fetched from. For this scenario, perform
  // a scan which would try to fetch all the table's data
  // (i.e. make calls to all tablet servers which host table's data).
  {
    shared_ptr<KuduClient> client;
    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
    // Scan the table at a snapshot: let the servers pick the timestamp.
    shared_ptr<KuduTable> table;
    ASSERT_OK(client->OpenTable(table_name_, &table));
    KuduScanner scanner(table.get());
    ASSERT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
    const ScanConfiguration& cfg(scanner.data_->configuration());
    ASSERT_FALSE(cfg.has_snapshot_timestamp());

    size_t row_count;
    ASSERT_OK(CountRowsWithRetries(&scanner, &row_count));

    // At this point, we have inserted 2 rows in total, where the second row
    // was inserted into the tablet which server's clock was advanced
    // (i.e. shifted into the future). We are expecting to get the timestamp
    // for the scan from the first tablet server, so the second row should not
    // be visible at that timestamp: from the second tablet server's view,
    // it was inserted after the specified timestamp. Instead, if the timestamp
    // for the scan were sampled at the second server's clock, then both rows
    // would be visible to the scan.
    ASSERT_EQ(1UL, row_count);
    ASSERT_TRUE(cfg.has_snapshot_timestamp());
  }

  // Advance the clock of the first server even further, leaving the clock
  // of the second server behind. Also, insert an additional row into the first
  // tablet.
  {
    // Find the tablet for the first range to advance its server's clock.
    ASSERT_OK(UpdateClockForTabletHostingKey(
        0, MonoDelta::FromMicroseconds(2 * offset_usec)));
    shared_ptr<KuduClient> client;
    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
    shared_ptr<KuduTable> table;
    ASSERT_OK(client->OpenTable(table_name_, &table));
    // Insert an additional row into the first tablet.
    // This is to check that the timestamp is taken from the first tablet
    // server: since now the clocks of both tablet servers are ahead of the
    // timestamps of the inserted rows so far, there would be no way to tell
    // which server's clock is used for the scan using the number of rows
    // returned by the scan. In either case, there will be two rows.
    //
    // Now, once we add a new row into the first tablet, given the big time
    // margin provided by the current clock offset, we should see different
    // outcomes from the subsequent scan:
    //   * if the timestamp is taken from the first server, there should be
    //     three rows in the result
    //   * if the timestamp is taken from the second server, there should be
    //     just two rows in the result
    ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, 1));
  }

  // Scan the table again and make sure the snapshot scan's timestamp is taken
  // from the first tablet server, as before. However, now the clock of the
  // first tablet server is ahead of the second tablet server's clock. If the
  // timestamp was taken from the second server, there would be 2 rows
  // in the result. The expected result is 3 rows, since the timestamp should
  // be taken from the first server.
  {
    shared_ptr<KuduClient> client;
    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
    // Scan the table at snapshot: let the servers pick the timestamp.
    shared_ptr<KuduTable> table;
    ASSERT_OK(client->OpenTable(table_name_, &table));
    KuduScanner scanner(table.get());
    ASSERT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
    const ScanConfiguration& cfg(scanner.data_->configuration());
    // Check the snapshot timestamp is unset -- it's a fresh object for a
    // READ_AT_SNAPSHOT scan where the snapshot timestamp is not specified
    // explicitly.
    ASSERT_FALSE(cfg.has_snapshot_timestamp());

    size_t row_count;
    ASSERT_OK(CountRowsWithRetries(&scanner, &row_count));

    // At this point, we have inserted 3 rows in total. Since the snapshot
    // timestamp is taken from the first server's clock, all 3 rows should be
    // visible to the scan at that timestamp. Given the tight timings on the
    // after-the-intsert scan and difference in server clocks, that would not be
    // the case if the snapshot was taken from the second server.
    ASSERT_EQ(3UL, row_count);
    // Check that the timestamp returned by the tablet server is set into the
    // scan configuration.
    ASSERT_TRUE(cfg.has_snapshot_timestamp());
  }
}

// Verify that the propagated timestamp from a serialized scan token
// makes its way into corresponding tablet servers while performing a scan
// operation built from the token.
//
// The real-world use-cases behind this test assume a Kudu client (C++/Java)
// can get scan tokens for a scan operation, serialize those and pass them
// to the other Kudu client (C++/Java). Since de-serializing a scan token
// propagates the latest observed timestamp, the latter client will have
// the latest observed timestamp set accordingly if it de-serializes those
// scan tokens into corresponding scan operations.
//
// The test scenario uses a table split into two tablets, each hosted by a
// tablet server. The clock of the first tablet server is shifted into the
// future. The first client inserts a row into the first tablet. Then it creates
// a scan token to retrieve some "related" data from the second
// tablet hosted by the second server. Now, another client receives the
// serialized scan token and runs corresponding READ_AT_SNAPSHOT scan
// with the specified timestamp to retrieve the data: it should observe
// a timestamp which is not less than the propagated timestamp
// encoded in the token.
TEST_F(ConsistencyITest, TestScanTokenTimestampPropagation) {
  const int32_t offset_usec = FLAGS_max_clock_sync_error_usec;

  // Need to have at least one row in the first partition with
  // values starting at 0.
  ASSERT_GE(key_split_value_, 1);

  {
    // Prepare the setup: create a proper disposition for tablet servers' clocks
    // and populate the table with initial data.
    shared_ptr<KuduClient> client;
    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
    ASSERT_OK(CreateTable(client.get(), table_name_));
    shared_ptr<KuduTable> table;
    ASSERT_OK(client->OpenTable(table_name_, &table));

    // Insert a single row into the second tablet: it's necessary to get
    // non-empty scan in the verification phase of the test.
    ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, key_split_value_));
  }

  uint64_t ts_ref;
  string scan_token;
  {
    shared_ptr<KuduClient> client;
    ASSERT_OK(cluster_->CreateClient(nullptr, &client));

    // Advance the clock of the server hosting the first partition tablet.
    const int32_t row_key = 0;
    ASSERT_OK(UpdateClockForTabletHostingKey(
        row_key, MonoDelta::FromMicroseconds(offset_usec)));
    shared_ptr<KuduTable> table;
    ASSERT_OK(client->OpenTable(table_name_, &table));
    // Insert just a single row into the first tablet.
    ASSERT_OK(InsertTestRows(client.get(), table.get(), 1, row_key));
    ts_ref = client->GetLatestObservedTimestamp();

    // Create and serialize a scan token: the scan selects a row by its key
    // from the other tablet at the timestamp at which the first row was
    // inserted.
    vector<KuduScanToken*> tokens;
    ElementDeleter deleter(&tokens);
    KuduScanTokenBuilder builder(table.get());
    ASSERT_OK(builder.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
    unique_ptr<KuduPredicate> predicate(table->NewComparisonPredicate(
        key_column_name_,
        KuduPredicate::EQUAL,
        KuduValue::FromInt(key_split_value_)));
    ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
    ASSERT_OK(builder.Build(&tokens));
    ASSERT_EQ(1, tokens.size());
    ASSERT_OK(tokens[0]->Serialize(&scan_token));
  }

  // The other client: scan the second tablet using a scanner built from
  // the serialized scanner token. If the client propagates timestamp from the
  // de-serialized scan token, upon fetching a batch of rows the client
  // should observe timestamp not less than the reference propagated timestamp
  // encoded in the token.
  {
    shared_ptr<KuduClient> client;
    ASSERT_OK(cluster_->CreateClient(nullptr, &client));
    shared_ptr<KuduTable> table;
    ASSERT_OK(client->OpenTable(table_name_, &table));
    KuduScanner* scanner_raw;
    ASSERT_OK(KuduScanToken::DeserializeIntoScanner(client.get(), scan_token,
                                                    &scanner_raw));
    unique_ptr<KuduScanner> scanner(scanner_raw);
    ASSERT_OK(scanner->Open());
    ASSERT_TRUE(scanner->HasMoreRows());
    size_t row_count = 0;
    while (scanner->HasMoreRows()) {
      KuduScanBatch batch;
      ASSERT_OK(scanner->NextBatch(&batch));
      row_count += batch.NumRows();
      ASSERT_LE(ts_ref, client->GetLatestObservedTimestamp());
    }
    EXPECT_EQ(1, row_count);
  }
}

const KuduClient::ReplicaSelection replica_selectors[] = {
    KuduClient::LEADER_ONLY,
    KuduClient::CLOSEST_REPLICA,
    KuduClient::FIRST_REPLICA,
};

class ScanYourWritesParamTest :
    public ConsistencyITest,
    public ::testing::WithParamInterface<KuduClient::ReplicaSelection> {
};

// Verify that no matter which replica is selected, a single client could
// achieve read-your-writes on READ_YOUR_WRITES scan mode.
TEST_P(ScanYourWritesParamTest, Test) {
  const KuduClient::ReplicaSelection sel = GetParam();
  shared_ptr<KuduClient> client;
  ASSERT_OK(cluster_->CreateClient(nullptr, &client));
  ASSERT_OK(CreateTable(client.get(), table_name_, 3));
  shared_ptr<KuduTable> table;
  ASSERT_OK(client->OpenTable(table_name_, &table));

  // Insert multiple rows into the tablets.
  const uint64_t rows_to_insert = 20000;
  ASSERT_OK(InsertTestRows(client.get(), table.get(), rows_to_insert, 0));

  size_t row_count;
  ASSERT_OK(GetRowCount(table.get(), KuduScanner::READ_YOUR_WRITES,
                        sel, 0, &row_count));
  EXPECT_EQ(rows_to_insert, row_count);

  row_count = 0;
  ASSERT_OK(GetRowCount(table.get(), KuduScanner::READ_YOUR_WRITES,
                        sel, 0, &row_count));
  EXPECT_EQ(rows_to_insert, row_count);
}

INSTANTIATE_TEST_SUITE_P(Params, ScanYourWritesParamTest,
                         testing::ValuesIn(replica_selectors));

class ScanYourWritesMultiClientsParamTest :
    public ConsistencyITest,
    public ::testing::WithParamInterface<KuduClient::ReplicaSelection> {
};

// This is a test that verifies, when multiple clients running
// simultaneously, a client can get read-your-writes and
// read-your-reads session guarantees using READ_YOUR_WRITES
// scan mode, no matter which replica is selected.
//
// Read-your-writes guarantees that a client can see all previous
// writes that it performed.
//
// Read-your-reads guarantees all subsequent reads to a given object
// "never return any previous values" regarding writes that other
// clients have done.
//
// The test scenario is as the following:
//
// 1) Have multiple clients running concurrently,
//
// 2) From the same client performs multiple writes and
//    multiple sets of scans. Each client
//    continuously performs inserts to a tablet, and then
//    performs a bunch of READ_YOUR_WRITES scans to all the
//    replicas that count the rows. The count of the rows
//    should never go down from the previous observed one.
TEST_P(ScanYourWritesMultiClientsParamTest, Test) {
  const int kNumThreads = 5;
  const int rows_to_insert = 1000;
  const int scans_to_perform = AllowSlowTests() ? 10 : 3;
  const KuduClient::ReplicaSelection sel = GetParam();

  shared_ptr<KuduClient> client;
  ASSERT_OK(cluster_->CreateClient(nullptr, &client));
  ASSERT_OK(CreateTable(client.get(), table_name_, 3));

  for (int run = 1; run <= 3; run++) {
    vector<thread> threads;
    for (int i = 0; i < kNumThreads; i++) {
      // TODO(adar): this is broken: each call to Next32() yields the same
      // value. Fixing it is non-trivial because 'first_row' is multiplied in
      // various code paths, causing it to overflow an int32. It just so happens
      // that the first Next32() using this particular seed generates a low
      // enough value to avoid overflow.
      Random rng(rows_to_insert * kNumThreads);
      uint32_t first_row = rng.Next32();
      threads.emplace_back([=]() {
        this->ScannerThread(sel, rows_to_insert, first_row, scans_to_perform);
      });
    }
    SleepFor(MonoDelta::FromMilliseconds(50));

    for (auto& t : threads) {
      t.join();
    }
  }
}

INSTANTIATE_TEST_SUITE_P(Params, ScanYourWritesMultiClientsParamTest,
                         testing::ValuesIn(replica_selectors));
} // namespace client
} // namespace kudu
