// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <memory>
#include <string>
#include <gtest/gtest_prod.h>
#include "kudu/clock/clock.h"
#include "kudu/clock/time_service.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/timestamp.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/cloud/instance_metadata.h"
#include "kudu/util/locks.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
namespace kudu {
namespace clock {
// The HybridTime clock.
// HybridTime should not be used on a distributed cluster running on OS X hosts,
// since NTP clock error is not available.
class HybridClock : public Clock {
// Create an instance, registering HybridClock's metrics with the specified
// metric entity. If 'wall_clock_jump_jump_threshold_usec' is greater than 0,
// enable the logic to detect sudden jumps of the wall clock with the
// corresponding threshold in microseconds. When InstanceMetadata is already
// available from a prior run of InstanceDetector, pass it with the 'im'
// parameter for deducing the effective time source during Init() instead of
// running instance detection one more time there.
explicit HybridClock(const scoped_refptr<MetricEntity>& metric_entity,
uint64_t wall_clock_jump_threshold_usec = 0,
std::unique_ptr<cloud::InstanceMetadata> im = {});
// Should be called only once.
Status Init() override;
// Obtains the timestamp corresponding to the current time.
Timestamp Now() override;
// Obtains the timestamp corresponding to latest possible current
// time.
Timestamp NowLatest() override;
// Obtain a timestamp which is guaranteed to be later than the current time
// on any machine in the cluster.
// NOTE: this is not a very tight bound.
Status GetGlobalLatest(Timestamp* t) override;
// Updates the clock with a timestamp originating on another machine.
Status Update(const Timestamp& to_update) override;
// HybridClock supports all external consistency modes.
bool SupportsExternalConsistencyMode(
ExternalConsistencyMode /* mode */) const override {
return true;
bool HasPhysicalComponent() const override {
return true;
MonoDelta GetPhysicalComponentDifference(Timestamp lhs, Timestamp rhs) const override;
// Blocks the caller thread until the true time is after 'then'.
// In other words, waits until the HybridClock::Now() on _all_ nodes
// will return a value greater than 'then'.
// The incoming time 'then' is assumed to be the latest time possible
// at the time the read was performed, i.e. 'then' = now + max_error.
// This method can be used to make Kudu behave like Spanner/TrueTime.
// This is implemented by possibly making the caller thread wait for a
// a certain period of time.
// As an example, the following cases might happen:
// 1 - 'then' is lower than now.earliest() -> Definitely in
// the past, no wait necessary.
// 2 - 'then' is greater than > now.earliest(): need to wait until
// 'then' <= now.earliest()
// Returns OK if it waited long enough or if no wait was necessary.
// Returns Status::ServiceUnavailable if the system clock was not
// synchronized and therefore it couldn't wait out the error.
// Returns Status::TimedOut() if 'deadline' will pass before the specified
// timestamp. NOTE: unlike most "wait" methods, this may return _immediately_
// with a timeout, rather than actually waiting for the timeout to expire.
// This is because, by looking at the current clock, we can know how long
// we'll have to wait, in contrast to most Wait() methods which are waiting
// on some external condition to become true.
Status WaitUntilAfter(const Timestamp& then,
const MonoTime& deadline) override;
// Blocks the caller thread until the local time is after 'then'.
// This is in contrast to the above method, which waits until the time
// on _all_ machines is past the given time.
// Returns Status::TimedOut() if 'deadline' will pass before the specified
// timestamp. NOTE: unlike most "wait" methods, this may return _immediately_
// with a timeout. See WaitUntilAfter() for details.
Status WaitUntilAfterLocally(const Timestamp& then,
const MonoTime& deadline) override;
// Return true if the given time has passed (i.e any future call
// to Now() would return a higher value than t).
// NOTE: this only refers to the _local_ clock, and is not a guarantee
// that other nodes' clocks have definitely passed this timestamp.
// This is in contrast to WaitUntilAfter() above.
bool IsAfter(Timestamp t) override;
std::string Stringify(Timestamp timestamp) override;
// Obtains the timestamp corresponding to the current time and the associated
// error in micros. If the clock is unsynchronized or synchronized but the
// error is too high, a non-OK status is returned.
Status NowWithError(Timestamp* timestamp, uint64_t* max_error_usec)
// Static encoding/decoding methods for timestamps. Public mostly
// for testing/debugging purposes.
// Returns the logical value embedded in 'timestamp'
static uint64_t GetLogicalValue(const Timestamp& timestamp);
// Returns the physical value embedded in 'timestamp', in microseconds.
static uint64_t GetPhysicalValueMicros(const Timestamp& timestamp);
// Obtains a new Timestamp with the logical value zeroed out.
static Timestamp TimestampFromMicroseconds(uint64_t micros);
// Obtains a new Timestamp that embeds both the physical and logical values.
static Timestamp TimestampFromMicrosecondsAndLogicalValue(uint64_t micros,
uint64_t logical_value);
// Creates a new timestamp whose physical time is GetPhysicalValue(original) +
// 'to_add' and which retains the same logical value.
static Timestamp AddPhysicalTimeToTimestamp(const Timestamp& original,
const MonoDelta& to_add);
// Outputs a string containing the physical and logical values of the timestamp,
// separated.
static std::string StringifyTimestamp(const Timestamp& timestamp);
clock::TimeService* time_service() const {
return time_service_.get();
FRIEND_TEST(HybridClockTest, AutoTimeSourceNoDedicatedNtpServer);
enum class TimeSource {
// Internal Kudu clock synchronized by built-in NTP client.
// Local machine clock synchronized by NTP.
// Local machine clock with no requirement of NTP synchronization.
// Mock clock (used for tests only).
// Unknown/invalid time source.
// How many bits to left shift a microseconds clock read. The remainder
// of the timestamp will be reserved for logical values. Left shifting 12 bits
// gives us 12 bits for the logical value and should still keep accurate
// microseconds time until 2100+
static constexpr const int kBitsToShift = 12;
// Mask to extract the pure logical bits.
static constexpr const uint64_t kLogicalBitMask = (1 << kBitsToShift) - 1;
// Convert time source to string representation.
static const char* TimeSourceToString(HybridClock::TimeSource time_source);
// Select particular time source for the hybrid clock given the
// 'time_source_str' parameter which can be a pseudo-source such as 'auto'.
// On success, the 'time_source' output parameter contains time source that
// determines particular time service to use. If the 'builtin' time source is
// selected, the --builtin_ntp_servers flag's value is used to build the set
// of reference servers for the built-in NTP client.
// If non-null, the optional 'instance_metadata' is used for selecting the
// effective time source.
static Status SelectTimeSource(
const std::string& time_source_str,
TimeSource* time_source,
const cloud::InstanceMetadata* instance_metadata = nullptr);
// Initialize hybrid clock with the specified time source.
// If 'time_source' is TimeSource::BUILTIN_NTP_SYNC, the set of reference
// servers for the built-in NTP client is sourced from --builtin_ntp_servers.
Status InitWithTimeSource(TimeSource time_source);
// Variant of NowWithError() that requires 'lock_' to be held already.
Status NowWithErrorUnlocked(Timestamp* timestamp, uint64_t* max_error_usec);
// Variant of NowWithError() that calls LOG(FATAL) if the clock is
// unsynchronized or synchronized but the error is too high.
void NowWithErrorOrDie(Timestamp* timestamp, uint64_t* max_error_usec);
// Obtains the current wallclock time and maximum error in microseconds,
// and checks if the clock is synchronized.
// On OS X, the error will always be 0.
Status WalltimeWithError(uint64_t* now_usec, uint64_t* error_usec);
// Used to get the timestamp for metrics.
uint64_t NowForMetrics();
// Used to get the current error, for metrics.
uint64_t ErrorForMetrics();
// If non-null, this instance metadata is used to detect effective time source
// during Init().
std::unique_ptr<cloud::InstanceMetadata> instance_metadata_;
// Whether to run a sanity check on the wall clock readings used for the
// physical part of the hybrid timestamp.
const bool is_wall_clock_jump_check_enabled_;
// The threshold for the difference of deltas in the wall clock's and the
// CLOCK_MONOTONIC_RAW clock's timestamps captured almost at the same time.
// The --wall_clock_jump_threshold_sec flag's value is used as the source
// for this constant.
const uint64_t wall_clock_jump_threshold_usec_;
// Used to fetch the current time and error bound from the system or NTP
// service.
std::unique_ptr<clock::TimeService> time_service_;
// Guards access to 'state_', 'next_timestamp_', 'prev_{mono,wall}_time_usec_'.
simple_spinlock lock_;
// The next timestamp to be generated from this clock, assuming that
// the physical clock hasn't advanced beyond the value stored here.
// Protected by 'lock_'.
uint64_t next_timestamp_;
// Prior value of the CLOCK_MONOTONIC_RAW captured almost the same moment
// when next_timestamp_'s physical part was captured.
int64_t prev_mono_time_usec_;
// Prior value of the physical part of 'next_timestamp_'.
int64_t prev_wall_time_usec_;
// The last valid clock reading we got from the time source, along
// with the monotime that we took that reading. The 'is_extrapolating' field
// tracks whether extrapolated or real readings of the underlying clock are
// used to generate hybrid timestamps.
simple_spinlock last_clock_read_lock_; // protects four fields below
MonoTime last_clock_read_time_;
uint64_t last_clock_read_physical_;
uint64_t last_clock_read_error_;
bool is_extrapolating_ = false;
// The state of the object. Guarded by 'lock_'.
enum State {
State state_;
// Metric entity.
scoped_refptr<MetricEntity> metric_entity_;
// Whether the hybrid clock is extrapolating the readings of the underlying
// clock instead of using the real ones. It's important to know whether
// the extrapolation is happening, but 'extrapolation_intervals_histogram_'
// metric doesn't allow for exposing this fact prior to the end of current
// extrapolation interval.
scoped_refptr<AtomicGauge<bool>> extrapolating_;
// Stats on the underlying clock's 'maximum error' metric sampled every
// NowWithError() call (essentially, every call when requesting a hybrid clock
// timestamp).
scoped_refptr<Histogram> max_errors_histogram_;
// Stats on time intervals when the underlying clock was extrapolated
// instead of using the actual readings. Extrapolation happens when an attempt
// to read the clock yields an error (clock might be unsynchronized, etc.).
scoped_refptr<Histogram> extrapolation_intervals_histogram_;
// Clock metrics are set to detach to their last value. This means
// that, during our destructor, we'll need to access other class members
// declared above this. Hence, this member must be declared last.
FunctionGaugeDetacher metric_detacher_;
} // namespace clock
} // namespace kudu