/**
 * @file FlowControlProtocol.h
 * FlowControlProtocol class declaration
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#ifndef __FLOW_CONTROL_PROTOCOL_H__
#define __FLOW_CONTROL_PROTOCOL_H__

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <string>
#include <errno.h>
#include <chrono>
#include <thread>
#include "Logger.h"
#include "Configure.h"
#include "Property.h"

//! Forwarder declaration
class FlowController;

#define DEFAULT_NIFI_SERVER_PORT 9000
#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec
#define MAX_READ_TIMEOUT 30000 // 30 seconds

//! FlowControl Protocol Msg Type
typedef enum {
	REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version
	REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval
	REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info
	REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property
	MAX_FLOW_CONTROL_MSG_TYPE
} FlowControlMsgType;

//! FlowControl Protocol Msg Type String
static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] =
{
		"REGISTER_REQ",
		"REGISTER_RESP",
		"REPORT_REQ",
		"REPORT_RESP"
};

//! Flow Control Msg Type to String
inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type)
{
	if (type < MAX_FLOW_CONTROL_MSG_TYPE)
		return FlowControlMsgTypeStr[type];
	else
		return NULL;
}

//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV)
typedef enum {
	//Fix length 8 bytes: client to server in register request, required field
	FLOW_SERIAL_NUMBER,
	// Flow XML name TLV: client to server in register request and report request, required field
	FLOW_XML_NAME,
	// Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server
	FLOW_XML_CONTENT,
	// Fix length, 4 bytes Report interval in msec: server to client in register respond, option field
	REPORT_INTERVAL,
	// Processor Name TLV:  server to client in report respond, option field in case server want to ask client to update processor property
	PROCESSOR_NAME,
	// Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property
	PROPERTY_NAME,
	// Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property
	PROPERTY_VALUE,
	// Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server
	REPORT_BLOB,
	MAX_FLOW_MSG_ID
} FlowControlMsgID;

//! FlowControl Protocol Msg ID String
static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] =
{
		"FLOW_SERIAL_NUMBER",
		"FLOW_XML_NAME",
		"FLOW_XML_CONTENT",
		"REPORT_INTERVAL",
		"PROCESSOR_NAME"
		"PROPERTY_NAME",
		"PROPERTY_VALUE",
		"REPORT_BLOB"
};

#define TYPE_HDR_LEN 4 // Fix Hdr Type
#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes

//! FlowControl Protocol Msg Len
inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen)
{
	if (id == FLOW_SERIAL_NUMBER)
		return (TYPE_HDR_LEN + 8);
	else if (id == REPORT_INTERVAL)
		return (TYPE_HDR_LEN + 4);
	else if (id < MAX_FLOW_MSG_ID)
		return (TLV_HDR_LEN + payLoadLen);
	else
		return -1;
}

//! Flow Control Msg Id to String
inline const char *FlowControlMsgIdToStr(FlowControlMsgID id)
{
	if (id < MAX_FLOW_MSG_ID)
		return FlowControlMsgIDStr[id];
	else
		return NULL;
}

//! Flow Control Respond status code
typedef enum {
	RESP_SUCCESS,
	RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register
	RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller
	RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller
	RESP_FAILURE,
	MAX_RESP_CODE
} FlowControlRespCode;

//! FlowControl Resp Code str
static const char *FlowControlRespCodeStr[MAX_RESP_CODE] =
{
		"RESP_SUCCESS",
		"RESP_TRIGGER_REGISTER",
		"RESP_START_FLOW_CONTROLLER",
		"RESP_STOP_FLOW_CONTROLLER",
		"RESP_FAILURE"
};

//! Flow Control Resp Code to String
inline const char *FlowControlRespCodeToStr(FlowControlRespCode code)
{
	if (code < MAX_RESP_CODE)
		return FlowControlRespCodeStr[code];
	else
		return NULL;
}

//! Common FlowControlProtocol Header
typedef struct {
	uint32_t msgType; //! Msg Type
	uint32_t seqNumber; //! Seq Number to match Req with Resp
	uint32_t status; //! Resp Code, see FlowControlRespCode
	uint32_t payloadLen; //! Msg Payload length
} FlowControlProtocolHeader;

//! FlowControlProtocol Class
class FlowControlProtocol
{
public:
	//! Constructor
	/*!
	 * Create a new control protocol
	 */
	FlowControlProtocol(FlowController *controller) {
		_controller = controller;
		_logger = Logger::getLogger();
		_configure = Configure::getConfigure();
		_socket = 0;
		_serverName = "localhost";
		_serverPort = DEFAULT_NIFI_SERVER_PORT;
		_registered = false;
		_seqNumber = 0;
		_reportBlob = NULL;
		_reportBlobLen = 0;
		_reportInterval = DEFAULT_REPORT_INTERVAL;
		_running = false;

		std::string value;

		if (_configure->get(Configure::nifi_server_name, value))
		{
			_serverName = value;
			_logger->log_info("NiFi Server Name %s", _serverName.c_str());
		}
		if (_configure->get(Configure::nifi_server_port, value) && Property::StringToInt(value, _serverPort))
		{
			_logger->log_info("NiFi Server Port: [%d]", _serverPort);
		}
		if (_configure->get(Configure::nifi_server_report_interval, value))
		{
			TimeUnit unit;
			if (Property::StringToTime(value, _reportInterval, unit) &&
						Property::ConvertTimeUnitToMS(_reportInterval, unit, _reportInterval))
			{
				_logger->log_info("NiFi server report interval: [%d] ms", _reportInterval);
			}
		}
	}
	//! Destructor
	virtual ~FlowControlProtocol()
	{
		stop();
		if (_socket)
			close(_socket);
		if (_reportBlob)
			delete [] _reportBlob;
		if (this->_thread)
			delete this->_thread;
	}

