blob: 1ad41c8ae825b2df3b60a7909a4751c633a3cfc7 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include "io/BaseStream.h"
#include "sitetosite/Peer.h"
#include "sitetosite/RawSocketProtocol.h"
#include "../TestBase.h"
#include "../unit/SiteToSiteHelper.h"
#define FMT_DEFAULT fmt_lower
TEST_CASE("TestSetPortId", "[S2S1]") {
std::unique_ptr<minifi::sitetosite::SiteToSitePeer> peer = std::unique_ptr<minifi::sitetosite::SiteToSitePeer>(
new minifi::sitetosite::SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::BufferStream>(new org::apache::nifi::minifi::io::BufferStream()), "fake_host", 65433, ""));
minifi::sitetosite::RawSiteToSiteClient protocol(std::move(peer));
utils::Identifier fakeUUID = utils::Identifier::parse("c56a4180-65aa-42ec-a945-5fd21dec0538").value();
protocol.setPortId(fakeUUID);
REQUIRE(fakeUUID == protocol.getPortId());
}
void sunny_path_bootstrap(SiteToSiteResponder *collector) {
char a = 0x14; // RESOURCE_OK
std::string resp_code;
resp_code.insert(resp_code.begin(), a);
collector->push_response(resp_code);
// Handshake respond code
resp_code = "R";
collector->push_response(resp_code);
resp_code = "C";
collector->push_response(resp_code);
char b = 0x1;
resp_code = b;
collector->push_response(resp_code);
// Codec Negotiation
resp_code = a;
collector->push_response(resp_code);
}
TEST_CASE("TestSiteToSiteVerifySend", "[S2S3]") {
SiteToSiteResponder *collector = new SiteToSiteResponder();
sunny_path_bootstrap(collector);
std::unique_ptr<minifi::sitetosite::SiteToSitePeer> peer = std::unique_ptr<minifi::sitetosite::SiteToSitePeer>(
new minifi::sitetosite::SiteToSitePeer(std::unique_ptr<minifi::io::BaseStream>(collector), "fake_host", 65433, ""));
minifi::sitetosite::RawSiteToSiteClient protocol(std::move(peer));
utils::Identifier fakeUUID = utils::Identifier::parse("C56A4180-65AA-42EC-A945-5FD21DEC0538").value();
protocol.setPortId(fakeUUID);
REQUIRE(true == protocol.bootstrap());
REQUIRE(collector->get_next_client_response() == "NiFi");
collector->get_next_client_response();
REQUIRE(collector->get_next_client_response() == "SocketFlowFileProtocol");
collector->get_next_client_response();
collector->get_next_client_response();
collector->get_next_client_response();
collector->get_next_client_response();
REQUIRE(collector->get_next_client_response() == "nifi://fake_host:65433");
collector->get_next_client_response();
collector->get_next_client_response();
REQUIRE(collector->get_next_client_response() == "GZIP");
collector->get_next_client_response();
REQUIRE(collector->get_next_client_response() == "false");
collector->get_next_client_response();
REQUIRE(collector->get_next_client_response() == "PORT_IDENTIFIER");
collector->get_next_client_response();
REQUIRE(utils::StringUtils::equalsIgnoreCase(collector->get_next_client_response(), "c56a4180-65aa-42ec-a945-5fd21dec0538"));
collector->get_next_client_response();
REQUIRE(collector->get_next_client_response() == "REQUEST_EXPIRATION_MILLIS");
collector->get_next_client_response();
REQUIRE(collector->get_next_client_response() == "30000");
collector->get_next_client_response();
REQUIRE(collector->get_next_client_response() == "NEGOTIATE_FLOWFILE_CODEC");
collector->get_next_client_response();
REQUIRE(collector->get_next_client_response() == "StandardFlowFileCodec");
collector->get_next_client_response(); // codec version
// start to send the stuff
// Create the transaction
std::string payload = "Test MiNiFi payload";
std::shared_ptr<minifi::sitetosite::Transaction> transaction;
transaction = protocol.createTransaction(minifi::sitetosite::SEND);
REQUIRE(transaction);
auto transactionID = transaction->getUUID();
collector->get_next_client_response();
REQUIRE(collector->get_next_client_response() == "SEND_FLOWFILES");
std::map<std::string, std::string> attributes;
std::shared_ptr<logging::Logger> logger = nullptr;
minifi::sitetosite::DataPacket packet(logger, transaction, attributes, payload);
REQUIRE(protocol.send(transactionID, &packet, nullptr, nullptr) == 0);
collector->get_next_client_response();
collector->get_next_client_response();
std::string rx_payload = collector->get_next_client_response();
REQUIRE(payload == rx_payload);
}
TEST_CASE("TestSiteToSiteVerifyNegotiationFail", "[S2S4]") {
SiteToSiteResponder *collector = new SiteToSiteResponder();
char a = '\xFF';
std::string resp_code;
resp_code.insert(resp_code.begin(), a);
collector->push_response(resp_code);
collector->push_response(resp_code);
std::unique_ptr<minifi::sitetosite::SiteToSitePeer> peer = std::unique_ptr<minifi::sitetosite::SiteToSitePeer>(
new minifi::sitetosite::SiteToSitePeer(std::unique_ptr<minifi::io::BaseStream>(collector), "fake_host", 65433, ""));
minifi::sitetosite::RawSiteToSiteClient protocol(std::move(peer));
std::string uuid_str = "C56A4180-65AA-42EC-A945-5FD21DEC0538";
utils::Identifier fakeUUID;
fakeUUID = uuid_str;
protocol.setPortId(fakeUUID);
REQUIRE(false == protocol.bootstrap());
}