| /** |
| * @file FlowControlProtocol.cpp |
| * FlowControlProtocol class implementation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include <sys/time.h> |
| #include <stdio.h> |
| #include <time.h> |
| #include <chrono> |
| #include <thread> |
| #include <random> |
| #include <netinet/tcp.h> |
| #include <iostream> |
| #include "FlowController.h" |
| #include "FlowControlProtocol.h" |
| |
| int FlowControlProtocol::connectServer(const char *host, uint16_t port) |
| { |
| in_addr_t addr; |
| int sock = 0; |
| struct hostent *h; |
| #ifdef __MACH__ |
| h = gethostbyname(host); |
| #else |
| char buf[1024]; |
| struct hostent he; |
| int hh_errno; |
| gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); |
| #endif |
| memcpy((char *) &addr, h->h_addr_list[0], h->h_length); |
| sock = socket(AF_INET, SOCK_STREAM, 0); |
| if (sock < 0) |
| { |
| _logger->log_error("Could not create socket to hostName %s", host); |
| return 0; |
| } |
| |
| #ifndef __MACH__ |
| int opt = 1; |
| bool nagle_off = true; |
| |
| if (nagle_off) |
| { |
| if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) |
| { |
| _logger->log_error("setsockopt() TCP_NODELAY failed"); |
| close(sock); |
| return 0; |
| } |
| if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, |
| (char *)&opt, sizeof(opt)) < 0) |
| { |
| _logger->log_error("setsockopt() SO_REUSEADDR failed"); |
| close(sock); |
| return 0; |
| } |
| } |
| |
| int sndsize = 256*1024; |
| if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) |
| { |
| _logger->log_error("setsockopt() SO_SNDBUF failed"); |
| close(sock); |
| return 0; |
| } |
| #endif |
| |
| struct sockaddr_in sa; |
| socklen_t socklen; |
| int status; |
| |
| memset(&sa, 0, sizeof(sa)); |
| sa.sin_family = AF_INET; |
| sa.sin_addr.s_addr = htonl(INADDR_ANY); |
| sa.sin_port = htons(0); |
| socklen = sizeof(sa); |
| if (bind(sock, (struct sockaddr *)&sa, socklen) < 0) |
| { |
| _logger->log_error("socket bind failed"); |
| close(sock); |
| return 0; |
| } |
| |
| memset(&sa, 0, sizeof(sa)); |
| sa.sin_family = AF_INET; |
| sa.sin_addr.s_addr = addr; |
| sa.sin_port = htons(port); |
| socklen = sizeof(sa); |
| |
| status = connect(sock, (struct sockaddr *)&sa, socklen); |
| |
| if (status < 0) |
| { |
| _logger->log_error("socket connect failed to %s %d", host, port); |
| close(sock); |
| return 0; |
| } |
| |
| _logger->log_info("Flow Control Protocol socket %d connect to server %s port %d success", sock, host, port); |
| |
| return sock; |
| } |
| |
| int FlowControlProtocol::sendData(uint8_t *buf, int buflen) |
| { |
| int ret = 0, bytes = 0; |
| |
| while (bytes < buflen) |
| { |
| ret = send(_socket, buf+bytes, buflen-bytes, 0); |
| //check for errors |
| if (ret == -1) |
| { |
| return ret; |
| } |
| bytes+=ret; |
| } |
| |
| return bytes; |
| } |
| |
| int FlowControlProtocol::selectClient(int msec) |
| { |
| fd_set fds; |
| struct timeval tv; |
| int retval; |
| int fd = _socket; |
| |
| FD_ZERO(&fds); |
| FD_SET(fd, &fds); |
| |
| tv.tv_sec = msec/1000; |
| tv.tv_usec = (msec % 1000) * 1000; |
| |
| if (msec > 0) |
| retval = select(fd+1, &fds, NULL, NULL, &tv); |
| else |
| retval = select(fd+1, &fds, NULL, NULL, NULL); |
| |
| if (retval <= 0) |
| return retval; |
| if (FD_ISSET(fd, &fds)) |
| return retval; |
| else |
| return 0; |
| } |
| |
| int FlowControlProtocol::readData(uint8_t *buf, int buflen) |
| { |
| int sendSize = buflen; |
| |
| while (buflen) |
| { |
| int status; |
| status = selectClient(MAX_READ_TIMEOUT); |
| if (status <= 0) |
| { |
| return status; |
| } |
| #ifndef __MACH__ |
| status = read(_socket, buf, buflen); |
| #else |
| status = recv(_socket, buf, buflen, 0); |
| #endif |
| if (status <= 0) |
| { |
| return status; |
| } |
| buflen -= status; |
| buf += status; |
| } |
| |
| return sendSize; |
| } |
| |
| int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr) |
| { |
| uint8_t buffer[sizeof(FlowControlProtocolHeader)]; |
| |
| uint8_t *data = buffer; |
| |
| int status = readData(buffer, sizeof(FlowControlProtocolHeader)); |
| if (status <= 0) |
| return status; |
| |
| uint32_t value; |
| data = this->decode(data, value); |
| hdr->msgType = value; |
| |
| data = this->decode(data, value); |
| hdr->seqNumber = value; |
| |
| data = this->decode(data, value); |
| hdr->status = value; |
| |
| data = this->decode(data, value); |
| hdr->payloadLen = value; |
| |
| return sizeof(FlowControlProtocolHeader); |
| } |
| |
| void FlowControlProtocol::start() |
| { |
| if (_reportInterval <= 0) |
| return; |
| if (_running) |
| return; |
| _running = true; |
| _logger->log_info("FlowControl Protocol Start"); |
| _thread = new std::thread(run, this); |
| _thread->detach(); |
| } |
| |
| void FlowControlProtocol::stop() |
| { |
| if (!_running) |
| return; |
| _running = false; |
| _logger->log_info("FlowControl Protocol Stop"); |
| } |
| |
| void FlowControlProtocol::run(FlowControlProtocol *protocol) |
| { |
| while (protocol->_running) |
| { |
| std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval)); |
| if (!protocol->_registered) |
| { |
| // if it is not register yet |
| protocol->sendRegisterReq(); |
| // protocol->_controller->reload("flow.xml"); |
| } |
| else |
| protocol->sendReportReq(); |
| } |
| return; |
| } |
| |
| int FlowControlProtocol::sendRegisterReq() |
| { |
| if (_registered) |
| { |
| _logger->log_info("Already registered"); |
| return -1; |
| } |
| |
| uint16_t port = this->_serverPort; |
| |
| if (this->_socket <= 0) |
| this->_socket = connectServer(_serverName.c_str(), port); |
| |
| if (this->_socket <= 0) |
| return -1; |
| |
| // Calculate the total payload msg size |
| uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0) + |
| FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1); |
| uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; |
| |
| uint8_t *data = new uint8_t[size]; |
| uint8_t *start = data; |
| |
| // encode the HDR |
| FlowControlProtocolHeader hdr; |
| hdr.msgType = REGISTER_REQ; |
| hdr.payloadLen = payloadSize; |
| hdr.seqNumber = this->_seqNumber; |
| hdr.status = RESP_SUCCESS; |
| data = this->encode(data, hdr.msgType); |
| data = this->encode(data, hdr.seqNumber); |
| data = this->encode(data, hdr.status); |
| data = this->encode(data, hdr.payloadLen); |
| |
| // encode the serial number |
| data = this->encode(data, FLOW_SERIAL_NUMBER); |
| data = this->encode(data, this->_serialNumber, 8); |
| |
| // encode the XML name |
| data = this->encode(data, FLOW_XML_NAME); |
| data = this->encode(data, this->_controller->getName()); |
| |
| // send it |
| int status = sendData(start, size); |
| delete[] start; |
| if (status <= 0) |
| { |
| close(_socket); |
| _socket = 0; |
| _logger->log_error("Flow Control Protocol Send Register Req failed"); |
| return -1; |
| } |
| |
| // Looking for register respond |
| status = readHdr(&hdr); |
| |
| if (status <= 0) |
| { |
| close(_socket); |
| _socket = 0; |
| _logger->log_error("Flow Control Protocol Read Register Resp header failed"); |
| return -1; |
| } |
| _logger->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); |
| _logger->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); |
| _logger->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); |
| _logger->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen); |
| |
| if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) |
| { |
| this->_registered = true; |
| this->_seqNumber++; |
| _logger->log_info("Flow Control Protocol Register success"); |
| uint8_t *payload = new uint8_t[hdr.payloadLen]; |
| uint8_t *payloadPtr = payload; |
| status = readData(payload, hdr.payloadLen); |
| if (status <= 0) |
| { |
| delete[] payload; |
| _logger->log_info("Flow Control Protocol Register Read Payload fail"); |
| close(_socket); |
| _socket = 0; |
| return -1; |
| } |
| while (payloadPtr < (payload + hdr.payloadLen)) |
| { |
| uint32_t msgID; |
| payloadPtr = this->decode(payloadPtr, msgID); |
| if (((FlowControlMsgID) msgID) == REPORT_INTERVAL) |
| { |
| // Fixed 4 bytes |
| uint32_t reportInterval; |
| payloadPtr = this->decode(payloadPtr, reportInterval); |
| _logger->log_info("Flow Control Protocol receive report interval %d ms", reportInterval); |
| this->_reportInterval = reportInterval; |
| } |
| else if (((FlowControlMsgID) msgID) == FLOW_XML_CONTENT) |
| { |
| uint32_t xmlLen; |
| payloadPtr = this->decode(payloadPtr, xmlLen); |
| _logger->log_info("Flow Control Protocol receive XML content length %d", xmlLen); |
| time_t rawtime; |
| struct tm *timeinfo; |
| time(&rawtime); |
| timeinfo = localtime(&rawtime); |
| std::string xmlFileName = "flow."; |
| xmlFileName += asctime(timeinfo); |
| xmlFileName += ".xml"; |
| std::ofstream fs; |
| fs.open(xmlFileName.c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); |
| if (fs.is_open()) |
| { |
| fs.write((const char *)payloadPtr, xmlLen); |
| fs.close(); |
| this->_controller->reload(xmlFileName.c_str()); |
| } |
| } |
| else |
| { |
| break; |
| } |
| } |
| delete[] payload; |
| close(_socket); |
| _socket = 0; |
| return 0; |
| } |
| else |
| { |
| _logger->log_info("Flow Control Protocol Register fail"); |
| close(_socket); |
| _socket = 0; |
| return -1; |
| } |
| } |
| |
| |
| int FlowControlProtocol::sendReportReq() |
| { |
| uint16_t port = this->_serverPort; |
| |
| if (this->_socket <= 0) |
| this->_socket = connectServer(_serverName.c_str(), port); |
| |
| if (this->_socket <= 0) |
| return -1; |
| |
| // Calculate the total payload msg size |
| uint32_t payloadSize = |
| FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1); |
| uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; |
| |
| uint8_t *data = new uint8_t[size]; |
| uint8_t *start = data; |
| |
| // encode the HDR |
| FlowControlProtocolHeader hdr; |
| hdr.msgType = REPORT_REQ; |
| hdr.payloadLen = payloadSize; |
| hdr.seqNumber = this->_seqNumber; |
| hdr.status = RESP_SUCCESS; |
| data = this->encode(data, hdr.msgType); |
| data = this->encode(data, hdr.seqNumber); |
| data = this->encode(data, hdr.status); |
| data = this->encode(data, hdr.payloadLen); |
| |
| // encode the XML name |
| data = this->encode(data, FLOW_XML_NAME); |
| data = this->encode(data, this->_controller->getName()); |
| |
| // send it |
| int status = sendData(start, size); |
| delete[] start; |
| if (status <= 0) |
| { |
| close(_socket); |
| _socket = 0; |
| _logger->log_error("Flow Control Protocol Send Report Req failed"); |
| return -1; |
| } |
| |
| // Looking for report respond |
| status = readHdr(&hdr); |
| |
| if (status <= 0) |
| { |
| close(_socket); |
| _socket = 0; |
| _logger->log_error("Flow Control Protocol Read Report Resp header failed"); |
| return -1; |
| } |
| _logger->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); |
| _logger->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); |
| _logger->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); |
| _logger->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen); |
| |
| if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) |
| { |
| this->_seqNumber++; |
| uint8_t *payload = new uint8_t[hdr.payloadLen]; |
| uint8_t *payloadPtr = payload; |
| status = readData(payload, hdr.payloadLen); |
| if (status <= 0) |
| { |
| delete[] payload; |
| _logger->log_info("Flow Control Protocol Report Resp Read Payload fail"); |
| close(_socket); |
| _socket = 0; |
| return -1; |
| } |
| std::string processor; |
| std::string propertyName; |
| std::string propertyValue; |
| while (payloadPtr < (payload + hdr.payloadLen)) |
| { |
| uint32_t msgID; |
| payloadPtr = this->decode(payloadPtr, msgID); |
| if (((FlowControlMsgID) msgID) == PROCESSOR_NAME) |
| { |
| uint32_t len; |
| payloadPtr = this->decode(payloadPtr, len); |
| processor = (const char *) payloadPtr; |
| payloadPtr += len; |
| _logger->log_info("Flow Control Protocol receive report resp processor %s", processor.c_str()); |
| } |
| else if (((FlowControlMsgID) msgID) == PROPERTY_NAME) |
| { |
| uint32_t len; |
| payloadPtr = this->decode(payloadPtr, len); |
| propertyName = (const char *) payloadPtr; |
| payloadPtr += len; |
| _logger->log_info("Flow Control Protocol receive report resp property name %s", propertyName.c_str()); |
| } |
| else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE) |
| { |
| uint32_t len; |
| payloadPtr = this->decode(payloadPtr, len); |
| propertyValue = (const char *) payloadPtr; |
| payloadPtr += len; |
| _logger->log_info("Flow Control Protocol receive report resp property value %s", propertyValue.c_str()); |
| this->_controller->updatePropertyValue(processor, propertyName, propertyValue); |
| } |
| else |
| { |
| break; |
| } |
| } |
| delete[] payload; |
| close(_socket); |
| _socket = 0; |
| return 0; |
| } |
| else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == this->_seqNumber) |
| { |
| _logger->log_info("Flow Control Protocol trigger reregister"); |
| this->_registered = false; |
| this->_seqNumber++; |
| close(_socket); |
| _socket = 0; |
| return 0; |
| } |
| else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) |
| { |
| _logger->log_info("Flow Control Protocol stop flow controller"); |
| this->_controller->stop(true); |
| this->_seqNumber++; |
| close(_socket); |
| _socket = 0; |
| return 0; |
| } |
| else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) |
| { |
| _logger->log_info("Flow Control Protocol start flow controller"); |
| this->_controller->start(); |
| this->_seqNumber++; |
| close(_socket); |
| _socket = 0; |
| return 0; |
| } |
| else |
| { |
| _logger->log_info("Flow Control Protocol Report fail"); |
| close(_socket); |
| _socket = 0; |
| return -1; |
| } |
| } |
| |