blob: e6972addb4caa776a8685bc1e144f1753a7d65e1 [file] [log] [blame]
/**
* @file Site2SitePeer.h
* Site2SitePeer class declaration for site to site peer
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __SITE2SITE_PEER_H__
#define __SITE2SITE_PEER_H__
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <string>
#include <errno.h>
#include <mutex>
#include <atomic>
#include "TimeUtil.h"
#include "Logger.h"
#include "Configure.h"
#include "Property.h"
class CRC32
{
public:
CRC32() {
crc = 0;
if (tableInit)
return;
tableInit = true;
unsigned int poly = 0xedb88320;
unsigned int temp = 0;
for(unsigned int i = 0; i < 256; ++i) {
temp = i;
for(int j = 8; j > 0; --j) {
if((temp & 1) == 1) {
temp = (unsigned int)((temp >> 1) ^ poly);
}else {
temp >>= 1;
}
}
table[i] = temp;
}
}
unsigned int update(uint8_t * bytes, size_t size) {
crc = crc ^ ~0U;
for(unsigned int i = 0; i < size; ++i) {
uint8_t index = (uint8_t)(((crc) & 0xff) ^ bytes[i]);
crc = (unsigned int)((crc >> 8) ^ table[index]);
}
crc = crc ^ ~0U;
return crc;
}
long getCRC()
{
return crc;
}
private:
static unsigned int table[256];
static std::atomic<bool> tableInit;
unsigned int crc;
};
static const char MAGIC_BYTES[] = {'N', 'i', 'F', 'i'};
//! Site2SitePeer Class
class Site2SitePeer
{
public:
//! Constructor
/*!
* Create a new site2site peer
*/
Site2SitePeer(std::string host, uint16_t port) {
_logger = Logger::getLogger();
_configure = Configure::getConfigure();
_socket = 0;
_host = host;
_port = port;
_yieldExpiration = 0;
_timeOut = 30000; // 30 seconds
_url = "nifi://" + _host + ":" + std::to_string(_port);
}
//! Destructor
virtual ~Site2SitePeer() { Close();}
//! Set Processor yield period in MilliSecond
void setYieldPeriodMsec(uint64_t period) {
_yieldPeriodMsec = period;
}
//! get URL
std::string getURL() {
return _url;
}
//! Get Processor yield period in MilliSecond
uint64_t getYieldPeriodMsec(void) {
return(_yieldPeriodMsec);
}
//! Yield based on the yield period
void yield()
{
_yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
}
//! setHostName
void setHostName(std::string host)
{
_host = host;
_url = "nifi://" + _host + ":" + std::to_string(_port);
}
//! setPort
void setPort(uint16_t port)
{
_port = port;
_url = "nifi://" + _host + ":" + std::to_string(_port);
}
//! getHostName
std::string getHostName()
{
return _host;
}
//! getPort
uint16_t getPort()
{
return _port;
}
//! Yield based on the input time
void yield(uint64_t time)
{
_yieldExpiration = (getTimeMillis() + time);
}
//! whether need be to yield
bool isYield()
{
if (_yieldExpiration > 0)
return (_yieldExpiration >= getTimeMillis());
else
return false;
}
// clear yield expiration
void clearYield()
{
_yieldExpiration = 0;
}
//! Yield based on the yield period
void yield(std::string portId)
{
std::lock_guard<std::mutex> lock(_mtx);
uint64_t yieldExpiration = (getTimeMillis() + _yieldPeriodMsec);
_yieldExpirationPortIdMap[portId] = yieldExpiration;
}
//! Yield based on the input time
void yield(std::string portId, uint64_t time)
{
std::lock_guard<std::mutex> lock(_mtx);
uint64_t yieldExpiration = (getTimeMillis() + time);
_yieldExpirationPortIdMap[portId] = yieldExpiration;
}
//! whether need be to yield
bool isYield(std::string portId)
{
std::lock_guard<std::mutex> lock(_mtx);
std::map<std::string, uint64_t>::iterator it = this->_yieldExpirationPortIdMap.find(portId);
if (it != _yieldExpirationPortIdMap.end())
{
uint64_t yieldExpiration = it->second;
return (yieldExpiration >= getTimeMillis());
}
else
{
return false;
}
}
//! clear yield expiration
void clearYield(std::string portId)
{
std::lock_guard<std::mutex> lock(_mtx);
std::map<std::string, uint64_t>::iterator it = this->_yieldExpirationPortIdMap.find(portId);
if (it != _yieldExpirationPortIdMap.end())
{
_yieldExpirationPortIdMap.erase(portId);
}
}
//! setTimeOut
void setTimeOut(uint64_t time)
{
_timeOut = time;
}
int write(uint8_t value, CRC32 *crc = NULL)
{
return sendData(&value, 1, crc);
}
int write(char value, CRC32 *crc = NULL)
{
return sendData((uint8_t *)&value, 1, crc);
}
int write(uint32_t value, CRC32 *crc = NULL)
{
uint8_t temp[4];
temp[0] = (value & 0xFF000000) >> 24;
temp[1] = (value & 0x00FF0000) >> 16;
temp[2] = (value & 0x0000FF00) >> 8;
temp[3] = (value & 0x000000FF);
return sendData(temp, 4, crc);
}
int write(uint16_t value, CRC32 *crc = NULL)
{
uint8_t temp[2];
temp[0] = (value & 0xFF00) >> 8;
temp[1] = (value & 0xFF);
return sendData(temp, 2, crc);
}
int write(uint8_t *value, int len, CRC32 *crc = NULL)
{
return sendData(value, len, crc);
}
int write(uint64_t value, CRC32 *crc = NULL)
{
uint8_t temp[8];
temp[0] = (value >> 56) & 0xFF;
temp[1] = (value >> 48) & 0xFF;
temp[2] = (value >> 40) & 0xFF;
temp[3] = (value >> 32) & 0xFF;
temp[4] = (value >> 24) & 0xFF;
temp[5] = (value >> 16) & 0xFF;
temp[6] = (value >> 8) & 0xFF;
temp[7] = (value >> 0) & 0xFF;
return sendData(temp, 8, crc);
}
int write(bool value, CRC32 *crc = NULL)
{
uint8_t temp = value;
return write(temp, crc);
}
int writeUTF(std::string str, bool widen = false, CRC32 *crc = NULL);
int read(uint8_t &value, CRC32 *crc = NULL)
{
uint8_t buf;
int ret = readData(&buf, 1, crc);
if (ret == 1)
value = buf;
return ret;
}
int read(uint16_t &value, CRC32 *crc = NULL)
{
uint8_t buf[2];
int ret = readData(buf, 2, crc);
if (ret == 2)
value = (buf[0] << 8) | buf[1];
return ret;
}
int read(char &value, CRC32 *crc = NULL)
{
uint8_t buf;
int ret = readData(&buf, 1, crc);
if (ret == 1)
value = (char) buf;
return ret;
}
int read(uint8_t *value, int len, CRC32 *crc = NULL)
{
return readData(value, len, crc);
}
int read(uint32_t &value, CRC32 *crc = NULL)
{
uint8_t buf[4];
int ret = readData(buf, 4, crc);
if (ret == 4)
value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
return ret;
}
int read(uint64_t &value, CRC32 *crc = NULL)
{
uint8_t buf[8];
int ret = readData(buf, 8, crc);
if (ret == 8)
{
value = ((uint64_t) buf[0] << 56) |
((uint64_t) (buf[1] & 255) << 48) |
((uint64_t) (buf[2] & 255) << 40) |
((uint64_t) (buf[3] & 255) << 32) |
((uint64_t) (buf[4] & 255) << 24) |
((uint64_t) (buf[5] & 255) << 16) |
((uint64_t) (buf[6] & 255) << 8) |
((uint64_t) (buf[7] & 255) << 0);
}
return ret;
}
int readUTF(std::string &str, bool widen = false, CRC32 *crc = NULL);
//! open connection to the peer
bool Open();
//! close connection to the peer
void Close();
//! Send Data via the socket, return -1 for failure
int sendData(uint8_t *buf, int buflen, CRC32 *crc = NULL);
//! Read length into buf, return -1 for failure and 0 for EOF
int readData(uint8_t *buf, int buflen, CRC32 *crc = NULL);
//! Select on the socket
int Select(int msec);
protected:
private:
//! Mutex for protection
std::mutex _mtx;
//! S2S server Name
std::string _host;
//! S2S server port
uint16_t _port;
//! socket to server
int _socket;
//! URL
std::string _url;
//! socket timeout;
std::atomic<uint64_t> _timeOut;
//! Logger
Logger *_logger;
//! Configure
Configure *_configure;
//! Yield Period in Milliseconds
std::atomic<uint64_t> _yieldPeriodMsec;
//! Yield Expiration
std::atomic<uint64_t> _yieldExpiration;
//! Yield Expiration per destination PortID
std::map<std::string, uint64_t> _yieldExpirationPortIdMap;
// Prevent default copy constructor and assignment operation
// Only support pass by reference or pointer
Site2SitePeer(const Site2SitePeer &parent);
Site2SitePeer &operator=(const Site2SitePeer &parent);
};
#endif