blob: f0d3e8a69567ef89fed2c6745b88810836be2a18 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "status_update_manager/operation.hpp"
#include <list>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <stout/hashmap.hpp>
#include <stout/uuid.hpp>
using lambda::function;
using process::Future;
using process::Owned;
using process::wait; // Necessary on some OS's to disambiguate.
namespace mesos {
namespace internal {
OperationStatusUpdateManager::OperationStatusUpdateManager()
: process(
new StatusUpdateManagerProcess<
id::UUID,
UpdateOperationStatusRecord,
UpdateOperationStatusMessage>(
"operation-status-update-manager",
"operation status update"))
{
spawn(process.get());
}
OperationStatusUpdateManager::~OperationStatusUpdateManager()
{
terminate(process.get());
wait(process.get());
}
void OperationStatusUpdateManager::initialize(
const function<void(const UpdateOperationStatusMessage&)>& forward,
const function<const std::string(const id::UUID&)>& getPath)
{
dispatch(
process.get(),
&StatusUpdateManagerProcess<
id::UUID,
UpdateOperationStatusRecord,
UpdateOperationStatusMessage>::initialize,
forward,
getPath);
}
Future<Nothing> OperationStatusUpdateManager::update(
const UpdateOperationStatusMessage& update,
bool checkpoint)
{
Try<id::UUID> operationUuid =
id::UUID::fromBytes(update.operation_uuid().value());
CHECK_SOME(operationUuid);
return dispatch(
process.get(),
&StatusUpdateManagerProcess<
id::UUID,
UpdateOperationStatusRecord,
UpdateOperationStatusMessage>::update,
update,
operationUuid.get(),
checkpoint);
}
Future<bool> OperationStatusUpdateManager::acknowledgement(
const id::UUID& operationUuid,
const id::UUID& statusUuid)
{
return dispatch(
process.get(),
&StatusUpdateManagerProcess<
id::UUID,
UpdateOperationStatusRecord,
UpdateOperationStatusMessage>::acknowledgement,
operationUuid,
statusUuid);
}
process::Future<OperationStatusUpdateManagerState>
OperationStatusUpdateManager::recover(
const std::list<id::UUID>& operationUuids,
bool strict)
{
return dispatch(
process.get(),
&StatusUpdateManagerProcess<
id::UUID,
UpdateOperationStatusRecord,
UpdateOperationStatusMessage>::recover,
operationUuids,
strict);
}
void OperationStatusUpdateManager::cleanup(const FrameworkID& frameworkId)
{
dispatch(
process.get(),
&StatusUpdateManagerProcess<
id::UUID,
UpdateOperationStatusRecord,
UpdateOperationStatusMessage>::cleanup,
frameworkId);
}
void OperationStatusUpdateManager::pause()
{
dispatch(
process.get(),
&StatusUpdateManagerProcess<
id::UUID,
UpdateOperationStatusRecord,
UpdateOperationStatusMessage>::pause);
}
void OperationStatusUpdateManager::resume()
{
dispatch(
process.get(),
&StatusUpdateManagerProcess<
id::UUID,
UpdateOperationStatusRecord,
UpdateOperationStatusMessage>::resume);
}
} // namespace internal {
} // namespace mesos {