blob: e6175700caefb47a95b1edc4ae41755f06781fb4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <gtest/gtest.h>
#include <mesos/v1/mesos.hpp>
#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/owned.hpp>
#include <stout/lambda.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
#include <stout/uuid.hpp>
#include <stout/os/ftruncate.hpp>
#include "common/protobuf_utils.hpp"
#include "slave/constants.hpp"
#include "tests/mesos.hpp"
#include "tests/utils.hpp"
#include "status_update_manager/operation.hpp"
using lambda::function;
using process::Clock;
using process::Future;
using process::Owned;
using process::Promise;
using std::string;
namespace mesos {
namespace internal {
namespace tests {
// This class will be the target of the forward callbacks passed to the
// operation status update managers in this test suite.
//
// It uses gmock, so that we can easily set expectations and check how
// often/which status updates are forwarded.
class MockUpdateOperationStatusMessageProcessor
{
public:
MOCK_METHOD1(update, void(const UpdateOperationStatusMessage&));
};
class OperationStatusUpdateManagerTest : public MesosTest
{
protected:
OperationStatusUpdateManagerTest()
: statusUpdateManager(new OperationStatusUpdateManager())
{
Clock::pause();
const function<void(const UpdateOperationStatusMessage&)> forward =
[&](const UpdateOperationStatusMessage& update) {
statusUpdateProcessor.update(update);
};
statusUpdateManager->initialize(
forward, OperationStatusUpdateManagerTest::getPath);
}
void TearDown() override
{
Clock::resume();
statusUpdateManager.reset();
MesosTest::TearDown();
}
UpdateOperationStatusMessage createUpdateOperationStatusMessage(
const id::UUID& uuid,
const id::UUID& operationUuid,
const OperationState& state,
const Option<FrameworkID>& frameworkId = None())
{
UpdateOperationStatusMessage statusUpdate;
statusUpdate.mutable_operation_uuid()->CopyFrom(
protobuf::createUUID(operationUuid));
if (frameworkId.isSome()) {
statusUpdate.mutable_framework_id()->CopyFrom(frameworkId.get());
}
OperationStatus* status = statusUpdate.mutable_status();
status->set_state(state);
status->mutable_uuid()->CopyFrom(protobuf::createUUID(uuid));
return statusUpdate;
}
void resetStatusUpdateManager()
{
statusUpdateManager.reset(new OperationStatusUpdateManager());
const function<void(const UpdateOperationStatusMessage&)> forward =
[&](const UpdateOperationStatusMessage& update) {
statusUpdateProcessor.update(update);
};
statusUpdateManager->initialize(
forward, OperationStatusUpdateManagerTest::getPath);
}
static const string getPath(const id::UUID& operationUuid)
{
return path::join(os::getcwd(), "streams", operationUuid.toString());
}
Owned<OperationStatusUpdateManager> statusUpdateManager;
MockUpdateOperationStatusMessageProcessor statusUpdateProcessor;
};
// This test verifies that the status update manager will not retry a terminal
// status update after it has been acknowledged.
TEST_F(OperationStatusUpdateManagerTest, UpdateAndAck)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a terminal update, so `acknowledgement`
// should return `false`.
AWAIT_EXPECT_FALSE(
statusUpdateManager->acknowledgement(operationUuid, statusUuid));
// Advance the clock, the status update has been acknowledged, so it shouldn't
// trigger a retry.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
}
// This test verifies that the status update manager will not retry a
// non-terminal status update after it has been acknowledged.
TEST_F(OperationStatusUpdateManagerTest, UpdateAndAckNonTerminalUpdate)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_PENDING);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a non-terminal update, so `acknowledgement`
// should return `true`.
AWAIT_EXPECT_TRUE(
statusUpdateManager->acknowledgement(operationUuid, statusUuid));
// Advance the clock, the status update has been acknowledged, so it shouldn't
// trigger a retry.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
}
// This test verifies that the status update manager resends status updates
// until they are acknowledged.
TEST_F(OperationStatusUpdateManagerTest, ResendUnacknowledged)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate1;
Future<UpdateOperationStatusMessage> forwardedStatusUpdate2;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate2));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
EXPECT_FALSE(forwardedStatusUpdate2.isReady());
// Advance the clock to trigger a retry.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
// Verify that the status update is forwarded again.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
// Acknowledge the update, this is a terminal update, so `acknowledgement`
// should return `false`.
AWAIT_EXPECT_FALSE(
statusUpdateManager->acknowledgement(operationUuid, statusUuid));
// Advance the clock, the status update has been acknowledged, so it shouldn't
// trigger a retry.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
}
// This test verifies that after the updates belonging to a framework are
// cleaned up from the status update manager, the status update manager stops
// resending them.
TEST_F(OperationStatusUpdateManagerTest, Cleanup)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
FrameworkID frameworkId;
frameworkId.set_value("frameworkId");
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid,
operationUuid,
OperationState::OPERATION_FINISHED,
frameworkId);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Cleanup the framework.
statusUpdateManager->cleanup(frameworkId);
// Advance the clock enough to trigger a retry if the update hasn't been
// cleaned up.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
}
// This test verifies that the status update manager is able to recover
// checkpointed status updates, and that it resends the recovered updates that
// haven't been acknowledged.
TEST_F(OperationStatusUpdateManagerTest, RecoverCheckpointedStream)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate1;
Future<UpdateOperationStatusMessage> forwardedStatusUpdate2;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate2));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
resetStatusUpdateManager();
// Advance the clock enough to trigger a retry if the update hasn't been
// cleaned up.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
EXPECT_FALSE(forwardedStatusUpdate2.isReady());
// Recover the checkpointed stream.
Future<OperationStatusUpdateManagerState> state =
statusUpdateManager->recover({operationUuid}, true);
AWAIT_READY(state);
EXPECT_EQ(0u, state->errors);
EXPECT_TRUE(state->streams.contains(operationUuid));
EXPECT_SOME(state->streams.at(operationUuid));
const Option<UpdateOperationStatusMessage> recoveredUpdate =
state->streams.at(operationUuid)->updates.front();
ASSERT_SOME(recoveredUpdate);
EXPECT_EQ(statusUpdate, recoveredUpdate.get());
// The stream should NOT be terminated.
EXPECT_FALSE(state->streams.at(operationUuid)->terminated);
// Check that the status update is resent.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
}
// This test verifies that the status update manager returns a `Failure` when
// trying to recover a stream that isn't checkpointed.
TEST_F(OperationStatusUpdateManagerTest, RecoverNotCheckpointedStream)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
// Send a non-checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, false));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Verify that the stream file is NOT created.
EXPECT_TRUE(!os::exists(getPath(operationUuid)));
resetStatusUpdateManager();
// Trying to recover the non-checkpointed stream should fail.
AWAIT_EXPECT_FAILED(statusUpdateManager->recover({operationUuid}, true));
}
// This test verifies that the status update manager doesn't return a
// `Failure` when trying to recover a stream from an empty file.
//
// This can happen when the checkpointing failed between opening the file and
// writing the first update. In this case `recover()` should succeed, but return
// `None` as the operation's state.
TEST_F(OperationStatusUpdateManagerTest, RecoverEmptyFile)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
resetStatusUpdateManager();
// Advance the clock enough to trigger a retry if the update hasn't been
// cleaned up.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
// Truncate the status updates to zero bytes, simulating a failure in
// checkpointing, where the file was created, but the status update manager
// crashed before any data was written to disk.
Try<int_fd> fd = os::open(getPath(operationUuid), O_RDWR);
ASSERT_SOME(os::ftruncate(fd.get(), 0));
os::close(fd.get());
// The recovery of the empty stream should not fail, but the stream state
// should be empty.
Future<OperationStatusUpdateManagerState> state =
statusUpdateManager->recover({operationUuid}, false);
AWAIT_READY(state);
EXPECT_EQ(0u, state->errors);
EXPECT_TRUE(state->streams.contains(operationUuid));
EXPECT_NONE(state->streams.at(operationUuid));
}
// This test verifies that the status update manager doesn't return a `Failure`
// when trying to recover a stream from a parent directory that doesn't contain
// the status updates file.
//
// This can happen when the checkpointing failed after creating the parent
// directory, but before the file was opened. In this case `recover()` should
// succeed, but return `None` as the operation's state.
TEST_F(OperationStatusUpdateManagerTest, RecoverEmptyDirectory)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
resetStatusUpdateManager();
// Advance the clock enough to trigger a retry if the update hasn't been
// cleaned up.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
// Leave the parent directory, but remove the status updates file, simulating
// a failure in checkpointing, where the parent directory was created, but the
// checkpointing process crashed before opening the status updates file.
ASSERT_SOME(os::rm(getPath(operationUuid)));
// The recovery of the empty stream should not fail, but the stream state
// should be empty.
Future<OperationStatusUpdateManagerState> state =
statusUpdateManager->recover({operationUuid}, false);
AWAIT_READY(state);
EXPECT_EQ(0u, state->errors);
EXPECT_TRUE(state->streams.contains(operationUuid));
EXPECT_NONE(state->streams.at(operationUuid));
}
// This test verifies that the status update manager is able to recover a
// terminated stream, and that the stream's state reflects that it is
// terminated.
TEST_F(OperationStatusUpdateManagerTest, RecoverTerminatedStream)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a terminal update, so `acknowledgement`
// should return `false`.
AWAIT_EXPECT_FALSE(
statusUpdateManager->acknowledgement(operationUuid, statusUuid));
resetStatusUpdateManager();
// Recover the checkpointed stream.
Future<OperationStatusUpdateManagerState> state =
statusUpdateManager->recover({operationUuid}, true);
AWAIT_ASSERT_READY(state);
EXPECT_EQ(0u, state->errors);
EXPECT_TRUE(state->streams.contains(operationUuid));
EXPECT_SOME(state->streams.at(operationUuid));
// The stream should be terminated.
EXPECT_TRUE(state->streams.at(operationUuid)->terminated);
const Option<UpdateOperationStatusMessage> recoveredUpdate =
state->streams.at(operationUuid)->updates.front();
ASSERT_SOME(recoveredUpdate);
EXPECT_EQ(statusUpdate, recoveredUpdate.get());
// Advance the clock, the status update has been acknowledged, so it shouldn't
// trigger a retry.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
}
// This test verifies that the status update manager silently ignores duplicate
// updates.
TEST_F(OperationStatusUpdateManagerTest, IgnoreDuplicateUpdate)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_PENDING);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a non-terminal update, so `acknowledgement`
// should return `true`.
AWAIT_EXPECT_TRUE(
statusUpdateManager->acknowledgement(operationUuid, statusUuid));
// Check that a duplicated update is ignored.
AWAIT_EXPECT_READY(statusUpdateManager->update(statusUpdate, true));
// Advance the clock enough to trigger a retry if the update hasn't been
// acknowledged.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
}
// This test verifies that after recovery the status update manager still
// silently ignores duplicate updates.
TEST_F(OperationStatusUpdateManagerTest, IgnoreDuplicateUpdateAfterRecover)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_PENDING);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a non-terminal update, so `acknowledgement`
// should return `true`.
AWAIT_EXPECT_TRUE(
statusUpdateManager->acknowledgement(operationUuid, statusUuid));
resetStatusUpdateManager();
// Recover the checkpointed stream.
AWAIT_ASSERT_READY(statusUpdateManager->recover({operationUuid}, true));
// Check that a duplicated update is ignored.
AWAIT_EXPECT_READY(statusUpdateManager->update(statusUpdate, true));
// Advance the clock enough to trigger a retry if the update hasn't been
// acknowledged.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
}
// This test verifies that the status update manager rejects duplicated
// acknowledgements.
TEST_F(OperationStatusUpdateManagerTest, RejectDuplicateAck)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_PENDING);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a non-terminal update, so `acknowledgement`
// should return `true`.
AWAIT_EXPECT_TRUE(
statusUpdateManager->acknowledgement(operationUuid, statusUuid));
// Advance the clock enough to trigger a retry if the update hasn't been
// ignored.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
// Try to acknowledge the same update, the operation should fail.
AWAIT_EXPECT_FAILED(
statusUpdateManager->acknowledgement(operationUuid, statusUuid));
}
// This test verifies that after recovery the status update manager still
// rejects duplicated acknowledgements.
TEST_F(OperationStatusUpdateManagerTest, RejectDuplicateAckAfterRecover)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_PENDING);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
// Acknowledge the update, this is a non-terminal update, so `acknowledgement`
// should return `true`.
AWAIT_EXPECT_TRUE(
statusUpdateManager->acknowledgement(operationUuid, statusUuid));
resetStatusUpdateManager();
// Recover the checkpointed stream.
AWAIT_ASSERT_READY(statusUpdateManager->recover({operationUuid}, true));
// Try to acknowledge the same update, the operation should fail.
AWAIT_EXPECT_FAILED(
statusUpdateManager->acknowledgement(operationUuid, statusUuid));
// Advance the clock enough to trigger a retry if the update hasn't been
// ignored.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
}
// This test verifies that non-strict recovery of a status updates file
// containing a corrupted record at the end succeeds.
TEST_F(OperationStatusUpdateManagerTest, NonStrictRecoveryCorruptedFile)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate1;
Future<UpdateOperationStatusMessage> forwardedStatusUpdate2;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate2));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
resetStatusUpdateManager();
// Advance the clock enough to trigger a retry if the update hasn't been
// cleaned up.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
// The file should contain only `UpdateOperationStatusRecord` messages.
// Write a `UpdateOperationStatusMessage` to the end of the file, so that the
// recovery process encounters an error.
Try<int_fd> fd = os::open(getPath(operationUuid), O_APPEND | O_RDWR);
ASSERT_SOME(fd);
ASSERT_SOME(::protobuf::write(fd.get(), statusUpdate));
ASSERT_SOME(os::close(fd.get()));
EXPECT_FALSE(forwardedStatusUpdate2.isReady());
// The non-strict recovery of the corrupted stream should not fail, but the
// state should reflect that the status update file contained a corrupted
// record.
Future<OperationStatusUpdateManagerState> state =
statusUpdateManager->recover({operationUuid}, false);
AWAIT_READY(state);
EXPECT_EQ(1u, state->errors);
EXPECT_TRUE(state->streams.contains(operationUuid));
EXPECT_SOME(state->streams.at(operationUuid));
// Check that the status update could be recovered.
const Option<UpdateOperationStatusMessage> recoveredUpdate =
state->streams.at(operationUuid)->updates.front();
ASSERT_SOME(recoveredUpdate);
EXPECT_EQ(statusUpdate, recoveredUpdate.get());
// Check that the status update is resent.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
}
// This test verifies that strict recovery of a status updates file containing
// a corrupted record at the end fails.
TEST_F(OperationStatusUpdateManagerTest, StrictRecoveryCorruptedFile)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid = id::UUID::random();
UpdateOperationStatusMessage statusUpdate =
createUpdateOperationStatusMessage(
statusUuid, operationUuid, OperationState::OPERATION_FINISHED);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate, true));
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(statusUpdate.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate);
resetStatusUpdateManager();
// Advance the clock enough to trigger a retry if the update hasn't been
// cleaned up.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
// The file should contain only `UpdateOperationStatusMessageRecord` messages.
// Write a `UpdateOperationStatusMessage` to the end of the file, so that the
// recovery process encounters an error.
Try<int_fd> fd = os::open(getPath(operationUuid), O_APPEND | O_RDWR);
ASSERT_SOME(fd);
ASSERT_SOME(::protobuf::write(fd.get(), statusUpdate));
ASSERT_SOME(os::close(fd.get()));
// The strict recovery of the corrupted stream should fail.
AWAIT_ASSERT_FAILED(statusUpdateManager->recover({operationUuid}, true));
}
// This test verifies that the status update manager correctly fills in the
// latest status when (re)sending status updates.
TEST_F(OperationStatusUpdateManagerTest, UpdateLatestWhenResending)
{
Future<UpdateOperationStatusMessage> forwardedStatusUpdate1;
Future<UpdateOperationStatusMessage> forwardedStatusUpdate2;
Future<UpdateOperationStatusMessage> forwardedStatusUpdate3;
EXPECT_CALL(statusUpdateProcessor, update(_))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate1))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate2))
.WillOnce(FutureArg<0>(&forwardedStatusUpdate3));
const id::UUID operationUuid = id::UUID::random();
const id::UUID statusUuid1 = id::UUID::random();
UpdateOperationStatusMessage statusUpdate1 =
createUpdateOperationStatusMessage(
statusUuid1, operationUuid, OperationState::OPERATION_PENDING);
// Send a checkpointed operation status update.
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate1, true));
// The status update manager should fill in the `latest_status` field with the
// status update we just sent.
UpdateOperationStatusMessage expectedStatusUpdate(statusUpdate1);
expectedStatusUpdate.mutable_latest_status()->CopyFrom(
statusUpdate1.status());
// Verify that the status update is forwarded.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate1);
EXPECT_FALSE(forwardedStatusUpdate2.isReady());
// Send another status update.
const id::UUID statusUuid2 = id::UUID::random();
UpdateOperationStatusMessage statusUpdate2 =
createUpdateOperationStatusMessage(
statusUuid2, operationUuid, OperationState::OPERATION_PENDING);
AWAIT_ASSERT_READY(statusUpdateManager->update(statusUpdate2, true));
// Advance the clock to trigger a retry of the first update.
Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
Clock::settle();
// Now that another status update was sent, the status update manager should
// fill in the `latest_status` field with this new status update.
expectedStatusUpdate.mutable_latest_status()->CopyFrom(
statusUpdate2.status());
// Verify that the status update is forwarded again.
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate2);
EXPECT_FALSE(forwardedStatusUpdate3.isReady());
// Acknowledge the first update, it is NOT a terminal update, so
// `acknowledgement` should return `true`. The status update manager
// should now send the second status update.
AWAIT_EXPECT_TRUE(
statusUpdateManager->acknowledgement(operationUuid, statusUuid1));
// The status update manager should then forward the latest status update.
expectedStatusUpdate = statusUpdate2;
expectedStatusUpdate.mutable_latest_status()->CopyFrom(
statusUpdate2.status());
AWAIT_EXPECT_EQ(expectedStatusUpdate, forwardedStatusUpdate3);
}
} // namespace tests {
} // namespace internal {
} // namespace mesos {