blob: a235b6b9d90f853cae14a99f8d2f94f67894cf7e [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
#include "HTTPTransaction.h"
#include "sitetosite/SiteToSite.h"
#include "sitetosite/SiteToSiteClient.h"
#include "core/logging/LoggerConfiguration.h"
#include "minifi-cpp/core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "sitetosite/Peer.h"
#include "utils/Id.h"
namespace org::apache::nifi::minifi::sitetosite {
class HttpSiteToSiteClient final : public SiteToSiteClient {
public:
static constexpr char const* PROTOCOL_VERSION_HEADER = "x-nifi-site-to-site-protocol-version";
static constexpr char const* HANDSHAKE_PROPERTY_USE_COMPRESSION = "x-nifi-site-to-site-use-compression";
static constexpr char const* HANDSHAKE_PROPERTY_REQUEST_EXPIRATION = "x-nifi-site-to-site-request-expiration";
static constexpr char const* HANDSHAKE_PROPERTY_BATCH_COUNT = "x-nifi-site-to-site-batch-count";
static constexpr char const* HANDSHAKE_PROPERTY_BATCH_SIZE = "x-nifi-site-to-site-batch-size";
static constexpr char const* HANDSHAKE_PROPERTY_BATCH_DURATION = "x-nifi-site-to-site-batch-duration";
explicit HttpSiteToSiteClient(gsl::not_null<std::unique_ptr<SiteToSitePeer>> peer)
: SiteToSiteClient(std::move(peer)),
current_code_(ResponseCode::UNRECOGNIZED_RESPONSE_CODE) {
peer_state_ = PeerState::READY;
}
HttpSiteToSiteClient(const HttpSiteToSiteClient&) = delete;
HttpSiteToSiteClient(HttpSiteToSiteClient&&) = delete;
HttpSiteToSiteClient& operator=(const HttpSiteToSiteClient&) = delete;
HttpSiteToSiteClient& operator=(HttpSiteToSiteClient&&) = delete;
~HttpSiteToSiteClient() override = default;
MINIFIAPI static constexpr auto Properties = std::array<core::PropertyReference, 0>{};
MINIFIAPI static constexpr bool SupportsDynamicProperties = false;
MINIFIAPI static constexpr bool SupportsDynamicRelationships = false;
std::optional<std::vector<PeerStatus>> getPeerList() override;
bool transmitPayload(core::ProcessContext& context, const std::string &payload, const std::map<std::string, std::string>& attributes) override;
protected:
bool bootstrap() override {
peer_state_ = PeerState::READY;
return true;
}
bool establish() override {
return true;
}
std::optional<SiteToSiteResponse> readResponse(const std::shared_ptr<Transaction> &transaction) override;
bool writeResponse(const std::shared_ptr<Transaction> &transaction, const SiteToSiteResponse& response) override;
std::shared_ptr<Transaction> createTransaction(TransferDirection direction) override;
void deleteTransaction(const utils::Identifier& transaction_id) override;
void tearDown() override;
protected:
std::pair<uint64_t, uint64_t> readFlowFiles(const std::shared_ptr<Transaction>& transaction, core::ProcessSession& session) override;
private:
void setSiteToSiteHeaders(minifi::http::HTTPClient& client);
void closeTransaction(const utils::Identifier &transaction_id);
std::shared_ptr<minifi::http::HTTPClient> openConnectionForSending(const std::shared_ptr<HttpTransaction> &transaction);
std::shared_ptr<minifi::http::HTTPClient> openConnectionForReceive(const std::shared_ptr<HttpTransaction> &transaction);
std::unique_ptr<minifi::http::HTTPClient> createHttpClient(const std::string &uri, http::HttpRequestMethod method);
[[nodiscard]] std::string getBaseURI();
std::optional<SiteToSiteResponse> readResponseForReceiveTransfer(const std::shared_ptr<Transaction>& transaction);
std::optional<SiteToSiteResponse> readResponseForSendTransfer(const std::shared_ptr<Transaction>& transaction);
ResponseCode current_code_;
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<HttpSiteToSiteClient>::getLogger();
};
} // namespace org::apache::nifi::minifi::sitetosite