| /** |
| * @file FlowControlProtocol.cpp |
| * FlowControlProtocol class implementation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "FlowControlProtocol.h" |
| |
| #include <chrono> |
| #include <cstdio> |
| #include <ctime> |
| #include <thread> |
| #include <string> |
| #include <random> |
| #include <iostream> |
| #include <cinttypes> |
| |
| #include "FlowController.h" |
| #include "core/Core.h" |
| #include "utils/gsl.h" |
| |
| namespace org { |
| namespace apache { |
| namespace nifi { |
| namespace minifi { |
| |
| int FlowControlProtocol::connectServer(const char *host, uint16_t port) { |
| return 0; |
| } |
| |
| int FlowControlProtocol::sendData(uint8_t *buf, int buflen) { |
| return 0; |
| } |
| |
| int FlowControlProtocol::selectClient(int msec) { |
| fd_set fds; |
| struct timeval tv{}; |
| int retval; |
| int fd = _socket; |
| |
| FD_ZERO(&fds); |
| FD_SET(fd, &fds); |
| |
| tv.tv_sec = msec / 1000; |
| tv.tv_usec = (msec % 1000) * 1000; |
| |
| if (msec > 0) |
| retval = select(fd + 1, &fds, nullptr, nullptr, &tv); |
| else |
| retval = select(fd + 1, &fds, nullptr, nullptr, nullptr); |
| |
| if (retval <= 0) |
| return retval; |
| if (FD_ISSET(fd, &fds)) |
| return retval; |
| else |
| return 0; |
| } |
| |
| int FlowControlProtocol::readData(uint8_t *buf, int buflen) { |
| int sendSize = buflen; |
| |
| while (buflen) { |
| int status; |
| status = selectClient(MAX_READ_TIMEOUT); |
| if (status <= 0) { |
| return status; |
| } |
| #ifndef __MACH__ |
| status = read(_socket, buf, buflen); |
| #else |
| status = recv(_socket, buf, buflen, 0); |
| #endif |
| if (status <= 0) { |
| return status; |
| } |
| buflen -= status; |
| buf += status; |
| } |
| |
| return sendSize; |
| } |
| |
| int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr) { |
| uint8_t buffer[sizeof(FlowControlProtocolHeader)]; |
| |
| uint8_t *data = buffer; |
| |
| int status = readData(buffer, sizeof(FlowControlProtocolHeader)); |
| if (status <= 0) |
| return status; |
| |
| uint32_t value; |
| data = this->decode(data, value); |
| hdr->msgType = value; |
| |
| data = this->decode(data, value); |
| hdr->seqNumber = value; |
| |
| data = this->decode(data, value); |
| hdr->status = value; |
| |
| data = this->decode(data, value); |
| hdr->payloadLen = value; |
| |
| return sizeof(FlowControlProtocolHeader); |
| } |
| |
| void FlowControlProtocol::start() { |
| if (_reportInterval <= 0) |
| return; |
| if (running_) |
| return; |
| running_ = true; |
| logger_->log_trace("FlowControl Protocol Start"); |
| _thread = std::thread(run, this); |
| _thread.detach(); |
| } |
| |
| void FlowControlProtocol::stop() { |
| if (!running_) |
| return; |
| running_ = false; |
| logger_->log_info("FlowControl Protocol Stop"); |
| } |
| |
| void FlowControlProtocol::run(FlowControlProtocol *protocol) { |
| while (protocol->running_) { |
| std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval)); |
| if (!protocol->_registered) { |
| // if it is not register yet |
| protocol->sendRegisterReq(); |
| } else { |
| protocol->sendReportReq(); |
| } |
| } |
| } |
| |
| int FlowControlProtocol::sendRegisterReq() { |
| if (_registered) { |
| logger_->log_debug("Already registered"); |
| return -1; |
| } |
| |
| uint16_t port = this->_serverPort; |
| |
| if (this->_socket <= 0) |
| this->_socket = connectServer(_serverName.c_str(), port); |
| |
| if (this->_socket <= 0) |
| return -1; |
| |
| // Calculate the total payload msg size |
| const auto payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0) + FlowControlMsgIDEncodingLen(FLOW_YML_NAME, gsl::narrow<int>(this->_controller->getName().size() + 1)); |
| const size_t size = sizeof(FlowControlProtocolHeader) + payloadSize; |
| |
| std::vector<uint8_t> buffer; |
| buffer.resize(size); |
| uint8_t *data = buffer.data(); |
| |
| // encode the HDR |
| FlowControlProtocolHeader hdr; |
| hdr.msgType = REGISTER_REQ; |
| hdr.payloadLen = payloadSize; |
| hdr.seqNumber = this->_seqNumber; |
| hdr.status = RESP_SUCCESS; |
| data = this->encode(data, hdr.msgType); |
| data = this->encode(data, hdr.seqNumber); |
| data = this->encode(data, hdr.status); |
| data = this->encode(data, hdr.payloadLen); |
| |
| // encode the serial number |
| data = this->encode(data, FLOW_SERIAL_NUMBER); |
| data = this->encode(data, this->_serialNumber, 8); |
| |
| // encode the YAML name |
| data = this->encode(data, FLOW_YML_NAME); |
| data = this->encode(data, this->_controller->getName()); |
| |
| // send it |
| int status = sendData(buffer.data(), size); |
| buffer.clear(); |
| if (status <= 0) { |
| close(_socket); |
| _socket = 0; |
| logger_->log_error("Flow Control Protocol Send Register Req failed"); |
| return -1; |
| } |
| |
| // Looking for register respond |
| status = readHdr(&hdr); |
| |
| if (status <= 0) { |
| close(_socket); |
| _socket = 0; |
| logger_->log_error("Flow Control Protocol Read Register Resp header failed"); |
| return -1; |
| } |
| logger_->log_debug("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); |
| logger_->log_debug("Flow Control Protocol receive Seq Num %" PRIu32, hdr.seqNumber); |
| logger_->log_debug("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); |
| logger_->log_debug("Flow Control Protocol receive Payload len %" PRIu32, hdr.payloadLen); |
| |
| if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) { |
| this->_registered = true; |
| this->_seqNumber++; |
| logger_->log_trace("Flow Control Protocol Register success"); |
| std::vector<uint8_t> payload; |
| payload.resize(hdr.payloadLen); |
| uint8_t *payloadPtr = payload.data(); |
| status = readData(payload.data(), hdr.payloadLen); |
| if (status <= 0) { |
| logger_->log_warn("Flow Control Protocol Register Read Payload fail"); |
| close(_socket); |
| _socket = 0; |
| return -1; |
| } |
| while (payloadPtr < (payload.data() + hdr.payloadLen)) { |
| uint32_t msgID; |
| payloadPtr = this->decode(payloadPtr, msgID); |
| if (((FlowControlMsgID) msgID) == REPORT_INTERVAL) { |
| // Fixed 4 bytes |
| uint32_t reportInterval; |
| payloadPtr = this->decode(payloadPtr, reportInterval); |
| logger_->log_debug("Flow Control Protocol receive report interval %" PRIu32 " ms", reportInterval); |
| this->_reportInterval = reportInterval; |
| } else { |
| break; |
| } |
| } |
| close(_socket); |
| _socket = 0; |
| return 0; |
| } else { |
| logger_->log_warn("Flow Control Protocol Register fail"); |
| close(_socket); |
| _socket = 0; |
| return -1; |
| } |
| } |
| |
| int FlowControlProtocol::sendReportReq() { |
| uint16_t port = this->_serverPort; |
| |
| if (this->_socket <= 0) |
| this->_socket = connectServer(_serverName.c_str(), port); |
| |
| if (this->_socket <= 0) |
| return -1; |
| |
| // Calculate the total payload msg size |
| uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_YML_NAME, gsl::narrow<int>(this->_controller->getName().size() + 1)); |
| const size_t size = sizeof(FlowControlProtocolHeader) + payloadSize; |
| |
| std::vector<uint8_t> buffer; |
| buffer.resize(size); |
| auto* data = buffer.data(); |
| |
| // encode the HDR |
| FlowControlProtocolHeader hdr; |
| hdr.msgType = REPORT_REQ; |
| hdr.payloadLen = payloadSize; |
| hdr.seqNumber = this->_seqNumber; |
| hdr.status = RESP_SUCCESS; |
| data = this->encode(data, hdr.msgType); |
| data = this->encode(data, hdr.seqNumber); |
| data = this->encode(data, hdr.status); |
| data = this->encode(data, hdr.payloadLen); |
| |
| // encode the YAML name |
| data = this->encode(data, FLOW_YML_NAME); |
| data = this->encode(data, this->_controller->getName()); |
| |
| // send it |
| int status = sendData(buffer.data(), size); |
| buffer.clear(); |
| if (status <= 0) { |
| close(_socket); |
| _socket = 0; |
| logger_->log_error("Flow Control Protocol Send Report Req failed"); |
| return -1; |
| } |
| |
| // Looking for report respond |
| status = readHdr(&hdr); |
| |
| if (status <= 0) { |
| close(_socket); |
| _socket = 0; |
| logger_->log_error("Flow Control Protocol Read Report Resp header failed"); |
| return -1; |
| } |
| logger_->log_debug("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); |
| logger_->log_debug("Flow Control Protocol receive Seq Num %" PRIu32, hdr.seqNumber); |
| logger_->log_debug("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); |
| logger_->log_debug("Flow Control Protocol receive Payload len %" PRIu32, hdr.payloadLen); |
| |
| if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) { |
| this->_seqNumber++; |
| std::vector<uint8_t> payload; |
| payload.resize(hdr.payloadLen); |
| uint8_t *payloadPtr = payload.data(); |
| status = readData(payload.data(), hdr.payloadLen); |
| if (status <= 0) { |
| logger_->log_warn("Flow Control Protocol Report Resp Read Payload fail"); |
| close(_socket); |
| _socket = 0; |
| return -1; |
| } |
| std::string processor; |
| std::string propertyName; |
| std::string propertyValue; |
| while (payloadPtr < (payload.data() + hdr.payloadLen)) { |
| uint32_t msgID; |
| payloadPtr = this->decode(payloadPtr, msgID); |
| if (((FlowControlMsgID) msgID) == PROCESSOR_NAME) { |
| uint32_t len; |
| payloadPtr = this->decode(payloadPtr, len); |
| processor = (const char *) payloadPtr; |
| payloadPtr += len; |
| logger_->log_debug("Flow Control Protocol receive report resp processor %s", processor); |
| } else if (((FlowControlMsgID) msgID) == PROPERTY_NAME) { |
| uint32_t len; |
| payloadPtr = this->decode(payloadPtr, len); |
| propertyName = (const char *) payloadPtr; |
| payloadPtr += len; |
| logger_->log_debug("Flow Control Protocol receive report resp property name %s", propertyName); |
| } else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE) { |
| uint32_t len; |
| payloadPtr = this->decode(payloadPtr, len); |
| propertyValue = (const char *) payloadPtr; |
| payloadPtr += len; |
| logger_->log_debug("Flow Control Protocol receive report resp property value %s", propertyValue); |
| this->_controller->updatePropertyValue(processor, propertyName, propertyValue); |
| } else { |
| break; |
| } |
| } |
| close(_socket); |
| _socket = 0; |
| return 0; |
| } else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == this->_seqNumber) { |
| logger_->log_trace("Flow Control Protocol trigger reregister"); |
| this->_registered = false; |
| this->_seqNumber++; |
| close(_socket); |
| _socket = 0; |
| return 0; |
| } else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) { |
| logger_->log_trace("Flow Control Protocol stop flow controller"); |
| this->_controller->stop(true); |
| this->_seqNumber++; |
| close(_socket); |
| _socket = 0; |
| return 0; |
| } else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) { |
| logger_->log_trace("Flow Control Protocol start flow controller"); |
| this->_controller->start(); |
| this->_seqNumber++; |
| close(_socket); |
| _socket = 0; |
| return 0; |
| } else { |
| logger_->log_trace("Flow Control Protocol Report fail"); |
| close(_socket); |
| _socket = 0; |
| return -1; |
| } |
| } |
| |
| } // namespace minifi |
| } // namespace nifi |
| } // namespace apache |
| } // namespace org |