| /** |
| * @file FlowControlProtocol.h |
| * FlowControlProtocol class declaration |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #ifndef __FLOW_CONTROL_PROTOCOL_H__ |
| #define __FLOW_CONTROL_PROTOCOL_H__ |
| |
| #include <stdio.h> |
| #include <unistd.h> |
| #include <sys/types.h> |
| #include <sys/socket.h> |
| #include <netinet/in.h> |
| #include <arpa/inet.h> |
| #include <fcntl.h> |
| #include <netdb.h> |
| #include <string> |
| #include <errno.h> |
| #include <chrono> |
| #include <thread> |
| #include "Logger.h" |
| #include "Configure.h" |
| #include "Property.h" |
| |
| //! Forwarder declaration |
| class FlowController; |
| |
| #define DEFAULT_NIFI_SERVER_PORT 9000 |
| #define DEFAULT_REPORT_INTERVAL 1000 // 1 sec |
| #define MAX_READ_TIMEOUT 30000 // 30 seconds |
| |
| //! FlowControl Protocol Msg Type |
| typedef enum { |
| REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version |
| REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval |
| REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info |
| REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property |
| MAX_FLOW_CONTROL_MSG_TYPE |
| } FlowControlMsgType; |
| |
| //! FlowControl Protocol Msg Type String |
| static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = |
| { |
| "REGISTER_REQ", |
| "REGISTER_RESP", |
| "REPORT_REQ", |
| "REPORT_RESP" |
| }; |
| |
| //! Flow Control Msg Type to String |
| inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) |
| { |
| if (type < MAX_FLOW_CONTROL_MSG_TYPE) |
| return FlowControlMsgTypeStr[type]; |
| else |
| return NULL; |
| } |
| |
| //! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV) |
| typedef enum { |
| //Fix length 8 bytes: client to server in register request, required field |
| FLOW_SERIAL_NUMBER, |
| // Flow XML name TLV: client to server in register request and report request, required field |
| FLOW_XML_NAME, |
| // Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server |
| FLOW_XML_CONTENT, |
| // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field |
| REPORT_INTERVAL, |
| // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property |
| PROCESSOR_NAME, |
| // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property |
| PROPERTY_NAME, |
| // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property |
| PROPERTY_VALUE, |
| // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server |
| REPORT_BLOB, |
| MAX_FLOW_MSG_ID |
| } FlowControlMsgID; |
| |
| //! FlowControl Protocol Msg ID String |
| static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = |
| { |
| "FLOW_SERIAL_NUMBER", |
| "FLOW_XML_NAME", |
| "FLOW_XML_CONTENT", |
| "REPORT_INTERVAL", |
| "PROCESSOR_NAME" |
| "PROPERTY_NAME", |
| "PROPERTY_VALUE", |
| "REPORT_BLOB" |
| }; |
| |
| #define TYPE_HDR_LEN 4 // Fix Hdr Type |
| #define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes |
| |
| //! FlowControl Protocol Msg Len |
| inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen) |
| { |
| if (id == FLOW_SERIAL_NUMBER) |
| return (TYPE_HDR_LEN + 8); |
| else if (id == REPORT_INTERVAL) |
| return (TYPE_HDR_LEN + 4); |
| else if (id < MAX_FLOW_MSG_ID) |
| return (TLV_HDR_LEN + payLoadLen); |
| else |
| return -1; |
| } |
| |
| //! Flow Control Msg Id to String |
| inline const char *FlowControlMsgIdToStr(FlowControlMsgID id) |
| { |
| if (id < MAX_FLOW_MSG_ID) |
| return FlowControlMsgIDStr[id]; |
| else |
| return NULL; |
| } |
| |
| //! Flow Control Respond status code |
| typedef enum { |
| RESP_SUCCESS, |
| RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register |
| RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller |
| RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller |
| RESP_FAILURE, |
| MAX_RESP_CODE |
| } FlowControlRespCode; |
| |
| //! FlowControl Resp Code str |
| static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = |
| { |
| "RESP_SUCCESS", |
| "RESP_TRIGGER_REGISTER", |
| "RESP_START_FLOW_CONTROLLER", |
| "RESP_STOP_FLOW_CONTROLLER", |
| "RESP_FAILURE" |
| }; |
| |
| //! Flow Control Resp Code to String |
| inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) |
| { |
| if (code < MAX_RESP_CODE) |
| return FlowControlRespCodeStr[code]; |
| else |
| return NULL; |
| } |
| |
| //! Common FlowControlProtocol Header |
| typedef struct { |
| uint32_t msgType; //! Msg Type |
| uint32_t seqNumber; //! Seq Number to match Req with Resp |
| uint32_t status; //! Resp Code, see FlowControlRespCode |
| uint32_t payloadLen; //! Msg Payload length |
| } FlowControlProtocolHeader; |
| |
| //! FlowControlProtocol Class |
| class FlowControlProtocol |
| { |
| public: |
| //! Constructor |
| /*! |
| * Create a new control protocol |
| */ |
| FlowControlProtocol(FlowController *controller) { |
| _controller = controller; |
| _logger = Logger::getLogger(); |
| _configure = Configure::getConfigure(); |
| _socket = 0; |
| _serverName = "localhost"; |
| _serverPort = DEFAULT_NIFI_SERVER_PORT; |
| _registered = false; |
| _seqNumber = 0; |
| _reportBlob = NULL; |
| _reportBlobLen = 0; |
| _reportInterval = DEFAULT_REPORT_INTERVAL; |
| _running = false; |
| |
| std::string value; |
| |
| if (_configure->get(Configure::nifi_server_name, value)) |
| { |
| _serverName = value; |
| _logger->log_info("NiFi Server Name %s", _serverName.c_str()); |
| } |
| if (_configure->get(Configure::nifi_server_port, value) && Property::StringToInt(value, _serverPort)) |
| { |
| _logger->log_info("NiFi Server Port: [%d]", _serverPort); |
| } |
| if (_configure->get(Configure::nifi_server_report_interval, value)) |
| { |
| TimeUnit unit; |
| if (Property::StringToTime(value, _reportInterval, unit) && |
| Property::ConvertTimeUnitToMS(_reportInterval, unit, _reportInterval)) |
| { |
| _logger->log_info("NiFi server report interval: [%d] ms", _reportInterval); |
| } |
| } |
| } |
| //! Destructor |
| virtual ~FlowControlProtocol() |
| { |
| stop(); |
| if (_socket) |
| close(_socket); |
| if (_reportBlob) |
| delete [] _reportBlob; |
| if (this->_thread) |
| delete this->_thread; |
| } |
| |
| public: |
| |
| //! SendRegisterRequest and Process Register Respond, return 0 for success |
| int sendRegisterReq(); |
| //! SendReportReq and Process Report Respond, return 0 for success |
| int sendReportReq(); |
| //! Start the flow control protocol |
| void start(); |
| //! Stop the flow control protocol |
| void stop(); |
| //! Set Report BLOB for periodically report |
| void setReportBlob(char *blob, int len) |
| { |
| std::lock_guard<std::mutex> lock(_mtx); |
| if (_reportBlob && _reportBlobLen >= len) |
| { |
| memcpy(_reportBlob, blob, len); |
| _reportBlobLen = len; |
| } |
| else |
| { |
| if (_reportBlob) |
| delete[] _reportBlob; |
| _reportBlob = new char[len]; |
| _reportBlobLen = len; |
| } |
| } |
| //! Run function for the thread |
| static void run(FlowControlProtocol *protocol); |
| //! set 8 bytes SerialNumber |
| void setSerialNumber(uint8_t *number) |
| { |
| memcpy(_serialNumber, number, 8); |
| } |
| |
| protected: |
| |
| private: |
| //! Connect to the socket, return sock descriptor if success, 0 for failure |
| int connectServer(const char *host, uint16_t port); |
| //! Send Data via the socket, return -1 for failure |
| int sendData(uint8_t *buf, int buflen); |
| //! Read length into buf, return -1 for failure and 0 for EOF |
| int readData(uint8_t *buf, int buflen); |
| //! Select on the socket |
| int selectClient(int msec); |
| //! Read the header |
| int readHdr(FlowControlProtocolHeader *hdr); |
| //! encode uint32_t |
| uint8_t *encode(uint8_t *buf, uint32_t value) |
| { |
| *buf++ = (value & 0xFF000000) >> 24; |
| *buf++ = (value & 0x00FF0000) >> 16; |
| *buf++ = (value & 0x0000FF00) >> 8; |
| *buf++ = (value & 0x000000FF); |
| return buf; |
| } |
| //! encode uint32_t |
| uint8_t *decode(uint8_t *buf, uint32_t &value) |
| { |
| value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3])); |
| return (buf + 4); |
| } |
| //! encode byte array |
| uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size) |
| { |
| memcpy(buf, bufArray, size); |
| buf += size; |
| return buf; |
| } |
| //! encode std::string |
| uint8_t *encode(uint8_t *buf, std::string value) |
| { |
| // add the \0 for size |
| buf = encode(buf, value.size()+1); |
| buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1); |
| return buf; |
| } |
| //! Mutex for protection |
| std::mutex _mtx; |
| //! Logger |
| Logger *_logger; |
| //! Configure |
| Configure *_configure; |
| //! NiFi server Name |
| std::string _serverName; |
| //! NiFi server port |
| int64_t _serverPort; |
| //! Serial Number |
| uint8_t _serialNumber[8]; |
| //! socket to server |
| int _socket; |
| //! report interal in msec |
| int64_t _reportInterval; |
| //! whether it was registered to the NiFi server |
| bool _registered; |
| //! seq number |
| uint32_t _seqNumber; |
| //! FlowController |
| FlowController *_controller; |
| //! report Blob |
| char *_reportBlob; |
| //! report Blob len; |
| int _reportBlobLen; |
| //! thread |
| std::thread *_thread; |
| //! whether it is running |
| bool _running; |
| // Prevent default copy constructor and assignment operation |
| // Only support pass by reference or pointer |
| FlowControlProtocol(const FlowControlProtocol &parent); |
| FlowControlProtocol &operator=(const FlowControlProtocol &parent); |
| |
| }; |
| |
| #endif |