blob: c96edf2b2bf098df0e1a2da6b1aa9bf287aa152a [file] [log] [blame]
// Copyright 2013 Cloudera, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "kudu/server/logical_clock.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
namespace kudu {
namespace server {
METRIC_DEFINE_gauge_uint64(server, logical_clock_timestamp,
"Logical Clock Timestamp",
kudu::MetricUnit::kUnits,
"Logical clock timestamp.");
using base::subtle::Atomic64;
using base::subtle::Barrier_AtomicIncrement;
using base::subtle::NoBarrier_CompareAndSwap;
Timestamp LogicalClock::Now() {
return Timestamp(Barrier_AtomicIncrement(&now_, 1));
}
Timestamp LogicalClock::NowLatest() {
return Now();
}
Status LogicalClock::Update(const Timestamp& to_update) {
DCHECK_NE(to_update.value(), Timestamp::kInvalidTimestamp.value())
<< "Updating the clock with an invalid timestamp";
Atomic64 new_value = to_update.value();
while (true) {
Atomic64 current_value = NoBarrier_Load(&now_);
// if the incoming value is less than the current one, or we've failed the
// CAS because the current clock increased to higher than the incoming value,
// we can stop the loop now.
if (new_value <= current_value) return Status::OK();
// otherwise try a CAS
if (PREDICT_TRUE(NoBarrier_CompareAndSwap(&now_, current_value, new_value)
== current_value))
break;
}
return Status::OK();
}
Status LogicalClock::WaitUntilAfter(const Timestamp& then,
const MonoTime& deadline) {
return Status::ServiceUnavailable(
"Logical clock does not support WaitUntilAfter()");
}
Status LogicalClock::WaitUntilAfterLocally(const Timestamp& then,
const MonoTime& deadline) {
if (IsAfter(then)) return Status::OK();
return Status::ServiceUnavailable(
"Logical clock does not support WaitUntilAfterLocally()");
}
bool LogicalClock::IsAfter(Timestamp t) {
return base::subtle::Acquire_Load(&now_) >= t.value();
}
LogicalClock* LogicalClock::CreateStartingAt(const Timestamp& timestamp) {
// initialize at 'timestamp' - 1 so that the first output value is 'timestamp'.
return new LogicalClock(timestamp.value() - 1);
}
uint64_t LogicalClock::NowForMetrics() {
// We don't want reading metrics to change the clock.
return NoBarrier_Load(&now_);
}
void LogicalClock::RegisterMetrics(const scoped_refptr<MetricEntity>& metric_entity) {
METRIC_logical_clock_timestamp.InstantiateFunctionGauge(
metric_entity,
Bind(&LogicalClock::NowForMetrics, Unretained(this)))
->AutoDetachToLastValue(&metric_detacher_);
}
string LogicalClock::Stringify(Timestamp timestamp) {
return strings::Substitute("L: $0", timestamp.ToUint64());
}
} // namespace server
} // namespace kudu