blob: 504822b2f03b9fa42ef81e005b1e863df3886c3f [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <memory>
#include <utility>
#include <string>
#include "../TestBase.h"
#include "io/ClientSocket.h"
#include "core/Processor.h"
#include "../../controller/Controller.h"
#include "c2/ControllerSocketProtocol.h"
#include "state/UpdateController.h"
class TestStateController : public minifi::state::StateController {
public:
TestStateController()
: is_running(false) {
}
virtual ~TestStateController() = default;
virtual std::string getComponentName() const {
return "TestStateController";
}
virtual utils::Identifier getComponentUUID() const {
static auto dummyUUID = utils::Identifier::parse("12345678-1234-1234-1234-123456789abc").value();
return dummyUUID;
}
/**
* Start the client
*/
virtual int16_t start() {
is_running = true;
return 0;
}
/**
* Stop the client
*/
virtual int16_t stop() {
is_running = false;
return 0;
}
virtual bool isRunning() {
return is_running;
}
virtual int16_t pause() {
return 0;
}
std::atomic<bool> is_running;
};
class TestUpdateSink : public minifi::state::StateMonitor {
public:
explicit TestUpdateSink(std::shared_ptr<StateController> controller)
: is_running(true),
clear_calls(0),
controller(controller),
update_calls(0) {
}
virtual std::vector<std::shared_ptr<StateController>> getComponents(const std::string &name) {
std::vector<std::shared_ptr<StateController>> vec;
vec.push_back(controller);
return vec;
}
virtual std::vector<std::shared_ptr<StateController>> getAllComponents() {
std::vector<std::shared_ptr<StateController>> vec;
vec.push_back(controller);
return vec;
}
virtual std::string getComponentName() const {
return "TestUpdateSink";
}
virtual utils::Identifier getComponentUUID() const {
static auto dummyUUID = utils::Identifier::parse("12345678-1234-1234-1234-123456789abc").value();
return dummyUUID;
}
/**
* Start the client
*/
virtual int16_t start() {
is_running = true;
return 0;
}
/**
* Stop the client
*/
virtual int16_t stop() {
is_running = false;
return 0;
}
virtual bool isRunning() {
return is_running;
}
virtual int16_t pause() {
return 0;
}
virtual std::vector<BackTrace> getTraces() {
std::vector<BackTrace> traces;
return traces;
}
/**
* Operational controllers
*/
/**
* Drain repositories
*/
virtual int16_t drainRepositories() {
return 0;
}
/**
* Clear connection for the agent.
*/
virtual int16_t clearConnection(const std::string &connection) {
clear_calls++;
return 0;
}
/**
* Apply an update with the provided string.
*
* < 0 is an error code
* 0 is success
*/
virtual int16_t applyUpdate(const std::string &source, const std::string &configuration) {
update_calls++;
return 0;
}
/**
* Apply an update that the agent must decode. This is useful for certain operations
* that can't be encapsulated within these definitions.
*/
virtual int16_t applyUpdate(const std::string &source, const std::shared_ptr<minifi::state::Update> &updateController) {
return 0;
}
/**
* Returns uptime for this module.
* @return uptime for the current state monitor.
*/
virtual uint64_t getUptime() {
return 8765309;
}
std::atomic<bool> is_running;
std::atomic<uint32_t> clear_calls;
std::shared_ptr<StateController> controller;
std::atomic<uint32_t> update_calls;
};
#ifdef WIN32
TEST_CASE("TestWindows", "[test1]") {
std::cout << "Controller Tests are not supported on windows";
}
#else
TEST_CASE("TestGet", "[test1]") {
auto controller = std::make_shared<TestStateController>();
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
configuration->set("controller.socket.host", "localhost");
configuration->set("controller.socket.port", "9997");
auto ptr = std::make_shared<TestUpdateSink>(controller);
minifi::c2::ControllerSocketProtocol protocol("testprotocol");
protocol.initialize(nullptr, ptr, configuration);
auto stream_factory = minifi::io::StreamFactory::getInstance(configuration);
auto socket = stream_factory->createSocket("localhost", 9997);
startComponent(std::move(socket), "TestStateController");
std::this_thread::sleep_for(std::chrono::milliseconds(500));
REQUIRE(controller->isRunning() == true);
socket = stream_factory->createSocket("localhost", 9997);
stopComponent(std::move(socket), "TestStateController");
std::this_thread::sleep_for(std::chrono::milliseconds(500));
REQUIRE(controller->isRunning() == false);
socket = stream_factory->createSocket("localhost", 9997);
std::stringstream ss;
listComponents(std::move(socket), ss);
REQUIRE(ss.str().find("TestStateController") != std::string::npos);
}
TEST_CASE("TestClear", "[test1]") {
auto controller = std::make_shared<TestStateController>();
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
configuration->set("controller.socket.host", "localhost");
configuration->set("controller.socket.port", "9997");
auto ptr = std::make_shared<TestUpdateSink>(controller);
minifi::c2::ControllerSocketProtocol protocol("testprotocol");
protocol.initialize(nullptr, ptr, configuration);
auto stream_factory = minifi::io::StreamFactory::getInstance(configuration);
auto socket = stream_factory->createSocket("localhost", 9997);
startComponent(std::move(socket), "TestStateController");
std::this_thread::sleep_for(std::chrono::milliseconds(500));
REQUIRE(controller->isRunning() == true);
socket = stream_factory->createSocket("localhost", 9997);
clearConnection(std::move(socket), "connection");
socket = stream_factory->createSocket("localhost", 9997);
clearConnection(std::move(socket), "connection");
socket = stream_factory->createSocket("localhost", 9997);
clearConnection(std::move(socket), "connection");
std::this_thread::sleep_for(std::chrono::milliseconds(500));
REQUIRE(3 == ptr->clear_calls);
}
TEST_CASE("TestUpdate", "[test1]") {
auto controller = std::make_shared<TestStateController>();
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
configuration->set("controller.socket.host", "localhost");
configuration->set("controller.socket.port", "9997");
auto ptr = std::make_shared<TestUpdateSink>(controller);
minifi::c2::ControllerSocketProtocol protocol("testprotocol");
protocol.initialize(nullptr, ptr, configuration);
auto stream_factory = minifi::io::StreamFactory::getInstance(configuration);
auto socket = stream_factory->createSocket("localhost", 9997);
startComponent(std::move(socket), "TestStateController");
std::this_thread::sleep_for(std::chrono::milliseconds(500));
REQUIRE(controller->isRunning() == true);
std::stringstream ss;
socket = stream_factory->createSocket("localhost", 9997);
updateFlow(std::move(socket), ss, "connection");
std::this_thread::sleep_for(std::chrono::milliseconds(500));
REQUIRE(1 == ptr->update_calls);
REQUIRE(0 == ptr->clear_calls);
}
#endif