blob: 8ca28695562c61a8de0eb942b547cbfefc61fb84 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#include <google/protobuf/message.h>
#include <google/protobuf/io/zero_copy_stream_impl.h> // For ArrayInputStream.
#include <list>
#include <set>
#include <string>
#include <mesos/log/log.hpp>
#include <mesos/state/log.hpp>
#include <process/defer.hpp>
#include <process/dispatch.hpp>
#include <process/future.hpp>
#include <process/id.hpp>
#include <process/mutex.hpp>
#include <process/process.hpp>
#include <process/metrics/metrics.hpp>
#include <process/metrics/timer.hpp>
#include <stout/bytes.hpp>
#include <stout/duration.hpp>
#include <stout/foreach.hpp>
#include <stout/lambda.hpp>
#include <stout/hashmap.hpp>
#include <stout/nothing.hpp>
#include <stout/option.hpp>
#include <stout/svn.hpp>
#include <stout/uuid.hpp>
#include "messages/state.hpp"
using namespace mesos::internal::log;
using namespace process;
// Note that we don't add 'using std::set' here because we need
// 'std::' to disambiguate the 'set' member.
using std::list;
using std::string;
using mesos::log::Log;
using mesos::internal::state::Entry;
using mesos::internal::state::Operation;
namespace mesos {
namespace state {
// A storage implementation for State that uses the replicated
// log. The log is made up of appended operations. Each state entry is
// mapped to a log "snapshot".
//
// All operations are gated by 'start()' which makes sure that a
// Log::Writer has been started and all positions in the log have been
// read and cached in memory. All reads are performed by this cache
// (for now). If the Log::Writer gets demoted (i.e., because another
// writer started) then the current operation will return false
// implying the operation was not atomic and subsequent operations
// will re-'start()' which will again read all positions to make sure
// operations are consistent.
// TODO(benh): Log demotion does not necessarily imply a non-atomic
// read/modify/write. An alternative strategy might be to retry after
// restarting via 'start' (and holding on to the mutex so no other
// operations are attempted).
class LogStorageProcess : public Process<LogStorageProcess>
{
public:
LogStorageProcess(Log* log, size_t diffsBetweenSnapshots);
~LogStorageProcess() override;
// Storage implementation.
Future<Option<Entry>> get(const string& name);
Future<bool> set(const Entry& entry, const id::UUID& uuid);
Future<bool> expunge(const Entry& entry);
Future<std::set<string>> names();
protected:
void finalize() override;
private:
Future<Nothing> start();
Future<Nothing> _start(const Option<Log::Position>& position);
Future<Nothing> __start(
const Log::Position& beginning,
const Log::Position& position);
// Helper for applying log entries.
Future<Nothing> apply(const list<Log::Entry>& entries);
// Helper for performing truncation.
void truncate();
Future<Nothing> _truncate();
Future<Nothing> __truncate(
const Log::Position& minimum,
const Option<Log::Position>& position);
// Continuations.
Future<Option<Entry>> _get(const string& name);
Future<bool> _set(const Entry& entry, const id::UUID& uuid);
Future<bool> __set(const Entry& entry, const id::UUID& uuid);
Future<bool> ___set(
const Entry& entry,
size_t diff,
Option<Log::Position> position);
Future<bool> _expunge(const Entry& entry);
Future<bool> __expunge(const Entry& entry);
Future<bool> ___expunge(
const Entry& entry,
const Option<Log::Position>& position);
Future<std::set<string>> _names();
Log::Reader reader;
Log::Writer writer;
const size_t diffsBetweenSnapshots;
// Used to serialize Log::Writer::append/truncate operations.
Mutex mutex;
// Whether or not we've started the ability to append to log.
Option<Future<Nothing>> starting;
// Last position in the log that we've read or written.
Option<Log::Position> index;
// Last position in the log up to which we've truncated.
Option<Log::Position> truncated;
// Note that while it would be nice to just use Operation::Snapshot
// modified to include a required field called 'position' we don't
// know the position (nor can we determine it) before we've done the
// actual appending of the data.
struct Snapshot
{
Snapshot(const Log::Position& position,
const Entry& entry,
size_t diffs = 0)
: position(position),
entry(entry),
diffs(diffs) {}
// Returns a snapshot after having applied the specified diff.
Try<Snapshot> patch(const Operation::Diff& diff) const
{
if (diff.entry().name() != entry.name()) {
return Error("Attempted to patch the wrong snapshot");
}
Try<string> patch = svn::patch(
entry.value(),
svn::Diff(diff.entry().value()));
if (patch.isError()) {
return Error(patch.error());
}
Entry entry(diff.entry());
entry.set_value(patch.get());
return Snapshot(position, entry, diffs + 1);
}
// Position in the log where this snapshot is located. NOTE: if
// 'diffs' is greater than 0 this still represents the location of
// the snapshot, not the last DIFF record in the log.
const Log::Position position;
// TODO(benh): Rather than storing the entire Entry we
// should just store the position, name, and UUID and cache the
// data so we don't use too much memory.
const Entry entry;
// This value represents the number of Operation::DIFFs in the
// underlying log that make up this "snapshot". If this snapshot
// is actually represented in the log this value is 0.
const size_t diffs;
};
// All known snapshots indexed by name. Note that 'hashmap::get'
// must be used instead of 'operator[]' since Snapshot doesn't have
// a default/empty constructor.
hashmap<string, Snapshot> snapshots;
struct Metrics
{
Metrics()
: diff("log_storage/diff")
{
process::metrics::add(diff);
}
~Metrics()
{
process::metrics::remove(diff);
}
process::metrics::Timer<Milliseconds> diff;
} metrics;
};
LogStorageProcess::LogStorageProcess(Log* log, size_t diffsBetweenSnapshots)
: ProcessBase(process::ID::generate("log-storage")),
reader(log),
writer(log),
diffsBetweenSnapshots(diffsBetweenSnapshots) {}
LogStorageProcess::~LogStorageProcess() {}
void LogStorageProcess::finalize()
{
if (starting.isSome()) {
Future<Nothing>(starting.get()).discard();
}
}
Future<Nothing> LogStorageProcess::start()
{
if (starting.isSome()) {
return starting.get();
}
VLOG(2) << "Starting the writer";
starting = writer.start()
.then(defer(self(), &Self::_start, lambda::_1));
return starting.get();
}
Future<Nothing> LogStorageProcess::_start(
const Option<Log::Position>& position)
{
CHECK_SOME(starting);
if (position.isNone()) {
VLOG(2) << "Writer failed to get elected, retrying";
starting = None(); // Reset 'starting' so we try again.
return start(); // TODO(benh): Don't try again forever?
}
VLOG(2) << "Writer got elected at position " << position->identity();
// Now read and apply log entries. Since 'start' can be called
// multiple times (i.e., since we reset 'starting' after getting a
// None position returned after 'set', 'expunge', etc) we need to
// check and see if we've already successfully read the log at least
// once by checking 'index'. If we haven't yet read the log (i.e.,
// this is the first call to 'start' and 'index' is None) then we
// get the beginning of the log first so we can read from that up to
// what ever position was known at the time we started the
// writer. Note that it should always be safe to read a truncated
// entry since a subsequent operation in the log should invalidate
// that entry when we read it instead.
if (index.isSome()) {
// If we've started before (i.e., have an 'index' position) we
// should also expect to know the last 'truncated' position.
CHECK_SOME(truncated);
return reader.read(index.get(), position.get())
.then(defer(self(), &Self::apply, lambda::_1));
}
return reader.beginning()
.then(defer(self(), &Self::__start, lambda::_1, position.get()));
}
Future<Nothing> LogStorageProcess::__start(
const Log::Position& beginning,
const Log::Position& position)
{
CHECK_SOME(starting);
truncated = beginning; // Cache for future truncations.
return reader.read(beginning, position)
.then(defer(self(), &Self::apply, lambda::_1));
}
Future<Nothing> LogStorageProcess::apply(const list<Log::Entry>& entries)
{
VLOG(2) << "Applying operations (" << entries.size() << " entries)";
// Only read and apply entries past our index.
foreach (const Log::Entry& entry, entries) {
if (index.isNone() || index.get() < entry.position) {
// Parse the Operation from the Log::Entry.
Operation operation;
google::protobuf::io::ArrayInputStream stream(
entry.data.data(),
entry.data.size());
if (!operation.ParseFromZeroCopyStream(&stream)) {
return Failure("Failed to deserialize Operation");
}
switch (operation.type()) {
case Operation::SNAPSHOT: {
CHECK(operation.has_snapshot());
// Add or update (override) the snapshot.
Snapshot snapshot(entry.position, operation.snapshot().entry());
snapshots.put(snapshot.entry.name(), snapshot);
break;
}
case Operation::DIFF: {
CHECK(operation.has_diff());
Option<Snapshot> snapshot =
snapshots.get(operation.diff().entry().name());
CHECK_SOME(snapshot);
Try<Snapshot> patched = snapshot->patch(operation.diff());
if (patched.isError()) {
return Failure("Failed to apply the diff: " + patched.error());
}
// Replace the snapshot with the patched snapshot.
snapshots.put(patched->entry.name(), patched.get());
break;
}
case Operation::EXPUNGE: {
CHECK(operation.has_expunge());
snapshots.erase(operation.expunge().name());
break;
}
default:
return Failure("Unknown operation: " + stringify(operation.type()));
}
index = entry.position;
}
}
return Nothing();
}
// TODO(benh): Truncation could be optimized by saving the "oldest"
// snapshot and only doing a truncation if/when we update that
// snapshot.
// TODO(benh): Truncation is not enough to keep the log size small as
// the log could get very fragmented. We'll need a way to defragment
// the log as some state entries might not get set over a long period
// of time and their associated snapshots are causing the log to grow
// very big.
void LogStorageProcess::truncate()
{
// We lock the truncation since it includes a call to
// Log::Writer::truncate which must be serialized with calls to
// Log::Writer::append.
mutex.lock()
.then(defer(self(), &Self::_truncate))
.onAny(lambda::bind(&Mutex::unlock, mutex));
}
Future<Nothing> LogStorageProcess::_truncate()
{
// Determine the minimum necessary position for all the snapshots.
Option<Log::Position> minimum = None();
foreachvalue (const Snapshot& snapshot, snapshots) {
minimum = min(minimum, snapshot.position);
}
// TODO(benh): It's possible that the minimum position we've found
// will leave a lot of "unnecessary" entries in the log (e.g., a
// snapshot that has been overwritten at a later position). In this
// circumstance we should "compact/defrag" the log by writing/moving
// snapshots to the end of the log first applying any diffs as
// necessary.
CHECK_SOME(truncated);
if (minimum.isSome() && minimum.get() > truncated.get()) {
return writer.truncate(minimum.get())
.then(defer(self(), &Self::__truncate, minimum.get(), lambda::_1));
// NOTE: Any failure from Log::Writer::truncate doesn't propagate
// since the expectation is any subsequent Log::Writer::append
// would cause a failure. Furthermore, if the failure was
// temporary any subsequent Log::Writer::truncate should rectify a
// "missing" truncation.
}
return Nothing();
}
Future<Nothing> LogStorageProcess::__truncate(
const Log::Position& minimum,
const Option<Log::Position>& position)
{
// Don't bother retrying truncation if we're demoted, we'll
// just try again the next time 'truncate()' gets called
// (after we've done what's necessary to append again).
if (position.isSome()) {
truncated = max(truncated, minimum);
index = max(index, position);
}
return Nothing();
}
Future<Option<Entry>> LogStorageProcess::get(const string& name)
{
return start()
.then(defer(self(), &Self::_get, name));
}
Future<Option<Entry>> LogStorageProcess::_get(const string& name)
{
Option<Snapshot> snapshot = snapshots.get(name);
if (snapshot.isNone()) {
return None();
}
return snapshot->entry;
}
Future<bool> LogStorageProcess::set(
const Entry& entry,
const id::UUID& uuid)
{
return mutex.lock()
.then(defer(self(), &Self::_set, entry, uuid))
.onAny(lambda::bind(&Mutex::unlock, mutex));
}
Future<bool> LogStorageProcess::_set(
const Entry& entry,
const id::UUID& uuid)
{
return start()
.then(defer(self(), &Self::__set, entry, uuid));
}
Future<bool> LogStorageProcess::__set(
const Entry& entry,
const id::UUID& uuid)
{
Option<Snapshot> snapshot = snapshots.get(entry.name());
// Check the version first (if we've already got a snapshot).
if (snapshot.isSome() &&
id::UUID::fromBytes(snapshot->entry.uuid()).get() != uuid) {
return false;
}
// Check if we should try to compute a diff.
if (snapshot.isSome() && snapshot->diffs < diffsBetweenSnapshots) {
// Keep metrics for the time to calculate diffs.
metrics.diff.start();
// Construct the diff of the last snapshot.
Try<svn::Diff> diff = svn::diff(snapshot->entry.value(), entry.value());
Duration elapsed = metrics.diff.stop();
if (diff.isError()) {
// TODO(benh): Fallback and try and write a whole snapshot?
return Failure("Failed to construct diff: " + diff.error());
}
VLOG(1) << "Created an SVN diff in " << elapsed
<< " of size " << Bytes(diff->data.size()) << " which is "
<< (diff->data.size() / (double) entry.value().size()) * 100.0
<< "% the original size (" << Bytes(entry.value().size()) << ")";
// Only write the diff if it provides a reduction in size.
if (diff->data.size() < entry.value().size()) {
// Append a diff operation.
Operation operation;
operation.set_type(Operation::DIFF);
operation.mutable_diff()->mutable_entry()->CopyFrom(entry);
operation.mutable_diff()->mutable_entry()->set_value(diff->data);
string value;
if (!operation.SerializeToString(&value)) {
return Failure("Failed to serialize DIFF Operation");
}
return writer.append(value)
.then(defer(self(),
&Self::___set,
entry,
snapshot->diffs + 1,
lambda::_1));
}
}
// Write the full snapshot.
Operation operation;
operation.set_type(Operation::SNAPSHOT);
operation.mutable_snapshot()->mutable_entry()->CopyFrom(entry);
string value;
if (!operation.SerializeToString(&value)) {
return Failure("Failed to serialize SNAPSHOT Operation");
}
return writer.append(value)
.then(defer(self(), &Self::___set, entry, 0, lambda::_1));
}
Future<bool> LogStorageProcess::___set(
const Entry& entry,
size_t diffs,
Option<Log::Position> position)
{
if (position.isNone()) {
starting = None(); // Reset 'starting' so we try again.
return false;
}
// Update index so we don't bother reading anything before this
// position again (if we don't have to).
index = max(index, position);
// Determine the position that represents the snapshot: if we just
// wrote a diff then we want to use the existing position of the
// snapshot, otherwise we just overwrote the snapshot so we should
// use the returned position (i.e., do nothing).
if (diffs > 0) {
CHECK(snapshots.contains(entry.name()));
position = snapshots.at(entry.name()).position;
}
Snapshot snapshot(position.get(), entry, diffs);
snapshots.put(snapshot.entry.name(), snapshot);
// And truncate the log if necessary.
truncate();
return true;
}
Future<bool> LogStorageProcess::expunge(const Entry& entry)
{
return mutex.lock()
.then(defer(self(), &Self::_expunge, entry))
.onAny(lambda::bind(&Mutex::unlock, mutex));
}
Future<bool> LogStorageProcess::_expunge(const Entry& entry)
{
return start()
.then(defer(self(), &Self::__expunge, entry));
}
Future<bool> LogStorageProcess::__expunge(const Entry& entry)
{
Option<Snapshot> snapshot = snapshots.get(entry.name());
if (snapshot.isNone()) {
return false;
}
// Check the version first.
if (id::UUID::fromBytes(snapshot->entry.uuid()).get() !=
id::UUID::fromBytes(entry.uuid()).get()) {
return false;
}
// Now serialize and append an expunge operation.
Operation operation;
operation.set_type(Operation::EXPUNGE);
operation.mutable_expunge()->set_name(entry.name());
string value;
if (!operation.SerializeToString(&value)) {
return Failure("Failed to serialize Operation");
}
return writer.append(value)
.then(defer(self(), &Self::___expunge, entry, lambda::_1));
}
Future<bool> LogStorageProcess::___expunge(
const Entry& entry,
const Option<Log::Position>& position)
{
if (position.isNone()) {
starting = None(); // Reset 'starting' so we try again.
return false;
}
// Remove from snapshots and truncate the log if possible.
CHECK(snapshots.contains(entry.name()));
snapshots.erase(entry.name());
truncate();
return true;
}
Future<std::set<string>> LogStorageProcess::names()
{
return start()
.then(defer(self(), &Self::_names));
}
Future<std::set<string>> LogStorageProcess::_names()
{
const hashset<string>& keys = snapshots.keys();
return std::set<string>(keys.begin(), keys.end());
}
LogStorage::LogStorage(Log* log, size_t diffsBetweenSnapshots)
{
process = new LogStorageProcess(log, diffsBetweenSnapshots);
spawn(process);
}
LogStorage::~LogStorage()
{
terminate(process);
wait(process);
delete process;
}
Future<Option<Entry>> LogStorage::get(const string& name)
{
return dispatch(process, &LogStorageProcess::get, name);
}
Future<bool> LogStorage::set(const Entry& entry, const id::UUID& uuid)
{
return dispatch(process, &LogStorageProcess::set, entry, uuid);
}
Future<bool> LogStorage::expunge(const Entry& entry)
{
return dispatch(process, &LogStorageProcess::expunge, entry);
}
Future<std::set<string>> LogStorage::names()
{
return dispatch(process, &LogStorageProcess::names);
}
} // namespace state {
} // namespace mesos {