blob: dfb01976148b5e2f4e97311f66eef1fd87543ec6 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#undef NDEBUG
#include "TestBase.h"
#include "c2/C2Agent.h"
#include "protocols/RESTProtocol.h"
#include "protocols/RESTSender.h"
#include "HTTPIntegrationBase.h"
#include "HTTPHandlers.h"
#include "io/ArchiveStream.h"
#include "EmptyFlow.h"
#include "properties/Configuration.h"
using namespace std::literals::chrono_literals;
class VerifyDebugInfo : public VerifyC2Base {
public:
explicit VerifyDebugInfo(std::function<bool()> verify): verify_(std::move(verify)) {}
void testSetup() override {
LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>();
VerifyC2Base::testSetup();
}
void runAssertions() override {
assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(300), verify_));
}
void configureC2() override {
VerifyC2Base::configureC2();
configuration->set(org::apache::nifi::minifi::Configuration::nifi_c2_agent_heartbeat_period, "100");
}
std::function<bool()> verify_;
};
class C2DebugBundleHandler : public ServerAwareHandler {
public:
bool handlePost(CivetServer* /*server*/, struct mg_connection *conn) override {
std::optional<std::string> file_content;
mg_form_data_handler form_handler{};
form_handler.field_found = field_found;
form_handler.field_get = field_get;
form_handler.user_data = &file_content;
mg_handle_form_request(conn, &form_handler);
assert(file_content);
{
std::lock_guard<std::mutex> lock(mtx_);
bundles_.push_back(std::move(*file_content));
}
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
return true;
}
std::vector<std::string> getBundles() {
std::lock_guard<std::mutex> lock(mtx_);
return bundles_;
}
private:
static int field_found(const char* key, const char* filename, char* /*path*/, size_t /*pathlen*/, void* user_data) {
auto& file_content = *static_cast<std::optional<std::string>*>(user_data);
if (!filename || std::string(filename) != "debug.tar.gz") {
throw std::runtime_error("Unknown form entry: " + std::string{key});
}
if (file_content) {
throw std::runtime_error("Debug archive has already been extracted: " + std::string{key});
}
return MG_FORM_FIELD_STORAGE_GET;
}
static int field_get(const char* /*key*/, const char* value, size_t valuelen, void* user_data) {
auto& file_content = *static_cast<std::optional<std::string>*>(user_data);
if (!file_content) {
file_content = "";
}
(*file_content) += std::string(value, valuelen);
return MG_FORM_FIELD_HANDLE_GET;
}
std::mutex mtx_;
std::vector<std::string> bundles_;
};
class C2HeartbeatHandler : public ServerAwareHandler {
public:
bool handlePost(CivetServer* /*server*/, struct mg_connection *conn) override {
if (response_) {
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
response_->length());
mg_printf(conn, "%s", response_->c_str());
response_.reset();
} else {
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
}
return true;
}
void setC2RestResponse(const std::string& url) {
response_ =
R"({
"operation" : "heartbeat",
"requested_operations": [{
"operation" : "transfer",
"operationid" : "79",
"name": "debug",
"args": {"target": ")" + url + R"("}
}]
})";
}
private:
std::optional<std::string> response_;
};
static std::string properties_file = "some.dummy.content = here\n";
static std::string flow_config_file = empty_flow;
int main() {
TestController controller;
C2HeartbeatHandler heartbeat_handler;
C2AcknowledgeHandler ack_handler;
C2DebugBundleHandler bundle_handler;
std::filesystem::path home_dir = controller.createTempDirectory();
utils::file::PathUtils::create_dir(home_dir / "conf");
std::ofstream{home_dir / "conf/minifi.properties", std::ios::binary} << properties_file;
std::ofstream{home_dir / "conf/config.yml", std::ios::binary} << flow_config_file;
VerifyDebugInfo harness([&]() -> bool {
if (!ack_handler.isAcknowledged("79")) {
return false;
}
auto bundles = bundle_handler.getBundles();
assert(bundles.size() == 1);
// verify the bundle
auto bundle_stream = std::make_shared<minifi::io::BufferStream>();
bundle_stream->write(reinterpret_cast<const uint8_t*>(bundles[0].data()), bundles[0].length());
auto archive_provider = core::ClassLoader::getDefaultClassLoader().instantiate<minifi::io::ArchiveStreamProvider>(
"ArchiveStreamProvider", "ArchiveStreamProvider");
assert(archive_provider);
auto decompressor = archive_provider->createReadStream(bundle_stream);
assert(decompressor);
std::map<std::string, std::string> archive_content;
while (auto info = decompressor->nextEntry()) {
std::string file_content;
file_content.resize(info->size);
assert(decompressor->read(as_writable_bytes(std::span(file_content))) ==
file_content.length());
archive_content[info->filename] = std::move(file_content);
}
assert(archive_content["minifi.properties"] == properties_file);
assert(archive_content["config.yml"] == flow_config_file);
auto log_gz = archive_content["minifi.log.gz"];
auto log_stream = std::make_shared<minifi::io::BufferStream>();
{
minifi::io::ZlibDecompressStream log_decompressor(gsl::make_not_null(log_stream.get()));
log_decompressor.write(reinterpret_cast<const uint8_t*>(log_gz.data()), log_gz.length());
}
std::string log_text;
log_text.resize(log_stream->size());
log_stream->read(as_writable_bytes(std::span(log_text)));
assert(log_text.find("Tis but a scratch") != std::string::npos);
assert(archive_content["manifest.json"].find("minifi-archive-extensions") != std::string::npos);
return true;
});
harness.getConfiguration()->setHome(home_dir.string());
harness.getConfiguration()->loadConfigureFile("conf/minifi.properties");
harness.setUrl("http://localhost:0/heartbeat", &heartbeat_handler);
harness.setUrl("http://localhost:0/acknowledge", &ack_handler);
harness.setUrl("http://localhost:0/debug_bundle", &bundle_handler);
harness.setC2Url("/heartbeat", "/acknowledge");
heartbeat_handler.setC2RestResponse("http://localhost:" + harness.getWebPort() + "/debug_bundle");
logging::LoggerFactory<C2HeartbeatHandler>::getLogger()->log_error("Tis but a scratch");
harness.run((home_dir / "conf/config.yml").string());
}