blob: b8d63abcbe8c5d641f42fddca3a62068bbcb8cee [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <glog/logging.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <fstream>
#include <iostream>
#include <string>
#include <thread>
#include <mesos/executor.hpp>
#include <stout/bytes.hpp>
#include <stout/duration.hpp>
#include <stout/error.hpp>
#include <stout/os.hpp>
#include <stout/os/pagesize.hpp>
using namespace mesos;
// Helper function to check whether the system uses swap.
//
// TODO(bbannier): Augment this implementation to explicitly check for swap
// on platforms other than Linux as well. It might make sense to move this
// function to stout once the implementation becomes more complete.
Try<bool> hasSwap() {
// We read `/proc/swaps` and check whether any active swap
// partitions are listed. The format of that file is e.g.,
//
// Filename Type Size Used Priority
// /dev/sda4 partition 1234567890 348 -1
// /dev/sdb4 partition 1234567890 0 -2
//
// If we find more than one line (including the header) the
// system likely uses swap.
int numberOfLines = 0;
std::ifstream swaps("/proc/swaps");
std::string line;
while (std::getline(swaps, line)) {
++numberOfLines;
}
// Check for read errors. This provides a default implementation
// for platforms without proc filesystem as well.
if (!swaps.eof()) {
return Error("Could not determine whether this system uses swap");
}
return numberOfLines > 1;
}
// The amount of memory in MB each balloon step consumes.
const static size_t BALLOON_STEP_MB = 64;
// This function will increase the memory footprint gradually.
// `TaskInfo.data` specifies the upper limit (in MB) of the memory
// footprint. The upper limit can be larger than the amount of memory
// allocated to this executor. In that case, the isolator (e.g. cgroups)
// may detect that and destroy the container before the executor can
// sent a TASK_FINISHED update.
void run(ExecutorDriver* driver, const TaskInfo& task)
{
TaskStatus status;
status.mutable_task_id()->MergeFrom(task.task_id());
status.set_state(TASK_RUNNING);
driver->sendStatusUpdate(status);
// Get the balloon limit (in MB).
Try<Bytes> _limit = Bytes::parse(task.data());
CHECK(_limit.isSome());
const size_t limit = _limit->bytes() / Bytes::MEGABYTES;
// On systems with swap partitions we explicitly prevent memory used by the
// executor from being swapped out with `mlock`. Since the amount of memory a
// process can lock is controlled by an rlimit, we only `mlock` when
// strictly necessary to prevent complicating the test setup.
Try<bool> hasSwap_ = hasSwap();
const bool lockMemory = hasSwap_.isError() || hasSwap_.get();
if (lockMemory) {
LOG(INFO)
<< "System might use swap partitions, will explicitly"
<< " lock memory to prevent swapping";
}
const size_t chunk = BALLOON_STEP_MB * 1024 * 1024;
for (size_t i = 0; i < limit / BALLOON_STEP_MB; i++) {
LOG(INFO)
<< "Increasing memory footprint by " << BALLOON_STEP_MB << " MB";
// Allocate page-aligned virtual memory.
void* buffer = nullptr;
if (posix_memalign(&buffer, os::pagesize(), chunk) != 0) {
LOG(FATAL) << ErrnoError(
"Failed to allocate page-aligned memory, posix_memalign").message;
}
// We use `memset` and possibly `mlock` here to make sure that the
// memory actually gets paged in and thus accounted for.
if (memset(buffer, 1, chunk) != buffer) {
LOG(FATAL) << ErrnoError("Failed to fill memory, memset").message;
}
if (lockMemory) {
if (mlock(buffer, chunk) != 0) {
LOG(FATAL) << ErrnoError("Failed to lock memory, mlock").message;
}
}
// Try not to increase the memory footprint too fast.
os::sleep(Seconds(1));
}
LOG(INFO) << "Finishing task " << task.task_id().value();
status.set_state(TASK_FINISHED);
driver->sendStatusUpdate(status);
// This is a hack to ensure the message is sent to the
// slave before we exit the process. Without this, we
// may exit before libprocess has sent the data over
// the socket. See MESOS-4111.
os::sleep(Seconds(1));
driver->stop();
}
class BalloonExecutor : public Executor
{
public:
BalloonExecutor()
: launched(false) {}
~BalloonExecutor() override {}
void registered(ExecutorDriver* driver,
const ExecutorInfo& executorInfo,
const FrameworkInfo& frameworkInfo,
const SlaveInfo& slaveInfo) override
{
LOG(INFO) << "Registered";
}
void reregistered(ExecutorDriver* driver,
const SlaveInfo& slaveInfo) override
{
LOG(INFO) << "Reregistered";
}
void disconnected(ExecutorDriver* driver) override
{
LOG(INFO) << "Disconnected";
}
void launchTask(ExecutorDriver* driver, const TaskInfo& task) override
{
if (launched) {
TaskStatus status;
status.mutable_task_id()->MergeFrom(task.task_id());
status.set_state(TASK_FAILED);
status.set_message(
"Attempted to run multiple balloon tasks with the same executor");
driver->sendStatusUpdate(status);
return;
}
LOG(INFO) << "Starting task " << task.task_id().value();
// NOTE: The executor driver calls `launchTask` synchronously, which
// means that calls such as `driver->sendStatusUpdate()` will not execute
// until `launchTask` returns.
std::thread thread([=]() {
run(driver, task);
});
thread.detach();
launched = true;
}
void killTask(ExecutorDriver* driver, const TaskID& taskId) override
{
LOG(INFO) << "Kill task " << taskId.value();
}
void frameworkMessage(
ExecutorDriver* driver, const std::string& data) override
{
LOG(INFO) << "Framework message: " << data;
}
void shutdown(ExecutorDriver* driver) override
{
LOG(INFO) << "Shutdown";
}
void error(ExecutorDriver* driver, const std::string& message) override
{
LOG(INFO) << "Error message: " << message;
}
private:
bool launched;
};
int main(int argc, char** argv)
{
BalloonExecutor executor;
MesosExecutorDriver driver(&executor);
return driver.run() == DRIVER_STOPPED ? 0 : 1;
}