blob: ad1d861aa6b5396f66e703326802926c7a99773c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <assert.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <iostream>
#include <string>
#include <mesos/executor.hpp>
#include <stout/duration.hpp>
#include <stout/numify.hpp>
#include <stout/os.hpp>
using namespace mesos;
// The amount of memory in MB each balloon step consumes.
const static size_t BALLOON_STEP_MB = 64;
// This function will increase the memory footprint gradually. The parameter
// limit specifies the upper limit (in MB) of the memory footprint. The
// parameter step specifies the step size (in MB).
static void balloon(size_t limit)
{
size_t chunk = BALLOON_STEP_MB * 1024 * 1024;
for (size_t i = 0; i < limit / BALLOON_STEP_MB; i++) {
std::cout << "Increasing memory footprint by "
<< BALLOON_STEP_MB << " MB" << std::endl;
// Allocate page-aligned virtual memory.
void* buffer = NULL;
if (posix_memalign(&buffer, getpagesize(), chunk) != 0) {
perror("Failed to allocate page-aligned memory, posix_memalign");
abort();
}
// We use mlock and memset here to make sure that the memory
// actually gets paged in and thus accounted for.
if (mlock(buffer, chunk) != 0) {
perror("Failed to lock memory, mlock");
abort();
}
if (memset(buffer, 1, chunk) != buffer) {
perror("Failed to fill memory, memset");
abort();
}
// Try not to increase the memory footprint too fast.
os::sleep(Seconds(1));
}
}
class BalloonExecutor : public Executor
{
public:
virtual ~BalloonExecutor() {}
virtual void registered(ExecutorDriver* driver,
const ExecutorInfo& executorInfo,
const FrameworkInfo& frameworkInfo,
const SlaveInfo& slaveInfo)
{
std::cout << "Registered" << std::endl;
}
virtual void reregistered(ExecutorDriver* driver,
const SlaveInfo& slaveInfo)
{
std::cout << "Reregistered" << std::endl;
}
virtual void disconnected(ExecutorDriver* driver)
{
std::cout << "Disconnected" << std::endl;
}
virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
{
std::cout << "Starting task " << task.task_id().value() << std::endl;
TaskStatus status;
status.mutable_task_id()->MergeFrom(task.task_id());
status.set_state(TASK_RUNNING);
driver->sendStatusUpdate(status);
// Get the balloon limit (in MB).
Try<size_t> limit = numify<size_t>(task.data());
assert(limit.isSome());
size_t balloonLimit = limit.get();
// Artificially increase the memory usage gradually. The
// balloonLimit specifies the upper limit. The balloonLimit can be
// larger than the amount of memory allocated to this executor. In
// that case, the isolator (e.g. cgroups) should be able to detect
// that and the task should not be able to reach TASK_FINISHED
// state.
balloon(balloonLimit);
std::cout << "Finishing task " << task.task_id().value() << std::endl;
status.mutable_task_id()->MergeFrom(task.task_id());
status.set_state(TASK_FINISHED);
driver->sendStatusUpdate(status);
}
virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
{
std::cout << "Kill task " << taskId.value() << std::endl;
}
virtual void frameworkMessage(ExecutorDriver* driver, const std::string& data)
{
std::cout << "Framework message: " << data << std::endl;
}
virtual void shutdown(ExecutorDriver* driver)
{
std::cout << "Shutdown" << std::endl;
}
virtual void error(ExecutorDriver* driver, const std::string& message)
{
std::cout << "Error message: " << message << std::endl;
}
};
int main(int argc, char** argv)
{
BalloonExecutor executor;
MesosExecutorDriver driver(&executor);
return driver.run() == DRIVER_STOPPED ? 0 : 1;
}