blob: 011ebcfdfab9f17201cb3744c892151179f42039 [file] [log] [blame]
/**
* @file FlowControlProtocol.cpp
* FlowControlProtocol class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <sys/time.h>
#include <stdio.h>
#include <time.h>
#include <chrono>
#include <thread>
#include <random>
#include <netinet/tcp.h>
#include <iostream>
#include "FlowController.h"
#include "FlowControlProtocol.h"
int FlowControlProtocol::connectServer(const char *host, uint16_t port)
{
in_addr_t addr;
int sock = 0;
struct hostent *h;
#ifdef __MACH__
h = gethostbyname(host);
#else
char buf[1024];
struct hostent he;
int hh_errno;
gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
#endif
memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
_logger->log_error("Could not create socket to hostName %s", host);
return 0;
}
#ifndef __MACH__
int opt = 1;
bool nagle_off = true;
if (nagle_off)
{
if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0)
{
_logger->log_error("setsockopt() TCP_NODELAY failed");
close(sock);
return 0;
}
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
(char *)&opt, sizeof(opt)) < 0)
{
_logger->log_error("setsockopt() SO_REUSEADDR failed");
close(sock);
return 0;
}
}
int sndsize = 256*1024;
if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0)
{
_logger->log_error("setsockopt() SO_SNDBUF failed");
close(sock);
return 0;
}
#endif
struct sockaddr_in sa;
socklen_t socklen;
int status;
memset(&sa, 0, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_addr.s_addr = htonl(INADDR_ANY);
sa.sin_port = htons(0);
socklen = sizeof(sa);
if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
{
_logger->log_error("socket bind failed");
close(sock);
return 0;
}
memset(&sa, 0, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_addr.s_addr = addr;
sa.sin_port = htons(port);
socklen = sizeof(sa);
status = connect(sock, (struct sockaddr *)&sa, socklen);
if (status < 0)
{
_logger->log_error("socket connect failed to %s %d", host, port);
close(sock);
return 0;
}
_logger->log_info("Flow Control Protocol socket %d connect to server %s port %d success", sock, host, port);
return sock;
}
int FlowControlProtocol::sendData(uint8_t *buf, int buflen)
{
int ret = 0, bytes = 0;
while (bytes < buflen)
{
ret = send(_socket, buf+bytes, buflen-bytes, 0);
//check for errors
if (ret == -1)
{
return ret;
}
bytes+=ret;
}
return bytes;
}
int FlowControlProtocol::selectClient(int msec)
{
fd_set fds;
struct timeval tv;
int retval;
int fd = _socket;
FD_ZERO(&fds);
FD_SET(fd, &fds);
tv.tv_sec = msec/1000;
tv.tv_usec = (msec % 1000) * 1000;
if (msec > 0)
retval = select(fd+1, &fds, NULL, NULL, &tv);
else
retval = select(fd+1, &fds, NULL, NULL, NULL);
if (retval <= 0)
return retval;
if (FD_ISSET(fd, &fds))
return retval;
else
return 0;
}
int FlowControlProtocol::readData(uint8_t *buf, int buflen)
{
int sendSize = buflen;
while (buflen)
{
int status;
status = selectClient(MAX_READ_TIMEOUT);
if (status <= 0)
{
return status;
}
#ifndef __MACH__
status = read(_socket, buf, buflen);
#else
status = recv(_socket, buf, buflen, 0);
#endif
if (status <= 0)
{
return status;
}
buflen -= status;
buf += status;
}
return sendSize;
}
int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr)
{
uint8_t buffer[sizeof(FlowControlProtocolHeader)];
uint8_t *data = buffer;
int status = readData(buffer, sizeof(FlowControlProtocolHeader));
if (status <= 0)
return status;
uint32_t value;
data = this->decode(data, value);
hdr->msgType = value;
data = this->decode(data, value);
hdr->seqNumber = value;
data = this->decode(data, value);
hdr->status = value;
data = this->decode(data, value);
hdr->payloadLen = value;
return sizeof(FlowControlProtocolHeader);
}
void FlowControlProtocol::start()
{
if (_reportInterval <= 0)
return;
if (_running)
return;
_running = true;
_logger->log_info("FlowControl Protocol Start");
_thread = new std::thread(run, this);
_thread->detach();
}
void FlowControlProtocol::stop()
{
if (!_running)
return;
_running = false;
_logger->log_info("FlowControl Protocol Stop");
}
void FlowControlProtocol::run(FlowControlProtocol *protocol)
{
while (protocol->_running)
{
std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval));
if (!protocol->_registered)
{
// if it is not register yet
protocol->sendRegisterReq();
// protocol->_controller->reload("flow.xml");
}
else
protocol->sendReportReq();
}
return;
}
int FlowControlProtocol::sendRegisterReq()
{
if (_registered)
{
_logger->log_info("Already registered");
return -1;
}
uint16_t port = this->_serverPort;
if (this->_socket <= 0)
this->_socket = connectServer(_serverName.c_str(), port);
if (this->_socket <= 0)
return -1;
// Calculate the total payload msg size
uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0) +
FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1);
uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
uint8_t *data = new uint8_t[size];
uint8_t *start = data;
// encode the HDR
FlowControlProtocolHeader hdr;
hdr.msgType = REGISTER_REQ;
hdr.payloadLen = payloadSize;
hdr.seqNumber = this->_seqNumber;
hdr.status = RESP_SUCCESS;
data = this->encode(data, hdr.msgType);
data = this->encode(data, hdr.seqNumber);
data = this->encode(data, hdr.status);
data = this->encode(data, hdr.payloadLen);
// encode the serial number
data = this->encode(data, FLOW_SERIAL_NUMBER);
data = this->encode(data, this->_serialNumber, 8);
// encode the XML name
data = this->encode(data, FLOW_XML_NAME);
data = this->encode(data, this->_controller->getName());
// send it
int status = sendData(start, size);
delete[] start;
if (status <= 0)
{
close(_socket);
_socket = 0;
_logger->log_error("Flow Control Protocol Send Register Req failed");
return -1;
}
// Looking for register respond
status = readHdr(&hdr);
if (status <= 0)
{
close(_socket);
_socket = 0;
_logger->log_error("Flow Control Protocol Read Register Resp header failed");
return -1;
}
_logger->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
_logger->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber);
_logger->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
_logger->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen);
if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
{
this->_registered = true;
this->_seqNumber++;
_logger->log_info("Flow Control Protocol Register success");
uint8_t *payload = new uint8_t[hdr.payloadLen];
uint8_t *payloadPtr = payload;
status = readData(payload, hdr.payloadLen);
if (status <= 0)
{
delete[] payload;
_logger->log_info("Flow Control Protocol Register Read Payload fail");
close(_socket);
_socket = 0;
return -1;
}
while (payloadPtr < (payload + hdr.payloadLen))
{
uint32_t msgID;
payloadPtr = this->decode(payloadPtr, msgID);
if (((FlowControlMsgID) msgID) == REPORT_INTERVAL)
{
// Fixed 4 bytes
uint32_t reportInterval;
payloadPtr = this->decode(payloadPtr, reportInterval);
_logger->log_info("Flow Control Protocol receive report interval %d ms", reportInterval);
this->_reportInterval = reportInterval;
}
else if (((FlowControlMsgID) msgID) == FLOW_XML_CONTENT)
{
uint32_t xmlLen;
payloadPtr = this->decode(payloadPtr, xmlLen);
_logger->log_info("Flow Control Protocol receive XML content length %d", xmlLen);
time_t rawtime;
struct tm *timeinfo;
time(&rawtime);
timeinfo = localtime(&rawtime);
std::string xmlFileName = "flow.";
xmlFileName += asctime(timeinfo);
xmlFileName += ".xml";
std::ofstream fs;
fs.open(xmlFileName.c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
if (fs.is_open())
{
fs.write((const char *)payloadPtr, xmlLen);
fs.close();
this->_controller->reload(xmlFileName.c_str());
}
}
else
{
break;
}
}
delete[] payload;
close(_socket);
_socket = 0;
return 0;
}
else
{
_logger->log_info("Flow Control Protocol Register fail");
close(_socket);
_socket = 0;
return -1;
}
}
int FlowControlProtocol::sendReportReq()
{
uint16_t port = this->_serverPort;
if (this->_socket <= 0)
this->_socket = connectServer(_serverName.c_str(), port);
if (this->_socket <= 0)
return -1;
// Calculate the total payload msg size
uint32_t payloadSize =
FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1);
uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
uint8_t *data = new uint8_t[size];
uint8_t *start = data;
// encode the HDR
FlowControlProtocolHeader hdr;
hdr.msgType = REPORT_REQ;
hdr.payloadLen = payloadSize;
hdr.seqNumber = this->_seqNumber;
hdr.status = RESP_SUCCESS;
data = this->encode(data, hdr.msgType);
data = this->encode(data, hdr.seqNumber);
data = this->encode(data, hdr.status);
data = this->encode(data, hdr.payloadLen);
// encode the XML name
data = this->encode(data, FLOW_XML_NAME);
data = this->encode(data, this->_controller->getName());
// send it
int status = sendData(start, size);
delete[] start;
if (status <= 0)
{
close(_socket);
_socket = 0;
_logger->log_error("Flow Control Protocol Send Report Req failed");
return -1;
}
// Looking for report respond
status = readHdr(&hdr);
if (status <= 0)
{
close(_socket);
_socket = 0;
_logger->log_error("Flow Control Protocol Read Report Resp header failed");
return -1;
}
_logger->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
_logger->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber);
_logger->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
_logger->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen);
if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
{
this->_seqNumber++;
uint8_t *payload = new uint8_t[hdr.payloadLen];
uint8_t *payloadPtr = payload;
status = readData(payload, hdr.payloadLen);
if (status <= 0)
{
delete[] payload;
_logger->log_info("Flow Control Protocol Report Resp Read Payload fail");
close(_socket);
_socket = 0;
return -1;
}
std::string processor;
std::string propertyName;
std::string propertyValue;
while (payloadPtr < (payload + hdr.payloadLen))
{
uint32_t msgID;
payloadPtr = this->decode(payloadPtr, msgID);
if (((FlowControlMsgID) msgID) == PROCESSOR_NAME)
{
uint32_t len;
payloadPtr = this->decode(payloadPtr, len);
processor = (const char *) payloadPtr;
payloadPtr += len;
_logger->log_info("Flow Control Protocol receive report resp processor %s", processor.c_str());
}
else if (((FlowControlMsgID) msgID) == PROPERTY_NAME)
{
uint32_t len;
payloadPtr = this->decode(payloadPtr, len);
propertyName = (const char *) payloadPtr;
payloadPtr += len;
_logger->log_info("Flow Control Protocol receive report resp property name %s", propertyName.c_str());
}
else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE)
{
uint32_t len;
payloadPtr = this->decode(payloadPtr, len);
propertyValue = (const char *) payloadPtr;
payloadPtr += len;
_logger->log_info("Flow Control Protocol receive report resp property value %s", propertyValue.c_str());
this->_controller->updatePropertyValue(processor, propertyName, propertyValue);
}
else
{
break;
}
}
delete[] payload;
close(_socket);
_socket = 0;
return 0;
}
else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == this->_seqNumber)
{
_logger->log_info("Flow Control Protocol trigger reregister");
this->_registered = false;
this->_seqNumber++;
close(_socket);
_socket = 0;
return 0;
}
else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber)
{
_logger->log_info("Flow Control Protocol stop flow controller");
this->_controller->stop(true);
this->_seqNumber++;
close(_socket);
_socket = 0;
return 0;
}
else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber)
{
_logger->log_info("Flow Control Protocol start flow controller");
this->_controller->start();
this->_seqNumber++;
close(_socket);
_socket = 0;
return 0;
}
else
{
_logger->log_info("Flow Control Protocol Report fail");
close(_socket);
_socket = 0;
return -1;
}
}