blob: 23f2d4993a5b101f0e0dac7aeda8b4117cb6a1d4 [file] [log] [blame]
/**
* @file FlowControlProtocol.h
* FlowControlProtocol class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __FLOW_CONTROL_PROTOCOL_H__
#define __FLOW_CONTROL_PROTOCOL_H__
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <string>
#include <errno.h>
#include <chrono>
#include <thread>
#include "Logger.h"
#include "Configure.h"
#include "Property.h"
//! Forwarder declaration
class FlowController;
#define DEFAULT_NIFI_SERVER_PORT 9000
#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec
#define MAX_READ_TIMEOUT 30000 // 30 seconds
//! FlowControl Protocol Msg Type
typedef enum {
REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version
REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval
REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info
REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property
MAX_FLOW_CONTROL_MSG_TYPE
} FlowControlMsgType;
//! FlowControl Protocol Msg Type String
static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] =
{
"REGISTER_REQ",
"REGISTER_RESP",
"REPORT_REQ",
"REPORT_RESP"
};
//! Flow Control Msg Type to String
inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type)
{
if (type < MAX_FLOW_CONTROL_MSG_TYPE)
return FlowControlMsgTypeStr[type];
else
return NULL;
}
//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV)
typedef enum {
//Fix length 8 bytes: client to server in register request, required field
FLOW_SERIAL_NUMBER,
// Flow XML name TLV: client to server in register request and report request, required field
FLOW_XML_NAME,
// Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server
FLOW_XML_CONTENT,
// Fix length, 4 bytes Report interval in msec: server to client in register respond, option field
REPORT_INTERVAL,
// Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
PROCESSOR_NAME,
// Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
PROPERTY_NAME,
// Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property
PROPERTY_VALUE,
// Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server
REPORT_BLOB,
MAX_FLOW_MSG_ID
} FlowControlMsgID;
//! FlowControl Protocol Msg ID String
static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] =
{
"FLOW_SERIAL_NUMBER",
"FLOW_XML_NAME",
"FLOW_XML_CONTENT",
"REPORT_INTERVAL",
"PROCESSOR_NAME"
"PROPERTY_NAME",
"PROPERTY_VALUE",
"REPORT_BLOB"
};
#define TYPE_HDR_LEN 4 // Fix Hdr Type
#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes
//! FlowControl Protocol Msg Len
inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen)
{
if (id == FLOW_SERIAL_NUMBER)
return (TYPE_HDR_LEN + 8);
else if (id == REPORT_INTERVAL)
return (TYPE_HDR_LEN + 4);
else if (id < MAX_FLOW_MSG_ID)
return (TLV_HDR_LEN + payLoadLen);
else
return -1;
}
//! Flow Control Msg Id to String
inline const char *FlowControlMsgIdToStr(FlowControlMsgID id)
{
if (id < MAX_FLOW_MSG_ID)
return FlowControlMsgIDStr[id];
else
return NULL;
}
//! Flow Control Respond status code
typedef enum {
RESP_SUCCESS,
RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register
RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller
RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller
RESP_FAILURE,
MAX_RESP_CODE
} FlowControlRespCode;
//! FlowControl Resp Code str
static const char *FlowControlRespCodeStr[MAX_RESP_CODE] =
{
"RESP_SUCCESS",
"RESP_TRIGGER_REGISTER",
"RESP_START_FLOW_CONTROLLER",
"RESP_STOP_FLOW_CONTROLLER",
"RESP_FAILURE"
};
//! Flow Control Resp Code to String
inline const char *FlowControlRespCodeToStr(FlowControlRespCode code)
{
if (code < MAX_RESP_CODE)
return FlowControlRespCodeStr[code];
else
return NULL;
}
//! Common FlowControlProtocol Header
typedef struct {
uint32_t msgType; //! Msg Type
uint32_t seqNumber; //! Seq Number to match Req with Resp
uint32_t status; //! Resp Code, see FlowControlRespCode
uint32_t payloadLen; //! Msg Payload length
} FlowControlProtocolHeader;
//! FlowControlProtocol Class
class FlowControlProtocol
{
public:
//! Constructor
/*!
* Create a new control protocol
*/
FlowControlProtocol(FlowController *controller) {
_controller = controller;
_logger = Logger::getLogger();
_configure = Configure::getConfigure();
_socket = 0;
_serverName = "localhost";
_serverPort = DEFAULT_NIFI_SERVER_PORT;
_registered = false;
_seqNumber = 0;
_reportBlob = NULL;
_reportBlobLen = 0;
_reportInterval = DEFAULT_REPORT_INTERVAL;
_running = false;
std::string value;
if (_configure->get(Configure::nifi_server_name, value))
{
_serverName = value;
_logger->log_info("NiFi Server Name %s", _serverName.c_str());
}
if (_configure->get(Configure::nifi_server_port, value) && Property::StringToInt(value, _serverPort))
{
_logger->log_info("NiFi Server Port: [%d]", _serverPort);
}
if (_configure->get(Configure::nifi_server_report_interval, value))
{
TimeUnit unit;
if (Property::StringToTime(value, _reportInterval, unit) &&
Property::ConvertTimeUnitToMS(_reportInterval, unit, _reportInterval))
{
_logger->log_info("NiFi server report interval: [%d] ms", _reportInterval);
}
}
}
//! Destructor
virtual ~FlowControlProtocol()
{
stop();
if (_socket)
close(_socket);
if (_reportBlob)
delete [] _reportBlob;
if (this->_thread)
delete this->_thread;
}
public:
//! SendRegisterRequest and Process Register Respond, return 0 for success
int sendRegisterReq();
//! SendReportReq and Process Report Respond, return 0 for success
int sendReportReq();
//! Start the flow control protocol
void start();
//! Stop the flow control protocol
void stop();
//! Set Report BLOB for periodically report
void setReportBlob(char *blob, int len)
{
std::lock_guard<std::mutex> lock(_mtx);
if (_reportBlob && _reportBlobLen >= len)
{
memcpy(_reportBlob, blob, len);
_reportBlobLen = len;
}
else
{
if (_reportBlob)
delete[] _reportBlob;
_reportBlob = new char[len];
_reportBlobLen = len;
}
}
//! Run function for the thread
static void run(FlowControlProtocol *protocol);
//! set 8 bytes SerialNumber
void setSerialNumber(uint8_t *number)
{
memcpy(_serialNumber, number, 8);
}
protected:
private:
//! Connect to the socket, return sock descriptor if success, 0 for failure
int connectServer(const char *host, uint16_t port);
//! Send Data via the socket, return -1 for failure
int sendData(uint8_t *buf, int buflen);
//! Read length into buf, return -1 for failure and 0 for EOF
int readData(uint8_t *buf, int buflen);
//! Select on the socket
int selectClient(int msec);
//! Read the header
int readHdr(FlowControlProtocolHeader *hdr);
//! encode uint32_t
uint8_t *encode(uint8_t *buf, uint32_t value)
{
*buf++ = (value & 0xFF000000) >> 24;
*buf++ = (value & 0x00FF0000) >> 16;
*buf++ = (value & 0x0000FF00) >> 8;
*buf++ = (value & 0x000000FF);
return buf;
}
//! encode uint32_t
uint8_t *decode(uint8_t *buf, uint32_t &value)
{
value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3]));
return (buf + 4);
}
//! encode byte array
uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size)
{
memcpy(buf, bufArray, size);
buf += size;
return buf;
}
//! encode std::string
uint8_t *encode(uint8_t *buf, std::string value)
{
// add the \0 for size
buf = encode(buf, value.size()+1);
buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1);
return buf;
}
//! Mutex for protection
std::mutex _mtx;
//! Logger
Logger *_logger;
//! Configure
Configure *_configure;
//! NiFi server Name
std::string _serverName;
//! NiFi server port
int64_t _serverPort;
//! Serial Number
uint8_t _serialNumber[8];
//! socket to server
int _socket;
//! report interal in msec
int64_t _reportInterval;
//! whether it was registered to the NiFi server
bool _registered;
//! seq number
uint32_t _seqNumber;
//! FlowController
FlowController *_controller;
//! report Blob
char *_reportBlob;
//! report Blob len;
int _reportBlobLen;
//! thread
std::thread *_thread;
//! whether it is running
bool _running;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
FlowControlProtocol(const FlowControlProtocol &parent);
FlowControlProtocol &operator=(const FlowControlProtocol &parent);
};
#endif