blob: e036b89b600c9018d063b858ac63b59daa5bad50 [file] [log] [blame]
/**
* @file Connection.cpp
* Connection class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sys/time.h>
#include <time.h>
#include <chrono>
#include <thread>
#include <iostream>
#include "Connection.h"
Connection::Connection(std::string name, uuid_t uuid, uuid_t srcUUID, uuid_t destUUID)
: _name(name)
{
if (!uuid)
// Generate the global UUID for the flow record
uuid_generate(_uuid);
else
uuid_copy(_uuid, uuid);
if (srcUUID)
uuid_copy(_srcUUID, srcUUID);
if (destUUID)
uuid_copy(_destUUID, destUUID);
_srcProcessor = NULL;
_destProcessor = NULL;
_maxQueueSize = 0;
_maxQueueDataSize = 0;
_expiredDuration = 0;
_queuedDataSize = 0;
_logger = Logger::getLogger();
_logger->log_info("Connection %s created", _name.c_str());
}
bool Connection::isEmpty()
{
std::lock_guard<std::mutex> lock(_mtx);
return _queue.empty();
}
bool Connection::isFull()
{
std::lock_guard<std::mutex> lock(_mtx);
if (_maxQueueSize <= 0 && _maxQueueDataSize <= 0)
// No back pressure setting
return false;
if (_maxQueueSize > 0 && _queue.size() >= _maxQueueSize)
return true;
if (_maxQueueDataSize > 0 && _queuedDataSize >= _maxQueueDataSize)
return true;
return false;
}
void Connection::put(FlowFileRecord *flow)
{
std::lock_guard<std::mutex> lock(_mtx);
_queue.push(flow);
_queuedDataSize += flow->getSize();
_logger->log_debug("Enqueue flow file UUID %s to connection %s",
flow->getUUIDStr().c_str(), _name.c_str());
}
FlowFileRecord* Connection::poll(std::set<FlowFileRecord *> &expiredFlowRecords)
{
std::lock_guard<std::mutex> lock(_mtx);
while (!_queue.empty())
{
FlowFileRecord *item = _queue.front();
_queue.pop();
_queuedDataSize -= item->getSize();
if (_expiredDuration > 0)
{
// We need to check for flow expiration
if (getTimeMillis() > (item->getEntryDate() + _expiredDuration))
{
// Flow record expired
expiredFlowRecords.insert(item);
}
else
{
// Flow record not expired
if (item->isPenalized())
{
// Flow record was penalized
_queue.push(item);
_queuedDataSize += item->getSize();
break;
}
item->setOriginalConnection(this);
_logger->log_debug("Dequeue flow file UUID %s from connection %s",
item->getUUIDStr().c_str(), _name.c_str());
return item;
}
}
else
{
// Flow record not expired
if (item->isPenalized())
{
// Flow record was penalized
_queue.push(item);
_queuedDataSize += item->getSize();
break;
}
item->setOriginalConnection(this);
_logger->log_debug("Dequeue flow file UUID %s from connection %s",
item->getUUIDStr().c_str(), _name.c_str());
return item;
}
}
return NULL;
}
void Connection::drain()
{
std::lock_guard<std::mutex> lock(_mtx);
while (!_queue.empty())
{
FlowFileRecord *item = _queue.front();
_queue.pop();
delete item;
}
_logger->log_debug("Drain connection %s", _name.c_str());
}