/**
 * @file GetFile.cpp
 * GetFile class implementation
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <time.h>
#include <sstream>
#include <stdio.h>
#include <string>
#include <iostream>
#include <dirent.h>
#include <limits.h>
#include <unistd.h>

#include "TimeUtil.h"
#include "GetFile.h"
#include "ProcessContext.h"
#include "ProcessSession.h"

const std::string GetFile::ProcessorName("GetFile");
Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull in each iteration", "10");
Property GetFile::Directory("Input Directory", "The input directory from which to pull files", ".");
Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether or not hidden files should be ignored", "true");
Property GetFile::KeepSourceFile("Keep Source File",
		"If true, the file is not deleted after it has been copied to the Content Repository", "false");
Property GetFile::MaxAge("Maximum File Age",
		"The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored", "0 sec");
Property GetFile::MinAge("Minimum File Age",
		"The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored", "0 sec");
Property GetFile::MaxSize("Maximum File Size", "The maximum size that a file can be in order to be pulled", "0 B");
Property GetFile::MinSize("Minimum File Size", "The minimum size that a file must be in order to be pulled", "0 B");
Property GetFile::PollInterval("Polling Interval", "Indicates how long to wait before performing a directory listing", "0 sec");
Property GetFile::Recurse("Recurse Subdirectories", "Indicates whether or not to pull files from subdirectories", "true");
Relationship GetFile::Success("success", "All files are routed to success");

void GetFile::initialize()
{
	//! Set the supported properties
	std::set<Property> properties;
	properties.insert(BatchSize);
	properties.insert(Directory);
	properties.insert(IgnoreHiddenFile);
	properties.insert(KeepSourceFile);
	properties.insert(MaxAge);
	properties.insert(MinAge);
	properties.insert(MaxSize);
	properties.insert(MinSize);
	properties.insert(PollInterval);
	properties.insert(Recurse);
	setSupportedProperties(properties);
	//! Set the supported relationships
	std::set<Relationship> relationships;
	relationships.insert(Success);
	setSupportedRelationships(relationships);
}

void GetFile::onTrigger(ProcessContext *context, ProcessSession *session)
{
	std::string value;
	if (context->getProperty(Directory.getName(), value))
	{
		_directory = value;
	}
	if (context->getProperty(BatchSize.getName(), value))
	{
		Property::StringToInt(value, _batchSize);
	}
	if (context->getProperty(IgnoreHiddenFile.getName(), value))
	{
		Property::StringToBool(value, _ignoreHiddenFile);
	}
	if (context->getProperty(KeepSourceFile.getName(), value))
	{
		Property::StringToBool(value, _keepSourceFile);
	}
	if (context->getProperty(MaxAge.getName(), value))
	{
		TimeUnit unit;
		if (Property::StringToTime(value, _maxAge, unit) &&
			Property::ConvertTimeUnitToMS(_maxAge, unit, _maxAge))
		{

		}
	}
	if (context->getProperty(MinAge.getName(), value))
	{
		TimeUnit unit;
		if (Property::StringToTime(value, _minAge, unit) &&
			Property::ConvertTimeUnitToMS(_minAge, unit, _minAge))
		{

		}
	}
	if (context->getProperty(MaxSize.getName(), value))
	{
		Property::StringToInt(value, _maxSize);
	}
	if (context->getProperty(MinSize.getName(), value))
	{
		Property::StringToInt(value, _minSize);
	}
	if (context->getProperty(PollInterval.getName(), value))
	{
		TimeUnit unit;
		if (Property::StringToTime(value, _pollInterval, unit) &&
			Property::ConvertTimeUnitToMS(_pollInterval, unit, _pollInterval))
		{

		}
	}
	if (context->getProperty(Recurse.getName(), value))
	{
		Property::StringToBool(value, _recursive);
	}

	// Perform directory list
	if (isListingEmpty())
	{
		if (_pollInterval == 0 || (getTimeMillis() - _lastDirectoryListingTime) > _pollInterval)
		{
			performListing(_directory);
		}
	}

	if (!isListingEmpty())
	{
		try
		{
			std::queue<std::string> list;
			pollListing(list, _batchSize);
			while (!list.empty())
			{
				std::string fileName = list.front();
				list.pop();
				_logger->log_info("GetFile process %s", fileName.c_str());
				FlowFileRecord *flowFile = session->create();
				if (!flowFile)
					return;
				std::size_t found = fileName.find_last_of("/\\");
				std::string path = fileName.substr(0,found);
				std::string name = fileName.substr(found+1);
				flowFile->updateAttribute(FILENAME, name);
				flowFile->updateAttribute(PATH, path);
				flowFile->addAttribute(ABSOLUTE_PATH, fileName);
				session->import(fileName, flowFile, _keepSourceFile);
				session->transfer(flowFile, Success);
			}
		}
		catch (std::exception &exception)
		{
			_logger->log_debug("GetFile Caught Exception %s", exception.what());
			throw;
		}
		catch (...)
		{
			throw;
		}
	}
}

bool GetFile::isListingEmpty()
{
	std::lock_guard<std::mutex> lock(_mtx);

	return _dirList.empty();
}

void GetFile::putListing(std::string fileName)
{
	std::lock_guard<std::mutex> lock(_mtx);

	_dirList.push(fileName);
}

void GetFile::pollListing(std::queue<std::string> &list, int maxSize)
{
	std::lock_guard<std::mutex> lock(_mtx);

	while (!_dirList.empty() && (maxSize == 0 || list.size() < maxSize))
	{
		std::string fileName = _dirList.front();
		_dirList.pop();
		list.push(fileName);
	}

	return;
}

bool GetFile::acceptFile(std::string fileName)
{
	struct stat statbuf;

	if (stat(fileName.c_str(), &statbuf) == 0)
	{
		if (_minSize > 0 && statbuf.st_size <_minSize)
			return false;

		if (_maxSize > 0 && statbuf.st_size > _maxSize)
			return false;

		uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
		uint64_t fileAge = getTimeMillis() - modifiedTime;
		if (_minAge > 0 && fileAge < _minAge)
			return false;
		if (_maxAge > 0 && fileAge > _maxAge)
			return false;

		if (_ignoreHiddenFile && fileName.c_str()[0] == '.')
			return false;

		if (access(fileName.c_str(), R_OK) != 0)
			return false;

		if (_keepSourceFile == false && access(fileName.c_str(), W_OK) != 0)
			return false;

		return true;
	}

	return false;
}

void GetFile::performListing(std::string dir)
{
	DIR *d;
	d = opendir(dir.c_str());
	if (!d)
		return;
	while (1)
	{
		struct dirent *entry;
		entry = readdir(d);
		if (!entry)
			break;
		std::string d_name = entry->d_name;
		if ((entry->d_type & DT_DIR))
		{
			// if this is a directory
			if (_recursive && strcmp(d_name.c_str(), "..") != 0 && strcmp(d_name.c_str(), ".") != 0)
			{
				std::string path = dir + "/" + d_name;
				performListing(path);
			}
		}
		else
		{
			std::string fileName = dir + "/" + d_name;
			if (acceptFile(fileName))
			{
				// check whether we can take this file
				putListing(fileName);
			}
		}
	}
	closedir(d);
}
