blob: 760b566a22c51d855883abe751b648a5019dd7b4 [file] [log] [blame]
/**
* @file RealTimeDataCollector.h
* RealTimeDataCollector class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __REAL_TIME_DATA_COLLECTOR_H__
#define __REAL_TIME_DATA_COLLECTOR_H__
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <netdb.h>
#include <string>
#include <errno.h>
#include "FlowFileRecord.h"
#include "Processor.h"
#include "ProcessSession.h"
//! RealTimeDataCollector Class
class RealTimeDataCollector : public Processor
{
public:
//! Constructor
/*!
* Create a new processor
*/
RealTimeDataCollector(std::string name, uuid_t uuid = NULL)
: Processor(name, uuid)
{
_realTimeSocket = 0;
_batchSocket = 0;
_logger = Logger::getLogger();
_firstInvoking = false;
_realTimeAccumulated = 0;
_batchAcccumulated = 0;
_queuedDataSize = 0;
}
//! Destructor
virtual ~RealTimeDataCollector()
{
if (_realTimeSocket)
close(_realTimeSocket);
if (_batchSocket)
close(_batchSocket);
if (_fileStream.is_open())
_fileStream.close();
}
//! Processor Name
static const std::string ProcessorName;
//! Supported Properties
static Property REALTIMESERVERNAME;
static Property REALTIMESERVERPORT;
static Property BATCHSERVERNAME;
static Property BATCHSERVERPORT;
static Property FILENAME;
static Property ITERATION;
static Property REALTIMEMSGID;
static Property BATCHMSGID;
static Property REALTIMEINTERVAL;
static Property BATCHINTERVAL;
static Property BATCHMAXBUFFERSIZE;
//! Supported Relationships
static Relationship Success;
//! Connect to the socket
int connectServer(const char *host, uint16_t port);
int sendData(int socket, const char *buf, int buflen);
void onTriggerRealTime(ProcessContext *context, ProcessSession *session);
void onTriggerBatch(ProcessContext *context, ProcessSession *session);
public:
//! OnTrigger method, implemented by NiFi RealTimeDataCollector
virtual void onTrigger(ProcessContext *context, ProcessSession *session);
//! Initialize, over write by NiFi RealTimeDataCollector
virtual void initialize(void);
protected:
private:
//! realtime server Name
std::string _realTimeServerName;
int64_t _realTimeServerPort;
std::string _batchServerName;
int64_t _batchServerPort;
int64_t _realTimeInterval;
int64_t _batchInterval;
int64_t _batchMaxBufferSize;
//! Match pattern for Real time Message ID
std::vector<std::string> _realTimeMsgID;
//! Match pattern for Batch Message ID
std::vector<std::string> _batchMsgID;
//! file for which the realTime collector will tail
std::string _fileName;
//! Whether we need to iterate from the beginning for demo
bool _iteration;
int _realTimeSocket;
int _batchSocket;
//! Logger
Logger *_logger;
//! Mutex for protection
std::mutex _mtx;
//! Queued data size
uint64_t _queuedDataSize;
//! Queue for the batch process
std::queue<std::string> _queue;
std::thread::id _realTimeThreadId;
std::thread::id _batchThreadId;
std::atomic<bool> _firstInvoking;
int64_t _realTimeAccumulated;
int64_t _batchAcccumulated;
std::ifstream _fileStream;
};
#endif