/**
 * @file Processor.cpp
 * Processor class implementation
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sys/time.h>
#include <time.h>
#include <chrono>
#include <thread>

#include "Processor.h"
#include "ProcessContext.h"
#include "ProcessSession.h"

Processor::Processor(std::string name, uuid_t uuid)
: _name(name)
{
	if (!uuid)
		// Generate the global UUID for the flow record
		uuid_generate(_uuid);
	else
		uuid_copy(_uuid, uuid);

	char uuidStr[37];
	uuid_unparse(_uuid, uuidStr);
	_uuidStr = uuidStr;

	// Setup the default values
	_state = DISABLED;
	_strategy = TIMER_DRIVEN;
	_lossTolerant = false;
	_triggerWhenEmpty = false;
	_schedulingPeriodNano = MINIMUM_SCHEDULING_NANOS;
	_runDurantionNano = 0;
	_yieldPeriodMsec = DEFAULT_YIELD_PERIOD_SECONDS * 1000;
	_penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000;
	_maxConcurrentTasks = 1;
	_activeTasks = 0;
	_yieldExpiration = 0;
	_incomingConnectionsIter = this->_incomingConnections.begin();
	_logger = Logger::getLogger();

	_logger->log_info("Processor %s created UUID %s", _name.c_str(), _uuidStr.c_str());
}

Processor::~Processor()
{

}

bool Processor::isRunning()
{
	return (_state == RUNNING && _activeTasks > 0);
}

bool Processor::setSupportedProperties(std::set<Property> properties)
{
	if (isRunning())
	{
		_logger->log_info("Can not set processor property while the process %s is running",
				_name.c_str());
		return false;
	}

	std::lock_guard<std::mutex> lock(_mtx);

	_properties.clear();
	for (std::set<Property>::iterator it = properties.begin(); it != properties.end(); ++it)
	{
		Property item(*it);
		_properties[item.getName()] = item;
		_logger->log_info("Processor %s supported property name %s", _name.c_str(), item.getName().c_str());
	}

	return true;
}

bool Processor::setSupportedRelationships(std::set<Relationship> relationships)
{
	if (isRunning())
	{
		_logger->log_info("Can not set processor supported relationship while the process %s is running",
				_name.c_str());
		return false;
	}

	std::lock_guard<std::mutex> lock(_mtx);

	_relationships.clear();
	for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it)
	{
		Relationship item(*it);
		_relationships[item.getName()] = item;
		_logger->log_info("Processor %s supported relationship name %s", _name.c_str(), item.getName().c_str());
	}

	return true;
}

bool Processor::setAutoTerminatedRelationships(std::set<Relationship> relationships)
{
	if (isRunning())
	{
		_logger->log_info("Can not set processor auto terminated relationship while the process %s is running",
				_name.c_str());
		return false;
	}

	std::lock_guard<std::mutex> lock(_mtx);

	_autoTerminatedRelationships.clear();
	for (std::set<Relationship>::iterator it = relationships.begin(); it != relationships.end(); ++it)
	{
		Relationship item(*it);
		_autoTerminatedRelationships[item.getName()] = item;
		_logger->log_info("Processor %s auto terminated relationship name %s", _name.c_str(), item.getName().c_str());
	}

	return true;
}

bool Processor::isAutoTerminated(Relationship relationship)
{
	bool isRun = isRunning();

	if (!isRun)
		_mtx.lock();

	std::map<std::string, Relationship>::iterator it = _autoTerminatedRelationships.find(relationship.getName());
	if (it != _autoTerminatedRelationships.end())
	{
		if (!isRun)
			_mtx.unlock();
		return true;
	}
	else
	{
		if (!isRun)
			_mtx.unlock();
		return false;
	}
}

bool Processor::isSupportedRelationship(Relationship relationship)
{
	bool isRun = isRunning();

	if (!isRun)
		_mtx.lock();

	std::map<std::string, Relationship>::iterator it = _relationships.find(relationship.getName());
	if (it != _relationships.end())
	{
		if (!isRun)
			_mtx.unlock();
		return true;
	}
	else
	{
		if (!isRun)
			_mtx.unlock();
		return false;
	}
}

bool Processor::getProperty(std::string name, std::string &value)
{
	bool isRun = isRunning();

	if (!isRun)
		// Because set property only allowed in non running state, we need to obtain lock avoid rack condition
		_mtx.lock();

	std::map<std::string, Property>::iterator it = _properties.find(name);
	if (it != _properties.end())
	{
		Property item = it->second;
		value = item.getValue();
		if (!isRun)
			_mtx.unlock();
		return true;
	}
	else
	{
		if (!isRun)
			_mtx.unlock();
		return false;
	}
}

bool Processor::setProperty(std::string name, std::string value)
{

	std::lock_guard<std::mutex> lock(_mtx);
	std::map<std::string, Property>::iterator it = _properties.find(name);

	if (it != _properties.end())
	{
		Property item = it->second;
		item.setValue(value);
		_properties[item.getName()] = item;
		_logger->log_info("Processor %s property name %s value %s", _name.c_str(), item.getName().c_str(), value.c_str());
		return true;
	}
	else
	{
		return false;
	}
}

std::set<Connection *> Processor::getOutGoingConnections(std::string relationship)
{
	std::set<Connection *> empty;

	std::map<std::string, std::set<Connection *>>::iterator it = _outGoingConnections.find(relationship);
	if (it != _outGoingConnections.end())
	{
		return _outGoingConnections[relationship];
	}
	else
	{
		return empty;
	}
}

bool Processor::addConnection(Connection *connection)
{
	bool ret = false;

	if (isRunning())
	{
		_logger->log_info("Can not add connection while the process %s is running",
				_name.c_str());
		return false;
	}

	std::lock_guard<std::mutex> lock(_mtx);

	uuid_t srcUUID;
	uuid_t destUUID;

	connection->getSourceProcessorUUID(srcUUID);
	connection->getDestinationProcessorUUID(destUUID);

	if (uuid_compare(_uuid, destUUID) == 0)
	{
		// Connection is destination to the current processor
		if (_incomingConnections.find(connection) == _incomingConnections.end())
		{
			_incomingConnections.insert(connection);
			connection->setDestinationProcessor(this);
			_logger->log_info("Add connection %s into Processor %s incoming connection",
					connection->getName().c_str(), _name.c_str());
			_incomingConnectionsIter = this->_incomingConnections.begin();
			ret = true;
		}
	}

	if (uuid_compare(_uuid, srcUUID) == 0)
	{
		std::string relationship = connection->getRelationship().getName();
		// Connection is source from the current processor
		std::map<std::string, std::set<Connection *>>::iterator it =
				_outGoingConnections.find(relationship);
		if (it != _outGoingConnections.end())
		{
			// We already has connection for this relationship
			std::set<Connection *> existedConnection = it->second;
			if (existedConnection.find(connection) == existedConnection.end())
			{
				// We do not have the same connection for this relationship yet
				existedConnection.insert(connection);
				connection->setSourceProcessor(this);
				_outGoingConnections[relationship] = existedConnection;
				_logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s",
												connection->getName().c_str(), _name.c_str(), relationship.c_str());
				ret = true;
			}
		}
		else
		{
			// We do not have any outgoing connection for this relationship yet
			std::set<Connection *> newConnection;
			newConnection.insert(connection);
			connection->setSourceProcessor(this);
			_outGoingConnections[relationship] = newConnection;
			_logger->log_info("Add connection %s into Processor %s outgoing connection for relationship %s",
								connection->getName().c_str(), _name.c_str(), relationship.c_str());
			ret = true;
		}
	}

	return ret;
}

void Processor::removeConnection(Connection *connection)
{
	if (isRunning())
	{
		_logger->log_info("Can not remove connection while the process %s is running",
				_name.c_str());
		return;
	}

	std::lock_guard<std::mutex> lock(_mtx);

	uuid_t srcUUID;
	uuid_t destUUID;

	connection->getSourceProcessorUUID(srcUUID);
	connection->getDestinationProcessorUUID(destUUID);

	if (uuid_compare(_uuid, destUUID) == 0)
	{
		// Connection is destination to the current processor
		if (_incomingConnections.find(connection) != _incomingConnections.end())
		{
			_incomingConnections.erase(connection);
			connection->setDestinationProcessor(NULL);
			_logger->log_info("Remove connection %s into Processor %s incoming connection",
					connection->getName().c_str(), _name.c_str());
			_incomingConnectionsIter = this->_incomingConnections.begin();
		}
	}

	if (uuid_compare(_uuid, srcUUID) == 0)
	{
		std::string relationship = connection->getRelationship().getName();
		// Connection is source from the current processor
		std::map<std::string, std::set<Connection *>>::iterator it =
				_outGoingConnections.find(relationship);
		if (it == _outGoingConnections.end())
		{
			return;
		}
		else
		{
			if (_outGoingConnections[relationship].find(connection) != _outGoingConnections[relationship].end())
			{
				_outGoingConnections[relationship].erase(connection);
				connection->setSourceProcessor(NULL);
				_logger->log_info("Remove connection %s into Processor %s outgoing connection for relationship %s",
								connection->getName().c_str(), _name.c_str(), relationship.c_str());
			}
		}
	}
}

Connection *Processor::getNextIncomingConnection()
{
	std::lock_guard<std::mutex> lock(_mtx);

	if (_incomingConnections.size() == 0)
		return NULL;

	if (_incomingConnectionsIter == _incomingConnections.end())
		_incomingConnectionsIter = _incomingConnections.begin();

	Connection *ret = *_incomingConnectionsIter;
	_incomingConnectionsIter++;

	if (_incomingConnectionsIter == _incomingConnections.end())
		_incomingConnectionsIter = _incomingConnections.begin();

	return ret;
}

bool Processor::flowFilesQueued()
{
	std::lock_guard<std::mutex> lock(_mtx);

	if (_incomingConnections.size() == 0)
		return false;

	for (std::set<Connection *>::iterator it = _incomingConnections.begin(); it != _incomingConnections.end(); ++it)
	{
		Connection *connection = *it;
		if (connection->getQueueSize() > 0)
			return true;
	}

	return false;
}

bool Processor::flowFilesOutGoingFull()
{
	std::lock_guard<std::mutex> lock(_mtx);

	std::map<std::string, std::set<Connection *>>::iterator it;

	for (it = _outGoingConnections.begin(); it != _outGoingConnections.end(); ++it)
	{
		// We already has connection for this relationship
		std::set<Connection *> existedConnection = it->second;
		for (std::set<Connection *>::iterator itConnection = existedConnection.begin(); itConnection != existedConnection.end(); ++itConnection)
		{
			Connection *connection = *itConnection;
			if (connection->isFull())
				return true;
		}
	}

	return false;
}

void Processor::onTrigger()
{
	ProcessContext *context = new ProcessContext(this);
	ProcessSession *session = new ProcessSession(context);
	try {
		// Call the child onTrigger function
		this->onTrigger(context, session);
		session->commit();
		delete session;
		delete context;
	}
	catch (std::exception &exception)
	{
		_logger->log_debug("Caught Exception %s", exception.what());
		session->rollback();
		delete session;
		delete context;
		throw;
	}
	catch (...)
	{
		_logger->log_debug("Caught Exception Processor::onTrigger");
		session->rollback();
		delete session;
		delete context;
		throw;
	}
}
