| /** |
| * @file Site2SiteProtocol.cpp |
| * Site2SiteProtocol class implementation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include <sys/time.h> |
| #include <stdio.h> |
| #include <time.h> |
| #include <chrono> |
| #include <thread> |
| #include <random> |
| #include <netinet/tcp.h> |
| #include <iostream> |
| #include "Site2SitePeer.h" |
| #include "Site2SiteClientProtocol.h" |
| |
| bool Site2SiteClientProtocol::establish() |
| { |
| if (_peerState != IDLE) |
| { |
| _logger->log_error("Site2Site peer state is not idle while try to establish"); |
| return false; |
| } |
| |
| bool ret = _peer->Open(); |
| |
| if (!ret) |
| { |
| _logger->log_error("Site2Site peer socket open failed"); |
| return false; |
| } |
| |
| // Negotiate the version |
| ret = initiateResourceNegotiation(); |
| |
| if (!ret) |
| { |
| _logger->log_error("Site2Site Protocol Version Negotiation failed"); |
| /* |
| _peer->yield(); |
| tearDown(); */ |
| return false; |
| } |
| |
| _logger->log_info("Site2Site socket established"); |
| _peerState = ESTABLISHED; |
| |
| return true; |
| } |
| |
| bool Site2SiteClientProtocol::initiateResourceNegotiation() |
| { |
| // Negotiate the version |
| if (_peerState != IDLE) |
| { |
| _logger->log_error("Site2Site peer state is not idle while initiateResourceNegotiation"); |
| return false; |
| } |
| |
| _logger->log_info("Negotiate protocol version with destination port %s current version %d", _portIdStr.c_str(), _currentVersion); |
| |
| int ret = _peer->writeUTF(this->getResourceName()); |
| |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| |
| ret = _peer->write(_currentVersion); |
| |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| |
| uint8_t statusCode; |
| ret = _peer->read(statusCode); |
| |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| |
| switch (statusCode) |
| { |
| case RESOURCE_OK: |
| _logger->log_info("Site2Site Protocol Negotiate protocol version OK"); |
| return true; |
| case DIFFERENT_RESOURCE_VERSION: |
| uint32_t serverVersion; |
| ret = _peer->read(serverVersion); |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| _logger->log_info("Site2Site Server Response asked for a different protocol version %d", serverVersion); |
| for (unsigned int i = (_currentVersionIndex + 1); i < sizeof(_supportedVersion)/sizeof(uint32_t); i++) |
| { |
| if (serverVersion >= _supportedVersion[i]) |
| { |
| _currentVersion = _supportedVersion[i]; |
| _currentVersionIndex = i; |
| return initiateResourceNegotiation(); |
| } |
| } |
| ret = -1; |
| // tearDown(); |
| return false; |
| case NEGOTIATED_ABORT: |
| _logger->log_info("Site2Site Negotiate protocol response ABORT"); |
| ret = -1; |
| // tearDown(); |
| return false; |
| default: |
| _logger->log_info("Negotiate protocol response unknown code %d", statusCode); |
| return true; |
| } |
| |
| return true; |
| } |
| |
| bool Site2SiteClientProtocol::initiateCodecResourceNegotiation() |
| { |
| // Negotiate the version |
| if (_peerState != HANDSHAKED) |
| { |
| _logger->log_error("Site2Site peer state is not handshaked while initiateCodecResourceNegotiation"); |
| return false; |
| } |
| |
| _logger->log_info("Negotiate Codec version with destination port %s current version %d", _portIdStr.c_str(), _currentCodecVersion); |
| |
| int ret = _peer->writeUTF(this->getCodecResourceName()); |
| |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| |
| ret = _peer->write(_currentCodecVersion); |
| |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| |
| uint8_t statusCode; |
| ret = _peer->read(statusCode); |
| |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| |
| switch (statusCode) |
| { |
| case RESOURCE_OK: |
| _logger->log_info("Site2Site Codec Negotiate version OK"); |
| return true; |
| case DIFFERENT_RESOURCE_VERSION: |
| uint32_t serverVersion; |
| ret = _peer->read(serverVersion); |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| _logger->log_info("Site2Site Server Response asked for a different codec version %d", serverVersion); |
| for (unsigned int i = (_currentCodecVersionIndex + 1); i < sizeof(_supportedCodecVersion)/sizeof(uint32_t); i++) |
| { |
| if (serverVersion >= _supportedCodecVersion[i]) |
| { |
| _currentCodecVersion = _supportedCodecVersion[i]; |
| _currentCodecVersionIndex = i; |
| return initiateCodecResourceNegotiation(); |
| } |
| } |
| ret = -1; |
| // tearDown(); |
| return false; |
| case NEGOTIATED_ABORT: |
| _logger->log_info("Site2Site Codec Negotiate response ABORT"); |
| ret = -1; |
| // tearDown(); |
| return false; |
| default: |
| _logger->log_info("Negotiate Codec response unknown code %d", statusCode); |
| return true; |
| } |
| |
| return true; |
| } |
| |
| bool Site2SiteClientProtocol::handShake() |
| { |
| if (_peerState != ESTABLISHED) |
| { |
| _logger->log_error("Site2Site peer state is not established while handshake"); |
| return false; |
| } |
| _logger->log_info("Site2Site Protocol Perform hand shake with destination port %s", _portIdStr.c_str()); |
| uuid_t uuid; |
| // Generate the global UUID for the com identify |
| uuid_generate(uuid); |
| char uuidStr[37]; |
| uuid_unparse(uuid, uuidStr); |
| _commsIdentifier = uuidStr; |
| |
| int ret = _peer->writeUTF(_commsIdentifier); |
| |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| |
| std::map<std::string, std::string> properties; |
| properties[HandShakePropertyStr[GZIP]] = "false"; |
| properties[HandShakePropertyStr[PORT_IDENTIFIER]] = _portIdStr; |
| properties[HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]] = std::to_string(this->_timeOut); |
| if (this->_currentVersion >= 5) |
| { |
| if (this->_batchCount > 0) |
| properties[HandShakePropertyStr[BATCH_COUNT]] = std::to_string(this->_batchCount); |
| if (this->_batchSize > 0) |
| properties[HandShakePropertyStr[BATCH_SIZE]] = std::to_string(this->_batchSize); |
| if (this->_batchDuration > 0) |
| properties[HandShakePropertyStr[BATCH_DURATION]] = std::to_string(this->_batchDuration); |
| } |
| |
| if (_currentVersion >= 3) |
| { |
| ret = _peer->writeUTF(_peer->getURL()); |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| } |
| |
| uint32_t size = properties.size(); |
| ret = _peer->write(size); |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| |
| std::map<std::string, std::string>::iterator it; |
| for (it = properties.begin(); it!= properties.end(); it++) |
| { |
| ret = _peer->writeUTF(it->first); |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| ret = _peer->writeUTF(it->second); |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| _logger->log_info("Site2Site Protocol Send handshake properties %s %s", it->first.c_str(), it->second.c_str()); |
| } |
| |
| RespondCode code; |
| std::string message; |
| |
| ret = this->readRespond(code, message); |
| |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| |
| switch (code) |
| { |
| case PROPERTIES_OK: |
| _logger->log_info("Site2Site HandShake Completed"); |
| _peerState = HANDSHAKED; |
| return true; |
| case PORT_NOT_IN_VALID_STATE: |
| case UNKNOWN_PORT: |
| case PORTS_DESTINATION_FULL: |
| _logger->log_error("Site2Site HandShake Failed because destination port is either invalid or full"); |
| ret = -1; |
| /* |
| _peer->yield(); |
| tearDown(); */ |
| return false; |
| default: |
| _logger->log_info("HandShake Failed because of unknown respond code %d", code); |
| ret = -1; |
| /* |
| _peer->yield(); |
| tearDown(); */ |
| return false; |
| } |
| |
| return false; |
| } |
| |
| void Site2SiteClientProtocol::tearDown() |
| { |
| if (_peerState >= ESTABLISHED) |
| { |
| _logger->log_info("Site2Site Protocol tearDown"); |
| // need to write shutdown request |
| writeRequestType(SHUTDOWN); |
| } |
| |
| std::map<std::string, Transaction *>::iterator it; |
| for (it = _transactionMap.begin(); it!= _transactionMap.end(); it++) |
| { |
| delete it->second; |
| } |
| _transactionMap.clear(); |
| _peer->Close(); |
| _peerState = IDLE; |
| } |
| |
| int Site2SiteClientProtocol::writeRequestType(RequestType type) |
| { |
| if (type >= MAX_REQUEST_TYPE) |
| return -1; |
| |
| return _peer->writeUTF(RequestTypeStr[type]); |
| } |
| |
| int Site2SiteClientProtocol::readRequestType(RequestType &type) |
| { |
| std::string requestTypeStr; |
| |
| int ret = _peer->readUTF(requestTypeStr); |
| |
| if (ret <= 0) |
| return ret; |
| |
| for (int i = (int) NEGOTIATE_FLOWFILE_CODEC; i <= (int) SHUTDOWN; i++) |
| { |
| if (RequestTypeStr[i] == requestTypeStr) |
| { |
| type = (RequestType) i; |
| return ret; |
| } |
| } |
| |
| return -1; |
| } |
| |
| int Site2SiteClientProtocol::readRespond(RespondCode &code, std::string &message) |
| { |
| uint8_t firstByte; |
| |
| int ret = _peer->read(firstByte); |
| |
| if (ret <= 0 || firstByte != CODE_SEQUENCE_VALUE_1) |
| return -1; |
| |
| uint8_t secondByte; |
| |
| ret = _peer->read(secondByte); |
| |
| if (ret <= 0 || secondByte != CODE_SEQUENCE_VALUE_2) |
| return -1; |
| |
| uint8_t thirdByte; |
| |
| ret = _peer->read(thirdByte); |
| |
| if (ret <= 0) |
| return ret; |
| |
| code = (RespondCode) thirdByte; |
| |
| RespondCodeContext *resCode = this->getRespondCodeContext(code); |
| |
| if ( resCode == NULL) |
| { |
| // Not a valid respond code |
| return -1; |
| } |
| if (resCode->hasDescription) |
| { |
| ret = _peer->readUTF(message); |
| if (ret <= 0) |
| return -1; |
| } |
| return 3 + message.size(); |
| } |
| |
| int Site2SiteClientProtocol::writeRespond(RespondCode code, std::string message) |
| { |
| RespondCodeContext *resCode = this->getRespondCodeContext(code); |
| |
| if (resCode == NULL) |
| { |
| // Not a valid respond code |
| return -1; |
| } |
| |
| uint8_t codeSeq[3]; |
| codeSeq[0] = CODE_SEQUENCE_VALUE_1; |
| codeSeq[1] = CODE_SEQUENCE_VALUE_2; |
| codeSeq[2] = (uint8_t) code; |
| |
| int ret = _peer->write(codeSeq, 3); |
| |
| if (ret != 3) |
| return -1; |
| |
| if (resCode->hasDescription) |
| { |
| ret = _peer->writeUTF(message); |
| if (ret > 0) |
| return (3 + ret); |
| else |
| return ret; |
| } |
| else |
| return 3; |
| } |
| |
| bool Site2SiteClientProtocol::negotiateCodec() |
| { |
| if (_peerState != HANDSHAKED) |
| { |
| _logger->log_error("Site2Site peer state is not handshaked while negotiate codec"); |
| return false; |
| } |
| |
| _logger->log_info("Site2Site Protocol Negotiate Codec with destination port %s", _portIdStr.c_str()); |
| |
| int status = this->writeRequestType(NEGOTIATE_FLOWFILE_CODEC); |
| |
| if (status <= 0) |
| { |
| // tearDown(); |
| return false; |
| } |
| |
| // Negotiate the codec version |
| bool ret = initiateCodecResourceNegotiation(); |
| |
| if (!ret) |
| { |
| _logger->log_error("Site2Site Codec Version Negotiation failed"); |
| /* |
| _peer->yield(); |
| tearDown(); */ |
| return false; |
| } |
| |
| _logger->log_info("Site2Site Codec Completed and move to READY state for data transfer"); |
| _peerState = READY; |
| |
| return true; |
| } |
| |
| bool Site2SiteClientProtocol::bootstrap() |
| { |
| if (_peerState == READY) |
| return true; |
| |
| tearDown(); |
| |
| if (establish() && handShake() && negotiateCodec()) |
| { |
| _logger->log_info("Site2Site Ready For data transaction"); |
| return true; |
| } |
| else |
| { |
| _peer->yield(); |
| tearDown(); |
| return false; |
| } |
| } |
| |
| Transaction* Site2SiteClientProtocol::createTransaction(std::string &transactionID, TransferDirection direction) |
| { |
| int ret; |
| bool dataAvailable; |
| Transaction *transaction = NULL; |
| |
| if (_peerState != READY) |
| { |
| bootstrap(); |
| } |
| |
| if (_peerState != READY) |
| { |
| return NULL; |
| } |
| |
| if (direction == RECEIVE) |
| { |
| ret = writeRequestType(RECEIVE_FLOWFILES); |
| |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return NULL; |
| } |
| |
| RespondCode code; |
| std::string message; |
| |
| ret = readRespond(code, message); |
| |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return NULL; |
| } |
| |
| switch (code) |
| { |
| case MORE_DATA: |
| dataAvailable = true; |
| _logger->log_info("Site2Site peer indicates that data is available"); |
| transaction = new Transaction(direction); |
| _transactionMap[transaction->getUUIDStr()] = transaction; |
| transactionID = transaction->getUUIDStr(); |
| transaction->setDataAvailable(dataAvailable); |
| _logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); |
| return transaction; |
| case NO_MORE_DATA: |
| dataAvailable = false; |
| _logger->log_info("Site2Site peer indicates that no data is available"); |
| transaction = new Transaction(direction); |
| _transactionMap[transaction->getUUIDStr()] = transaction; |
| transactionID = transaction->getUUIDStr(); |
| transaction->setDataAvailable(dataAvailable); |
| _logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); |
| return transaction; |
| default: |
| _logger->log_info("Site2Site got unexpected response %d when asking for data", code); |
| // tearDown(); |
| return NULL; |
| } |
| } |
| else |
| { |
| ret = writeRequestType(SEND_FLOWFILES); |
| |
| if (ret <= 0) |
| { |
| // tearDown(); |
| return NULL; |
| } |
| else |
| { |
| transaction = new Transaction(direction); |
| _transactionMap[transaction->getUUIDStr()] = transaction; |
| transactionID = transaction->getUUIDStr(); |
| _logger->log_info("Site2Site create transaction %s", transaction->getUUIDStr().c_str()); |
| return transaction; |
| } |
| } |
| } |
| |
| bool Site2SiteClientProtocol::receive(std::string transactionID, DataPacket *packet, bool &eof) |
| { |
| int ret; |
| Transaction *transaction = NULL; |
| |
| if (_peerState != READY) |
| { |
| bootstrap(); |
| } |
| |
| if (_peerState != READY) |
| { |
| return false; |
| } |
| |
| std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); |
| |
| if (it == _transactionMap.end()) |
| { |
| return false; |
| } |
| else |
| { |
| transaction = it->second; |
| } |
| |
| if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) |
| { |
| _logger->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); |
| return false; |
| } |
| |
| if (transaction->getDirection() != RECEIVE) |
| { |
| _logger->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); |
| return false; |
| } |
| |
| if (!transaction->isDataAvailable()) |
| { |
| eof = true; |
| return true; |
| } |
| |
| if (transaction->_transfers > 0) |
| { |
| // if we already has transfer before, check to see whether another one is available |
| RespondCode code; |
| std::string message; |
| |
| ret = readRespond(code, message); |
| |
| if (ret <= 0) |
| { |
| return false; |
| } |
| if (code == CONTINUE_TRANSACTION) |
| { |
| _logger->log_info("Site2Site transaction %s peer indicate continue transaction", transactionID.c_str()); |
| transaction->_dataAvailable = true; |
| } |
| else if (code == FINISH_TRANSACTION) |
| { |
| _logger->log_info("Site2Site transaction %s peer indicate finish transaction", transactionID.c_str()); |
| transaction->_dataAvailable = false; |
| } |
| else |
| { |
| _logger->log_info("Site2Site transaction %s peer indicate wrong respond code %d", transactionID.c_str(), code); |
| return false; |
| } |
| } |
| |
| if (!transaction->isDataAvailable()) |
| { |
| eof = true; |
| return true; |
| } |
| |
| // start to read the packet |
| uint32_t numAttributes; |
| ret = _peer->read(numAttributes, &transaction->_crc); |
| if (ret <= 0 || numAttributes > MAX_NUM_ATTRIBUTES) |
| { |
| return false; |
| } |
| |
| // read the attributes |
| for (unsigned int i = 0; i < numAttributes; i++) |
| { |
| std::string key; |
| std::string value; |
| ret = _peer->readUTF(key, true, &transaction->_crc); |
| if (ret <= 0) |
| { |
| return false; |
| } |
| ret = _peer->readUTF(value, true, &transaction->_crc); |
| if (ret <= 0) |
| { |
| return false; |
| } |
| packet->_attributes[key] = value; |
| _logger->log_info("Site2Site transaction %s receives attribute key %s value %s", transactionID.c_str(), key.c_str(), value.c_str()); |
| } |
| |
| uint64_t len; |
| ret = _peer->read(len, &transaction->_crc); |
| if (ret <= 0) |
| { |
| return false; |
| } |
| |
| packet->_size = len; |
| transaction->_transfers++; |
| transaction->_state = DATA_EXCHANGED; |
| transaction->_bytes += len; |
| _logger->log_info("Site2Site transaction %s receives flow record %d, total length %d", transactionID.c_str(), |
| transaction->_transfers, transaction->_bytes); |
| |
| return true; |
| } |
| |
| bool Site2SiteClientProtocol::send(std::string transactionID, DataPacket *packet, FlowFileRecord *flowFile, ProcessSession *session) |
| { |
| int ret; |
| Transaction *transaction = NULL; |
| |
| if (_peerState != READY) |
| { |
| bootstrap(); |
| } |
| |
| if (_peerState != READY) |
| { |
| return false; |
| } |
| |
| std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); |
| |
| if (it == _transactionMap.end()) |
| { |
| return false; |
| } |
| else |
| { |
| transaction = it->second; |
| } |
| |
| if (transaction->getState() != TRANSACTION_STARTED && transaction->getState() != DATA_EXCHANGED) |
| { |
| _logger->log_info("Site2Site transaction %s is not at started or exchanged state", transactionID.c_str()); |
| return false; |
| } |
| |
| if (transaction->getDirection() != SEND) |
| { |
| _logger->log_info("Site2Site transaction %s direction is wrong", transactionID.c_str()); |
| return false; |
| } |
| |
| if (transaction->_transfers > 0) |
| { |
| ret = writeRespond(CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION"); |
| if (ret <= 0) |
| { |
| return false; |
| } |
| } |
| |
| // start to read the packet |
| uint32_t numAttributes = packet->_attributes.size(); |
| ret = _peer->write(numAttributes, &transaction->_crc); |
| if (ret != 4) |
| { |
| return false; |
| } |
| |
| std::map<std::string, std::string>::iterator itAttribute; |
| for (itAttribute = packet->_attributes.begin(); itAttribute!= packet->_attributes.end(); itAttribute++) |
| { |
| ret = _peer->writeUTF(itAttribute->first, true, &transaction->_crc); |
| if (ret <= 0) |
| { |
| return false; |
| } |
| ret = _peer->writeUTF(itAttribute->second, true, &transaction->_crc); |
| if (ret <= 0) |
| { |
| return false; |
| } |
| _logger->log_info("Site2Site transaction %s send attribute key %s value %s", transactionID.c_str(), |
| itAttribute->first.c_str(), itAttribute->second.c_str()); |
| } |
| |
| uint64_t len = flowFile->getSize() ; |
| ret = _peer->write(len, &transaction->_crc); |
| if (ret != 8) |
| { |
| return false; |
| } |
| |
| if (flowFile->getSize()) |
| { |
| Site2SiteClientProtocol::ReadCallback callback(packet); |
| session->read(flowFile, &callback); |
| if (flowFile->getSize() != packet->_size) |
| { |
| return false; |
| } |
| } |
| |
| transaction->_transfers++; |
| transaction->_state = DATA_EXCHANGED; |
| transaction->_bytes += len; |
| _logger->log_info("Site2Site transaction %s send flow record %d, total length %d", transactionID.c_str(), |
| transaction->_transfers, transaction->_bytes); |
| |
| return true; |
| } |
| |
| void Site2SiteClientProtocol::receiveFlowFiles(ProcessContext *context, ProcessSession *session) |
| { |
| uint64_t bytes = 0; |
| int transfers = 0; |
| Transaction *transaction = NULL; |
| |
| if (_peerState != READY) |
| { |
| bootstrap(); |
| } |
| |
| if (_peerState != READY) |
| { |
| context->yield(); |
| tearDown(); |
| throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); |
| return; |
| } |
| |
| // Create the transaction |
| std::string transactionID; |
| transaction = createTransaction(transactionID, RECEIVE); |
| |
| if (transaction == NULL) |
| { |
| context->yield(); |
| tearDown(); |
| throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); |
| return; |
| } |
| |
| try |
| { |
| while (true) |
| { |
| std::map<std::string, std::string> empty; |
| DataPacket packet(this, transaction, empty); |
| bool eof = false; |
| |
| if (!receive(transactionID, &packet, eof)) |
| { |
| throw Exception(SITE2SITE_EXCEPTION, "Receive Failed"); |
| return; |
| } |
| if (eof) |
| { |
| // transaction done |
| break; |
| } |
| FlowFileRecord *flowFile = session->create(); |
| if (!flowFile) |
| { |
| throw Exception(SITE2SITE_EXCEPTION, "Flow File Creation Failed"); |
| return; |
| } |
| std::map<std::string, std::string>::iterator it; |
| for (it = packet._attributes.begin(); it!= packet._attributes.end(); it++) |
| { |
| flowFile->addAttribute(it->first, it->second); |
| } |
| |
| if (packet._size > 0) |
| { |
| Site2SiteClientProtocol::WriteCallback callback(&packet); |
| session->write(flowFile, &callback); |
| if (flowFile->getSize() != packet._size) |
| { |
| throw Exception(SITE2SITE_EXCEPTION, "Receive Size Not Right"); |
| return; |
| } |
| } |
| Relationship relation; // undefined relationship |
| session->transfer(flowFile, relation); |
| // receive the transfer for the flow record |
| bytes += packet._size; |
| transfers++; |
| } // while true |
| |
| if (!confirm(transactionID)) |
| { |
| throw Exception(SITE2SITE_EXCEPTION, "Confirm Transaction Failed"); |
| return; |
| } |
| if (!complete(transactionID)) |
| { |
| throw Exception(SITE2SITE_EXCEPTION, "Complete Transaction Failed"); |
| return; |
| } |
| _logger->log_info("Site2Site transaction %s successfully receive flow record %d, content bytes %d", |
| transactionID.c_str(), transfers, bytes); |
| // we yield the receive if we did not get anything |
| if (transfers == 0) |
| context->yield(); |
| } |
| catch (std::exception &exception) |
| { |
| if (transaction) |
| deleteTransaction(transactionID); |
| context->yield(); |
| tearDown(); |
| _logger->log_debug("Caught Exception %s", exception.what()); |
| throw; |
| } |
| catch (...) |
| { |
| if (transaction) |
| deleteTransaction(transactionID); |
| context->yield(); |
| tearDown(); |
| _logger->log_debug("Caught Exception during Site2SiteClientProtocol::receiveFlowFiles"); |
| throw; |
| } |
| |
| deleteTransaction(transactionID); |
| |
| return; |
| } |
| |
| bool Site2SiteClientProtocol::confirm(std::string transactionID) |
| { |
| int ret; |
| Transaction *transaction = NULL; |
| |
| if (_peerState != READY) |
| { |
| bootstrap(); |
| } |
| |
| if (_peerState != READY) |
| { |
| return false; |
| } |
| |
| std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); |
| |
| if (it == _transactionMap.end()) |
| { |
| return false; |
| } |
| else |
| { |
| transaction = it->second; |
| } |
| |
| if (transaction->getState() == TRANSACTION_STARTED && !transaction->isDataAvailable() && |
| transaction->getDirection() == RECEIVE) |
| { |
| transaction->_state = TRANSACTION_CONFIRMED; |
| return true; |
| } |
| |
| if (transaction->getState() != DATA_EXCHANGED) |
| return false; |
| |
| if (transaction->getDirection() == RECEIVE) |
| { |
| if (transaction->isDataAvailable()) |
| return false; |
| // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message |
| // to peer so that we can verify that the connection is still open. This is a two-phase commit, |
| // which helps to prevent the chances of data duplication. Without doing this, we may commit the |
| // session and then when we send the response back to the peer, the peer may have timed out and may not |
| // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the |
| // Critical Section involved in this transaction so that rather than the Critical Section being the |
| // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. |
| long crcValue = transaction->getCRC(); |
| std::string crc = std::to_string(crcValue); |
| _logger->log_info("Site2Site Send confirm with CRC %d to transaction %s", transaction->getCRC(), |
| transactionID.c_str()); |
| ret = writeRespond(CONFIRM_TRANSACTION, crc); |
| if (ret <= 0) |
| return false; |
| RespondCode code; |
| std::string message; |
| readRespond(code, message); |
| if (ret <= 0) |
| return false; |
| |
| if (code == CONFIRM_TRANSACTION) |
| { |
| _logger->log_info("Site2Site transaction %s peer confirm transaction", transactionID.c_str()); |
| transaction->_state = TRANSACTION_CONFIRMED; |
| return true; |
| } |
| else if (code == BAD_CHECKSUM) |
| { |
| _logger->log_info("Site2Site transaction %s peer indicate bad checksum", transactionID.c_str()); |
| /* |
| transaction->_state = TRANSACTION_CONFIRMED; |
| return true; */ |
| return false; |
| } |
| else |
| { |
| _logger->log_info("Site2Site transaction %s peer unknown respond code %d", |
| transactionID.c_str(), code); |
| return false; |
| } |
| } |
| else |
| { |
| _logger->log_info("Site2Site Send FINISH TRANSACTION for transaction %s", |
| transactionID.c_str()); |
| ret = writeRespond(FINISH_TRANSACTION, "FINISH_TRANSACTION"); |
| if (ret <= 0) |
| return false; |
| RespondCode code; |
| std::string message; |
| readRespond(code, message); |
| if (ret <= 0) |
| return false; |
| |
| // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response |
| if (code == CONFIRM_TRANSACTION) |
| { |
| _logger->log_info("Site2Site transaction %s peer confirm transaction with CRC %s", transactionID.c_str(), message.c_str()); |
| if (this->_currentVersion > 3) |
| { |
| long crcValue = transaction->getCRC(); |
| std::string crc = std::to_string(crcValue); |
| if (message == crc) |
| { |
| _logger->log_info("Site2Site transaction %s CRC matched", transactionID.c_str()); |
| ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); |
| if (ret <= 0) |
| return false; |
| transaction->_state = TRANSACTION_CONFIRMED; |
| return true; |
| } |
| else |
| { |
| _logger->log_info("Site2Site transaction %s CRC not matched %s", transactionID.c_str(), crc.c_str()); |
| ret = writeRespond(BAD_CHECKSUM, "BAD_CHECKSUM"); |
| /* |
| ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); |
| if (ret <= 0) |
| return false; |
| transaction->_state = TRANSACTION_CONFIRMED; |
| return true; */ |
| return false; |
| } |
| } |
| ret = writeRespond(CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION"); |
| if (ret <= 0) |
| return false; |
| transaction->_state = TRANSACTION_CONFIRMED; |
| return true; |
| } |
| else |
| { |
| _logger->log_info("Site2Site transaction %s peer unknown respond code %d", |
| transactionID.c_str(), code); |
| return false; |
| } |
| return false; |
| } |
| } |
| |
| void Site2SiteClientProtocol::cancel(std::string transactionID) |
| { |
| Transaction *transaction = NULL; |
| |
| if (_peerState != READY) |
| { |
| return; |
| } |
| |
| std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); |
| |
| if (it == _transactionMap.end()) |
| { |
| return; |
| } |
| else |
| { |
| transaction = it->second; |
| } |
| |
| if (transaction->getState() == TRANSACTION_CANCELED || transaction->getState() == TRANSACTION_COMPLETED |
| || transaction->getState() == TRANSACTION_ERROR) |
| { |
| return; |
| } |
| |
| this->writeRespond(CANCEL_TRANSACTION, "Cancel"); |
| transaction->_state = TRANSACTION_CANCELED; |
| |
| tearDown(); |
| return; |
| } |
| |
| void Site2SiteClientProtocol::deleteTransaction(std::string transactionID) |
| { |
| Transaction *transaction = NULL; |
| |
| std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); |
| |
| if (it == _transactionMap.end()) |
| { |
| return; |
| } |
| else |
| { |
| transaction = it->second; |
| } |
| |
| _logger->log_info("Site2Site delete transaction %s", transaction->getUUIDStr().c_str()); |
| delete transaction; |
| _transactionMap.erase(transactionID); |
| } |
| |
| void Site2SiteClientProtocol::error(std::string transactionID) |
| { |
| Transaction *transaction = NULL; |
| |
| std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); |
| |
| if (it == _transactionMap.end()) |
| { |
| return; |
| } |
| else |
| { |
| transaction = it->second; |
| } |
| |
| transaction->_state = TRANSACTION_ERROR; |
| tearDown(); |
| return; |
| } |
| |
| //! Complete the transaction |
| bool Site2SiteClientProtocol::complete(std::string transactionID) |
| { |
| int ret; |
| Transaction *transaction = NULL; |
| |
| if (_peerState != READY) |
| { |
| bootstrap(); |
| } |
| |
| if (_peerState != READY) |
| { |
| return false; |
| } |
| |
| std::map<std::string, Transaction *>::iterator it = this->_transactionMap.find(transactionID); |
| |
| if (it == _transactionMap.end()) |
| { |
| return false; |
| } |
| else |
| { |
| transaction = it->second; |
| } |
| |
| if (transaction->getState() != TRANSACTION_CONFIRMED) |
| { |
| return false; |
| } |
| |
| if (transaction->getDirection() == RECEIVE) |
| { |
| if (transaction->_transfers == 0) |
| { |
| transaction->_state = TRANSACTION_COMPLETED; |
| return true; |
| } |
| else |
| { |
| _logger->log_info("Site2Site transaction %s send finished", transactionID.c_str()); |
| ret = this->writeRespond(TRANSACTION_FINISHED, "Finished"); |
| if (ret <= 0) |
| return false; |
| else |
| { |
| transaction->_state = TRANSACTION_COMPLETED; |
| return true; |
| } |
| } |
| } |
| else |
| { |
| RespondCode code; |
| std::string message; |
| int ret; |
| |
| ret = readRespond(code, message); |
| |
| if (ret <= 0) |
| return false; |
| |
| if (code == TRANSACTION_FINISHED) |
| { |
| _logger->log_info("Site2Site transaction %s peer finished transaction", transactionID.c_str()); |
| transaction->_state = TRANSACTION_COMPLETED; |
| return true; |
| } |
| else |
| { |
| _logger->log_info("Site2Site transaction %s peer unknown respond code %d", |
| transactionID.c_str(), code); |
| return false; |
| } |
| } |
| } |
| |
| void Site2SiteClientProtocol::transferFlowFiles(ProcessContext *context, ProcessSession *session) |
| { |
| FlowFileRecord *flow = session->get(); |
| Transaction *transaction = NULL; |
| |
| if (!flow) |
| return; |
| |
| if (_peerState != READY) |
| { |
| bootstrap(); |
| } |
| |
| if (_peerState != READY) |
| { |
| context->yield(); |
| tearDown(); |
| throw Exception(SITE2SITE_EXCEPTION, "Can not establish handshake with peer"); |
| return; |
| } |
| |
| // Create the transaction |
| std::string transactionID; |
| transaction = createTransaction(transactionID, SEND); |
| |
| if (transaction == NULL) |
| { |
| context->yield(); |
| tearDown(); |
| throw Exception(SITE2SITE_EXCEPTION, "Can not create transaction"); |
| return; |
| } |
| |
| bool continueTransaction = true; |
| uint64_t startSendingNanos = getTimeNano(); |
| |
| try |
| { |
| while (continueTransaction) |
| { |
| DataPacket packet(this, transaction, flow->getAttributes()); |
| |
| if (!send(transactionID, &packet, flow, session)) |
| { |
| throw Exception(SITE2SITE_EXCEPTION, "Send Failed"); |
| return; |
| } |
| _logger->log_info("Site2Site transaction %s send flow record %s", |
| transactionID.c_str(), flow->getUUIDStr().c_str()); |
| session->remove(flow); |
| |
| uint64_t transferNanos = getTimeNano() - startSendingNanos; |
| if (transferNanos > _batchSendNanos) |
| break; |
| |
| flow = session->get(); |
| if (!flow) |
| { |
| continueTransaction = false; |
| } |
| } // while true |
| |
| if (!confirm(transactionID)) |
| { |
| throw Exception(SITE2SITE_EXCEPTION, "Confirm Failed"); |
| return; |
| } |
| if (!complete(transactionID)) |
| { |
| throw Exception(SITE2SITE_EXCEPTION, "Complete Failed"); |
| return; |
| } |
| _logger->log_info("Site2Site transaction %s successfully send flow record %d, content bytes %d", |
| transactionID.c_str(), transaction->_transfers, transaction->_bytes); |
| } |
| catch (std::exception &exception) |
| { |
| if (transaction) |
| deleteTransaction(transactionID); |
| context->yield(); |
| tearDown(); |
| _logger->log_debug("Caught Exception %s", exception.what()); |
| throw; |
| } |
| catch (...) |
| { |
| if (transaction) |
| deleteTransaction(transactionID); |
| context->yield(); |
| tearDown(); |
| _logger->log_debug("Caught Exception during Site2SiteClientProtocol::transferFlowFiles"); |
| throw; |
| } |
| |
| deleteTransaction(transactionID); |
| |
| return; |
| } |