* @file FlowControlProtocol.h
* FlowControlProtocol class declaration
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <string>
#include <errno.h>
#include <chrono>
#include <thread>
#include "Logger.h"
#include "Configure.h"
#include "Property.h"
//! Forwarder declaration
class FlowController;
#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec
#define MAX_READ_TIMEOUT 30000 // 30 seconds
//! FlowControl Protocol Msg Type
typedef enum {
REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version
REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval
REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info
REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property
} FlowControlMsgType;
//! FlowControl Protocol Msg Type String
static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] =
//! Flow Control Msg Type to String
inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type)
return FlowControlMsgTypeStr[type];
return NULL;
//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV)
typedef enum {
//Fix length 8 bytes: client to server in register request, required field
// Flow XML name TLV: client to server in register request and report request, required field
// Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server
// Fix length, 4 bytes Report interval in msec: server to client in register respond, option field
// Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
// Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
// Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property
// Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server
} FlowControlMsgID;
//! FlowControl Protocol Msg ID String
static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] =
#define TYPE_HDR_LEN 4 // Fix Hdr Type
#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes
//! FlowControl Protocol Msg Len
inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen)
return (TYPE_HDR_LEN + 8);
else if (id == REPORT_INTERVAL)
return (TYPE_HDR_LEN + 4);
else if (id < MAX_FLOW_MSG_ID)
return (TLV_HDR_LEN + payLoadLen);
return -1;
//! Flow Control Msg Id to String
inline const char *FlowControlMsgIdToStr(FlowControlMsgID id)
if (id < MAX_FLOW_MSG_ID)
return FlowControlMsgIDStr[id];
return NULL;
//! Flow Control Respond status code
typedef enum {
RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register
RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller
RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller
} FlowControlRespCode;
//! FlowControl Resp Code str
static const char *FlowControlRespCodeStr[MAX_RESP_CODE] =
//! Flow Control Resp Code to String
inline const char *FlowControlRespCodeToStr(FlowControlRespCode code)
if (code < MAX_RESP_CODE)
return FlowControlRespCodeStr[code];
return NULL;
//! Common FlowControlProtocol Header
typedef struct {
uint32_t msgType; //! Msg Type
uint32_t seqNumber; //! Seq Number to match Req with Resp
uint32_t status; //! Resp Code, see FlowControlRespCode
uint32_t payloadLen; //! Msg Payload length
} FlowControlProtocolHeader;
//! FlowControlProtocol Class
class FlowControlProtocol
//! Constructor
* Create a new control protocol
FlowControlProtocol(FlowController *controller) {
_controller = controller;
_logger = Logger::getLogger();
_configure = Configure::getConfigure();
_socket = 0;
_serverName = "localhost";
_registered = false;
_seqNumber = 0;
_reportBlob = NULL;
_reportBlobLen = 0;
_running = false;
std::string value;
if (_configure->get(Configure::nifi_server_name, value))
_serverName = value;
_logger->log_info("NiFi Server Name %s", _serverName.c_str());
if (_configure->get(Configure::nifi_server_port, value) && Property::StringToInt(value, _serverPort))
_logger->log_info("NiFi Server Port: [%d]", _serverPort);
if (_configure->get(Configure::nifi_server_report_interval, value))
TimeUnit unit;
if (Property::StringToTime(value, _reportInterval, unit) &&
Property::ConvertTimeUnitToMS(_reportInterval, unit, _reportInterval))
_logger->log_info("NiFi server report interval: [%d] ms", _reportInterval);
_reportInterval = 0;
//! Destructor
virtual ~FlowControlProtocol()
if (_socket)
if (_reportBlob)
delete [] _reportBlob;
if (this->_thread)
delete this->_thread;
//! SendRegisterRequest and Process Register Respond, return 0 for success
int sendRegisterReq();
//! SendReportReq and Process Report Respond, return 0 for success
int sendReportReq();
//! Start the flow control protocol
void start();
//! Stop the flow control protocol
void stop();
//! Set Report BLOB for periodically report
void setReportBlob(char *blob, int len)
std::lock_guard<std::mutex> lock(_mtx);
if (_reportBlob && _reportBlobLen >= len)
memcpy(_reportBlob, blob, len);
_reportBlobLen = len;
if (_reportBlob)
delete[] _reportBlob;
_reportBlob = new char[len];
_reportBlobLen = len;
//! Run function for the thread
static void run(FlowControlProtocol *protocol);
//! set 8 bytes SerialNumber
void setSerialNumber(uint8_t *number)
memcpy(_serialNumber, number, 8);
//! Connect to the socket, return sock descriptor if success, 0 for failure
int connectServer(const char *host, uint16_t port);
//! Send Data via the socket, return -1 for failure
int sendData(uint8_t *buf, int buflen);
//! Read length into buf, return -1 for failure and 0 for EOF
int readData(uint8_t *buf, int buflen);
//! Select on the socket
int selectClient(int msec);
//! Read the header
int readHdr(FlowControlProtocolHeader *hdr);
//! encode uint32_t
uint8_t *encode(uint8_t *buf, uint32_t value)
*buf++ = (value & 0xFF000000) >> 24;
*buf++ = (value & 0x00FF0000) >> 16;
*buf++ = (value & 0x0000FF00) >> 8;
*buf++ = (value & 0x000000FF);
return buf;
//! encode uint32_t
uint8_t *decode(uint8_t *buf, uint32_t &value)
value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3]));
return (buf + 4);
//! encode byte array
uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size)
memcpy(buf, bufArray, size);
buf += size;
return buf;
//! encode std::string
uint8_t *encode(uint8_t *buf, std::string value)
// add the \0 for size
buf = encode(buf, value.size()+1);
buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1);
return buf;
//! Mutex for protection
std::mutex _mtx;
//! Logger
Logger *_logger;
//! Configure
Configure *_configure;
//! NiFi server Name
std::string _serverName;
//! NiFi server port
int64_t _serverPort;
//! Serial Number
uint8_t _serialNumber[8];
//! socket to server
int _socket;
//! report interal in msec
int64_t _reportInterval;
//! whether it was registered to the NiFi server
bool _registered;
//! seq number
uint32_t _seqNumber;
//! FlowController
FlowController *_controller;
//! report Blob
char *_reportBlob;
//! report Blob len;
int _reportBlobLen;
//! thread
std::thread *_thread;
//! whether it is running
bool _running;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
FlowControlProtocol(const FlowControlProtocol &parent);
FlowControlProtocol &operator=(const FlowControlProtocol &parent);