blob: d47c19b65843252d51e9345c297d1a340ab2318e [file] [log] [blame]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
#include <errno.h>
#include <time.h>
#ifndef __WINDOWS__
#include <arpa/inet.h>
#endif // __WINDOWS__
#include <gmock/gmock.h>
#ifndef __WINDOWS__
#include <netinet/in.h>
#include <netinet/tcp.h>
#endif // __WINDOWS__
#include <atomic>
#include <sstream>
#include <string>
#include <vector>
#include <process/async.hpp>
#include <process/clock.hpp>
#include <process/count_down_latch.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
#include <process/executor.hpp>
#include <process/filter.hpp>
#include <process/future.hpp>
#include <process/gmock.hpp>
#include <process/gtest.hpp>
#include <process/network.hpp>
#include <process/owned.hpp>
#include <process/process.hpp>
#include <process/reap.hpp>
#include <process/run.hpp>
#include <process/socket.hpp>
#include <process/subprocess.hpp>
#include <process/time.hpp>
#include <stout/duration.hpp>
#include <stout/gtest.hpp>
#include <stout/hashmap.hpp>
#include <stout/hashset.hpp>
#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include <stout/os.hpp>
#include <stout/stopwatch.hpp>
#include <stout/stringify.hpp>
#include <stout/try.hpp>
#include <stout/os/killtree.hpp>
#include <stout/os/write.hpp>
#include <stout/tests/utils.hpp>
#include "encoder.hpp"
namespace http = process::http;
namespace inject = process::inject;
namespace inet4 = process::network::inet4;
using process::async;
using process::Clock;
using process::CountDownLatch;
using process::defer;
using process::Deferred;
using process::Event;
using process::Executor;
using process::DispatchEvent;
using process::ExitedEvent;
using process::TerminateEvent;
using process::Failure;
using process::Future;
using process::Message;
using process::MessageEncoder;
using process::MessageEvent;
using process::Owned;
using process::PID;
using process::Process;
using process::ProcessBase;
using process::Promise;
using process::run;
using process::Subprocess;
using process::TerminateEvent;
using process::Time;
using process::UPID;
using process::firewall::DisabledEndpointsFirewallRule;
using process::firewall::FirewallRule;
using process::network::inet::Address;
using process::network::inet::Socket;
using std::move;
using std::string;
using std::vector;
using testing::_;
using testing::Assign;
using testing::DoAll;
using testing::InvokeWithoutArgs;
using testing::Return;
using testing::ReturnArg;
namespace process {
// Used by TimerAfterReinitialize.
void reinitialize(
const Option<string>& delegate,
const Option<string>& readonlyAuthenticationRealm,
const Option<string>& readwriteAuthenticationRealm);
}
// TODO(bmahler): Move tests into their own files as appropriate.
class ProcessTest : public TemporaryDirectoryTest {};
TEST_F(ProcessTest, Event)
{
Owned<Event> event(new TerminateEvent(UPID(), false));
EXPECT_FALSE(event->is<MessageEvent>());
EXPECT_FALSE(event->is<ExitedEvent>());
EXPECT_TRUE(event->is<TerminateEvent>());
}
class SpawnProcess : public Process<SpawnProcess>
{
public:
MOCK_METHOD0(initialize, void());
MOCK_METHOD0(finalize, void());
};
TEST_F(ProcessTest, Spawn)
{
SpawnProcess process;
EXPECT_CALL(process, initialize());
EXPECT_CALL(process, finalize());
PID<SpawnProcess> pid = spawn(process);
ASSERT_FALSE(!pid);
ASSERT_FALSE(wait(pid, Seconds(0)));
terminate(pid);
wait(pid);
}
struct MoveOnly
{
MoveOnly() {}
MoveOnly(const MoveOnly&) = delete;
MoveOnly(MoveOnly&&) = default;
MoveOnly& operator=(const MoveOnly&) = delete;
MoveOnly& operator=(MoveOnly&&) = default;
};
class DispatchProcess : public Process<DispatchProcess>
{
public:
MOCK_METHOD0(func0, void());
MOCK_METHOD1(func1, bool(bool));
MOCK_METHOD1(func2, Future<bool>(bool));
MOCK_METHOD1(func3, int(int));
MOCK_METHOD2(func4, Future<bool>(bool, int));
void func5(MoveOnly&& mo) { func5_(mo); }
MOCK_METHOD1(func5_, void(const MoveOnly&));
bool func6(MoveOnly&& m1, MoveOnly&& m2, bool b) { return func6_(m1, m2, b); }
MOCK_METHOD3(func6_, bool(const MoveOnly&, const MoveOnly&, bool));
};
TEST_F(ProcessTest, Dispatch)
{
DispatchProcess process;
EXPECT_CALL(process, func0());
EXPECT_CALL(process, func1(_))
.WillOnce(ReturnArg<0>());
EXPECT_CALL(process, func2(_))
.WillOnce(ReturnArg<0>());
EXPECT_CALL(process, func5_(_));
PID<DispatchProcess> pid = spawn(&process);
ASSERT_FALSE(!pid);
dispatch(pid, &DispatchProcess::func0);
dispatch(pid, &DispatchProcess::func5, MoveOnly());
Future<bool> future;
future = dispatch(pid, &DispatchProcess::func1, true);
EXPECT_TRUE(future.get());
future = dispatch(pid, &DispatchProcess::func2, true);
EXPECT_TRUE(future.get());
terminate(pid);
wait(pid);
}
TEST_F(ProcessTest, Defer1)
{
DispatchProcess process;
EXPECT_CALL(process, func0());
EXPECT_CALL(process, func1(_))
.WillOnce(ReturnArg<0>());
EXPECT_CALL(process, func2(_))
.WillOnce(ReturnArg<0>());
EXPECT_CALL(process, func4(_, _))
.WillRepeatedly(ReturnArg<0>());
EXPECT_CALL(process, func5_(_));
EXPECT_CALL(process, func6_(_, _, _))
.WillRepeatedly(ReturnArg<2>());
PID<DispatchProcess> pid = spawn(&process);
ASSERT_FALSE(!pid);
{
Deferred<void()> func0 =
defer(pid, &DispatchProcess::func0);
func0();
}
Future<bool> future;
{
Deferred<Future<bool>()> func1 =
defer(pid, &DispatchProcess::func1, true);
future = func1();
EXPECT_TRUE(future.get());
}
{
Deferred<Future<bool>()> func2 =
defer(pid, &DispatchProcess::func2, true);
future = func2();
EXPECT_TRUE(future.get());
}
{
Deferred<Future<bool>()> func4 =
defer(pid, &DispatchProcess::func4, true, 42);
future = func4();
EXPECT_TRUE(future.get());
}
{
Deferred<Future<bool>(bool)> func4 =
defer(pid, &DispatchProcess::func4, lambda::_1, 42);
future = func4(false);
EXPECT_FALSE(future.get());
}
{
Deferred<Future<bool>(int)> func4 =
defer(pid, &DispatchProcess::func4, true, lambda::_1);
future = func4(42);
EXPECT_TRUE(future.get());
}
{
lambda::CallableOnce<void()> func5 =
defer(pid, &DispatchProcess::func5, MoveOnly());
std::move(func5)();
}
{
lambda::CallableOnce<Future<bool>(MoveOnly&&)> func6 =
defer(pid, &DispatchProcess::func6, MoveOnly(), lambda::_1, true);
future = std::move(func6)(MoveOnly());
EXPECT_TRUE(future.get());
}
{
lambda::CallableOnce<Future<bool>(MoveOnly&&)> func6 =
defer(pid, &DispatchProcess::func6, MoveOnly(), lambda::_1, false);
future = std::move(func6)(MoveOnly());
EXPECT_FALSE(future.get());
}
// Only take const &!
terminate(pid);
wait(pid);
}
class DeferProcess : public Process<DeferProcess>
{
public:
Future<string> func1(const Future<int>& f)
{
return f.then(defer(self(), &Self::_func1, lambda::_1));
}
Future<string> func2(const Future<int>& f)
{
return f.then(defer(self(), &Self::_func2));
}
private:
Future<string> _func1(int i)
{
return stringify(i);
}
Future<string> _func2()
{
return string("42");
}
};
TEST_F(ProcessTest, Defer2)
{
DeferProcess process;
PID<DeferProcess> pid = spawn(process);
Future<string> f = dispatch(pid, &DeferProcess::func1, 41);
f.await();
ASSERT_TRUE(f.isReady());
EXPECT_EQ("41", f.get());
f = dispatch(pid, &DeferProcess::func2, 41);
f.await();
ASSERT_TRUE(f.isReady());
EXPECT_EQ("42", f.get());
terminate(pid);
wait(pid);
}
template <typename T>
void set(T* t1, const T& t2)
{
*t1 = t2;
}
TEST_F(ProcessTest, Defer3)
{
std::atomic_bool bool1(false);
std::atomic_bool bool2(false);
Deferred<void(bool)> set1 =
defer([&bool1](bool b) { bool1.store(b); });
set1(true);
Deferred<void(bool)> set2 =
defer([&bool2](bool b) { bool2.store(b); });
set2(true);
while (bool1.load() == false);
while (bool2.load() == false);
}
class HandlersProcess : public Process<HandlersProcess>
{
public:
HandlersProcess()
{
install("func", &HandlersProcess::func);
}
MOCK_METHOD2(func, void(const UPID&, const string&));
};
TEST_F(ProcessTest, Handlers)
{
HandlersProcess process;
Future<Nothing> func;
EXPECT_CALL(process, func(_, _))
.WillOnce(FutureSatisfy(&func));
PID<HandlersProcess> pid = spawn(&process);
ASSERT_FALSE(!pid);
post(pid, "func");
AWAIT_READY(func);
terminate(pid, false);
wait(pid);
}
// Tests DROP_MESSAGE and DROP_DISPATCH and in particular that an
// event can get dropped before being processed.
TEST_F(ProcessTest, Expect)
{
HandlersProcess process;
EXPECT_CALL(process, func(_, _))
.Times(0);
PID<HandlersProcess> pid = spawn(&process);
ASSERT_FALSE(!pid);
Future<Message> message = DROP_MESSAGE("func", _, _);
post(pid, "func");
AWAIT_EXPECT_READY(message);
Future<Nothing> func = DROP_DISPATCH(pid, &HandlersProcess::func);
dispatch(pid, &HandlersProcess::func, pid, "");
AWAIT_EXPECT_READY(func);
terminate(pid, false);
wait(pid);
}
// Tests the FutureArg<N> action.
TEST_F(ProcessTest, Action)
{
HandlersProcess process;
PID<HandlersProcess> pid = spawn(&process);
ASSERT_FALSE(!pid);
Future<string> future1;
Future<Nothing> future2;
EXPECT_CALL(process, func(_, _))
.WillOnce(FutureArg<1>(&future1))
.WillOnce(FutureSatisfy(&future2));
dispatch(pid, &HandlersProcess::func, pid, "hello world");
AWAIT_EXPECT_EQ("hello world", future1);
EXPECT_TRUE(future2.isPending());
dispatch(pid, &HandlersProcess::func, pid, "hello world");
AWAIT_EXPECT_READY(future2);
terminate(pid, false);
wait(pid);
}
class BaseProcess : public Process<BaseProcess>
{
public:
virtual void func() = 0;
MOCK_METHOD0(foo, void());
};
class DerivedProcess : public BaseProcess
{
public:
DerivedProcess() {}
MOCK_METHOD0(func, void());
};
TEST_F(ProcessTest, Inheritance)
{
DerivedProcess process;
EXPECT_CALL(process, func())
.Times(2);
EXPECT_CALL(process, foo());
PID<DerivedProcess> pid1 = spawn(&process);
ASSERT_FALSE(!pid1);
dispatch(pid1, &DerivedProcess::func);
PID<BaseProcess> pid2(process);
PID<BaseProcess> pid3 = pid1;
ASSERT_EQ(pid2, pid3);
dispatch(pid3, &BaseProcess::func);
dispatch(pid3, &BaseProcess::foo);
terminate(pid1, false);
wait(pid1);
}
TEST_F(ProcessTest, Thunk)
{
struct Thunk
{
static int run(int i)
{
return i;
}
static int run(int i, int j)
{
return run(i + j);
}
};
int result = run(&Thunk::run, 21, 21).get();
EXPECT_EQ(42, result);
}
class DelegatorProcess : public Process<DelegatorProcess>
{
public:
explicit DelegatorProcess(const UPID& delegatee)
{
delegate("func", delegatee);
}
};
class DelegateeProcess : public Process<DelegateeProcess>
{
public:
DelegateeProcess()
{
install("func", &DelegateeProcess::func);
}
MOCK_METHOD2(func, void(const UPID&, const string&));
};
TEST_F(ProcessTest, Delegate)
{
DelegateeProcess delegatee;
DelegatorProcess delegator(delegatee.self());
Future<Nothing> func;
EXPECT_CALL(delegatee, func(_, _))
.WillOnce(FutureSatisfy(&func));
spawn(&delegator);
spawn(&delegatee);
post(delegator.self(), "func");
AWAIT_READY(func);
terminate(delegator, false);
wait(delegator);
terminate(delegatee, false);
wait(delegatee);
}
class TimeoutProcess : public Process<TimeoutProcess>
{
public:
MOCK_METHOD0(timeout, void());
};
TEST_F(ProcessTest, Delay)
{
Clock::pause();
std::atomic_bool timeoutCalled(false);
TimeoutProcess process;
EXPECT_CALL(process, timeout())
.WillOnce(Assign(&timeoutCalled, true));
spawn(process);
delay(Seconds(5), process.self(), &TimeoutProcess::timeout);
Clock::advance(Seconds(5));
while (timeoutCalled.load() == false);
terminate(process);
wait(process);
Clock::resume();
}
class OrderProcess : public Process<OrderProcess>
{
public:
void order(const PID<TimeoutProcess>& pid)
{
// TODO(benh): Add a test which uses 'send' instead of dispatch.
dispatch(pid, &TimeoutProcess::timeout);
}
};
TEST_F(ProcessTest, Order)
{
Clock::pause();
TimeoutProcess process1;
std::atomic_bool timeoutCalled(false);
EXPECT_CALL(process1, timeout())
.WillOnce(Assign(&timeoutCalled, true));
spawn(process1);
Time now = Clock::now(&process1);
Seconds seconds(1);
Clock::advance(Seconds(1));
EXPECT_EQ(now, Clock::now(&process1));
OrderProcess process2;
spawn(process2);
dispatch(process2, &OrderProcess::order, process1.self());
while (timeoutCalled.load() == false);
EXPECT_EQ(now + seconds, Clock::now(&process1));
terminate(process1);
wait(process1);
terminate(process2);
wait(process2);
Clock::resume();
}
class DonateProcess : public Process<DonateProcess>
{
public:
void donate()
{
DonateProcess process;
spawn(process);
terminate(process);
wait(process);
}
};
TEST_F(ProcessTest, Donate)
{
DonateProcess process;
spawn(process);
dispatch(process, &DonateProcess::donate);
terminate(process, false);
wait(process);
}
// TODO(bmahler): Use an RAII-wrapper here to prevent crashes
// during test assertion failures.
class ExitedProcess : public Process<ExitedProcess>
{
public:
explicit ExitedProcess(const UPID& _pid) : pid(_pid) {}
void initialize() override
{
link(pid);
}
MOCK_METHOD1(exited, void(const UPID&));
private:
const UPID pid;
};
TEST_F(ProcessTest, Exited)
{
UPID pid = spawn(new ProcessBase(), true);
ExitedProcess process(pid);
Future<UPID> exitedPid;
EXPECT_CALL(process, exited(pid))
.WillOnce(FutureArg<0>(&exitedPid));
spawn(process);
terminate(pid);
AWAIT_ASSERT_EQ(pid, exitedPid);
terminate(process);
wait(process);
}
TEST_F(ProcessTest, InjectExited)
{
UPID pid = spawn(new ProcessBase(), true);
ExitedProcess process(pid);
Future<UPID> exitedPid;
EXPECT_CALL(process, exited(pid))
.WillOnce(FutureArg<0>(&exitedPid));
spawn(process);
inject::exited(pid, process.self());
AWAIT_ASSERT_EQ(pid, exitedPid);
terminate(process);
wait(process);
}
// TODO(bmahler): Move all message interception / dropping tests
// (of the gmock.hpp functionality) into a separate file.
TEST_F(ProcessTest, FutureExited)
{
UPID linkee = spawn(new ProcessBase(), true);
ExitedProcess linker(linkee);
Future<Nothing> exited = FUTURE_EXITED(linkee, linker.self());
Future<UPID> exitedPid;
EXPECT_CALL(linker, exited(linkee))
.WillOnce(FutureArg<0>(&exitedPid));
spawn(linker);
terminate(linkee);
AWAIT_READY(exited);
AWAIT_ASSERT_EQ(linkee, exitedPid);
terminate(linker);
wait(linker);
}
// TODO(bmahler): Move all message interception / dropping tests
// (of the gmock.hpp functionality) into a separate file.
TEST_F(ProcessTest, DropExited)
{
UPID linkee = spawn(new ProcessBase(), true);
ExitedProcess linker(linkee);
Future<Nothing> exited = DROP_EXITED(linkee, linker.self());
Future<UPID> exitedPid;
EXPECT_CALL(linker, exited(linkee))
.WillRepeatedly(FutureArg<0>(&exitedPid));
spawn(linker);
terminate(linkee);
AWAIT_READY(exited);
EXPECT_TRUE(exitedPid.isPending());
terminate(linker);
wait(linker);
}
class MessageEventProcess : public Process<MessageEventProcess>
{
public:
// This is a workaround for mocking methods taking
// rvalue reference parameters.
// See https://github.com/google/googletest/issues/395
void consume(MessageEvent&& event) override { consume_(event.message); }
MOCK_METHOD1(consume_, void(const Message&));
};
class ProcessRemoteLinkTest : public ::testing::Test
{
protected:
void SetUp() override
{
// Spawn a process to coordinate with the subprocess (test-linkee).
// The `test-linkee` will send us a message when it has finished
// initializing and is itself ready to receive messages.
MessageEventProcess coordinator;
spawn(coordinator);
Future<Message> message;
EXPECT_CALL(coordinator, consume_(_))
.WillOnce(FutureArg<0>(&message));
// TODO(andschwa): Clean this up so that `BUILD_DIR` has the correct
// separator at compilation time.
#ifdef __WINDOWS__
const std::string buildDir = strings::replace(BUILD_DIR, "/", "\\");
#else
const std::string buildDir = BUILD_DIR;
#endif // __WINDOWS__
#ifdef __WINDOWS__
constexpr char LINKEENAME[] = "test-linkee.exe";
#else
constexpr char LINKEENAME[] = "test-linkee";
#endif // __WINDOWS__
const std::string linkeePath = path::join(buildDir, LINKEENAME);
ASSERT_TRUE(os::exists(linkeePath));
// NOTE: Because of the differences between Windows and POSIX
// shells when interpreting quotes, we use the second form of
// `subprocess` to call `test-linkee` directly with a set of
// arguments, rather than through the shell.
Try<Subprocess> s = process::subprocess(
linkeePath, {linkeePath, stringify(coordinator.self())});
ASSERT_SOME(s);
linkee = s.get();
// Wait until the subprocess sends us a message.
AWAIT_ASSERT_READY(message);
// Save the PID of the linkee.
pid = message->from;
terminate(coordinator);
wait(coordinator);
}
// Helper method to quickly reap the `linkee`.
// Subprocesses are reaped (via a non-blocking `waitpid` call) on
// a regular interval. We can speed up the internal reaper by
// advancing the clock.
void reap_linkee()
{
if (linkee.isSome()) {
bool paused = Clock::paused();
Clock::pause();
while (linkee->status().isPending()) {
Clock::advance(process::MAX_REAP_INTERVAL());
Clock::settle();
}
if (!paused) {
Clock::resume();
}
}
}
void TearDown() override
{
if (linkee.isSome()) {
os::kill(linkee->pid(), SIGKILL);
reap_linkee();
linkee = None();
}
}
public:
Option<Subprocess> linkee;
UPID pid;
};
// Verifies that linking to a remote process will correctly detect
// the associated `ExitedEvent`.
TEST_F(ProcessRemoteLinkTest, RemoteLink)
{
// Link to the remote subprocess.
ExitedProcess process(pid);
Future<UPID> exitedPid;
EXPECT_CALL(process, exited(pid))
.WillOnce(FutureArg<0>(&exitedPid));
spawn(process);
os::kill(linkee->pid(), SIGKILL);
reap_linkee();
linkee = None();
AWAIT_ASSERT_EQ(pid, exitedPid);
terminate(process);
wait(process);
}
class RemoteLinkTestProcess : public Process<RemoteLinkTestProcess>
{
public:
explicit RemoteLinkTestProcess(const UPID& pid) : pid(pid) {}
void linkup()
{
link(pid);
}
void relink()
{
link(pid, RemoteConnection::RECONNECT);
}
void ping_linkee()
{
send(pid, "whatever");
}
MOCK_METHOD1(exited, void(const UPID&));
private:
const UPID pid;
};
// Verifies that calling `link` with "relink" semantics will have the
// same behavior as `link` with "normal" semantics, when there is no
// existing persistent connection.
TEST_F(ProcessRemoteLinkTest, RemoteRelink)
{
RemoteLinkTestProcess process(pid);
Future<UPID> exitedPid;
EXPECT_CALL(process, exited(pid))
.WillOnce(FutureArg<0>(&exitedPid));
spawn(process);
process.relink();
os::kill(linkee->pid(), SIGKILL);
reap_linkee();
linkee = None();
AWAIT_ASSERT_EQ(pid, exitedPid);
terminate(process);
wait(process);
}
// Verifies that linking and relinking a process will retain monitoring
// on the linkee.
TEST_F(ProcessRemoteLinkTest, RemoteLinkRelink)
{
RemoteLinkTestProcess process(pid);
Future<UPID> exitedPid;
EXPECT_CALL(process, exited(pid))
.WillOnce(FutureArg<0>(&exitedPid));
spawn(process);
process.linkup();
process.relink();
os::kill(linkee->pid(), SIGKILL);
reap_linkee();
linkee = None();
AWAIT_ASSERT_EQ(pid, exitedPid);
terminate(process);
wait(process);
}
// Verifies that relinking a remote process will not affect the
// monitoring of the process by other linkers.
TEST_F(ProcessRemoteLinkTest, RemoteDoubleLinkRelink)
{
ExitedProcess linker(pid);
RemoteLinkTestProcess relinker(pid);
Future<UPID> linkerExitedPid;
Future<UPID> relinkerExitedPid;
EXPECT_CALL(linker, exited(pid))
.WillOnce(FutureArg<0>(&linkerExitedPid));
EXPECT_CALL(relinker, exited(pid))
.WillOnce(FutureArg<0>(&relinkerExitedPid));
spawn(linker);
spawn(relinker);
relinker.linkup();
relinker.relink();
os::kill(linkee->pid(), SIGKILL);
reap_linkee();
linkee = None();
AWAIT_ASSERT_EQ(pid, linkerExitedPid);
AWAIT_ASSERT_EQ(pid, relinkerExitedPid);
terminate(linker);
wait(linker);
terminate(relinker);
wait(relinker);
}
// Verifies that remote links will trigger an `ExitedEvent` if the link
// fails during socket creation. The test instigates a socket creation
// failure by hogging all available file descriptors.
//
// TODO(andschwa): Enable this test. The current logic will not work on Windows
// as " The Microsoft Winsock provider limits the maximum number of sockets
// supported only by available memory on the local computer." See MESOS-9093.
//
// https://docs.microsoft.com/en-us/windows/desktop/WinSock/maximum-number-of-sockets-supported-2 // NOLINT(whitespace/line_length)
TEST_F_TEMP_DISABLED_ON_WINDOWS(ProcessRemoteLinkTest, RemoteLinkLeak)
{
RemoteLinkTestProcess relinker(pid);
Future<UPID> relinkerExitedPid;
EXPECT_CALL(relinker, exited(pid))
.WillOnce(FutureArg<0>(&relinkerExitedPid));
spawn(relinker);
// Open enough sockets to fill up all available FDs.
vector<Socket> fdHogs;
while (true) {
Try<Socket> hog = Socket::create();
if (hog.isError()) {
break;
}
fdHogs.push_back(hog.get());
}
relinker.linkup();
AWAIT_ASSERT_EQ(pid, relinkerExitedPid);
terminate(relinker);
wait(relinker);
}
namespace process {
// Forward declare the `get_persistent_socket` function since we want
// to programatically mess with "link" FDs during tests.
Option<int> get_persistent_socket(const UPID& to);
} // namespace process {
// TODO(hausdorff): Test disabled temporarily because `SHUT_WR` does not exist
// on Windows. See MESOS-5817.
#ifndef __WINDOWS__
// Verifies that sending a message over a socket will fail if the
// link to the target is broken (i.e. closed) outside of the
// `SocketManager`s knowledge.
// Emulates the error behind MESOS-5576. In this case, the socket
// becomes "stale", but libprocess does not receive a TCP RST either.
// A `send` later will trigger a socket error and thereby discover
// the socket's staleness.
TEST_F(ProcessRemoteLinkTest, RemoteUseStaleLink)
{
RemoteLinkTestProcess process(pid);
Future<UPID> exitedPid;
EXPECT_CALL(process, exited(pid))
.WillOnce(FutureArg<0>(&exitedPid));
spawn(process);
process.linkup();
// Dig out the link from the `SocketManager`.
Option<int> linkfd = get_persistent_socket(pid);
ASSERT_SOME(linkfd);
// Disable further writes on this socket without telling the
// `SocketManager`! This will cause a `send` to fail later.
// NOTE: This is done in a loop as the `shutdown` call will fail
// while the socket is connecting.
Duration waited = Duration::zero();
do {
if (::shutdown(linkfd.get(), SHUT_WR) != 0) {
// These errors are expected as we are racing against the code
// responsible for setting up the persistent socket.
ASSERT_TRUE(errno == EINPROGRESS || errno == ENOTCONN)
<< ErrnoError().message;
continue;
}
break;
} while (waited < Seconds(5));
EXPECT_LE(waited, Seconds(5));
ASSERT_TRUE(exitedPid.isPending());
// Now try to send a message over the dead link.
process.ping_linkee();
// The dead link should be detected and trigger an `ExitedEvent`.
AWAIT_ASSERT_EQ(pid, exitedPid);
terminate(process);
wait(process);
}
#endif // __WINDOWS__
// TODO(hausdorff): Test disabled temporarily because `SHUT_WR` does not exist
// on Windows. See MESOS-5817.
#ifndef __WINDOWS__
// Verifies that, in a situation where an existing remote link has become
// "stale", "relinking" prior to sending a message will lead to successful
// message passing. The existing remote link is broken in the same way as
// the test `RemoteUseStaleLink`.
TEST_F(ProcessRemoteLinkTest, RemoteStaleLinkRelink)
{
RemoteLinkTestProcess process(pid);
Future<UPID> exitedPid;
EXPECT_CALL(process, exited(pid))
.WillOnce(FutureArg<0>(&exitedPid));
spawn(process);
process.linkup();
// Dig out the link from the `SocketManager`.
Option<int> linkfd = get_persistent_socket(pid);
ASSERT_SOME(linkfd);
// Disable further writes on this socket without telling the
// `SocketManager`! This would cause a `send` to fail later,
// but this test will "relink" before calling `send`.
// NOTE: This is done in a loop as the `shutdown` call will fail
// while the socket is connecting.
Duration waited = Duration::zero();
do {
if (::shutdown(linkfd.get(), SHUT_WR) != 0) {
// These errors are expected as we are racing against the code
// responsible for setting up the persistent socket.
ASSERT_TRUE(errno == EINPROGRESS || errno == ENOTCONN)
<< ErrnoError().message;
continue;
}
break;
} while (waited < Seconds(5));
EXPECT_LE(waited, Seconds(5));
ASSERT_TRUE(exitedPid.isPending());
// Call `link` again with the "relink" semantics.
process.relink();
// Now try to send a message over the new link.
process.ping_linkee();
// The message should trigger a suicide on the receiving end.
// The linkee should suicide with a successful exit code.
reap_linkee();
AWAIT_ASSERT_READY(linkee->status());
ASSERT_SOME_EQ(EXIT_SUCCESS, linkee->status().get());
// We should also get the associated `ExitedEvent`.
AWAIT_ASSERT_EQ(pid, exitedPid);
terminate(process);
wait(process);
}
#endif // __WINDOWS__
class SettleProcess : public Process<SettleProcess>
{
public:
SettleProcess() : calledDispatch(false) {}
void initialize() override
{
os::sleep(Milliseconds(10));
delay(Seconds(0), self(), &SettleProcess::afterDelay);
}
void afterDelay()
{
dispatch(self(), &SettleProcess::afterDispatch);
os::sleep(Milliseconds(10));
TimeoutProcess timeoutProcess;
spawn(timeoutProcess);
terminate(timeoutProcess);
wait(timeoutProcess);
}
void afterDispatch()
{
os::sleep(Milliseconds(10));
calledDispatch.store(true);
}
std::atomic_bool calledDispatch;
};
TEST_F(ProcessTest, Settle)
{
Clock::pause();
SettleProcess process;
spawn(process);
Clock::settle();
ASSERT_TRUE(process.calledDispatch.load());
terminate(process);
wait(process);
Clock::resume();
}
TEST_F(ProcessTest, Pid)
{
TimeoutProcess process;
PID<TimeoutProcess> pid = process;
}
class Listener1 : public Process<Listener1>
{
public:
virtual void event1() = 0;
};
class Listener2 : public Process<Listener2>
{
public:
virtual void event2() = 0;
};
class MultipleListenerProcess
: public Process<MultipleListenerProcess>,
public Listener1,
public Listener2
{
public:
MOCK_METHOD0(event1, void());
MOCK_METHOD0(event2, void());
};
TEST_F(ProcessTest, Listener)
{
MultipleListenerProcess process;
EXPECT_CALL(process, event1());
EXPECT_CALL(process, event2());
spawn(process);
dispatch(PID<Listener1>(process), &Listener1::event1);
dispatch(PID<Listener2>(process), &Listener2::event2);
terminate(process, false);
wait(process);
}
class EventReceiver
{
public:
MOCK_METHOD1(event1, void(int));
MOCK_METHOD1(event2, void(const string&));
};
TEST_F(ProcessTest, Executor_Defer)
{
EventReceiver receiver;
Executor executor;
CountDownLatch event1Called;
EXPECT_CALL(receiver, event1(42))
.WillOnce(InvokeWithoutArgs([&]() {
event1Called.decrement();
}));
Deferred<void(int)> event1 =
executor.defer([&receiver](int i) {
return receiver.event1(i);
});
event1(42);
AWAIT_READY(event1Called.triggered());
CountDownLatch event2Called;
EXPECT_CALL(receiver, event2("event2"))
.WillOnce(InvokeWithoutArgs([&]() {
event2Called.decrement();
}));
Deferred<void(const string&)> event2 =
executor.defer([&receiver](const string& s) {
return receiver.event2(s);
});
event2("event2");
AWAIT_READY(event2Called.triggered());
}
TEST_F(ProcessTest, Executor_Execute)
{
Executor executor;
// A void immutable lambda.
CountDownLatch f1Result;
auto f1 = [&f1Result] {
f1Result.decrement();
};
// Specify the return type explicitly for type checking. Same below.
Future<Nothing> f1Called = executor.execute(f1);
AWAIT_READY(f1Called);
AWAIT_READY(f1Result.triggered());
// A void mutable bind.
CountDownLatch f2Result;
int f2State = 0;
auto f2 = [&f2Result, f2State](int) mutable -> void {
f2State++;
f2Result.decrement();
};
Future<Nothing> f2Called = executor.execute(std::bind(f2, 42));
AWAIT_READY(f2Called);
AWAIT_READY(f2Result.triggered());
// A non-void immutable lambda.
// NOTE: It appears that g++ throws away the cv-qualifiers when doing
// the lvalue-to-rvalue conversion for the returned string but clang
// does not, so `f3` should return a non-constant string.
string f3Result = "f3";
auto f3 = [&f3Result] {
return f3Result;
};
Future<string> f3Called = executor.execute(f3);
AWAIT_EXPECT_EQ(f3Result, f3Called);
// A mutable bind returning a future.
const string f4Result = "f4";
int f4State = 0;
auto f4 = [&f4Result, f4State](int) mutable -> Future<string> {
f4State++;
return f4Result;
};
Future<string> f4Called = executor.execute(std::bind(f4, 42));
AWAIT_EXPECT_EQ(f4Result, f4Called);
}
class RemoteProcess : public Process<RemoteProcess>
{
public:
RemoteProcess() : ProcessBase(process::ID::generate("remote"))
{
install("handler", &RemoteProcess::handler);
}
MOCK_METHOD2(handler, void(const UPID&, const string&));
};
TEST_F(ProcessTest, Remote)
{
RemoteProcess process;
spawn(process);
Future<UPID> pid;
Future<string> body;
EXPECT_CALL(process, handler(_, _))
.WillOnce(DoAll(FutureArg<0>(&pid),
FutureArg<1>(&body)));
Try<Socket> create = Socket::create();
ASSERT_SOME(create);
Socket socket = create.get();
AWAIT_READY(socket.connect(process.self().address));
Try<Address> sender = socket.address();
ASSERT_SOME(sender);
Message message;
message.name = "handler";
message.from = UPID("sender", sender.get());
message.to = process.self();
message.body = "hello world";
const string data = MessageEncoder::encode(message);
AWAIT_READY(socket.send(data));
AWAIT_READY(body);
ASSERT_EQ("hello world", body.get());
AWAIT_READY(pid);
ASSERT_EQ(message.from, pid.get());
terminate(process);
wait(process);
}
// Like the 'remote' test but uses http::connect.
TEST_F(ProcessTest, Http1)
{
RemoteProcess process;
spawn(process);
http::URL url = http::URL(
"http",
process.self().address.ip,
process.self().address.port,
process.self().id + "/handler");
Future<http::Connection> connect = http::connect(url);
AWAIT_READY(connect);
http::Connection connection = connect.get();
Try<process::network::Address> address = connection.localAddress;
ASSERT_SOME(address);
UPID from("sender", process::network::convert<Address>(address.get()).get());
Future<UPID> pid;
Future<string> body;
EXPECT_CALL(process, handler(_, _))
.WillOnce(DoAll(FutureArg<0>(&pid),
FutureArg<1>(&body)));
http::Request request;
request.method = "POST";
request.url = url;
request.headers["User-Agent"] = "libprocess/" + stringify(from);
request.keepAlive = true;
request.body = "hello world";
Future<http::Response> response = connection.send(request);
AWAIT_READY(body);
ASSERT_EQ("hello world", body.get());
AWAIT_READY(pid);
ASSERT_EQ(from, pid.get());
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Accepted().status, response);
AWAIT_READY(connection.disconnect());
terminate(process);
wait(process);
}
// Like 'http1' but uses the 'Libprocess-From' header. We can
// also use http::post here since we expect a 202 response.
TEST_F(ProcessTest, Http2)
{
RemoteProcess process;
spawn(process);
// Create a receiving socket so we can get messages back.
Try<Socket> create = Socket::create();
ASSERT_SOME(create);
Socket socket = create.get();
ASSERT_SOME(socket.bind(inet4::Address::ANY_ANY()));
// Create a UPID for 'Libprocess-From' based on the IP and port we
// got assigned.
Try<Address> address = socket.address();
ASSERT_SOME(address);
UPID from("", process.self().address.ip, address->port);
ASSERT_SOME(socket.listen(1));
Future<UPID> pid;
Future<string> body;
EXPECT_CALL(process, handler(_, _))
.WillOnce(DoAll(FutureArg<0>(&pid),
FutureArg<1>(&body)));
http::Headers headers;
headers["Libprocess-From"] = stringify(from);
Future<http::Response> response =
http::post(process.self(), "handler", headers, "hello world");
AWAIT_READY(response);
ASSERT_EQ(http::Status::ACCEPTED, response->code);
ASSERT_EQ(http::Status::string(http::Status::ACCEPTED),
response->status);
AWAIT_READY(body);
ASSERT_EQ("hello world", body.get());
AWAIT_READY(pid);
ASSERT_EQ(from, pid.get());
// Now post a message as though it came from the process.
const string name = "reply";
post(process.self(), from, name);
// Accept the incoming connection.
Future<Socket> accept = socket.accept();
AWAIT_READY(accept);
Socket client = accept.get();
const string data = "POST /" + name + " HTTP/1.1";
AWAIT_EXPECT_EQ(data, client.recv(data.size()));
terminate(process);
wait(process);
}
static int foo()
{
return 1;
}
static int foo1(int a)
{
return a;
}
static int foo2(int a, int b)
{
return a + b;
}
static int foo3(int a, int b, int c)
{
return a + b + c;
}
static int foo4(int a, int b, int c, int d)
{
return a + b + c + d;
}
static Future<string> itoa1(int* const& i)
{
std::ostringstream out;
out << *i;
return out.str();
}
static string itoa2(int* const& i)
{
std::ostringstream out;
out << *i;
return out.str();
}
TEST_F(ProcessTest, Async)
{
// Non-void functions with different no.of args.
EXPECT_EQ(1, async(&foo).get());
EXPECT_EQ(10, async(&foo1, 10).get());
EXPECT_EQ(30, async(&foo2, 10, 20).get());
EXPECT_EQ(60, async(&foo3, 10, 20, 30).get());
EXPECT_EQ(100, async(&foo4, 10, 20, 30, 40).get());
// Non-void function with a complex arg.
int i = 42;
EXPECT_EQ("42", async(&itoa2, &i).get());
// Non-void function that returns a future.
EXPECT_EQ("42", async(&itoa1, &i)->get());
}
static int baz(string s) { return 42; }
static Future<int> bam(string s) { return 42; }
TEST_F(ProcessTest, Defers)
{
{
std::function<Future<int>(string)> f =
defer(std::bind(baz, std::placeholders::_1));
Deferred<Future<int>(string)> d =
defer(std::bind(baz, std::placeholders::_1));
Future<int> future = Future<string>().then(
defer(std::bind(baz, std::placeholders::_1)));
Future<int> future3 = Future<string>().then(
std::bind(baz, std::placeholders::_1));
Future<string>().then(std::function<int(string)>());
Future<string>().then(std::function<int()>());
Future<int> future11 = Future<string>().then(
defer(std::bind(bam, std::placeholders::_1)));
Future<int> future12 = Future<string>().then(
std::bind(bam, std::placeholders::_1));
std::function<Future<int>(string)> f2 =
defer([](string s) { return baz(s); });
Deferred<Future<int>(string)> d2 =
defer([](string s) { return baz(s); });
Future<int> future2 = Future<string>().then(
defer([](string s) { return baz(s); }));
Future<int> future4 = Future<string>().then(
[](string s) { return baz(s); });
Future<int> future5 = Future<string>().then(
defer([](string s) -> Future<int> { return baz(s); }));
Future<int> future6 = Future<string>().then(
defer([](string s) { return Future<int>(baz(s)); }));
Future<int> future7 = Future<string>().then(
defer([](string s) { return bam(s); }));
Future<int> future8 = Future<string>().then(
[](string s) { return Future<int>(baz(s)); });
Future<int> future9 = Future<string>().then(
[](string s) -> Future<int> { return baz(s); });
Future<int> future10 = Future<string>().then(
[](string s) { return bam(s); });
}
// {
// // CANNOT DO IN CLANG!
// std::function<void(string)> f =
// defer(std::bind(baz, std::placeholders::_1));
// std::function<int(string)> blah;
// std::function<void(string)> blam = blah;
// std::function<void(string)> f2 =
// defer([](string s) { return baz(s); });
// }
// {
// // CANNOT DO WITH GCC OR CLANG!
// std::function<int(int)> f =
// defer(std::bind(baz, std::placeholders::_1));
// }
{
std::function<Future<int>()> f =
defer(std::bind(baz, "42"));
std::function<Future<int>()> f2 =
defer([]() { return baz("42"); });
}
{
std::function<Future<int>(int)> f =
defer(std::bind(baz, "42"));
std::function<Future<int>(int)> f2 =
defer([](int i) { return baz("42"); });
}
// Don't care about value passed from Future::then.
{
Future<int> future = Future<string>().then(
defer(std::bind(baz, "42")));
Future<int> future3 = Future<string>().then(
std::bind(baz, "42"));
Future<int> future11 = Future<string>().then(
defer(std::bind(bam, "42")));
Future<int> future12 = Future<string>().then(
std::bind(bam, "42"));
Future<int> future2 = Future<string>().then(
defer([]() { return baz("42"); }));
Future<int> future4 = Future<string>().then(
[]() { return baz("42"); });
Future<int> future5 = Future<string>().then(
defer([]() -> Future<int> { return baz("42"); }));
Future<int> future6 = Future<string>().then(
defer([]() { return Future<int>(baz("42")); }));
Future<int> future7 = Future<string>().then(
defer([]() { return bam("42"); }));
Future<int> future8 = Future<string>().then(
[]() { return Future<int>(baz("42")); });
Future<int> future9 = Future<string>().then(
[]() -> Future<int> { return baz("42"); });
Future<int> future10 = Future<string>().then(
[]() { return bam("42"); });
}
struct Functor
{
int operator()(string) const { return 42; }
int operator()() const { return 42; }
} functor;
Future<int> future13 = Future<string>().then(
defer(functor));
}
class PercentEncodedIDProcess : public Process<PercentEncodedIDProcess>
{
public:
PercentEncodedIDProcess()
: ProcessBase("id(42)") {}
void initialize() override
{
install("handler1", &Self::handler1);
route("/handler2", None(), &Self::handler2);
}
MOCK_METHOD2(handler1, void(const UPID&, const string&));
MOCK_METHOD1(handler2, Future<http::Response>(const http::Request&));
};
TEST_F(ProcessTest, PercentEncodedURLs)
{
PercentEncodedIDProcess process;
spawn(process);
// Construct the PID using percent-encoding.
http::URL url = http::URL(
"http",
process.self().address.ip,
process.self().address.port,
http::encode(process.self().id) + "/handler1");
Future<http::Connection> connect = http::connect(url);
AWAIT_READY(connect);
http::Connection connection = connect.get();
// Mimic a libprocess message sent to an installed handler.
Future<Nothing> handler1;
EXPECT_CALL(process, handler1(_, _))
.WillOnce(FutureSatisfy(&handler1));
UPID from("sender", process.self().address.ip, 99);
http::Request request;
request.method = "POST";
request.url = url;
request.headers["User-Agent"] = "libprocess/" + stringify(from);
request.keepAlive = true;
Future<http::Response> response = connection.send(request);
AWAIT_READY(handler1);
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::Accepted().status, response);
AWAIT_READY(connection.disconnect());
// Now an HTTP request.
EXPECT_CALL(process, handler2(_))
.WillOnce(Return(http::OK()));
// Construct the PID using percent-encoding.
UPID pid(http::encode(process.self().id), process.self().address);
response = http::get(pid, "handler2");
AWAIT_EXPECT_RESPONSE_STATUS_EQ(http::OK().status, response);
terminate(process);
wait(process);
}
class HTTPEndpointProcess : public Process<HTTPEndpointProcess>
{
public:
explicit HTTPEndpointProcess(const string& id)
: ProcessBase(id) {}
void initialize() override
{
route(
"/handler1",
None(),
&HTTPEndpointProcess::handler1);
route(
"/handler2",
None(),
&HTTPEndpointProcess::handler2);
route(
"/handler3",
None(),
&HTTPEndpointProcess::handler3);
}
MOCK_METHOD1(handler1, Future<http::Response>(const http::Request&));
MOCK_METHOD1(handler2, Future<http::Response>(const http::Request&));
MOCK_METHOD1(handler3, Future<http::Response>(const http::Request&));
};
// Sets firewall rules which disable endpoints on a process and then
// attempts to connect to those endpoints.
TEST_F(ProcessTest, FirewallDisablePaths)
{
const string id = "testprocess";
hashset<string> endpoints = {
strings::join("/", "", id, "handler1"),
strings::join("/", "", id, "handler2", "nested"),
// Patterns are not supported, so this should do nothing.
strings::join("/", "", id, "handler3", "*")};
process::firewall::install(
{Owned<FirewallRule>(new DisabledEndpointsFirewallRule(endpoints))});
HTTPEndpointProcess process(id);
PID<HTTPEndpointProcess> pid = spawn(process);
// Test call to a disabled endpoint.
Future<http::Response> response = http::get(pid, "handler1");
AWAIT_READY(response);
EXPECT_EQ(http::Status::FORBIDDEN, response->code);
EXPECT_EQ(http::Status::string(http::Status::FORBIDDEN),
response->status);
// Test call to a non disabled endpoint.
// Substrings should not match.
EXPECT_CALL(process, handler2(_))
.WillOnce(Return(http::OK()));
response = http::get(pid, "handler2");
AWAIT_READY(response);
EXPECT_EQ(http::Status::OK, response->code);
EXPECT_EQ(http::Status::string(http::Status::OK),
response->status);
// Test nested endpoints. Full paths needed for match.
response = http::get(pid, "handler2/nested");
AWAIT_READY(response);
EXPECT_EQ(http::Status::FORBIDDEN, response->code);
EXPECT_EQ(http::Status::string(http::Status::FORBIDDEN),
response->status);
EXPECT_CALL(process, handler2(_))
.WillOnce(Return(http::OK()));
response = http::get(pid, "handler2/nested/path");
AWAIT_READY(response);
EXPECT_EQ(http::Status::OK, response->code);
EXPECT_EQ(http::Status::string(http::Status::OK),
response->status);
EXPECT_CALL(process, handler3(_))
.WillOnce(Return(http::OK()));
// Test a wildcard rule. Since they are not supported, it must have
// no effect at all.
response = http::get(pid, "handler3");
AWAIT_READY(response);
EXPECT_EQ(http::Status::OK, response->code);
EXPECT_EQ(http::Status::string(http::Status::OK),
response->status);
EXPECT_CALL(process, handler3(_))
.WillOnce(Return(http::OK()));
response = http::get(pid, "handler3/nested");
AWAIT_READY(response);
EXPECT_EQ(http::Status::OK, response->code);
EXPECT_EQ(http::Status::string(http::Status::OK),
response->status);
terminate(process);
wait(process);
}
// Test that firewall rules can be changed by changing the vector.
// An empty vector should allow all paths.
TEST_F(ProcessTest, FirewallUninstall)
{
const string id = "testprocess";
hashset<string> endpoints = {strings::join("/", "", id, "handler1"),
strings::join("/", "", id, "handler2")};
process::firewall::install(
{Owned<FirewallRule>(new DisabledEndpointsFirewallRule(endpoints))});
HTTPEndpointProcess process(id);
PID<HTTPEndpointProcess> pid = spawn(process);
Future<http::Response> response = http::get(pid, "handler1");
AWAIT_READY(response);
EXPECT_EQ(http::Status::FORBIDDEN, response->code);
EXPECT_EQ(http::Status::string(http::Status::FORBIDDEN),
response->status);
response = http::get(pid, "handler2");
AWAIT_READY(response);
EXPECT_EQ(http::Status::FORBIDDEN, response->code);
EXPECT_EQ(http::Status::string(http::Status::FORBIDDEN),
response->status);
process::firewall::install({});
EXPECT_CALL(process, handler1(_))
.WillOnce(Return(http::OK()));
response = http::get(pid, "handler1");
AWAIT_READY(response);
EXPECT_EQ(http::Status::OK, response->code);
EXPECT_EQ(http::Status::string(http::Status::OK),
response->status);
EXPECT_CALL(process, handler2(_))
.WillOnce(Return(http::OK()));
response = http::get(pid, "handler2");
AWAIT_READY(response);
EXPECT_EQ(http::Status::OK, response->code);
EXPECT_EQ(http::Status::string(http::Status::OK),
response->status);
terminate(process);
wait(process);
}
// This ensures that the `/__processes__` endpoint does not hang
// if one of the dispatches is abandoned. This can occur if a
// process is terminated with `inject == true` after the
// `/__processes__` endpoint handler has dispatched to it.
TEST_F(ProcessTest, ProcessesEndpointNoHang)
{
// This process will hold itself in a dispatch handler
// until both are present in its event queue:
// (1) an injected terminate event, and
// (2) a dispatch event from the `__processes__` endpoint.
//
// At that point, we know that the future for (2) will get
// abandoned.
class TestProcess : public Process<TestProcess>
{
public:
Future<Nothing> wait_for_terminate(Promise<Nothing>&& p)
{
p.set(Nothing()); // Notify that we're inside the function.
Time start = Clock::now();
while (Clock::now() - start < process::TEST_AWAIT_TIMEOUT) {
if (eventCount<TerminateEvent>() == 1 &&
eventCount<DispatchEvent>() == 1) {
return Nothing();
}
os::sleep(Milliseconds(1));
}
return Failure("Timed out waiting for terminate and dispatch");
}
};
PID<TestProcess> process = spawn(new TestProcess(), true);
Promise<Nothing> promise;
Future<Nothing> inside = promise.future();
Future<Nothing> waited =
dispatch(process, &TestProcess::wait_for_terminate, std::move(promise));
AWAIT_READY(inside);
http::URL url = http::URL(
"http",
process::address().ip,
process::address().port,
"/__processes__");
Future<http::Response> response = http::get(url);
terminate(process, true);
AWAIT_READY(waited);
AWAIT_READY(response);
EXPECT_EQ(http::Status::OK, response->code);
}
// Test for a bug where timers wouldn't be handled after libprocess was
// reinitialized.
TEST_F(ProcessTest, TimerAfterReinitialize)
{
// Schedule a timer, which won't have time to expire before...
process::Timer timer_before = Clock::timer(Milliseconds(10), []() {});
// We reinitialize libprocess.
process::reinitialize(
None(),
process::READWRITE_HTTP_AUTHENTICATION_REALM,
process::READONLY_HTTP_AUTHENTICATION_REALM);
// Now, schedule a new timer which is supposed to fire later than the one
// above.
Promise<Nothing> promise;
Future<Nothing> future = promise.future();
process::Timer timer_after = Clock::timer(Milliseconds(10),
[&]() { promise.set(Nothing()); });
// Wait until it fires.
AWAIT_READY(future);
}