| /* A simple server in the internet domain using TCP |
| The port number is passed as an argument */ |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <unistd.h> |
| #include <sys/types.h> |
| #include <sys/socket.h> |
| #include <netinet/in.h> |
| #include <errno.h> |
| #include <arpa/inet.h> |
| #include <fcntl.h> |
| #include <netdb.h> |
| #include <string> |
| #include <errno.h> |
| #include <chrono> |
| #include <thread> |
| #include <iostream> // std::cout |
| #include <fstream> // std::ifstream |
| #include <signal.h> |
| |
| #define DEFAULT_NIFI_SERVER_PORT 9000 |
| #define DEFAULT_REPORT_INTERVAL 1000 // 1 sec |
| #define MAX_READ_TIMEOUT 30000 // 30 seconds |
| |
| //! FlowControl Protocol Msg Type |
| typedef enum { |
| REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version |
| REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval |
| REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info |
| REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property |
| MAX_FLOW_CONTROL_MSG_TYPE |
| } FlowControlMsgType; |
| |
| //! FlowControl Protocol Msg Type String |
| static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = |
| { |
| "REGISTER_REQ", |
| "REGISTER_RESP", |
| "REPORT_REQ", |
| "REPORT_RESP" |
| }; |
| |
| //! Flow Control Msg Type to String |
| inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) |
| { |
| if (type < MAX_FLOW_CONTROL_MSG_TYPE) |
| return FlowControlMsgTypeStr[type]; |
| else |
| return NULL; |
| } |
| |
| //! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV) |
| typedef enum { |
| //Fix length 8 bytes: client to server in register request, required field |
| FLOW_SERIAL_NUMBER, |
| // Flow XML name TLV: client to server in register request and report request, required field |
| FLOW_XML_NAME, |
| // Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server |
| FLOW_XML_CONTENT, |
| // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field |
| REPORT_INTERVAL, |
| // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property |
| PROCESSOR_NAME, |
| // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property |
| PROPERTY_NAME, |
| // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property |
| PROPERTY_VALUE, |
| // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server |
| REPORT_BLOB, |
| MAX_FLOW_MSG_ID |
| } FlowControlMsgID; |
| |
| //! FlowControl Protocol Msg ID String |
| static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = |
| { |
| "FLOW_SERIAL_NUMBER", |
| "FLOW_XML_NAME", |
| "FLOW_XML_CONTENT", |
| "REPORT_INTERVAL", |
| "PROCESSOR_NAME" |
| "PROPERTY_NAME", |
| "PROPERTY_VALUE", |
| "REPORT_BLOB" |
| }; |
| |
| #define TYPE_HDR_LEN 4 // Fix Hdr Type |
| #define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes |
| |
| //! FlowControl Protocol Msg Len |
| inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen) |
| { |
| if (id == FLOW_SERIAL_NUMBER) |
| return (TYPE_HDR_LEN + 8); |
| else if (id == REPORT_INTERVAL) |
| return (TYPE_HDR_LEN + 4); |
| else if (id < MAX_FLOW_MSG_ID) |
| return (TLV_HDR_LEN + payLoadLen); |
| else |
| return -1; |
| } |
| |
| //! Flow Control Msg Id to String |
| inline const char *FlowControlMsgIdToStr(FlowControlMsgID id) |
| { |
| if (id < MAX_FLOW_MSG_ID) |
| return FlowControlMsgIDStr[id]; |
| else |
| return NULL; |
| } |
| |
| //! Flow Control Respond status code |
| typedef enum { |
| RESP_SUCCESS, |
| RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register |
| RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller |
| RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller |
| RESP_FAILURE, |
| MAX_RESP_CODE |
| } FlowControlRespCode; |
| |
| //! FlowControl Resp Code str |
| static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = |
| { |
| "RESP_SUCCESS", |
| "RESP_TRIGGER_REGISTER", |
| "RESP_START_FLOW_CONTROLLER", |
| "RESP_STOP_FLOW_CONTROLLER", |
| "RESP_FAILURE" |
| }; |
| |
| //! Flow Control Resp Code to String |
| inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) |
| { |
| if (code < MAX_RESP_CODE) |
| return FlowControlRespCodeStr[code]; |
| else |
| return NULL; |
| } |
| |
| //! Common FlowControlProtocol Header |
| typedef struct { |
| uint32_t msgType; //! Msg Type |
| uint32_t seqNumber; //! Seq Number to match Req with Resp |
| uint32_t status; //! Resp Code, see FlowControlRespCode |
| uint32_t payloadLen; //! Msg Payload length |
| } FlowControlProtocolHeader; |
| |
| |
| //! encode uint32_t |
| uint8_t *encode(uint8_t *buf, uint32_t value) |
| { |
| *buf++ = (value & 0xFF000000) >> 24; |
| *buf++ = (value & 0x00FF0000) >> 16; |
| *buf++ = (value & 0x0000FF00) >> 8; |
| *buf++ = (value & 0x000000FF); |
| return buf; |
| } |
| |
| //! encode uint32_t |
| uint8_t *decode(uint8_t *buf, uint32_t &value) |
| { |
| value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3])); |
| return (buf + 4); |
| } |
| |
| //! encode byte array |
| uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size) |
| { |
| memcpy(buf, bufArray, size); |
| buf += size; |
| return buf; |
| } |
| |
| //! encode std::string |
| uint8_t *encode(uint8_t *buf, std::string value) |
| { |
| // add the \0 for size |
| buf = encode(buf, value.size()+1); |
| buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1); |
| return buf; |
| } |
| |
| int sendData(int socket, uint8_t *buf, int buflen) |
| { |
| int ret = 0, bytes = 0; |
| |
| while (bytes < buflen) |
| { |
| ret = send(socket, buf+bytes, buflen-bytes, 0); |
| //check for errors |
| if (ret == -1) |
| { |
| return ret; |
| } |
| bytes+=ret; |
| } |
| |
| return bytes; |
| } |
| |
| void error(const char *msg) |
| { |
| perror(msg); |
| exit(1); |
| } |
| |
| /* readline - read a '\n' terminated line from socket fd |
| into buffer bufptr of size len. The line in the |
| buffer is terminated with '\0'. |
| It returns -1 in case of error or if |
| the capacity of the buffer is exceeded. |
| It returns 0 if EOF is encountered before reading '\n'. |
| */ |
| int readline( int fd, char *bufptr, size_t len ) |
| { |
| /* Note that this function is very tricky. It uses the |
| static variables bp, cnt, and b to establish a local buffer. |
| The recv call requests large chunks of data (the size of the buffer). |
| Then if the recv call reads more than one line, the overflow |
| remains in the buffer and it is made available to the next call |
| to readline. |
| Notice also that this routine reads up to '\n' and overwrites |
| it with '\0'. Thus if the line is really terminated with |
| "\r\n", the '\r' will remain unchanged. |
| */ |
| char *bufx = bufptr; |
| static char *bp; |
| static int cnt = 0; |
| static char b[ 4096 ]; |
| char c; |
| |
| while ( --len > 0 ) |
| { |
| if ( --cnt <= 0 ) |
| { |
| cnt = recv( fd, b, sizeof( b ), 0 ); |
| if ( cnt < 0 ) |
| { |
| if ( errno == EINTR ) |
| { |
| len++; /* the while will decrement */ |
| continue; |
| } |
| return -1; |
| } |
| if ( cnt == 0 ) |
| return 0; |
| bp = b; |
| } |
| c = *bp++; |
| *bufptr++ = c; |
| if ( c == '\n' ) |
| { |
| *bufptr = '\0'; |
| return bufptr - bufx; |
| } |
| } |
| return -1; |
| } |
| |
| int readData(int socket, uint8_t *buf, int buflen) |
| { |
| int sendSize = buflen; |
| int status; |
| |
| while (buflen) |
| { |
| #ifndef __MACH__ |
| status = read(socket, buf, buflen); |
| #else |
| status = recv(socket, buf, buflen, 0); |
| #endif |
| if (status <= 0) |
| { |
| return status; |
| } |
| buflen -= status; |
| buf += status; |
| } |
| |
| return sendSize; |
| } |
| |
| int readHdr(int socket, FlowControlProtocolHeader *hdr) |
| { |
| uint8_t buffer[sizeof(FlowControlProtocolHeader)]; |
| |
| uint8_t *data = buffer; |
| |
| int status = readData(socket, buffer, sizeof(FlowControlProtocolHeader)); |
| if (status <= 0) |
| return status; |
| |
| uint32_t value; |
| data = decode(data, value); |
| hdr->msgType = value; |
| |
| data = decode(data, value); |
| hdr->seqNumber = value; |
| |
| data = decode(data, value); |
| hdr->status = value; |
| |
| data = decode(data, value); |
| hdr->payloadLen = value; |
| |
| return sizeof(FlowControlProtocolHeader); |
| } |
| |
| int readXML(char **xmlContent) |
| { |
| std::ifstream is ("conf/flowServer.xml", std::ifstream::binary); |
| if (is) { |
| // get length of file: |
| is.seekg (0, is.end); |
| int length = is.tellg(); |
| is.seekg (0, is.beg); |
| |
| char * buffer = new char [length]; |
| |
| printf("Reading %s len %d\n", "conf/flowServer.xml", length); |
| // read data as a block: |
| is.read (buffer,length); |
| |
| is.close(); |
| |
| // ...buffer contains the entire file... |
| *xmlContent = buffer; |
| |
| return length; |
| } |
| return 0; |
| } |
| |
| static int sockfd = 0, newsockfd = 0; |
| void sigHandler(int signal) |
| { |
| if (signal == SIGINT || signal == SIGTERM) |
| { |
| close(newsockfd); |
| close(sockfd); |
| exit(1); |
| } |
| } |
| |
| int main(int argc, char *argv[]) |
| { |
| int portno; |
| socklen_t clilen; |
| struct sockaddr_in serv_addr, cli_addr; |
| char buffer[4096]; |
| int flag = 0; |
| int number = 0; |
| |
| int n; |
| if (argc < 2) { |
| fprintf(stderr,"ERROR, no port provided\n"); |
| exit(1); |
| } |
| |
| if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR) |
| { |
| |
| return -1; |
| } |
| sockfd = socket(AF_INET, SOCK_STREAM, 0); |
| if (sockfd < 0) |
| error("ERROR opening socket"); |
| bzero((char *) &serv_addr, sizeof(serv_addr)); |
| portno = atoi(argv[1]); |
| serv_addr.sin_family = AF_INET; |
| serv_addr.sin_addr.s_addr = INADDR_ANY; |
| serv_addr.sin_port = htons(portno); |
| if (bind(sockfd, (struct sockaddr *) &serv_addr, |
| sizeof(serv_addr)) < 0) |
| error("ERROR on binding"); |
| listen(sockfd,5); |
| if (portno == DEFAULT_NIFI_SERVER_PORT) |
| { |
| while (true) |
| { |
| clilen = sizeof(cli_addr); |
| newsockfd = accept(sockfd, |
| (struct sockaddr *) &cli_addr, |
| &clilen); |
| if (newsockfd < 0) |
| { |
| error("ERROR on accept"); |
| break; |
| } |
| // process request |
| FlowControlProtocolHeader hdr; |
| int status = readHdr(newsockfd, &hdr); |
| if (status > 0) |
| { |
| printf("Flow Control Protocol receive MsgType %s\n", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); |
| printf("Flow Control Protocol receive Seq Num %d\n", hdr.seqNumber); |
| printf("Flow Control Protocol receive Resp Code %s\n", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); |
| printf("Flow Control Protocol receive Payload len %d\n", hdr.payloadLen); |
| if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ) |
| { |
| printf("Flow Control Protocol Register Req receive\n"); |
| uint8_t *payload = new uint8_t[hdr.payloadLen]; |
| uint8_t *payloadPtr = payload; |
| status = readData(newsockfd, payload, hdr.payloadLen); |
| while (status > 0 && payloadPtr < (payload + hdr.payloadLen)) |
| { |
| uint32_t msgID = 0xFFFFFFFF; |
| payloadPtr = decode(payloadPtr, msgID); |
| if (((FlowControlMsgID) msgID) == FLOW_SERIAL_NUMBER) |
| { |
| // Fixed 8 bytes |
| uint8_t seqNum[8]; |
| memcpy(seqNum, payloadPtr, 8); |
| printf("Flow Control Protocol Register Req receive serial num\n"); |
| payloadPtr += 8; |
| } |
| else if (((FlowControlMsgID) msgID) == FLOW_XML_NAME) |
| { |
| uint32_t len; |
| payloadPtr = decode(payloadPtr, len); |
| printf("Flow Control Protocol receive XML name length %d\n", len); |
| std::string flowName = (const char *) payloadPtr; |
| payloadPtr += len; |
| printf("Flow Control Protocol receive XML name %s\n", flowName.c_str()); |
| } |
| else |
| { |
| break; |
| } |
| } |
| delete[] payload; |
| // Send Register Respond |
| // Calculate the total payload msg size |
| char *xmlContent; |
| uint32_t xmlLen = readXML(&xmlContent); |
| uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0); |
| if (xmlLen > 0) |
| payloadSize += FlowControlMsgIDEncodingLen(FLOW_XML_CONTENT, xmlLen); |
| |
| uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; |
| uint8_t *data = new uint8_t[size]; |
| uint8_t *start = data; |
| |
| // encode the HDR |
| hdr.msgType = REGISTER_RESP; |
| hdr.payloadLen = payloadSize; |
| hdr.status = RESP_SUCCESS; |
| data = encode(data, hdr.msgType); |
| data = encode(data, hdr.seqNumber); |
| data = encode(data, hdr.status); |
| data = encode(data, hdr.payloadLen); |
| |
| // encode the report interval |
| data = encode(data, REPORT_INTERVAL); |
| data = encode(data, DEFAULT_REPORT_INTERVAL); |
| |
| // encode the XML content |
| if (xmlLen > 0) |
| { |
| data = encode(data, FLOW_XML_CONTENT); |
| data = encode(data, xmlLen); |
| data = encode(data, (uint8_t *) xmlContent, xmlLen); |
| delete[] xmlContent; |
| } |
| |
| // send it |
| status = sendData(newsockfd, start, size); |
| delete[] start; |
| } |
| else if (((FlowControlMsgType) hdr.msgType) == REPORT_REQ) |
| { |
| printf("Flow Control Protocol Report Req receive\n"); |
| uint8_t *payload = new uint8_t[hdr.payloadLen]; |
| uint8_t *payloadPtr = payload; |
| status = readData(newsockfd, payload, hdr.payloadLen); |
| while (status > 0 && payloadPtr < (payload + hdr.payloadLen)) |
| { |
| uint32_t msgID = 0xFFFFFFFF; |
| payloadPtr = decode(payloadPtr, msgID); |
| if (((FlowControlMsgID) msgID) == FLOW_XML_NAME) |
| { |
| uint32_t len; |
| payloadPtr = decode(payloadPtr, len); |
| printf("Flow Control Protocol receive XML name length %d\n", len); |
| std::string flowName = (const char *) payloadPtr; |
| payloadPtr += len; |
| printf("Flow Control Protocol receive XML name %s\n", flowName.c_str()); |
| } |
| else |
| { |
| break; |
| } |
| } |
| delete[] payload; |
| // Send Register Respond |
| // Calculate the total payload msg size |
| std::string processor = "RealTimeDataCollector"; |
| std::string propertyName1 = "real Time Message ID"; |
| std::string propertyValue1 = "41"; |
| std::string propertyName2 = "Batch Message ID"; |
| std::string propertyValue2 = "172,30,48"; |
| if (flag == 0) |
| { |
| propertyName1 = "Real Time Message ID"; |
| propertyValue1 = "41"; |
| propertyName2 = "Batch Message ID"; |
| propertyValue2 = "172,48"; |
| flag = 1; |
| } |
| else if (flag == 1) |
| { |
| propertyName1 = "Real Time Message ID"; |
| propertyValue1 = "172,48"; |
| propertyName2 = "Batch Message ID"; |
| propertyValue2 = "41"; |
| flag = 0; |
| } |
| uint32_t payloadSize = FlowControlMsgIDEncodingLen(PROCESSOR_NAME, processor.size()+1); |
| payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName1.size()+1); |
| payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue1.size()+1); |
| payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName2.size()+1); |
| payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue2.size()+1); |
| |
| uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; |
| uint8_t *data = new uint8_t[size]; |
| uint8_t *start = data; |
| |
| // encode the HDR |
| hdr.msgType = REPORT_RESP; |
| hdr.payloadLen = payloadSize; |
| hdr.status = RESP_SUCCESS; |
| |
| if (number >= 10 && number < 20) |
| { |
| // After 10 second report, stop the flow controller for 10 second |
| hdr.status = RESP_STOP_FLOW_CONTROLLER; |
| } |
| else if (number == 20) |
| { |
| // restart the flow controller after 10 second |
| hdr.status = RESP_START_FLOW_CONTROLLER; |
| } |
| else if (number == 30) |
| { |
| // retrigger register |
| hdr.status = RESP_TRIGGER_REGISTER; |
| number = 0; |
| } |
| |
| number++; |
| |
| data = encode(data, hdr.msgType); |
| data = encode(data, hdr.seqNumber); |
| data = encode(data, hdr.status); |
| data = encode(data, hdr.payloadLen); |
| |
| // encode the processorName |
| data = encode(data, PROCESSOR_NAME); |
| data = encode(data, processor); |
| |
| // encode the propertyName and value TLV |
| data = encode(data, PROPERTY_NAME); |
| data = encode(data, propertyName1); |
| data = encode(data, PROPERTY_VALUE); |
| data = encode(data, propertyValue1); |
| data = encode(data, PROPERTY_NAME); |
| data = encode(data, propertyName2); |
| data = encode(data, PROPERTY_VALUE); |
| data = encode(data, propertyValue2); |
| // send it |
| status = sendData(newsockfd, start, size); |
| delete[] start; |
| } |
| } |
| close(newsockfd); |
| } |
| close(sockfd); |
| } |
| else |
| { |
| clilen = sizeof(cli_addr); |
| newsockfd = accept(sockfd, |
| (struct sockaddr *) &cli_addr, |
| &clilen); |
| if (newsockfd < 0) |
| error("ERROR on accept"); |
| while (1) |
| { |
| bzero(buffer,4096); |
| n = readline(newsockfd,buffer,4095); |
| if (n <= 0 ) |
| { |
| close(newsockfd); |
| newsockfd = accept(sockfd, |
| (struct sockaddr *) &cli_addr, |
| &clilen); |
| continue; |
| } |
| printf("%s",buffer); |
| } |
| close(newsockfd); |
| close(sockfd); |
| } |
| return 0; |
| } |