| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/consensus/time_manager.h" |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <mutex> |
| #include <ostream> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| |
| #include "kudu/clock/clock.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/util/countdown_latch.h" |
| #include "kudu/util/flag_tags.h" |
| |
| DEFINE_bool(safe_time_advancement_without_writes, true, |
| "Whether to enable the advancement of \"safe\" time in the absense of write " |
| "operations"); |
| TAG_FLAG(safe_time_advancement_without_writes, advanced); |
| |
| DEFINE_double(missed_heartbeats_before_rejecting_snapshot_scans, 1.5, |
| "The maximum raft heartbeat periods since the tablet has seen safe time advanced " |
| "before refusing scans at snapshots that aren't yet safe and forcing clients to " |
| "try again."); |
| TAG_FLAG(missed_heartbeats_before_rejecting_snapshot_scans, experimental); |
| |
| DEFINE_int32(safe_time_max_lag_ms, 30 * 1000, |
| "The maximum amount of time we allow safe time to lag behind the requested timestamp" |
| "before forcing the client to retry, in milliseconds."); |
| TAG_FLAG(safe_time_max_lag_ms, experimental); |
| |
| DECLARE_int32(raft_heartbeat_interval_ms); |
| |
| using kudu::clock::Clock; |
| using std::string; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace consensus { |
| |
| typedef std::lock_guard<simple_spinlock> Lock; |
| |
| ExternalConsistencyMode TimeManager::GetMessageConsistencyMode(const ReplicateMsg& message) { |
| // TODO(dralves): We should have no-ops (?) and config changes be COMMIT_WAIT |
| // ops. See KUDU-798. |
| // TODO(dralves) Move external consistency mode to ReplicateMsg. This will be useful |
| // for consistent alter table ops. |
| if (PREDICT_TRUE(message.has_write_request())) { |
| return message.write_request().external_consistency_mode(); |
| } |
| return CLIENT_PROPAGATED; |
| } |
| |
| TimeManager::TimeManager(Clock* clock, Timestamp initial_safe_time) |
| : last_serial_ts_assigned_(initial_safe_time), |
| last_safe_ts_(initial_safe_time), |
| last_advanced_safe_time_(MonoTime::Now()), |
| mode_(NON_LEADER), |
| clock_(clock) {} |
| |
| void TimeManager::SetLeaderMode() { |
| Lock l(lock_); |
| mode_ = LEADER; |
| AdvanceSafeTimeAndWakeUpWaitersUnlocked(clock_->Now()); |
| } |
| |
| void TimeManager::SetNonLeaderMode() { |
| Lock l(lock_); |
| mode_ = NON_LEADER; |
| } |
| |
| Status TimeManager::AssignTimestamp(ReplicateMsg* message) { |
| Lock l(lock_); |
| if (PREDICT_FALSE(mode_ == NON_LEADER)) { |
| return Status::IllegalState(Substitute( |
| "Cannot assign timestamp to op. Tablet is not " |
| "in leader mode. Last heard from a leader: $0 ago.", |
| (MonoTime::Now() - last_advanced_safe_time_).ToString())); |
| } |
| Timestamp t; |
| switch (GetMessageConsistencyMode(*message)) { |
| case COMMIT_WAIT: t = GetSerialTimestampPlusMaxError(); break; |
| case CLIENT_PROPAGATED: t = GetSerialTimestampUnlocked(); break; |
| default: return Status::NotSupported("Unsupported external consistency mode."); |
| } |
| message->set_timestamp(t.value()); |
| return Status::OK(); |
| } |
| |
| Status TimeManager::MessageReceivedFromLeader(const ReplicateMsg& message) { |
| // NOTE: Currently this method just updates the clock and stores the message's timestamp. |
| // It always returns Status::OK() if the clock returns an OK status on Update(). |
| // |
| // When we have leader leases we can trust that the timestamps of messages sent by |
| // any valid leader are safe and we could increase safe time here. However, since |
| // this is not yet the case we will only increase safe time later, when the message is |
| // committed, at the cost of additional delay in moving safe time. |
| // |
| // This greatly reduces the opportunity for non-repeatable reads. On a busy cluster |
| // with a lot of writes (i.e. no empty, "heartbeat" messages) safe time moves only |
| // with committed message timestamps, which are forcibly 'safe'. |
| // |
| // The only opportunity for unrepeatable reads in this setup is if an old leader sends |
| // an (accepted) empty heartbeat message to a follower that immediately afterwards |
| // receives a non-empty message from another higher term leader but with a lower timestamp |
| // than the empty heartbeat. |
| DCHECK(message.has_timestamp()); |
| Timestamp t(message.timestamp()); |
| RETURN_NOT_OK(clock_->Update(t)); |
| { |
| Lock l(lock_); |
| CHECK_EQ(mode_, NON_LEADER) << "Cannot receive messages from a leader in leader mode."; |
| if (GetMessageConsistencyMode(message) == CLIENT_PROPAGATED) { |
| last_serial_ts_assigned_ = t; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status TimeManager::UpdateClockAndLastAssignedTimestamp(const Timestamp& timestamp) { |
| RETURN_NOT_OK(clock_->Update(timestamp)); |
| Lock l(lock_); |
| if (PREDICT_FALSE(mode_ == NON_LEADER)) { |
| return Status::IllegalState(Substitute( |
| "Cannot bump the last assigned timestamp. Tablet is not " |
| "in leader mode. Last heard from a leader: $0 ago.", |
| (MonoTime::Now() - last_advanced_safe_time_).ToString())); |
| } |
| last_serial_ts_assigned_ = std::max(timestamp, last_serial_ts_assigned_); |
| return Status::OK(); |
| } |
| |
| void TimeManager::AdvanceSafeTimeWithMessage(const ReplicateMsg& message) { |
| Lock l(lock_); |
| if (GetMessageConsistencyMode(message) == CLIENT_PROPAGATED) { |
| AdvanceSafeTimeAndWakeUpWaitersUnlocked(Timestamp(message.timestamp())); |
| } |
| } |
| |
| void TimeManager::AdvanceSafeTime(Timestamp safe_time) { |
| Lock l(lock_); |
| CHECK_EQ(mode_, NON_LEADER) << "Cannot advance safe time by timestamp in leader mode."; |
| AdvanceSafeTimeAndWakeUpWaitersUnlocked(safe_time); |
| } |
| |
| bool TimeManager::HasAdvancedSafeTimeRecentlyUnlocked(string* error_message) { |
| DCHECK(lock_.is_locked()); |
| |
| MonoDelta time_since_last_advance = MonoTime::Now() - last_advanced_safe_time_; |
| int64_t max_last_advanced = FLAGS_missed_heartbeats_before_rejecting_snapshot_scans * |
| FLAGS_raft_heartbeat_interval_ms; |
| // Clamp max_last_advanced to 100 ms. Some tests set leader election timeouts really |
| // low and don't necessarily want to stress scanners. |
| max_last_advanced = std::max<int64_t>(max_last_advanced, 100LL); |
| MonoDelta max_delta = MonoDelta::FromMilliseconds(max_last_advanced); |
| if (time_since_last_advance > max_delta) { |
| *error_message = Substitute("Tablet hasn't heard from leader, or there hasn't been a stable " |
| "leader for: $0 secs, (max is $1):", |
| time_since_last_advance.ToString(), max_delta.ToString()); |
| return false; |
| } |
| return true; |
| } |
| |
| bool TimeManager::IsSafeTimeLaggingUnlocked(Timestamp timestamp, string* error_message) { |
| DCHECK(lock_.is_locked()); |
| |
| // Can't calculate safe time lag for the logical clock. |
| if (PREDICT_FALSE(!clock_->HasPhysicalComponent())) return false; |
| MonoDelta safe_time_diff = clock_->GetPhysicalComponentDifference(timestamp, |
| last_safe_ts_); |
| if (safe_time_diff.ToMilliseconds() > FLAGS_safe_time_max_lag_ms) { |
| *error_message = Substitute("Tablet is lagging too much to be able to serve snapshot scan. " |
| "Lagging by: $0 ms, (max is $1 ms):", |
| safe_time_diff.ToMilliseconds(), |
| FLAGS_safe_time_max_lag_ms); |
| return true; |
| } |
| return false; |
| } |
| |
| void TimeManager::MakeWaiterTimeoutMessageUnlocked(Timestamp timestamp, string* error_message) { |
| DCHECK(lock_.is_locked()); |
| |
| string mode = mode_ == LEADER ? "LEADER" : "NON-LEADER"; |
| string clock_diff = clock_->HasPhysicalComponent() ? clock_->GetPhysicalComponentDifference( |
| timestamp, last_safe_ts_).ToString() : "None (Logical clock)"; |
| *error_message = Substitute("Timed out waiting for ts: $0 to be safe (mode: $1). Current safe " |
| "time: $2 Physical time difference: $3", clock_->Stringify(timestamp), |
| mode, clock_->Stringify(last_safe_ts_), clock_diff); |
| } |
| |
| Status TimeManager::WaitUntilSafe(Timestamp timestamp, const MonoTime& deadline) { |
| string error_message; |
| |
| // Pre-flight checks: |
| // - If this timestamp is before the last safe time return. |
| // - If we're not the leader make sure we've heard from the leader recently. |
| // - If we're not the leader make sure safe time isn't lagging too much. |
| { |
| Lock l(lock_); |
| if (timestamp < GetSafeTimeUnlocked()) return Status::OK(); |
| |
| if (mode_ == NON_LEADER) { |
| if (IsSafeTimeLaggingUnlocked(timestamp, &error_message)) { |
| return Status::TimedOut(error_message); |
| } |
| |
| if (!HasAdvancedSafeTimeRecentlyUnlocked(&error_message)) { |
| return Status::TimedOut(error_message); |
| } |
| } |
| } |
| |
| // First wait for the clock to be past 'timestamp'. |
| RETURN_NOT_OK(clock_->WaitUntilAfterLocally(timestamp, deadline)); |
| |
| if (PREDICT_FALSE(MonoTime::Now() > deadline)) { |
| return Status::TimedOut("Timed out waiting for the local clock."); |
| } |
| |
| CountDownLatch latch(1); |
| WaitingState waiter; |
| waiter.timestamp = timestamp; |
| waiter.latch = &latch; |
| |
| // Register a waiter in waiters_ |
| { |
| Lock l(lock_); |
| if (IsTimestampSafeUnlocked(timestamp)) return Status::OK(); |
| waiters_.push_back(&waiter); |
| } |
| |
| // Wait until we get notified or 'deadline' elapses. |
| if (waiter.latch->WaitUntil(deadline)) return Status::OK(); |
| |
| // Timed out, clean up. |
| { |
| Lock l(lock_); |
| // Address the case where we were notified after the timeout. |
| if (waiter.latch->count() == 0) return Status::OK(); |
| |
| waiters_.erase(std::find(waiters_.begin(), waiters_.end(), &waiter)); |
| |
| MakeWaiterTimeoutMessageUnlocked(waiter.timestamp, &error_message); |
| return Status::TimedOut(error_message); |
| } |
| } |
| |
| void TimeManager::AdvanceSafeTimeAndWakeUpWaitersUnlocked(Timestamp safe_time) { |
| DCHECK(lock_.is_locked()); |
| |
| if (safe_time <= last_safe_ts_) { |
| return; |
| } |
| last_safe_ts_ = safe_time; |
| last_advanced_safe_time_ = MonoTime::Now(); |
| |
| auto iter = waiters_.begin(); |
| while (iter != waiters_.end()) { |
| WaitingState* waiter = *iter; |
| if (IsTimestampSafeUnlocked(waiter->timestamp)) { |
| iter = waiters_.erase(iter); |
| waiter->latch->CountDown(); |
| continue; |
| } |
| ++iter; |
| } |
| } |
| |
| bool TimeManager::IsTimestampSafe(Timestamp timestamp) { |
| Lock l(lock_); |
| return IsTimestampSafeUnlocked(timestamp); |
| } |
| |
| bool TimeManager::IsTimestampSafeUnlocked(Timestamp timestamp) { |
| return timestamp <= GetSafeTimeUnlocked(); |
| } |
| |
| Timestamp TimeManager::GetSafeTime() { |
| Lock l(lock_); |
| return GetSafeTimeUnlocked(); |
| } |
| |
| Timestamp TimeManager::GetSafeTimeUnlocked() { |
| DCHECK(lock_.is_locked()); |
| |
| switch (mode_) { |
| case LEADER: { |
| // In ASCII form, where 'S' represents a safe timestamp, 'A' represents the last assigned |
| // timestamp, and 'N' represents the current clock value, the internal state can look like |
| // the following diagrams (time moves from left to right): |
| // |
| // a) |
| // SSSSSSSSSSSSSSSSSS N |
| // | \- last_safe_ts_ |
| // | |
| // \- last_serial_ts_assigned_ |
| // or like: |
| // b) |
| // SSSSSSSSSSSSSSSSSS A N |
| // | \- last_serial_ts_assigned_ |
| // | |
| // \- last_safe_ts_ |
| // |
| // If the current internal state is a), then we can advance safe time to 'N'. We know the |
| // leader will never assign a new timestamp lower than it. |
| if (PREDICT_TRUE(last_serial_ts_assigned_ <= last_safe_ts_)) { |
| last_safe_ts_ = clock_->Now(); |
| last_advanced_safe_time_ = MonoTime::Now(); |
| return last_safe_ts_; |
| } |
| // If the current state is b), then there might be an op with a timestamp |
| // that is lower than 'N' in between assignment and being appended to the |
| // queue. We can't consider 'N' safe and thus have to return the last |
| // known safe timestamp. |
| // Note that there can be at most one single op in this state, because |
| // prepare is single threaded. |
| return last_safe_ts_; |
| } |
| case NON_LEADER: |
| return last_safe_ts_; |
| } |
| __builtin_unreachable(); // silence gcc warnings |
| } |
| |
| Timestamp TimeManager::GetSerialTimestamp() { |
| Lock l(lock_); |
| return GetSerialTimestampUnlocked(); |
| } |
| |
| Timestamp TimeManager::GetSerialTimestampUnlocked() { |
| DCHECK(lock_.is_locked()); |
| |
| last_serial_ts_assigned_ = clock_->Now(); |
| return last_serial_ts_assigned_; |
| } |
| |
| Timestamp TimeManager::GetSerialTimestampPlusMaxError() { |
| return clock_->NowLatest(); |
| } |
| |
| |
| } // namespace consensus |
| } // namespace kudu |