// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/clock/logical_clock.h"

#include <functional>
#include <memory>
#include <ostream>
#include <string>

#include <glog/logging.h>

#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/metrics.h"
#include "kudu/util/status.h"

METRIC_DEFINE_gauge_uint64(server, logical_clock_timestamp,
                           "Logical Clock Timestamp",
                           kudu::MetricUnit::kUnits,
                           "Logical clock timestamp.",
                           kudu::MetricLevel::kInfo);

using base::subtle::Atomic64;
using base::subtle::Barrier_AtomicIncrement;
using base::subtle::NoBarrier_CompareAndSwap;
using base::subtle::NoBarrier_Load;
using std::unique_ptr;

namespace kudu {
namespace clock {

LogicalClock::LogicalClock(const Timestamp& timestamp,
                           const scoped_refptr<MetricEntity>& metric_entity)
    : now_(timestamp.value() - 1) {
  if (metric_entity) {
    METRIC_logical_clock_timestamp.InstantiateFunctionGauge(
        metric_entity, [this]() { return this->GetCurrentTime(); })->
            AutoDetachToLastValue(&metric_detacher_);
  }
}

Timestamp LogicalClock::Now() {
  return Timestamp(Barrier_AtomicIncrement(&now_, 1));
}

Timestamp LogicalClock::NowLatest() {
  return Now();
}

Status LogicalClock::Update(const Timestamp& to_update) {
  DCHECK_NE(to_update.value(), Timestamp::kInvalidTimestamp.value())
      << "Updating the clock with an invalid timestamp";
  Atomic64 new_value = to_update.value();

  while (true) {
    Atomic64 current_value = NoBarrier_Load(&now_);
    // if the incoming value is less than the current one, or we've failed the
    // CAS because the current clock increased to higher than the incoming value,
    // we can stop the loop now.
    if (new_value <= current_value) return Status::OK();
    // otherwise try a CAS
    if (PREDICT_TRUE(NoBarrier_CompareAndSwap(&now_, current_value, new_value)
        == current_value))
      break;
  }
  return Status::OK();
}

Status LogicalClock::WaitUntilAfter(const Timestamp& then,
                                    const MonoTime& /* deadline */) {
  return Status::ServiceUnavailable(
      "Logical clock does not support WaitUntilAfter()");
}

Status LogicalClock::WaitUntilAfterLocally(const Timestamp& then,
                                           const MonoTime& /* deadline */) {
  if (IsAfter(then)) return Status::OK();
  return Status::ServiceUnavailable(
      "Logical clock does not support WaitUntilAfterLocally()");
}

bool LogicalClock::IsAfter(Timestamp t) {
  return base::subtle::Acquire_Load(&now_) >= t.value();
}

uint64_t LogicalClock::GetCurrentTime() {
  // We don't want reading metrics to change the clock.
  return NoBarrier_Load(&now_);
}

std::string LogicalClock::Stringify(Timestamp timestamp) {
  return strings::Substitute("L: $0", timestamp.ToUint64());
}

}  // namespace clock
}  // namespace kudu