public:

	//! SendRegisterRequest and Process Register Respond, return 0 for success
	int sendRegisterReq();
	//! SendReportReq and Process Report Respond, return 0 for success
	int sendReportReq();
	//! Start the flow control protocol
	void start();
	//! Stop the flow control protocol
	void stop();
	//! Set Report BLOB for periodically report
	void setReportBlob(char *blob, int len)
	{
		std::lock_guard<std::mutex> lock(_mtx);
		if (_reportBlob && _reportBlobLen >= len)
		{
			memcpy(_reportBlob, blob, len);
			_reportBlobLen = len;
		}
		else
		{
			if (_reportBlob)
				delete[] _reportBlob;
			_reportBlob = new char[len];
			_reportBlobLen = len;
		}
	}
	//! Run function for the thread
	static void run(FlowControlProtocol *protocol);
	//! set 8 bytes SerialNumber
	void setSerialNumber(uint8_t *number)
	{
		memcpy(_serialNumber, number, 8);
	}

protected:

private:
	//! Connect to the socket, return sock descriptor if success, 0 for failure
	int connectServer(const char *host, uint16_t port);
	//! Send Data via the socket, return -1 for failure
	int sendData(uint8_t *buf, int buflen);
	//! Read length into buf, return -1 for failure and 0 for EOF
	int readData(uint8_t *buf, int buflen);
	//! Select on the socket
	int selectClient(int msec);
	//! Read the header
	int readHdr(FlowControlProtocolHeader *hdr);
	//! encode uint32_t
	uint8_t *encode(uint8_t *buf, uint32_t value)
	{
		*buf++ = (value & 0xFF000000) >> 24;
		*buf++ = (value & 0x00FF0000) >> 16;
		*buf++ = (value & 0x0000FF00) >> 8;
		*buf++ = (value & 0x000000FF);
		return buf;
	}
	//! encode uint32_t
	uint8_t *decode(uint8_t *buf, uint32_t &value)
	{
		value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3]));
		return (buf + 4);
	}
	//! encode byte array
	uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size)
	{
		memcpy(buf, bufArray, size);
		buf += size;
		return buf;
	}
	//! encode std::string
	uint8_t *encode(uint8_t *buf, std::string value)
	{
		// add the \0 for size
		buf = encode(buf, value.size()+1);
		buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1);
		return buf;
	}
	//! Mutex for protection
	std::mutex _mtx;
	//! Logger
	Logger *_logger;
	//! Configure
	Configure *_configure;
	//! NiFi server Name
	std::string _serverName;
	//! NiFi server port
	int64_t _serverPort;
	//! Serial Number
	uint8_t _serialNumber[8];
	//! socket to server
	int _socket;
	//! report interal in msec
	int64_t _reportInterval;
	//! whether it was registered to the NiFi server
	bool _registered;
	//! seq number
	uint32_t _seqNumber;
	//! FlowController
	FlowController *_controller;
	//! report Blob
	char *_reportBlob;
	//! report Blob len;
	int _reportBlobLen;
	//! thread
	std::thread *_thread;
	//! whether it is running
	bool _running;
	// Prevent default copy constructor and assignment operation
	// Only support pass by reference or pointer
	FlowControlProtocol(const FlowControlProtocol &parent);
	FlowControlProtocol &operator=(const FlowControlProtocol &parent);

};

#endif
