blob: 07bd5422af2662dc052e5b1be92124e9a87d106e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License
#include <google/protobuf/message.h>
#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
#include <list>
#include <set>
#include <string>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/mutex.hpp>
#include <process/process.hpp>
#include <process/metrics/metrics.hpp>
#include <process/metrics/timer.hpp>
#include <stout/bytes.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/lambda.hpp>
#include <stout/hashmap.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/svn.hpp>
#include <stout/uuid.hpp>
#include "log/log.hpp"
#include "state/log.hpp"
using namespace mesos::internal::log;
using namespace process;
// Note that we don't add 'using std::set' here because we need
// 'std::' to disambiguate the 'set' member.
using std::list;
using std::string;
namespace mesos {
namespace internal {
namespace state {
// A storage implementation for State that uses the replicated
// log. The log is made up of appended operations. Each state entry is
// mapped to a log "snapshot".
// All operations are gated by 'start()' which makes sure that a
// Log::Writer has been started and all positions in the log have been
// read and cached in memory. All reads are performed by this cache
// (for now). If the Log::Writer gets demoted (i.e., because another
// writer started) then the current operation will return false
// implying the operation was not atomic and subsequent operations
// will re-'start()' which will again read all positions to make sure
// operations are consistent.
// TODO(benh): Log demotion does not necessarily imply a non-atomic
// read/modify/write. An alternative strategy might be to retry after
// restarting via 'start' (and holding on to the mutex so no other
// operations are attempted).
class LogStorageProcess : public Process<LogStorageProcess>
LogStorageProcess(Log* log, size_t diffsBetweenSnapshots);
virtual ~LogStorageProcess();
// Storage implementation.
Future<Option<state::Entry> > get(const string& name);
Future<bool> set(const state::Entry& entry, const UUID& uuid);
Future<bool> expunge(const state::Entry& entry);
Future<std::set<string> > names();
virtual void finalize();
Future<Nothing> start();
Future<Nothing> _start(const Option<Log::Position>& position);
Future<Nothing> __start(
const Log::Position& beginning,
const Log::Position& position);
// Helper for applying log entries.
Future<Nothing> apply(const list<Log::Entry>& entries);
// Helper for performing truncation.
void truncate();
Future<Nothing> _truncate();
Future<Nothing> __truncate(
const Log::Position& minimum,
const Option<Log::Position>& position);
// Continuations.
Future<Option<state::Entry> > _get(const string& name);
Future<bool> _set(const state::Entry& entry, const UUID& uuid);
Future<bool> __set(const state::Entry& entry, const UUID& uuid);
Future<bool> ___set(
const state::Entry& entry,
size_t diff,
Option<Log::Position> position);
Future<bool> _expunge(const state::Entry& entry);
Future<bool> __expunge(const state::Entry& entry);
Future<bool> ___expunge(
const state::Entry& entry,
const Option<Log::Position>& position);
Future<std::set<string> > _names();
Log::Reader reader;
Log::Writer writer;
const size_t diffsBetweenSnapshots;
// Used to serialize Log::Writer::append/truncate operations.
Mutex mutex;
// Whether or not we've started the ability to append to log.
Option<Future<Nothing> > starting;
// Last position in the log that we've read or written.
Option<Log::Position> index;
// Last position in the log up to which we've truncated.
Option<Log::Position> truncated;
// Note that while it would be nice to just use Operation::Snapshot
// modified to include a required field called 'position' we don't
// know the position (nor can we determine it) before we've done the
// actual appending of the data.
struct Snapshot
Snapshot(const Log::Position& position,
const state::Entry& entry,
size_t diffs = 0)
: position(position),
diffs(diffs) {}
// Returns a snapshot after having applied the specified diff.
Try<Snapshot> patch(const Operation::Diff& diff) const
if (diff.entry().name() != {
return Error("Attempted to patch the wrong snapshot");
Try<string> patch = svn::patch(
if (patch.isError()) {
return Error(patch.error());
Entry entry(diff.entry());
return Snapshot(position, entry, diffs + 1);
// Position in the log where this snapshot is located. NOTE: if
// 'diffs' is greater than 0 this still represents the location of
// the snapshot, not the last DIFF record in the log.
const Log::Position position;
// TODO(benh): Rather than storing the entire state::Entry we
// should just store the position, name, and UUID and cache the
// data so we don't use too much memory.
const state::Entry entry;
// This value represents the number of Operation::DIFFs in the
// underlying log that make up this "snapshot". If this snapshot
// is actually represented in the log this value is 0.
const size_t diffs;
// All known snapshots indexed by name. Note that 'hashmap::get'
// must be used instead of 'operator[]' since Snapshot doesn't have
// a default/empty constructor.
hashmap<string, Snapshot> snapshots;
struct Metrics
: diff("log_storage/diff")
process::metrics::Timer<Milliseconds> diff;
} metrics;
LogStorageProcess::LogStorageProcess(Log* log, size_t diffsBetweenSnapshots)
: reader(log),
diffsBetweenSnapshots(diffsBetweenSnapshots) {}
LogStorageProcess::~LogStorageProcess() {}
void LogStorageProcess::finalize()
if (starting.isSome()) {
Future<Nothing> LogStorageProcess::start()
if (starting.isSome()) {
return starting.get();
VLOG(2) << "Starting the writer";
starting = writer.start()
.then(defer(self(), &Self::_start, lambda::_1));
return starting.get();
Future<Nothing> LogStorageProcess::_start(
const Option<Log::Position>& position)
if (position.isNone()) {
VLOG(2) << "Writer failed to get elected, retrying";
starting = None(); // Reset 'starting' so we try again.
return start(); // TODO(benh): Don't try again forever?
VLOG(2) << "Writer got elected at position "
<< position.get().identity();
// Now read and apply log entries. Since 'start' can be called
// multiple times (i.e., since we reset 'starting' after getting a
// None position returned after 'set', 'expunge', etc) we need to
// check and see if we've already successfully read the log at least
// once by checking 'index'. If we haven't yet read the log (i.e.,
// this is the first call to 'start' and 'index' is None) then we
// get the beginning of the log first so we can read from that up to
// what ever position was known at the time we started the
// writer. Note that it should always be safe to read a truncated
// entry since a subsequent operation in the log should invalidate
// that entry when we read it instead.
if (index.isSome()) {
// If we've started before (i.e., have an 'index' position) we
// should also expect to know the last 'truncated' position.
return, position.get())
.then(defer(self(), &Self::apply, lambda::_1));
return reader.beginning()
.then(defer(self(), &Self::__start, lambda::_1, position.get()));
Future<Nothing> LogStorageProcess::__start(
const Log::Position& beginning,
const Log::Position& position)
truncated = beginning; // Cache for future truncations.
return, position)
.then(defer(self(), &Self::apply, lambda::_1));
Future<Nothing> LogStorageProcess::apply(const list<Log::Entry>& entries)
VLOG(2) << "Applying operations (" << entries.size() << " entries)";
// Only read and apply entries past our index.
foreach (const Log::Entry& entry, entries) {
if (index.isNone() || index.get() < entry.position) {
// Parse the Operation from the Log::Entry.
Operation operation;
google::protobuf::io::ArrayInputStream stream(,;
if (!operation.ParseFromZeroCopyStream(&stream)) {
return Failure("Failed to deserialize Operation");
switch (operation.type()) {
case Operation::SNAPSHOT: {
// Add or update (override) the snapshot.
Snapshot snapshot(entry.position, operation.snapshot().entry());
snapshots.put(, snapshot);
case Operation::DIFF: {
Option<Snapshot> snapshot =
Try<Snapshot> patched = snapshot.get().patch(operation.diff());
if (patched.isError()) {
return Failure("Failed to apply the diff: " + patched.error());
// Replace the snapshot with the patched snapshot.
snapshots.put(patched.get(), patched.get());
case Operation::EXPUNGE: {
return Failure("Unknown operation: " + stringify(operation.type()));
index = entry.position;
return Nothing();
// TODO(benh): Truncation could be optimized by saving the "oldest"
// snapshot and only doing a truncation if/when we update that
// snapshot.
// TODO(benh): Truncation is not enough to keep the log size small as
// the log could get very fragmented. We'll need a way to defragment
// the log as some state entries might not get set over a long period
// of time and their associated snapshots are causing the log to grow
// very big.
void LogStorageProcess::truncate()
// We lock the truncation since it includes a call to
// Log::Writer::truncate which must be serialized with calls to
// Log::Writer::append.
.then(defer(self(), &Self::_truncate))
.onAny(lambda::bind(&Mutex::unlock, mutex));
Future<Nothing> LogStorageProcess::_truncate()
// Determine the minimum necessary position for all the snapshots.
Option<Log::Position> minimum = None();
foreachvalue (const Snapshot& snapshot, snapshots) {
minimum = min(minimum, snapshot.position);
// TODO(benh): It's possible that the minimum position we've found
// will leave a lot of "unnecessary" entries in the log (e.g., a
// snapshot that has been overwritten at a later position). In this
// circumstance we should "compact/defrag" the log by writing/moving
// snapshots to the end of the log first applying any diffs as
// necessary.
if (minimum.isSome() && minimum.get() > truncated.get()) {
return writer.truncate(minimum.get())
.then(defer(self(), &Self::__truncate, minimum.get(), lambda::_1));
// NOTE: Any failure from Log::Writer::truncate doesn't propagate
// since the expectation is any subsequent Log::Writer::append
// would cause a failure. Furthermore, if the failure was
// temporary any subsequent Log::Writer::truncate should rectify a
// "missing" truncation.
return Nothing();
Future<Nothing> LogStorageProcess::__truncate(
const Log::Position& minimum,
const Option<Log::Position>& position)
// Don't bother retrying truncation if we're demoted, we'll
// just try again the next time 'truncate()' gets called
// (after we've done what's necessary to append again).
if (position.isSome()) {
truncated = max(truncated, minimum);
index = max(index, position);
return Nothing();
Future<Option<state::Entry> > LogStorageProcess::get(const string& name)
return start()
.then(defer(self(), &Self::_get, name));
Future<Option<state::Entry> > LogStorageProcess::_get(const string& name)
Option<Snapshot> snapshot = snapshots.get(name);
if (snapshot.isNone()) {
return None();
return snapshot.get().entry;
Future<bool> LogStorageProcess::set(
const state::Entry& entry,
const UUID& uuid)
return mutex.lock()
.then(defer(self(), &Self::_set, entry, uuid))
.onAny(lambda::bind(&Mutex::unlock, mutex));
Future<bool> LogStorageProcess::_set(
const state::Entry& entry,
const UUID& uuid)
return start()
.then(defer(self(), &Self::__set, entry, uuid));
Future<bool> LogStorageProcess::__set(
const state::Entry& entry,
const UUID& uuid)
Option<Snapshot> snapshot = snapshots.get(;
// Check the version first (if we've already got a snapshot).
if (snapshot.isSome() &&
UUID::fromBytes(snapshot.get().entry.uuid()) != uuid) {
return false;
// Check if we should try to compute a diff.
if (snapshot.isSome() && snapshot.get().diffs < diffsBetweenSnapshots) {
// Keep metrics for the time to calculate diffs.
// Construct the diff of the last snapshot.
Try<svn::Diff> diff = svn::diff(
Duration elapsed = metrics.diff.stop();
if (diff.isError()) {
// TODO(benh): Fallback and try and write a whole snapshot?
return Failure("Failed to construct diff: " + diff.error());
VLOG(1) << "Created an SVN diff in " << elapsed
<< " of size " << Bytes(diff.get().data.size()) << " which is "
<< (diff.get().data.size() / (double) entry.value().size()) * 100.0
<< "% the original size (" << Bytes(entry.value().size()) << ")";
// Only write the diff if it provides a reduction in size.
if (diff.get().data.size() < entry.value().size()) {
// Append a diff operation.
Operation operation;
string value;
if (!operation.SerializeToString(&value)) {
return Failure("Failed to serialize DIFF Operation");
return writer.append(value)
snapshot.get().diffs + 1,
// Write the full snapshot.
Operation operation;
string value;
if (!operation.SerializeToString(&value)) {
return Failure("Failed to serialize SNAPSHOT Operation");
return writer.append(value)
.then(defer(self(), &Self::___set, entry, 0, lambda::_1));
Future<bool> LogStorageProcess::___set(
const state::Entry& entry,
size_t diffs,
Option<Log::Position> position)
if (position.isNone()) {
starting = None(); // Reset 'starting' so we try again.
return false;
// Update index so we don't bother reading anything before this
// position again (if we don't have to).
index = max(index, position);
// Determine the position that represents the snapshot: if we just
// wrote a diff then we want to use the existing position of the
// snapshot, otherwise we just overwrote the snapshot so we should
// use the returned position (i.e., do nothing).
if (diffs > 0) {
position = snapshots.get(;
Snapshot snapshot(position.get(), entry, diffs);
snapshots.put(, snapshot);
// And truncate the log if necessary.
return true;
Future<bool> LogStorageProcess::expunge(const state::Entry& entry)
return mutex.lock()
.then(defer(self(), &Self::_expunge, entry))
.onAny(lambda::bind(&Mutex::unlock, mutex));
Future<bool> LogStorageProcess::_expunge(const state::Entry& entry)
return start()
.then(defer(self(), &Self::__expunge, entry));
Future<bool> LogStorageProcess::__expunge(const state::Entry& entry)
Option<Snapshot> snapshot = snapshots.get(;
if (snapshot.isNone()) {
return false;
// Check the version first.
if (UUID::fromBytes(snapshot.get().entry.uuid()) !=
UUID::fromBytes(entry.uuid())) {
return false;
// Now serialize and append an expunge operation.
Operation operation;
string value;
if (!operation.SerializeToString(&value)) {
return Failure("Failed to serialize Operation");
return writer.append(value)
.then(defer(self(), &Self::___expunge, entry, lambda::_1));
Future<bool> LogStorageProcess::___expunge(
const state::Entry& entry,
const Option<Log::Position>& position)
if (position.isNone()) {
starting = None(); // Reset 'starting' so we try again.
return false;
// Remove from snapshots and truncate the log if possible.
return true;
Future<std::set<string> > LogStorageProcess::names()
return start()
.then(defer(self(), &Self::_names));
Future<std::set<string> > LogStorageProcess::_names()
const hashset<string>& keys = snapshots.keys();
return std::set<string>(keys.begin(), keys.end());
LogStorage::LogStorage(Log* log, size_t diffsBetweenSnapshots)
process = new LogStorageProcess(log, diffsBetweenSnapshots);
delete process;
Future<Option<state::Entry> > LogStorage::get(const string& name)
return dispatch(process, &LogStorageProcess::get, name);
Future<bool> LogStorage::set(const state::Entry& entry, const UUID& uuid)
return dispatch(process, &LogStorageProcess::set, entry, uuid);
Future<bool> LogStorage::expunge(const state::Entry& entry)
return dispatch(process, &LogStorageProcess::expunge, entry);
Future<std::set<string> > LogStorage::names()
return dispatch(process, &LogStorageProcess::names);
} // namespace state {
} // namespace internal {
} // namespace mesos {