blob: 2a517d77c7a2d6486715659d1ff9137bbd380eb0 [file] [log] [blame]
/**
* @file Site2SiteClientProtocol.h
* Site2SiteClientProtocol class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __SITE2SITE_CLIENT_PROTOCOL_H__
#define __SITE2SITE_CLIENT_PROTOCOL_H__
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <string>
#include <errno.h>
#include <chrono>
#include <thread>
#include <algorithm>
#include <uuid/uuid.h>
#include "Logger.h"
#include "Configure.h"
#include "Property.h"
#include "Site2SitePeer.h"
#include "FlowFileRecord.h"
#include "ProcessContext.h"
#include "ProcessSession.h"
//! Resource Negotiated Status Code
#define RESOURCE_OK 20
#define DIFFERENT_RESOURCE_VERSION 21
#define NEGOTIATED_ABORT 255
// ! Max attributes
#define MAX_NUM_ATTRIBUTES 25000
/**
* An enumeration for specifying the direction in which data should be
* transferred between a client and a remote NiFi instance.
*/
typedef enum {
/**
* * The client is to send data to the remote instance.
* */
SEND,
/**
* * The client is to receive data from the remote instance.
* */
RECEIVE
} TransferDirection;
//! Peer State
typedef enum {
/**
* * IDLE
* */
IDLE = 0,
/**
* * Socket Established
* */
ESTABLISHED,
/**
* * HandShake Done
* */
HANDSHAKED,
/**
* * After CodeDec Completion
* */
READY
} PeerState;
//! Transaction State
typedef enum {
/**
* * Transaction has been started but no data has been sent or received.
* */
TRANSACTION_STARTED,
/**
* * Transaction has been started and data has been sent or received.
* */
DATA_EXCHANGED,
/**
* * Data that has been transferred has been confirmed via its CRC.
* * Transaction is ready to be completed.
* */
TRANSACTION_CONFIRMED,
/**
* * Transaction has been successfully completed.
* */
TRANSACTION_COMPLETED,
/**
* * The Transaction has been canceled.
* */
TRANSACTION_CANCELED,
/**
* * The Transaction ended in an error.
* */
TRANSACTION_ERROR
} TransactionState;
//! Request Type
typedef enum {
NEGOTIATE_FLOWFILE_CODEC = 0,
REQUEST_PEER_LIST,
SEND_FLOWFILES,
RECEIVE_FLOWFILES,
SHUTDOWN,
MAX_REQUEST_TYPE
} RequestType;
//! Request Type Str
static const char *RequestTypeStr[MAX_REQUEST_TYPE] =
{
"NEGOTIATE_FLOWFILE_CODEC",
"REQUEST_PEER_LIST",
"SEND_FLOWFILES",
"RECEIVE_FLOWFILES",
"SHUTDOWN"
};
//! Respond Code
typedef enum {
RESERVED = 0,
// ResponseCode, so that we can indicate a 0 followed by some other bytes
// handshaking properties
PROPERTIES_OK = 1,
UNKNOWN_PROPERTY_NAME = 230,
ILLEGAL_PROPERTY_VALUE = 231,
MISSING_PROPERTY = 232,
// transaction indicators
CONTINUE_TRANSACTION = 10,
FINISH_TRANSACTION = 11,
CONFIRM_TRANSACTION = 12, // "Explanation" of this code is the checksum
TRANSACTION_FINISHED = 13,
TRANSACTION_FINISHED_BUT_DESTINATION_FULL = 14,
CANCEL_TRANSACTION = 15,
BAD_CHECKSUM = 19,
// data availability indicators
MORE_DATA = 20,
NO_MORE_DATA = 21,
// port state indicators
UNKNOWN_PORT = 200,
PORT_NOT_IN_VALID_STATE = 201,
PORTS_DESTINATION_FULL = 202,
// authorization
UNAUTHORIZED = 240,
// error indicators
ABORT = 250,
UNRECOGNIZED_RESPONSE_CODE = 254,
END_OF_STREAM = 255
} RespondCode;
//! Respond Code Class
typedef struct {
RespondCode code;
const char *description;
bool hasDescription;
} RespondCodeContext;
//! Respond Code Context
static RespondCodeContext respondCodeContext[] =
{
{RESERVED, "Reserved for Future Use", false},
{PROPERTIES_OK, "Properties OK", false},
{UNKNOWN_PROPERTY_NAME, "Unknown Property Name", true},
{ILLEGAL_PROPERTY_VALUE, "Illegal Property Value", true},
{MISSING_PROPERTY, "Missing Property", true},
{CONTINUE_TRANSACTION, "Continue Transaction", false},
{FINISH_TRANSACTION, "Finish Transaction", false},
{CONFIRM_TRANSACTION, "Confirm Transaction", true},
{TRANSACTION_FINISHED, "Transaction Finished", false},
{TRANSACTION_FINISHED_BUT_DESTINATION_FULL, "Transaction Finished But Destination is Full", false},
{CANCEL_TRANSACTION, "Cancel Transaction", true},
{BAD_CHECKSUM, "Bad Checksum", false},
{MORE_DATA, "More Data Exists", false},
{NO_MORE_DATA, "No More Data Exists", false},
{UNKNOWN_PORT, "Unknown Port", false},
{PORT_NOT_IN_VALID_STATE, "Port Not in a Valid State", true},
{PORTS_DESTINATION_FULL, "Port's Destination is Full", false},
{UNAUTHORIZED, "User Not Authorized", true},
{ABORT, "Abort", true},
{UNRECOGNIZED_RESPONSE_CODE, "Unrecognized Response Code", false},
{END_OF_STREAM, "End of Stream", false}
};
//! Respond Code Sequence Pattern
static const uint8_t CODE_SEQUENCE_VALUE_1 = (uint8_t) 'R';
static const uint8_t CODE_SEQUENCE_VALUE_2 = (uint8_t) 'C';
/**
* Enumeration of Properties that can be used for the Site-to-Site Socket
* Protocol.
*/
typedef enum {
/**
* Boolean value indicating whether or not the contents of a FlowFile should
* be GZipped when transferred.
*/
GZIP,
/**
* The unique identifier of the port to communicate with
*/
PORT_IDENTIFIER,
/**
* Indicates the number of milliseconds after the request was made that the
* client will wait for a response. If no response has been received by the
* time this value expires, the server can move on without attempting to
* service the request because the client will have already disconnected.
*/
REQUEST_EXPIRATION_MILLIS,
/**
* The preferred number of FlowFiles that the server should send to the
* client when pulling data. This property was introduced in version 5 of
* the protocol.
*/
BATCH_COUNT,
/**
* The preferred number of bytes that the server should send to the client
* when pulling data. This property was introduced in version 5 of the
* protocol.
*/
BATCH_SIZE,
/**
* The preferred amount of time that the server should send data to the
* client when pulling data. This property was introduced in version 5 of
* the protocol. Value is in milliseconds.
*/
BATCH_DURATION,
MAX_HANDSHAKE_PROPERTY
} HandshakeProperty;
//! HandShakeProperty Str
static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] =
{
/**
* Boolean value indicating whether or not the contents of a FlowFile should
* be GZipped when transferred.
*/
"GZIP",
/**
* The unique identifier of the port to communicate with
*/
"PORT_IDENTIFIER",
/**
* Indicates the number of milliseconds after the request was made that the
* client will wait for a response. If no response has been received by the
* time this value expires, the server can move on without attempting to
* service the request because the client will have already disconnected.
*/
"REQUEST_EXPIRATION_MILLIS",
/**
* The preferred number of FlowFiles that the server should send to the
* client when pulling data. This property was introduced in version 5 of
* the protocol.
*/
"BATCH_COUNT",
/**
* The preferred number of bytes that the server should send to the client
* when pulling data. This property was introduced in version 5 of the
* protocol.
*/
"BATCH_SIZE",
/**
* The preferred amount of time that the server should send data to the
* client when pulling data. This property was introduced in version 5 of
* the protocol. Value is in milliseconds.
*/
"BATCH_DURATION"
};
class Site2SiteClientProtocol;
//! Transaction Class
class Transaction
{
friend class Site2SiteClientProtocol;
public:
//! Constructor
/*!
* Create a new transaction
*/
Transaction(TransferDirection direction) {
_state = TRANSACTION_STARTED;
_direction = direction;
_dataAvailable = false;
_transfers = 0;
_bytes = 0;
char uuidStr[37];
// Generate the global UUID for the transaction
uuid_generate(_uuid);
uuid_unparse(_uuid, uuidStr);
_uuidStr = uuidStr;
}
//! Destructor
virtual ~Transaction()
{
}
//! getUUIDStr
std::string getUUIDStr()
{
return _uuidStr;
}
//! getState
TransactionState getState()
{
return _state;
}
//! isDataAvailable
bool isDataAvailable()
{
return _dataAvailable;
}
//! setDataAvailable()
void setDataAvailable(bool value)
{
_dataAvailable = value;
}
//! getDirection
TransferDirection getDirection()
{
return _direction;
}
//! getCRC
long getCRC()
{
return _crc.getCRC();
}
//! updateCRC
void updateCRC(uint8_t *buffer, uint32_t length)
{
_crc.update(buffer, length);
}
protected:
private:
//! Transaction State
TransactionState _state;
//! Transaction Direction
TransferDirection _direction;
//! Whether received data is available
bool _dataAvailable;
//! A global unique identifier
uuid_t _uuid;
//! UUID string
std::string _uuidStr;
//! Number of transfer
int _transfers;
//! Number of content bytes
uint64_t _bytes;
//! CRC32
CRC32 _crc;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
Transaction(const Transaction &parent);
Transaction &operator=(const Transaction &parent);
};
/**
* Represents a piece of data that is to be sent to or that was received from a
* NiFi instance.
*/
class DataPacket
{
public:
DataPacket(Site2SiteClientProtocol *protocol, Transaction *transaction,
std::map<std::string, std::string> attributes) {
_protocol = protocol;
_size = 0;
_transaction = transaction;
_attributes = attributes;
}
std::map<std::string, std::string> _attributes;
uint64_t _size;
Site2SiteClientProtocol *_protocol;
Transaction *_transaction;
};
//! Site2SiteClientProtocol Class
class Site2SiteClientProtocol
{
public:
//! Constructor
/*!
* Create a new control protocol
*/
Site2SiteClientProtocol(Site2SitePeer *peer) {
_logger = Logger::getLogger();
_configure = Configure::getConfigure();
_peer = peer;
_batchSize = 0;
_batchCount = 0;
_batchDuration = 0;
_batchSendNanos = 5000000000; // 5 seconds
_timeOut = 30000; // 30 seconds
_peerState = IDLE;
_supportedVersion[0] = 5;
_supportedVersion[1] = 4;
_supportedVersion[2] = 3;
_supportedVersion[3] = 2;
_supportedVersion[4] = 1;
_currentVersion = _supportedVersion[0];
_currentVersionIndex = 0;
_supportedCodecVersion[0] = 1;
_currentCodecVersion = _supportedCodecVersion[0];
_currentCodecVersionIndex = 0;
}
//! Destructor
virtual ~Site2SiteClientProtocol()
{
}
public:
//! setBatchSize
void setBatchSize(uint64_t size)
{
_batchSize = size;
}
//! setBatchCount
void setBatchCount(uint64_t count)
{
_batchCount = count;
}
//! setBatchDuration
void setBatchDuration(uint64_t duration)
{
_batchDuration = duration;
}
//! setTimeOut
void setTimeOut(uint64_t time)
{
_timeOut = time;
if (_peer)
_peer->setTimeOut(time);
}
//! setPortId
void setPortId(uuid_t id)
{
uuid_copy(_portId, id);
char idStr[37];
uuid_unparse(id, idStr);
_portIdStr = idStr;
}
//! getResourceName
std::string getResourceName()
{
return "SocketFlowFileProtocol";
}
//! getCodecResourceName
std::string getCodecResourceName()
{
return "StandardFlowFileCodec";
}
//! bootstrap the protocol to the ready for transaction state by going through the state machine
bool bootstrap();
//! establish
bool establish();
//! handShake
bool handShake();
//! negotiateCodec
bool negotiateCodec();
//! initiateResourceNegotiation
bool initiateResourceNegotiation();
//! initiateCodecResourceNegotiation
bool initiateCodecResourceNegotiation();
//! tearDown
void tearDown();
//! write Request Type
int writeRequestType(RequestType type);
//! read Request Type
int readRequestType(RequestType &type);
//! read Respond
int readRespond(RespondCode &code, std::string &message);
//! write respond
int writeRespond(RespondCode code, std::string message);
//! getRespondCodeContext
RespondCodeContext *getRespondCodeContext(RespondCode code)
{
for (unsigned int i = 0; i < sizeof(respondCodeContext)/sizeof(RespondCodeContext); i++)
{
if (respondCodeContext[i].code == code)
{
return &respondCodeContext[i];
}
}
return NULL;
}
//! getPeer
Site2SitePeer *getPeer()
{
return _peer;
}
//! Creation of a new transaction, return the transaction ID if success,
//! Return NULL when any error occurs
Transaction *createTransaction(std::string &transactionID, TransferDirection direction);
//! Receive the data packet from the transaction
//! Return false when any error occurs
bool receive(std::string transactionID, DataPacket *packet, bool &eof);
//! Send the data packet from the transaction
//! Return false when any error occurs
bool send(std::string transactionID, DataPacket *packet, FlowFileRecord *flowFile, ProcessSession *session);
//! Confirm the data that was sent or received by comparing CRC32's of the data sent and the data received.
bool confirm(std::string transactionID);
//! Cancel the transaction
void cancel(std::string transactionID);
//! Complete the transaction
bool complete(std::string transactionID);
//! Error the transaction
void error(std::string transactionID);
//! Receive flow files for the process session
void receiveFlowFiles(ProcessContext *context, ProcessSession *session);
//! Transfer flow files for the process session
void transferFlowFiles(ProcessContext *context, ProcessSession *session);
//! deleteTransaction
void deleteTransaction(std::string transactionID);
//! Nest Callback Class for write stream
class WriteCallback : public OutputStreamCallback
{
public:
WriteCallback(DataPacket *packet)
: _packet(packet) {}
DataPacket *_packet;
void process(std::ofstream *stream) {
uint8_t buffer[8192];
int len = _packet->_size;
while (len > 0)
{
int size = std::min(len, (int) sizeof(buffer));
int ret = _packet->_protocol->_peer->readData(buffer, size, &_packet->_transaction->_crc);
if (ret != size)
{
_packet->_protocol->_logger->log_error("Site2Site Receive Flow Size %d Failed %d", size, ret);
break;
}
stream->write((const char *) buffer, size);
len -= size;
}
}
};
//! Nest Callback Class for read stream
class ReadCallback : public InputStreamCallback
{
public:
ReadCallback(DataPacket *packet)
: _packet(packet) {}
DataPacket *_packet;
void process(std::ifstream *stream) {
_packet->_size = 0;
uint8_t buffer[8192];
int readSize;
while (stream->good())
{
if (!stream->read((char *)buffer, 8192))
readSize = stream->gcount();
else
readSize = 8192;
int ret = _packet->_protocol->_peer->write(buffer, readSize, &_packet->_transaction->_crc);
if (ret != readSize)
{
_packet->_protocol->_logger->log_error("Site2Site Send Flow Size %d Failed %d", readSize, ret);
break;
}
_packet->_size += readSize;
}
}
};
protected:
private:
//! Mutex for protection
std::mutex _mtx;
//! Logger
Logger *_logger;
//! Configure
Configure *_configure;
//! Batch Count
std::atomic<uint64_t> _batchCount;
//! Batch Size
std::atomic<uint64_t> _batchSize;
//! Batch Duration in msec
std::atomic<uint64_t> _batchDuration;
//! Timeout in msec
std::atomic<uint64_t> _timeOut;
//! Peer Connection
Site2SitePeer *_peer;
//! portId
uuid_t _portId;
//! portIDStr
std::string _portIdStr;
//! BATCH_SEND_NANOS
uint64_t _batchSendNanos;
//! Peer State
PeerState _peerState;
uint32_t _supportedVersion[5];
uint32_t _currentVersion;
int _currentVersionIndex;
uint32_t _supportedCodecVersion[1];
uint32_t _currentCodecVersion;
int _currentCodecVersionIndex;
//! commsIdentifier
std::string _commsIdentifier;
//! transaction map
std::map<std::string, Transaction *> _transactionMap;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
Site2SiteClientProtocol(const Site2SiteClientProtocol &parent);
Site2SiteClientProtocol &operator=(const Site2SiteClientProtocol &parent);
};
#endif