/**
 * @file ProcessSession.cpp
 * ProcessSession class implementation
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sys/time.h>
#include <time.h>
#include <chrono>
#include <thread>
#include <iostream>

#include "ProcessSession.h"

FlowFileRecord* ProcessSession::create()
{
	std::map<std::string, std::string> empty;
	FlowFileRecord *record = new FlowFileRecord(empty);

	if (record)
	{
		_addedFlowFiles[record->getUUIDStr()] = record;
		_logger->log_debug("Create FlowFile with UUID %s", record->getUUIDStr().c_str());
	}

	return record;
}

FlowFileRecord* ProcessSession::create(FlowFileRecord *parent)
{
	FlowFileRecord *record = this->create();
	if (record)
	{
		// Copy attributes
		std::map<std::string, std::string> parentAttributes = parent->getAttributes();
	    std::map<std::string, std::string>::iterator it;
	    for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++)
	    {
	    	if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
	    			it->first == FlowAttributeKey(DISCARD_REASON) ||
					it->first == FlowAttributeKey(UUID))
	    		// Do not copy special attributes from parent
	    		continue;
	    	record->setAttribute(it->first, it->second);
	    }
	    record->_lineageStartDate = parent->_lineageStartDate;
	    record->_lineageIdentifiers = parent->_lineageIdentifiers;
	    record->_lineageIdentifiers.insert(parent->_uuidStr);

	}
	return record;
}

FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent)
{
	FlowFileRecord *record = this->create(parent);
	if (record)
	{
		// Copy Resource Claim
		record->_claim = parent->_claim;
		if (record->_claim)
		{
			record->_offset = parent->_offset;
			record->_size = parent->_size;
			record->_claim->increaseFlowFileRecordOwnedCount();
		}
	}
	return record;
}

FlowFileRecord* ProcessSession::cloneDuringTransfer(FlowFileRecord *parent)
{
	std::map<std::string, std::string> empty;
	FlowFileRecord *record = new FlowFileRecord(empty);

	if (record)
	{
		this->_clonedFlowFiles[record->getUUIDStr()] = record;
		_logger->log_debug("Clone FlowFile with UUID %s during transfer", record->getUUIDStr().c_str());
		// Copy attributes
		std::map<std::string, std::string> parentAttributes = parent->getAttributes();
		std::map<std::string, std::string>::iterator it;
		for (it = parentAttributes.begin(); it!= parentAttributes.end(); it++)
		{
			if (it->first == FlowAttributeKey(ALTERNATE_IDENTIFIER) ||
	    			it->first == FlowAttributeKey(DISCARD_REASON) ||
					it->first == FlowAttributeKey(UUID))
	    		// Do not copy special attributes from parent
	    		continue;
	    	record->setAttribute(it->first, it->second);
	    }
	    record->_lineageStartDate = parent->_lineageStartDate;
	    record->_lineageIdentifiers = parent->_lineageIdentifiers;
	    record->_lineageIdentifiers.insert(parent->_uuidStr);

	    // Copy Resource Claim
	    record->_claim = parent->_claim;
	    if (record->_claim)
	    {
	    	record->_offset = parent->_offset;
	    	record->_size = parent->_size;
	    	record->_claim->increaseFlowFileRecordOwnedCount();
	    }
	}

	return record;
}

FlowFileRecord* ProcessSession::clone(FlowFileRecord *parent, long offset, long size)
{
	FlowFileRecord *record = this->create(parent);
	if (record)
	{
		if (parent->_claim)
		{
			if ((offset + size) > (long) parent->_size)
			{
				// Set offset and size
				_logger->log_error("clone offset %d and size %d exceed parent size %d",
						offset, size, parent->_size);
				// Remove the Add FlowFile for the session
				std::map<std::string, FlowFileRecord *>::iterator it =
						this->_addedFlowFiles.find(record->getUUIDStr());
				if (it != this->_addedFlowFiles.end())
					this->_addedFlowFiles.erase(record->getUUIDStr());
				delete record;
				return NULL;
			}
			record->_offset = parent->_offset + parent->_offset;
			record->_size = size;
			// Copy Resource Claim
			record->_claim = parent->_claim;
			record->_claim->increaseFlowFileRecordOwnedCount();
		}
	}
	return record;
}

void ProcessSession::remove(FlowFileRecord *flow)
{
	flow->_markedDelete = true;
	_deletedFlowFiles[flow->getUUIDStr()] = flow;
}

void ProcessSession::putAttribute(FlowFileRecord *flow, std::string key, std::string value)
{
	flow->setAttribute(key, value);
}

void ProcessSession::removeAttribute(FlowFileRecord *flow, std::string key)
{
	flow->removeAttribute(key);
}

void ProcessSession::penalize(FlowFileRecord *flow)
{
	flow->_penaltyExpirationMs = getTimeMillis() + this->_processContext->getProcessor()->getPenalizationPeriodMsec();
}

void ProcessSession::transfer(FlowFileRecord *flow, Relationship relationship)
{
	_transferRelationship[flow->getUUIDStr()] = relationship;
}

void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback)
{
	ResourceClaim *claim = NULL;

	claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);

	try
	{
		std::ofstream fs;
		fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
		if (fs.is_open())
		{
			// Call the callback to write the content
			callback->process(&fs);
			if (fs.good() && fs.tellp() >= 0)
			{
				flow->_size = fs.tellp();
				flow->_offset = 0;
				if (flow->_claim)
				{
					// Remove the old claim
					flow->_claim->decreaseFlowFileRecordOwnedCount();
					flow->_claim = NULL;
				}
				flow->_claim = claim;
				claim->increaseFlowFileRecordOwnedCount();
				/*
				_logger->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s",
						flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
				fs.close();
			}
			else
			{
				fs.close();
				throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
			}
		}
		else
		{
			throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
		}
	}
	catch (std::exception &exception)
	{
		if (flow && flow->_claim == claim)
		{
			flow->_claim->decreaseFlowFileRecordOwnedCount();
			flow->_claim = NULL;
		}
		if (claim)
			delete claim;
		_logger->log_debug("Caught Exception %s", exception.what());
		throw;
	}
	catch (...)
	{
		if (flow && flow->_claim == claim)
		{
			flow->_claim->decreaseFlowFileRecordOwnedCount();
			flow->_claim = NULL;
		}
		if (claim)
			delete claim;
		_logger->log_debug("Caught Exception during process session write");
		throw;
	}
}

void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback)
{
	ResourceClaim *claim = NULL;

	if (flow->_claim == NULL)
	{
		// No existed claim for append, we need to create new claim
		return write(flow, callback);
	}

	claim = flow->_claim;

	try
	{
		std::ofstream fs;
		fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::app);
		if (fs.is_open())
		{
			// Call the callback to write the content
			std::streampos oldPos = fs.tellp();
			callback->process(&fs);
			if (fs.good() && fs.tellp() >= 0)
			{
				uint64_t appendSize = fs.tellp() - oldPos;
				flow->_size += appendSize;
				/*
				_logger->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s",
						flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
				fs.close();
			}
			else
			{
				fs.close();
				throw Exception(FILE_OPERATION_EXCEPTION, "File Write Error");
			}
		}
		else
		{
			throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
		}
	}
	catch (std::exception &exception)
	{
		_logger->log_debug("Caught Exception %s", exception.what());
		throw;
	}
	catch (...)
	{
		_logger->log_debug("Caught Exception during process session append");
		throw;
	}
}

void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback)
{
	try
	{
		ResourceClaim *claim = NULL;
		if (flow->_claim == NULL)
		{
			// No existed claim for read, we throw exception
			throw Exception(FILE_OPERATION_EXCEPTION, "No Content Claim existed for read");
		}

		claim = flow->_claim;
		std::ifstream fs;
		fs.open(claim->getContentFullPath().c_str(), std::fstream::in | std::fstream::binary);
		if (fs.is_open())
		{
			fs.seekg(flow->_offset, fs.beg);

			if (fs.good())
			{
				callback->process(&fs);
				/*
				_logger->log_debug("Read offset %d size %d content %s for FlowFile UUID %s",
						flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
				fs.close();
			}
			else
			{
				fs.close();
				throw Exception(FILE_OPERATION_EXCEPTION, "File Read Error");
			}
		}
		else
		{
			throw Exception(FILE_OPERATION_EXCEPTION, "File Open Error");
		}
	}
	catch (std::exception &exception)
	{
		_logger->log_debug("Caught Exception %s", exception.what());
		throw;
	}
	catch (...)
	{
		_logger->log_debug("Caught Exception during process session read");
		throw;
	}
}

void ProcessSession::import(std::string source, FlowFileRecord *flow, bool keepSource, uint64_t offset)
{
	ResourceClaim *claim = NULL;

	claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
	char *buf = NULL;
	int size = 4096;
	buf = new char [size];

	try
	{
		std::ofstream fs;
		fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc);
		std::ifstream input;
		input.open(source.c_str(), std::fstream::in | std::fstream::binary);

		if (fs.is_open() && input.is_open())
		{
			// Open the source file and stream to the flow file
			input.seekg(offset, fs.beg);
			while (input.good())
			{
				input.read(buf, size);
				if (input)
					fs.write(buf, size);
				else
					fs.write(buf, input.gcount());
			}

			if (fs.good() && fs.tellp() >= 0)
			{
				flow->_size = fs.tellp();
				flow->_offset = 0;
				if (flow->_claim)
				{
					// Remove the old claim
					flow->_claim->decreaseFlowFileRecordOwnedCount();
					flow->_claim = NULL;
				}
				flow->_claim = claim;
				claim->increaseFlowFileRecordOwnedCount();
				/*
				_logger->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s",
						flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
				fs.close();
				input.close();
				if (!keepSource)
					std::remove(source.c_str());
			}
			else
			{
				fs.close();
				input.close();
				throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
			}
		}
		else
		{
			throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error");
		}

		delete[] buf;
	}
	catch (std::exception &exception)
	{
		if (flow && flow->_claim == claim)
		{
			flow->_claim->decreaseFlowFileRecordOwnedCount();
			flow->_claim = NULL;
		}
		if (claim)
			delete claim;
		_logger->log_debug("Caught Exception %s", exception.what());
		delete[] buf;
		throw;
	}
	catch (...)
	{
		if (flow && flow->_claim == claim)
		{
			flow->_claim->decreaseFlowFileRecordOwnedCount();
			flow->_claim = NULL;
		}
		if (claim)
			delete claim;
		_logger->log_debug("Caught Exception during process session write");
		delete[] buf;
		throw;
	}
}

void ProcessSession::commit()
{
	try
	{
		// First we clone the flow record based on the transfered relationship for updated flow record
		std::map<std::string, FlowFileRecord *>::iterator it;
		for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++)
		{
			FlowFileRecord *record = it->second;
			if (record->_markedDelete)
				continue;
			std::map<std::string, Relationship>::iterator itRelationship =
					this->_transferRelationship.find(record->getUUIDStr());
			if (itRelationship != _transferRelationship.end())
			{
				Relationship relationship = itRelationship->second;
				// Find the relationship, we need to find the connections for that relationship
				std::set<Connection *> connections =
						_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
				if (connections.empty())
				{
					// No connection
					if (!_processContext->getProcessor()->isAutoTerminated(relationship))
					{
						// Not autoterminate, we should have the connect
						std::string message = "Connect empty for non auto terminated relationship" + relationship.getName();
						throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
					}
					else
					{
						// Autoterminated
						remove(record);
					}
				}
				else
				{
					// We connections, clone the flow and assign the connection accordingly
					for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection)
					{
						Connection *connection(*itConnection);
						if (itConnection == connections.begin())
						{
							// First connection which the flow need be routed to
							record->_connection = connection;
						}
						else
						{
							// Clone the flow file and route to the connection
							FlowFileRecord *cloneRecord;
							cloneRecord = this->cloneDuringTransfer(record);
							if (cloneRecord)
								cloneRecord->_connection = connection;
							else
								throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
						}
					}
				}
			}
			else
			{
				// Can not find relationship for the flow
				throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
			}
		}

		// Do the samething for added flow file
		for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++)
		{
			FlowFileRecord *record = it->second;
			if (record->_markedDelete)
				continue;
			std::map<std::string, Relationship>::iterator itRelationship =
					this->_transferRelationship.find(record->getUUIDStr());
			if (itRelationship != _transferRelationship.end())
			{
				Relationship relationship = itRelationship->second;
				// Find the relationship, we need to find the connections for that relationship
				std::set<Connection *> connections =
						_processContext->getProcessor()->getOutGoingConnections(relationship.getName());
				if (connections.empty())
				{
					// No connection
					if (!_processContext->getProcessor()->isAutoTerminated(relationship))
					{
						// Not autoterminate, we should have the connect
						std::string message = "Connect empty for non auto terminated relationship " + relationship.getName();
						throw Exception(PROCESS_SESSION_EXCEPTION, message.c_str());
					}
					else
					{
						// Autoterminated
						remove(record);
					}
				}
				else
				{
					// We connections, clone the flow and assign the connection accordingly
					for (std::set<Connection *>::iterator itConnection = connections.begin(); itConnection != connections.end(); ++itConnection)
					{
						Connection *connection(*itConnection);
						if (itConnection == connections.begin())
						{
							// First connection which the flow need be routed to
							record->_connection = connection;
						}
						else
						{
							// Clone the flow file and route to the connection
							FlowFileRecord *cloneRecord;
							cloneRecord = this->cloneDuringTransfer(record);
							if (cloneRecord)
								cloneRecord->_connection = connection;
							else
								throw Exception(PROCESS_SESSION_EXCEPTION, "Can not clone the flow for transfer");
						}
					}
				}
			}
			else
			{
				// Can not find relationship for the flow
				throw Exception(PROCESS_SESSION_EXCEPTION, "Can not find the transfer relationship for the flow");
			}
		}

		// Complete process the added and update flow files for the session, send the flow file to its queue
		for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++)
		{
			FlowFileRecord *record = it->second;
			if (record->_markedDelete)
			{
				continue;
			}
			if (record->_connection)
				record->_connection->put(record);
			else
				delete record;
		}
		for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++)
		{
			FlowFileRecord *record = it->second;
			if (record->_markedDelete)
			{
				continue;
			}
			if (record->_connection)
				record->_connection->put(record);
			else
				delete record;
		}
		// Process the clone flow files
		for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++)
		{
			FlowFileRecord *record = it->second;
			if (record->_markedDelete)
			{
				continue;
			}
			if (record->_connection)
				record->_connection->put(record);
			else
				delete record;
		}
		// Delete the deleted flow files
		for (it = _deletedFlowFiles.begin(); it!= _deletedFlowFiles.end(); it++)
		{
			FlowFileRecord *record = it->second;
			delete record;
		}
		// Delete the snapshot
		for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++)
		{
			FlowFileRecord *record = it->second;
			delete record;
		}
		// All done
		_updatedFlowFiles.clear();
		_addedFlowFiles.clear();
		_clonedFlowFiles.clear();
		_deletedFlowFiles.clear();
		_originalFlowFiles.clear();
		_logger->log_trace("ProcessSession committed for %s", _processContext->getProcessor()->getName().c_str());
	}
	catch (std::exception &exception)
	{
		_logger->log_debug("Caught Exception %s", exception.what());
		throw;
	}
	catch (...)
	{
		_logger->log_debug("Caught Exception during process session commit");
		throw;
	}
}


void ProcessSession::rollback()
{
	try
	{
		std::map<std::string, FlowFileRecord *>::iterator it;
		// Requeue the snapshot of the flowfile back
		for (it = _originalFlowFiles.begin(); it!= _originalFlowFiles.end(); it++)
		{
			FlowFileRecord *record = it->second;
			if (record->_orginalConnection)
			{
				record->_snapshot = false;
				record->_orginalConnection->put(record);
			}
			else
				delete record;
		}
		_originalFlowFiles.clear();
		// Process the clone flow files
		for (it = _clonedFlowFiles.begin(); it!= _clonedFlowFiles.end(); it++)
		{
			FlowFileRecord *record = it->second;
			delete record;
		}
		_clonedFlowFiles.clear();
		for (it = _addedFlowFiles.begin(); it!= _addedFlowFiles.end(); it++)
		{
			FlowFileRecord *record = it->second;
			delete record;
		}
		_addedFlowFiles.clear();
		for (it = _updatedFlowFiles.begin(); it!= _updatedFlowFiles.end(); it++)
		{
			FlowFileRecord *record = it->second;
			delete record;
		}
		_updatedFlowFiles.clear();
		_deletedFlowFiles.clear();
		_logger->log_trace("ProcessSession rollback for %s", _processContext->getProcessor()->getName().c_str());
	}
	catch (std::exception &exception)
	{
		_logger->log_debug("Caught Exception %s", exception.what());
		throw;
	}
	catch (...)
	{
		_logger->log_debug("Caught Exception during process session roll back");
		throw;
	}
}

FlowFileRecord *ProcessSession::get()
{
	Connection *first = _processContext->getProcessor()->getNextIncomingConnection();

	if (first == NULL)
		return NULL;

	Connection *current = first;

	do
	{
		std::set<FlowFileRecord *> expired;
		FlowFileRecord *ret = current->poll(expired);
		if (expired.size() > 0)
		{
			// Remove expired flow record
			for (std::set<FlowFileRecord *>::iterator it = expired.begin(); it != expired.end(); ++it)
			{
				delete (*it);
			}
		}
		if (ret)
		{
			// add the flow record to the current process session update map
			ret->_markedDelete = false;
			_updatedFlowFiles[ret->getUUIDStr()] = ret;
			std::map<std::string, std::string> empty;
			FlowFileRecord *snapshot = new FlowFileRecord(empty);
			_logger->log_debug("Create Snapshot FlowFile with UUID %s", snapshot->getUUIDStr().c_str());
			snapshot->duplicate(ret);
			// save a snapshot
			_originalFlowFiles[snapshot->getUUIDStr()] = snapshot;
			return ret;
		}
		current = _processContext->getProcessor()->getNextIncomingConnection();
	}
	while (current != NULL && current != first);

	return NULL;
}

