blob: c7118ff5e6dbd93f791f8efb2e47b4b39c201e82 [file] [log] [blame]
/**
* @file RealTimeDataCollector.cpp
* RealTimeDataCollector class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sys/time.h>
#include <time.h>
#include <chrono>
#include <thread>
#include <random>
#include <netinet/tcp.h>
#include "RealTimeDataCollector.h"
#include "ProcessContext.h"
#include "ProcessSession.h"
const std::string RealTimeDataCollector::ProcessorName("RealTimeDataCollector");
Property RealTimeDataCollector::FILENAME("File Name", "File Name for the real time processor to process", "data.osp");
Property RealTimeDataCollector::REALTIMESERVERNAME("Real Time Server Name", "Real Time Server Name", "localhost");
Property RealTimeDataCollector::REALTIMESERVERPORT("Real Time Server Port", "Real Time Server Port", "10000");
Property RealTimeDataCollector::BATCHSERVERNAME("Batch Server Name", "Batch Server Name", "localhost");
Property RealTimeDataCollector::BATCHSERVERPORT("Batch Server Port", "Batch Server Port", "10001");
Property RealTimeDataCollector::ITERATION("Iteration",
"If true, sample osp file will be iterated", "true");
Property RealTimeDataCollector::REALTIMEMSGID("Real Time Message ID", "Real Time Message ID", "41");
Property RealTimeDataCollector::BATCHMSGID("Batch Message ID", "Batch Message ID", "172, 30, 48");
Property RealTimeDataCollector::REALTIMEINTERVAL("Real Time Interval", "Real Time Data Collection Interval in msec", "10 ms");
Property RealTimeDataCollector::BATCHINTERVAL("Batch Time Interval", "Batch Processing Interval in msec", "100 ms");
Property RealTimeDataCollector::BATCHMAXBUFFERSIZE("Batch Max Buffer Size", "Batch Buffer Maximum size in bytes", "262144");
Relationship RealTimeDataCollector::Success("success", "success operational on the flow record");
void RealTimeDataCollector::initialize()
{
//! Set the supported properties
std::set<Property> properties;
properties.insert(FILENAME);
properties.insert(REALTIMESERVERNAME);
properties.insert(REALTIMESERVERPORT);
properties.insert(BATCHSERVERNAME);
properties.insert(BATCHSERVERPORT);
properties.insert(ITERATION);
properties.insert(REALTIMEMSGID);
properties.insert(BATCHMSGID);
properties.insert(REALTIMEINTERVAL);
properties.insert(BATCHINTERVAL);
properties.insert(BATCHMAXBUFFERSIZE);
setSupportedProperties(properties);
//! Set the supported relationships
std::set<Relationship> relationships;
relationships.insert(Success);
setSupportedRelationships(relationships);
}
int RealTimeDataCollector::connectServer(const char *host, uint16_t port)
{
in_addr_t addr;
int sock = 0;
struct hostent *h;
#ifdef __MACH__
h = gethostbyname(host);
#else
char buf[1024];
struct hostent he;
int hh_errno;
gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
#endif
memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
_logger->log_error("Could not create socket to hostName %s", host);
return 0;
}
#ifndef __MACH__
int opt = 1;
bool nagle_off = true;
if (nagle_off)
{
if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0)
{
_logger->log_error("setsockopt() TCP_NODELAY failed");
close(sock);
return 0;
}
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
(char *)&opt, sizeof(opt)) < 0)
{
_logger->log_error("setsockopt() SO_REUSEADDR failed");
close(sock);
return 0;
}
}
int sndsize = 256*1024;
if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0)
{
_logger->log_error("setsockopt() SO_SNDBUF failed");
close(sock);
return 0;
}
#endif
struct sockaddr_in sa;
socklen_t socklen;
int status;
//TODO bind socket to the interface
memset(&sa, 0, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_addr.s_addr = htonl(INADDR_ANY);
sa.sin_port = htons(0);
socklen = sizeof(sa);
if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
{
_logger->log_error("socket bind failed");
close(sock);
return 0;
}
memset(&sa, 0, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_addr.s_addr = addr;
sa.sin_port = htons(port);
socklen = sizeof(sa);
status = connect(sock, (struct sockaddr *)&sa, socklen);
if (status < 0)
{
_logger->log_error("socket connect failed to %s %d", host, port);
close(sock);
return 0;
}
_logger->log_info("socket %d connect to server %s port %d success", sock, host, port);
return sock;
}
int RealTimeDataCollector::sendData(int socket, const char *buf, int buflen)
{
int ret = 0, bytes = 0;
while (bytes < buflen)
{
ret = send(socket, buf+bytes, buflen-bytes, 0);
//check for errors
if (ret == -1)
{
return ret;
}
bytes+=ret;
}
if (ret)
_logger->log_debug("Send data size %d over socket %d", buflen, socket);
return ret;
}
void RealTimeDataCollector::onTriggerRealTime(ProcessContext *context, ProcessSession *session)
{
if (_realTimeAccumulated >= this->_realTimeInterval)
{
std::string value;
if (this->getProperty(REALTIMEMSGID.getName(), value))
{
this->_realTimeMsgID.clear();
this->_logger->log_info("Real Time Msg IDs %s", value.c_str());
std::stringstream lineStream(value);
std::string cell;
while(std::getline(lineStream, cell, ','))
{
this->_realTimeMsgID.push_back(cell);
// this->_logger->log_debug("Real Time Msg ID %s", cell.c_str());
}
}
if (this->getProperty(BATCHMSGID.getName(), value))
{
this->_batchMsgID.clear();
this->_logger->log_info("Batch Msg IDs %s", value.c_str());
std::stringstream lineStream(value);
std::string cell;
while(std::getline(lineStream, cell, ','))
{
cell = Property::trim(cell);
this->_batchMsgID.push_back(cell);
// this->_logger->log_debug("Batch Msg ID %s", cell.c_str());
}
}
// _logger->log_info("onTriggerRealTime");
// Open the file
if (!this->_fileStream.is_open())
{
_fileStream.open(this->_fileName.c_str(), std::ifstream::in);
if (this->_fileStream.is_open())
_logger->log_debug("open %s", _fileName.c_str());
}
if (!_fileStream.good())
{
_logger->log_error("load data file failed %s", _fileName.c_str());
return;
}
if (this->_fileStream.is_open())
{
std::string line;
while (std::getline(_fileStream, line))
{
line += "\n";
std::stringstream lineStream(line);
std::string cell;
if (std::getline(lineStream, cell, ','))
{
cell = Property::trim(cell);
// Check whether it match to the batch traffic
for (std::vector<std::string>::iterator it = _batchMsgID.begin(); it != _batchMsgID.end(); ++it)
{
if (cell == *it)
{
// push the batch data to the queue
std::lock_guard<std::mutex> lock(_mtx);
while ((_queuedDataSize + line.size()) > _batchMaxBufferSize)
{
std::string item = _queue.front();
_queuedDataSize -= item.size();
_logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize);
_queue.pop();
}
_queue.push(line);
_queuedDataSize += line.size();
_logger->log_debug("Push batch msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize);
}
}
bool findRealTime = false;
// Check whether it match to the real time traffic
for (std::vector<std::string>::iterator it = _realTimeMsgID.begin(); it != _realTimeMsgID.end(); ++it)
{
if (cell == *it)
{
int status = 0;
if (this->_realTimeSocket <= 0)
{
// Connect the LTE socket
uint16_t port = _realTimeServerPort;
this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port);
}
if (this->_realTimeSocket)
{
// try to send the data
status = sendData(_realTimeSocket, line.data(), line.size());
if (status < 0)
{
close(_realTimeSocket);
_realTimeSocket = 0;
}
}
if (this->_realTimeSocket <= 0 || status < 0)
{
// push the batch data to the queue
std::lock_guard<std::mutex> lock(_mtx);
while ((_queuedDataSize + line.size()) > _batchMaxBufferSize)
{
std::string item = _queue.front();
_queuedDataSize -= item.size();
_logger->log_debug("Pop item size %d from batch queue, queue buffer size %d", item.size(), _queuedDataSize);
_queue.pop();
}
_queue.push(line);
_queuedDataSize += line.size();
_logger->log_debug("Push real time msg ID %s into batch queue, queue buffer size %d", cell.c_str(), _queuedDataSize);
}
// find real time
findRealTime = true;
} // cell
} // for real time pattern
if (findRealTime)
// we break the while once we find the first real time
break;
} // if get line
} // while
if (_fileStream.eof())
{
_fileStream.close();
}
} // if open
_realTimeAccumulated = 0;
}
_realTimeAccumulated += context->getProcessor()->getSchedulingPeriodNano();
}
void RealTimeDataCollector::onTriggerBatch(ProcessContext *context, ProcessSession *session)
{
if (_batchAcccumulated >= this->_batchInterval)
{
// _logger->log_info("onTriggerBatch");
// dequeue the batch and send over WIFI
int status = 0;
if (this->_batchSocket <= 0)
{
// Connect the WIFI socket
uint16_t port = _batchServerPort;
this->_batchSocket = connectServer(_batchServerName.c_str(), port);
}
if (this->_batchSocket)
{
std::lock_guard<std::mutex> lock(_mtx);
while (!_queue.empty())
{
std::string line = _queue.front();
status = sendData(_batchSocket, line.data(), line.size());
_queue.pop();
_queuedDataSize -= line.size();
if (status < 0)
{
close(_batchSocket);
_batchSocket = 0;
break;
}
}
}
_batchAcccumulated = 0;
}
_batchAcccumulated += context->getProcessor()->getSchedulingPeriodNano();
}
void RealTimeDataCollector::onTrigger(ProcessContext *context, ProcessSession *session)
{
std::thread::id id = std::this_thread::get_id();
if (id == _realTimeThreadId)
return onTriggerRealTime(context, session);
else if (id == _batchThreadId)
return onTriggerBatch(context, session);
else
{
std::lock_guard<std::mutex> lock(_mtx);
if (!this->_firstInvoking)
{
this->_fileName = "data.osp";
std::string value;
if (this->getProperty(FILENAME.getName(), value))
{
this->_fileName = value;
this->_logger->log_info("Data Collector File Name %s", _fileName.c_str());
}
this->_realTimeServerName = "localhost";
if (this->getProperty(REALTIMESERVERNAME.getName(), value))
{
this->_realTimeServerName = value;
this->_logger->log_info("Real Time Server Name %s", this->_realTimeServerName.c_str());
}
this->_realTimeServerPort = 10000;
if (this->getProperty(REALTIMESERVERPORT.getName(), value))
{
Property::StringToInt(value, _realTimeServerPort);
this->_logger->log_info("Real Time Server Port %d", _realTimeServerPort);
}
if (this->getProperty(BATCHSERVERNAME.getName(), value))
{
this->_batchServerName = value;
this->_logger->log_info("Batch Server Name %s", this->_batchServerName.c_str());
}
this->_batchServerPort = 10001;
if (this->getProperty(BATCHSERVERPORT.getName(), value))
{
Property::StringToInt(value, _batchServerPort);
this->_logger->log_info("Batch Server Port %d", _batchServerPort);
}
if (this->getProperty(ITERATION.getName(), value))
{
Property::StringToBool(value, this->_iteration);
_logger->log_info("Iteration %d", _iteration);
}
this->_realTimeInterval = 10000000; //10 msec
if (this->getProperty(REALTIMEINTERVAL.getName(), value))
{
TimeUnit unit;
if (Property::StringToTime(value, _realTimeInterval, unit) &&
Property::ConvertTimeUnitToNS(_realTimeInterval, unit, _realTimeInterval))
{
_logger->log_info("Real Time Interval: [%d] ns", _realTimeInterval);
}
}
this->_batchInterval = 100000000; //100 msec
if (this->getProperty(BATCHINTERVAL.getName(), value))
{
TimeUnit unit;
if (Property::StringToTime(value, _batchInterval, unit) &&
Property::ConvertTimeUnitToNS(_batchInterval, unit, _batchInterval))
{
_logger->log_info("Batch Time Interval: [%d] ns", _batchInterval);
}
}
this->_batchMaxBufferSize = 256*1024;
if (this->getProperty(BATCHMAXBUFFERSIZE.getName(), value))
{
Property::StringToInt(value, _batchMaxBufferSize);
this->_logger->log_info("Batch Max Buffer Size %d", _batchMaxBufferSize);
}
if (this->getProperty(REALTIMEMSGID.getName(), value))
{
this->_logger->log_info("Real Time Msg IDs %s", value.c_str());
std::stringstream lineStream(value);
std::string cell;
while(std::getline(lineStream, cell, ','))
{
this->_realTimeMsgID.push_back(cell);
this->_logger->log_info("Real Time Msg ID %s", cell.c_str());
}
}
if (this->getProperty(BATCHMSGID.getName(), value))
{
this->_logger->log_info("Batch Msg IDs %s", value.c_str());
std::stringstream lineStream(value);
std::string cell;
while(std::getline(lineStream, cell, ','))
{
cell = Property::trim(cell);
this->_batchMsgID.push_back(cell);
this->_logger->log_info("Batch Msg ID %s", cell.c_str());
}
}
// Connect the LTE socket
uint16_t port = _realTimeServerPort;
this->_realTimeSocket = connectServer(_realTimeServerName.c_str(), port);
// Connect the WIFI socket
port = _batchServerPort;
this->_batchSocket = connectServer(_batchServerName.c_str(), port);
// Open the file
_fileStream.open(this->_fileName.c_str(), std::ifstream::in);
if (!_fileStream.good())
{
_logger->log_error("load data file failed %s", _fileName.c_str());
return;
}
else
{
_logger->log_debug("open %s", _fileName.c_str());
}
_realTimeThreadId = id;
this->_firstInvoking = true;
}
else
{
if (id != _realTimeThreadId)
_batchThreadId = id;
this->_firstInvoking = false;
}
}
}