blob: 85dfad6b506f2d81caad2387b2b5da17cf92f574 [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <CivetServer.h>
#include "integration/CivetLibrary.h"
#include "minifi-cpp/core/logging/Logger.h"
#include "core/logging/LoggerFactory.h"
#include "rapidjson/document.h"
#include "rapidjson/writer.h"
#include "rapidjson/stringbuffer.h"
namespace org::apache::nifi::minifi::extensions::elasticsearch::test {
class MockElasticAuthHandler : public CivetAuthHandler {
public:
static constexpr const char* API_KEY = "VnVhQ2ZHY0JDZGJrUW0tZTVhT3g6dWkybHAyYXhUTm1zeWFrdzl0dk5udw";
static constexpr const char* USERNAME = "elastic";
static constexpr const char* PASSWORD = "elastic_password";
private:
bool authorize(CivetServer*, struct mg_connection* conn) override {
const char* authHeader = mg_get_header(conn, "Authorization");
if (authHeader == nullptr) {
return false;
}
if (strcmp(authHeader, "Basic ZWxhc3RpYzplbGFzdGljX3Bhc3N3b3Jk") == 0)
return true;
if (strcmp(authHeader, "ApiKey VnVhQ2ZHY0JDZGJrUW0tZTVhT3g6dWkybHAyYXhUTm1zeWFrdzl0dk5udw") == 0)
return true;
return false;
};
};
class BulkElasticHandler : public CivetHandler {
public:
void returnErrors(bool ret_errors) {
ret_error_ = ret_errors;
}
private:
rapidjson::Value addIndexSuccess(rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value item{rapidjson::kObjectType};
rapidjson::Value operation{rapidjson::kObjectType};
operation.AddMember("_index", "test", alloc);
operation.AddMember("_id", "1", alloc);
operation.AddMember("result", "created", alloc);
item.AddMember("index", operation, alloc);
return item;
}
rapidjson::Value addUpdateSuccess(rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value item{rapidjson::kObjectType};
rapidjson::Value operation{rapidjson::kObjectType};
operation.AddMember("_index", "test", alloc);
operation.AddMember("_id", "1", alloc);
operation.AddMember("result", "updated", alloc);
item.AddMember("update", operation, alloc);
return item;
}
rapidjson::Value addUpdateError(rapidjson::Document::AllocatorType& alloc) {
rapidjson::Value item{rapidjson::kObjectType};
rapidjson::Value operation{rapidjson::kObjectType};
operation.AddMember("_index", "test", alloc);
operation.AddMember("_id", "1", alloc);
rapidjson::Value error{rapidjson::kObjectType};
error.AddMember("type", "document_missing_exception", alloc);
error.AddMember("reason", "[6]: document missing", alloc);
error.AddMember("index_uuid", "aAsFqTI0Tc2W0LCWgPNrOA", alloc);
error.AddMember("shard", "0", alloc);
error.AddMember("index", "index", alloc);
operation.AddMember("error", error, alloc);
item.AddMember("update", operation, alloc);
return item;
}
bool handlePost(CivetServer*, struct mg_connection* conn) override {
char request[2048];
size_t chars_read = mg_read(conn, request, 2048);
std::vector<std::string> lines = utils::string::splitRemovingEmpty({request, chars_read}, "\n");
rapidjson::Document response{rapidjson::kObjectType};
response.AddMember("took", 30, response.GetAllocator());
response.AddMember("errors", ret_error_, response.GetAllocator());
response.AddMember("items", rapidjson::kArrayType, response.GetAllocator());
auto& items = response["items"];
for (const auto& line : lines) {
rapidjson::Document line_json;
line_json.Parse<rapidjson::kParseStopWhenDoneFlag>(line.data());
if (!line_json.HasMember("index") && !line_json.HasMember("create") && !line_json.HasMember("update") && !line_json.HasMember("delete"))
continue;
rapidjson::Value item{rapidjson::kObjectType};
rapidjson::Value operation{rapidjson::kObjectType};
if (ret_error_) {
items.PushBack(addUpdateError(response.GetAllocator()), response.GetAllocator());
} else {
if (line_json.HasMember("update"))
items.PushBack(addUpdateSuccess(response.GetAllocator()), response.GetAllocator());
else
items.PushBack(addIndexSuccess(response.GetAllocator()), response.GetAllocator());
}
}
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
response.Accept(writer);
mg_printf(conn, "HTTP/1.1 200 OK\r\n");
mg_printf(conn, "Content-length: %lu", buffer.GetSize());
mg_printf(conn, "\r\n\r\n");
mg_printf(conn, "%s", buffer.GetString());
return true;
}
bool ret_error_ = false;
};
class MockElastic {
public:
explicit MockElastic(std::string port) : port_(std::move(port)) {
std::vector<std::string> options;
options.emplace_back("listening_ports");
options.emplace_back(port_);
server_ = std::make_unique<CivetServer>(options, &callbacks_, &logger_);
bulk_handler_ = std::make_unique<BulkElasticHandler>();
server_->addHandler("/_bulk", *bulk_handler_);
auth_handler_ = std::make_unique<MockElasticAuthHandler>();
server_->addAuthHandler("/_bulk", *auth_handler_);
}
[[nodiscard]] const std::string& getPort() const {
return port_;
}
void returnErrors(bool ret_errors) {
bulk_handler_->returnErrors(ret_errors);
}
private:
CivetLibrary lib_;
std::string port_;
std::unique_ptr<CivetServer> server_;
std::unique_ptr<BulkElasticHandler> bulk_handler_;
std::unique_ptr<MockElasticAuthHandler> auth_handler_;
CivetCallbacks callbacks_;
std::shared_ptr<org::apache::nifi::minifi::core::logging::Logger> logger_ = org::apache::nifi::minifi::core::logging::LoggerFactory<MockElastic>::getLogger();
};
} // namespace org::apache::nifi::minifi::extensions::elasticsearch::test