blob: feccdf9aac7d3bf7049eb0fcb2f9a5ec149f7087 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#undef NDEBUG
#include "TestBase.h"
#include "c2/C2Agent.h"
#include "c2/HeartbeatLogger.h"
#include "protocols/RESTProtocol.h"
#include "protocols/RESTSender.h"
#include "HTTPIntegrationBase.h"
#include "HTTPHandlers.h"
#include "range/v3/action/sort.hpp"
#include "range/v3/action/unique.hpp"
#include "range/v3/range/conversion.hpp"
#include "range/v3/view/filter.hpp"
#include "range/v3/view/split.hpp"
#include "range/v3/view/transform.hpp"
#include "utils/IntegrationTestUtils.h"
#include "utils/StringUtils.h"
#include "utils/span.h"
#include "properties/Configuration.h"
#include "io/ZlibStream.h"
class CompressedHeartbeatHandler : public HeartbeatHandler {
protected:
std::string readPayload(struct mg_connection* conn) override {
auto payload = HeartbeatHandler::readPayload(conn);
const char* encoding = mg_get_header(conn, "content-encoding");
if (!encoding || std::string_view(encoding).find("gzip") == std::string_view::npos) {
return payload;
}
received_compressed_ = true;
minifi::io::BufferStream output;
{
minifi::io::ZlibDecompressStream decompressor(gsl::make_not_null(&output));
auto ret = decompressor.write(as_bytes(std::span(payload)));
assert(ret == payload.size());
}
auto str_span = utils::as_span<const char>(output.getBuffer());
return {str_span.data(), str_span.size()};
}
public:
using HeartbeatHandler::HeartbeatHandler;
std::atomic_bool received_compressed_{false};
};
class VerifyCompressedHeartbeat : public VerifyC2Base {
public:
void testSetup() override {
LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
VerifyC2Base::testSetup();
}
void runAssertions() override {
assert(utils::verifyEventHappenedInPollTime(std::chrono::seconds(10), verify_));
}
void configureC2() override {
VerifyC2Base::configureC2();
configuration->set(org::apache::nifi::minifi::Configuration::nifi_c2_agent_heartbeat_period, "100");
configuration->set(org::apache::nifi::minifi::Configuration::nifi_c2_rest_request_encoding, "gzip");
}
void setVerifier(std::function<bool()> verify) {
verify_ = std::move(verify);
}
private:
std::function<bool()> verify_;
};
int main() {
VerifyCompressedHeartbeat harness;
CompressedHeartbeatHandler responder(harness.getConfiguration());
harness.setVerifier([&] () -> bool {
return responder.received_compressed_;
});
harness.setUrl("https://localhost:0/heartbeat", &responder);
harness.run();
}