blob: 5f1b55c9dab7aa1b7d312e69a5925698c66ae5bf [file] [log] [blame]
/**
* Site2SiteProtocol class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <time.h>
#include <chrono>
#include <utility>
#include <map>
#include <string>
#include <memory>
#include <thread>
#include <random>
#include <iostream>
#include <vector>
#include "sitetosite/RawSocketProtocol.h"
#include "io/CRCStream.h"
#include "sitetosite/Peer.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace sitetosite {
std::shared_ptr<utils::IdGenerator> RawSiteToSiteClient::id_generator_ = utils::IdGenerator::getIdGenerator();
std::shared_ptr<utils::IdGenerator> Transaction::id_generator_ = utils::IdGenerator::getIdGenerator();
const char *RawSiteToSiteClient::HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = {
/**
* Boolean value indicating whether or not the contents of a FlowFile should
* be GZipped when transferred.
*/
"GZIP",
/**
* The unique identifier of the port to communicate with
*/
"PORT_IDENTIFIER",
/**
* Indicates the number of milliseconds after the request was made that the
* client will wait for a response. If no response has been received by the
* time this value expires, the server can move on without attempting to
* service the request because the client will have already disconnected.
*/
"REQUEST_EXPIRATION_MILLIS",
/**
* The preferred number of FlowFiles that the server should send to the
* client when pulling data. This property was introduced in version 5 of
* the protocol.
*/
"BATCH_COUNT",
/**
* The preferred number of bytes that the server should send to the client
* when pulling data. This property was introduced in version 5 of the
* protocol.
*/
"BATCH_SIZE",
/**
* The preferred amount of time that the server should send data to the
* client when pulling data. This property was introduced in version 5 of
* the protocol. Value is in milliseconds.
*/
"BATCH_DURATION" };
bool RawSiteToSiteClient::establish() {
if (peer_state_ != IDLE) {
logger_->log_error("Site2Site peer state is not idle while try to establish");
return false;
}
bool ret = peer_->Open();
if (!ret) {
logger_->log_error("Site2Site peer socket open failed");
return false;
}
// Negotiate the version
ret = initiateResourceNegotiation();
if (!ret) {
logger_->log_error("Site2Site Protocol Version Negotiation failed");
return false;
}
logger_->log_debug("Site2Site socket established");
peer_state_ = ESTABLISHED;
return true;
}
bool RawSiteToSiteClient::initiateResourceNegotiation() {
// Negotiate the version
if (peer_state_ != IDLE) {
logger_->log_error("Site2Site peer state is not idle while initiateResourceNegotiation");
return false;
}
logger_->log_debug("Negotiate protocol version with destination port %s current version %d", port_id_.to_string(), _currentVersion);
int ret = peer_->write(getResourceName());
logger_->log_trace("result of writing resource name is %i", ret);
if (ret <= 0) {
logger_->log_debug("result of writing resource name is %i", ret);
// tearDown();
return false;
}
ret = peer_->write(_currentVersion);
if (ret <= 0) {
logger_->log_debug("result of writing version is %i", ret);
return false;
}
uint8_t statusCode;
ret = peer_->read(statusCode);
if (ret <= 0) {
logger_->log_debug("result of writing version status code %i", ret);
return false;
}
logger_->log_debug("status code is %i", statusCode);
switch (statusCode) {
case RESOURCE_OK:
logger_->log_debug("Site2Site Protocol Negotiate protocol version OK");
return true;
case DIFFERENT_RESOURCE_VERSION:
uint32_t serverVersion;
ret = peer_->read(serverVersion);
if (ret <= 0) {
return false;
}
logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion;
for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion) / sizeof(uint32_t); i++) {
if (serverVersion >= _supportedVersion[i]) {
_currentVersion = _supportedVersion[i];
_currentVersionIndex = i;
return initiateResourceNegotiation();
}
}
logger_->log_error("Site2Site Negotiate protocol failed to find a common version with server");
return false;
case NEGOTIATED_ABORT:
logger_->log_error("Site2Site Negotiate protocol response ABORT");
return false;
default:
logger_->log_error("Negotiate protocol response unknown code %d", statusCode);
return false;
}
}
bool RawSiteToSiteClient::initiateCodecResourceNegotiation() {
// Negotiate the version
if (peer_state_ != HANDSHAKED) {
logger_->log_error("Site2Site peer state is not handshaked while initiateCodecResourceNegotiation");
return false;
}
logger_->log_trace("Negotiate Codec version with destination port %s current version %d", port_id_.to_string(), _currentCodecVersion);
int ret = peer_->write(getCodecResourceName());
if (ret <= 0) {
logger_->log_debug("result of getCodecResourceName is %i", ret);
return false;
}
ret = peer_->write(_currentCodecVersion);
if (ret <= 0) {
logger_->log_debug("result of _currentCodecVersion is %i", ret);
return false;
}
uint8_t statusCode;
ret = peer_->read(statusCode);
if (ret <= 0) {
return false;
}
switch (statusCode) {
case RESOURCE_OK:
logger_->log_trace("Site2Site Codec Negotiate version OK");
return true;
case DIFFERENT_RESOURCE_VERSION:
uint32_t serverVersion;
ret = peer_->read(serverVersion);
if (ret <= 0) {
return false;
}
logging::LOG_INFO(logger_) << "Site2Site Server Response asked for a different protocol version " << serverVersion;
for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion) / sizeof(uint32_t); i++) {
if (serverVersion >= _supportedCodecVersion[i]) {
_currentCodecVersion = _supportedCodecVersion[i];
_currentCodecVersionIndex = i;
return initiateCodecResourceNegotiation();
}
}
logger_->log_error("Site2Site Negotiate codec failed to find a common version with server");
return false;
case NEGOTIATED_ABORT:
logger_->log_error("Site2Site Codec Negotiate response ABORT");
return false;
default:
logger_->log_error("Negotiate Codec response unknown code %d", statusCode);
return false;
}
}
bool RawSiteToSiteClient::handShake() {
if (peer_state_ != ESTABLISHED) {
logger_->log_error("Site2Site peer state is not established while handshake");
return false;
}
logger_->log_debug("Site2Site Protocol Perform hand shake with destination port %s", port_id_.to_string());
_commsIdentifier = id_generator_->generate();
int ret = peer_->write(_commsIdentifier);
if (ret <= 0) {
return false;
}
std::map<std::string, std::string> properties;
properties[HandShakePropertyStr[GZIP]] = "false";
properties[HandShakePropertyStr[PORT_IDENTIFIER]] = port_id_.to_string();
properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(_timeOut);
if (_currentVersion >= 5) {
if (_batchCount > 0)
properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(_batchCount);
if (_batchSize > 0)
properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(_batchSize);
if (_batchDuration > 0)
properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(_batchDuration);
}
if (_currentVersion >= 3) {
ret = peer_->write(peer_->getURL());
if (ret <= 0) {
return false;
}
}
uint32_t size = properties.size();
ret = peer_->write(size);
if (ret <= 0) {
return false;
}
std::map<std::string, std::string>::iterator it;
for (it = properties.begin(); it != properties.end(); it++) {
ret = peer_->write(it->first);
if (ret <= 0) {
return false;
}
ret = peer_->write(it->second);
if (ret <= 0) {
return false;
}
logger_->log_debug("Site2Site Protocol Send handshake properties %s %s", it->first, it->second);
}
RespondCode code;
std::string message;
ret = readRespond(nullptr, code, message);
if (ret <= 0) {
return false;
}
std::string error;
switch (code) {
case PROPERTIES_OK:
logger_->log_debug("Site2Site HandShake Completed");
peer_state_ = HANDSHAKED;
return true;
case PORT_NOT_IN_VALID_STATE:
error = "in invalid state";
break;
case UNKNOWN_PORT:
error = "an unknown port";
break;
case PORTS_DESTINATION_FULL:
error = "full";
break;
// Unknown error
default:
logger_->log_error("HandShake Failed because of unknown respond code %d", code);
ret = -1;
return false;
}
// All known error cases handled here
logger_->log_error("Site2Site HandShake Failed because destination port, %s, is %s", port_id_.to_string(), error);
ret = -1;
return false;
}
void RawSiteToSiteClient::tearDown() {
if (peer_state_ >= ESTABLISHED) {
logger_->log_trace("Site2Site Protocol tearDown");
// need to write shutdown request
writeRequestType(SHUTDOWN);
}
known_transactions_.clear();
peer_->Close();
peer_state_ = IDLE;
}
bool RawSiteToSiteClient::getPeerList(std::vector<PeerStatus> &peers) {
if (establish() && handShake()) {
int status = writeRequestType(REQUEST_PEER_LIST);
if (status <= 0) {
tearDown();
return false;
}
uint32_t number;
status = peer_->read(number);
if (status <= 0) {
tearDown();
return false;
}
for (uint32_t i = 0; i < number; i++) {
std::string host;
status = peer_->read(host);
if (status <= 0) {
tearDown();
return false;
}
uint32_t port;
status = peer_->read(port);
if (status <= 0) {
tearDown();
return false;
}
uint8_t secure;
status = peer_->read(secure);
if (status <= 0) {
tearDown();
return false;
}
uint32_t count;
status = peer_->read(count);
if (status <= 0) {
tearDown();
return false;
}
PeerStatus status(std::make_shared<Peer>(port_id_, host, gsl::narrow<uint16_t>(port), secure != 0), count, true);
peers.push_back(std::move(status));
logging::LOG_TRACE(logger_) << "Site2Site Peer host " << host << " port " << port << " Secure " << std::to_string(secure);
}
tearDown();
return true;
} else {
tearDown();
return false;
}
}
int RawSiteToSiteClient::writeRequestType(RequestType type) {
if (type >= MAX_REQUEST_TYPE)
return -1;
return peer_->write(SiteToSiteRequest::RequestTypeStr[type]);
}
int RawSiteToSiteClient::readRequestType(RequestType &type) {
std::string requestTypeStr;
int ret = peer_->read(requestTypeStr);
if (ret <= 0)
return ret;
for (int i = NEGOTIATE_FLOWFILE_CODEC; i <= SHUTDOWN; i++) {
if (SiteToSiteRequest::RequestTypeStr[i] == requestTypeStr) {
type = (RequestType) i;
return ret;
}
}
return -1;
}
int RawSiteToSiteClient::readRespond(const std::shared_ptr<Transaction> &transaction, RespondCode &code, std::string &message) {
return readResponse(transaction, code, message);
}
int RawSiteToSiteClient::writeRespond(const std::shared_ptr<Transaction> &transaction, RespondCode code, std::string message) {
return writeResponse(transaction, code, message);
}
bool RawSiteToSiteClient::negotiateCodec() {
if (peer_state_ != HANDSHAKED) {
logger_->log_error("Site2Site peer state is not handshaked while negotiate codec");
return false;
}
logger_->log_trace("Site2Site Protocol Negotiate Codec with destination port %s", port_id_.to_string());
int status = writeRequestType(NEGOTIATE_FLOWFILE_CODEC);
if (status <= 0) {
return false;
}
// Negotiate the codec version
bool ret = initiateCodecResourceNegotiation();
if (!ret) {
logger_->log_error("Site2Site Codec Version Negotiation failed");
return false;
}
logger_->log_trace("Site2Site Codec Completed and move to READY state for data transfer");
peer_state_ = READY;
return true;
}
bool RawSiteToSiteClient::bootstrap() {
if (peer_state_ == READY)
return true;
tearDown();
if (establish() && handShake() && negotiateCodec()) {
logger_->log_debug("Site to Site ready for data transaction");
return true;
} else {
peer_->yield();
tearDown();
return false;
}
}
std::shared_ptr<Transaction> RawSiteToSiteClient::createTransaction(TransferDirection direction) {
int ret;
bool dataAvailable;
std::shared_ptr<Transaction> transaction = nullptr;
if (peer_state_ != READY) {
bootstrap();
}
if (peer_state_ != READY) {
return transaction;
}
if (direction == RECEIVE) {
ret = writeRequestType(RECEIVE_FLOWFILES);
if (ret <= 0) {
return transaction;
}
RespondCode code;
std::string message;
ret = readRespond(nullptr, code, message);
if (ret <= 0) {
return transaction;
}
org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer> crcstream(gsl::make_not_null(peer_.get()));
switch (code) {
case MORE_DATA:
dataAvailable = true;
logger_->log_trace("Site2Site peer indicates that data is available");
transaction = std::make_shared<Transaction>(direction, std::move(crcstream));
known_transactions_[transaction->getUUID()] = transaction;
transaction->setDataAvailable(dataAvailable);
logger_->log_trace("Site2Site create transaction %s", transaction->getUUIDStr());
return transaction;
case NO_MORE_DATA:
dataAvailable = false;
logger_->log_trace("Site2Site peer indicates that no data is available");
transaction = std::make_shared<Transaction>(direction, std::move(crcstream));
known_transactions_[transaction->getUUID()] = transaction;
transaction->setDataAvailable(dataAvailable);
logger_->log_trace("Site2Site create transaction %s", transaction->getUUIDStr());
return transaction;
default:
logger_->log_warn("Site2Site got unexpected response %d when asking for data", code);
return NULL;
}
} else {
ret = writeRequestType(SEND_FLOWFILES);
if (ret <= 0) {
return NULL;
} else {
org::apache::nifi::minifi::io::CRCStream<SiteToSitePeer> crcstream(gsl::make_not_null(peer_.get()));
transaction = std::make_shared<Transaction>(direction, std::move(crcstream));
known_transactions_[transaction->getUUID()] = transaction;
logger_->log_trace("Site2Site create transaction %s", transaction->getUUIDStr());
return transaction;
}
}
}
bool RawSiteToSiteClient::transmitPayload(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session, const std::string &payload,
std::map<std::string, std::string> attributes) {
std::shared_ptr<Transaction> transaction = NULL;
if (payload.length() <= 0)
return false;
if (peer_state_ != READY) {
if (!bootstrap()) {
return false;
}
}
if (peer_state_ != READY) {
context->yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer");
}
// Create the transaction
transaction = createTransaction(SEND);
if (transaction == NULL) {
context->yield();
tearDown();
throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction");
}
utils::Identifier transactionID = transaction->getUUID();
try {
DataPacket packet(getLogger(), transaction, attributes, payload);
int16_t resp = send(transactionID, &packet, nullptr, session);
if (resp == -1) {
throw Exception(SITE2SITE_EXCEPTION, "Send Failed in transaction " + transactionID.to_string());
}
logging::LOG_INFO(logger_) << "Site2Site transaction " << transactionID.to_string() << " sent bytes length" << payload.length();
if (!confirm(transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed in transaction " + transactionID.to_string());
}
if (!complete(transactionID)) {
throw Exception(SITE2SITE_EXCEPTION, "Complete Failed in transaction " + transactionID.to_string());
}
logging::LOG_INFO(logger_) << "Site2Site transaction " << transactionID.to_string()
<< " successfully send flow record " << transaction->current_transfers_ << " content bytes " << transaction->_bytes;
} catch (std::exception &exception) {
if (transaction)
deleteTransaction(transactionID);
context->yield();
tearDown();
logger_->log_debug("Caught Exception %s", exception.what());
throw;
} catch (...) {
if (transaction)
deleteTransaction(transactionID);
context->yield();
tearDown();
logger_->log_debug("Caught Exception during RawSiteToSiteClient::transferBytes");
throw;
}
deleteTransaction(transactionID);
return true;
}
} /* namespace sitetosite */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */