/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */


#include <log4cxx/asyncappender.h>


#include <log4cxx/helpers/loglog.h>
#include <log4cxx/spi/loggingevent.h>
#include <log4cxx/helpers/stringhelper.h>
#include <log4cxx/helpers/optionconverter.h>
#include <log4cxx/helpers/threadutility.h>
#include <log4cxx/private/appenderskeleton_priv.h>
#include <thread>
#include <atomic>
#include <condition_variable>

#if LOG4CXX_EVENTS_AT_EXIT
#include <log4cxx/private/atexitregistry.h>
#endif

using namespace LOG4CXX_NS;
using namespace LOG4CXX_NS::helpers;
using namespace LOG4CXX_NS::spi;

#if 15 < LOG4CXX_ABI_VERSION
namespace
{
#endif

/**
 * The default buffer size is set to 128 events.
*/
enum { DEFAULT_BUFFER_SIZE = 128 };

class DiscardSummary
{
	private:
		/**
		 * First event of the highest severity.
		*/
		LoggingEventPtr maxEvent;

		/**
		* Total count of messages discarded.
		*/
		int count;

	public:
		/**
		 * Create new instance.
		 *
		 * @param event event, may not be null.
		*/
		DiscardSummary(const LoggingEventPtr& event);
		/** Copy constructor.  */
		DiscardSummary(const DiscardSummary& src);
		/** Assignment operator. */
		DiscardSummary& operator=(const DiscardSummary& src);

		/**
		 * Add discarded event to summary.
		 *
		 * @param event event, may not be null.
		*/
		void add(const LoggingEventPtr& event);

		/**
		 * Create an event with a discard count and the message from \c maxEvent.
		 *
		 * @return the new event.
		 */
		LoggingEventPtr createEvent(Pool& p);

#if LOG4CXX_ABI_VERSION <= 15
		static
		::LOG4CXX_NS::spi::LoggingEventPtr createEvent(::LOG4CXX_NS::helpers::Pool& p,
			size_t discardedCount);
#endif

		/**
		* The number of messages discarded.
		*/
		int getCount() const { return count; }
};

typedef std::map<LogString, DiscardSummary> DiscardMap;

#if 15 < LOG4CXX_ABI_VERSION
}
#endif

#ifdef __cpp_lib_hardware_interference_size
    using std::hardware_constructive_interference_size;
    using std::hardware_destructive_interference_size;
#else
    // 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ...
    constexpr std::size_t hardware_constructive_interference_size = 64;
    constexpr std::size_t hardware_destructive_interference_size = 64;
#endif

struct AsyncAppender::AsyncAppenderPriv : public AppenderSkeleton::AppenderSkeletonPrivate
{
	AsyncAppenderPriv() :
		AppenderSkeletonPrivate(),
		buffer(DEFAULT_BUFFER_SIZE),
		bufferSize(DEFAULT_BUFFER_SIZE),
		appenders(pool),
		dispatcher(),
		locationInfo(false),
		blocking(true)
#if LOG4CXX_EVENTS_AT_EXIT
		, atExitRegistryRaii([this]{stopDispatcher();})
#endif
		, eventCount(0)
		, dispatchedCount(0)
		, commitCount(0)
	{
	}

	/**
	 * Event buffer.
	*/
	struct EventData
	{
		LoggingEventPtr event;
		size_t pendingCount;
	};
	std::vector<EventData> buffer;

	/**
	 *  Mutex used to guard access to buffer and discardMap.
	 */
	std::mutex bufferMutex;

	std::condition_variable bufferNotFull;
	std::condition_variable bufferNotEmpty;

	/**
	  * Map of DiscardSummary objects keyed by logger name.
	*/
	DiscardMap discardMap;

	/**
	 * The maximum number of undispatched events.
	*/
	int bufferSize;

	/**
	 * Nested appenders.
	*/
	helpers::AppenderAttachableImpl appenders;

	/**
	 *  Dispatcher.
	 */
	std::thread dispatcher;

	void stopDispatcher()
	{
		{
			std::lock_guard<std::mutex> lock(bufferMutex);
			closed = true;
		}
		bufferNotEmpty.notify_all();
		bufferNotFull.notify_all();

		if (dispatcher.joinable())
		{
			dispatcher.join();
		}
	}

	/**
	 * Should location info be included in dispatched messages.
	*/
	bool locationInfo;

	/**
	 * Does appender block when buffer is full.
	*/
	bool blocking;

#if LOG4CXX_EVENTS_AT_EXIT
	helpers::AtExitRegistry::Raii atExitRegistryRaii;
#endif

	/**
	 * Used to calculate the buffer position at which to store the next event.
	*/
	alignas(hardware_constructive_interference_size) std::atomic<size_t> eventCount;

	/**
	 * Used to calculate the buffer position from which to extract the next event.
	*/
	alignas(hardware_constructive_interference_size) std::atomic<size_t> dispatchedCount;

	/**
	 * Used to communicate to the dispatch thread when an event is committed in buffer.
	*/
	alignas(hardware_constructive_interference_size) std::atomic<size_t> commitCount;
};


IMPLEMENT_LOG4CXX_OBJECT(AsyncAppender)

#define priv static_cast<AsyncAppenderPriv*>(m_priv.get())

AsyncAppender::AsyncAppender()
	: AppenderSkeleton(std::make_unique<AsyncAppenderPriv>())
{
}

AsyncAppender::~AsyncAppender()
{
	finalize();
}

void AsyncAppender::addAppender(const AppenderPtr newAppender)
{
	priv->appenders.addAppender(newAppender);
}


void AsyncAppender::setOption(const LogString& option,
	const LogString& value)
{
	if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("LOCATIONINFO"), LOG4CXX_STR("locationinfo")))
	{
		setLocationInfo(OptionConverter::toBoolean(value, false));
	}

	if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("BUFFERSIZE"), LOG4CXX_STR("buffersize")))
	{
		setBufferSize(OptionConverter::toInt(value, DEFAULT_BUFFER_SIZE));
	}

	if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("BLOCKING"), LOG4CXX_STR("blocking")))
	{
		setBlocking(OptionConverter::toBoolean(value, true));
	}
	else
	{
		AppenderSkeleton::setOption(option, value);
	}
}


void AsyncAppender::doAppend(const spi::LoggingEventPtr& event, Pool& pool1)
{
	doAppendImpl(event, pool1);
}

void AsyncAppender::append(const spi::LoggingEventPtr& event, Pool& p)
{
	if (priv->bufferSize <= 0)
	{
		priv->appenders.appendLoopOnAppenders(event, p);
	}

	// Set the NDC and MDC for the calling thread as these
	// LoggingEvent fields were not set at event creation time.
	LogString ndcVal;
	event->getNDC(ndcVal);
	// Get a copy of this thread's MDC.
	event->getMDCCopy();

	if (!priv->dispatcher.joinable())
	{
		std::lock_guard<std::mutex> lock(priv->bufferMutex);
		if (!priv->dispatcher.joinable())
			priv->dispatcher = ThreadUtility::instance()->createThread( LOG4CXX_STR("AsyncAppender"), &AsyncAppender::dispatch, this );
	}
	while (true)
	{
		auto pendingCount = priv->eventCount - priv->dispatchedCount;
		if (0 <= pendingCount && pendingCount < priv->bufferSize)
		{
			// Claim a slot in the ring buffer
			auto oldEventCount = priv->eventCount++;
			auto index = oldEventCount % priv->buffer.size();
			// Wait for a free slot
			while (priv->bufferSize <= oldEventCount - priv->dispatchedCount)
				;
			// Write to the ring buffer
			priv->buffer[index] = AsyncAppenderPriv::EventData{event, pendingCount};
			// Notify the dispatch thread that an event has been added
			auto savedEventCount = oldEventCount;
			while (!priv->commitCount.compare_exchange_weak(oldEventCount, oldEventCount + 1, std::memory_order_release))
			{
				 oldEventCount = savedEventCount;
			}
			priv->bufferNotEmpty.notify_all();
			break;
		}
		priv->bufferNotEmpty.notify_all();
		//
		//   Following code is only reachable if buffer is full or eventCount has overflowed
		//
		std::unique_lock<std::mutex> lock(priv->bufferMutex);
		//
		//   if blocking and thread is not already interrupted
		//      and not the dispatcher then
		//      wait for a buffer notification
		bool discard = true;

		if (priv->blocking
			&& !priv->closed
			&& (priv->dispatcher.get_id() != std::this_thread::get_id()) )
		{
			priv->bufferNotFull.wait(lock, [this]()
			{
				return priv->eventCount - priv->dispatchedCount < priv->bufferSize;
			});
			discard = false;
		}

		//
		//   if blocking is false or thread has been interrupted
		//   add event to discard map.
		//
		if (discard)
		{
			LogString loggerName = event->getLoggerName();
			DiscardMap::iterator iter = priv->discardMap.find(loggerName);

			if (iter == priv->discardMap.end())
			{
				DiscardSummary summary(event);
				priv->discardMap.insert(DiscardMap::value_type(loggerName, summary));
			}
			else
			{
				(*iter).second.add(event);
			}

			break;
		}
	}
}

void AsyncAppender::close()
{
	priv->stopDispatcher();
	for (auto item : priv->appenders.getAllAppenders())
	{
		item->close();
	}
}

AppenderList AsyncAppender::getAllAppenders() const
{
	return priv->appenders.getAllAppenders();
}

AppenderPtr AsyncAppender::getAppender(const LogString& n) const
{
	return priv->appenders.getAppender(n);
}

bool AsyncAppender::isAttached(const AppenderPtr appender) const
{
	return priv->appenders.isAttached(appender);
}

bool AsyncAppender::requiresLayout() const
{
	return false;
}

void AsyncAppender::removeAllAppenders()
{
	priv->appenders.removeAllAppenders();
}

void AsyncAppender::removeAppender(const AppenderPtr appender)
{
	priv->appenders.removeAppender(appender);
}

void AsyncAppender::removeAppender(const LogString& n)
{
	priv->appenders.removeAppender(n);
}

bool AsyncAppender::getLocationInfo() const
{
	return priv->locationInfo;
}

void AsyncAppender::setLocationInfo(bool flag)
{
	priv->locationInfo = flag;
}


void AsyncAppender::setBufferSize(int size)
{
	if (size < 0)
	{
		throw IllegalArgumentException(LOG4CXX_STR("size argument must be non-negative"));
	}

	std::lock_guard<std::mutex> lock(priv->bufferMutex);
	priv->bufferSize = (size < 1) ? 1 : size;
	priv->buffer.resize(priv->bufferSize);
	priv->bufferNotFull.notify_all();
}

int AsyncAppender::getBufferSize() const
{
	return priv->bufferSize;
}

void AsyncAppender::setBlocking(bool value)
{
	std::lock_guard<std::mutex> lock(priv->bufferMutex);
	priv->blocking = value;
	priv->bufferNotFull.notify_all();
}

bool AsyncAppender::getBlocking() const
{
	return priv->blocking;
}

DiscardSummary::DiscardSummary(const LoggingEventPtr& event) :
	maxEvent(event), count(1)
{
}

DiscardSummary::DiscardSummary(const DiscardSummary& src) :
	maxEvent(src.maxEvent), count(src.count)
{
}

DiscardSummary& DiscardSummary::operator=(const DiscardSummary& src)
{
	maxEvent = src.maxEvent;
	count = src.count;
	return *this;
}

void DiscardSummary::add(const LoggingEventPtr& event)
{
	if (event->getLevel()->toInt() > maxEvent->getLevel()->toInt())
	{
		maxEvent = event;
	}

	count++;
}

LoggingEventPtr DiscardSummary::createEvent(Pool& p)
 {
	LogString msg(LOG4CXX_STR("Discarded "));
	StringHelper::toString(count, p, msg);
	msg.append(LOG4CXX_STR(" messages due to a full event buffer including: "));
	msg.append(maxEvent->getMessage());
	return std::make_shared<LoggingEvent>(
				maxEvent->getLoggerName(),
				maxEvent->getLevel(),
				msg,
				LocationInfo::getLocationUnavailable() );
}

#if LOG4CXX_ABI_VERSION <= 15
::LOG4CXX_NS::spi::LoggingEventPtr
DiscardSummary::createEvent(::LOG4CXX_NS::helpers::Pool& p,
	size_t discardedCount)
{
	LogString msg(LOG4CXX_STR("Discarded "));
	StringHelper::toString(discardedCount, p, msg);
	msg.append(LOG4CXX_STR(" messages due to a full event buffer"));

	return std::make_shared<LoggingEvent>(
				LOG4CXX_STR(""),
				LOG4CXX_NS::Level::getError(),
				msg,
				LocationInfo::getLocationUnavailable() );
}
#endif


void AsyncAppender::dispatch()
{
	size_t discardCount = 0;
	std::vector<size_t> pendingCountHistogram(priv->bufferSize, 0);
	bool isActive = true;

	while (isActive)
	{
		Pool p;
		LoggingEventList events;
		events.reserve(priv->bufferSize);
		for (int count = 0; count < 2 && priv->dispatchedCount == priv->commitCount; ++count)
			std::this_thread::yield(); // Wait a bit
		if (priv->dispatchedCount == priv->commitCount)
		{
			std::unique_lock<std::mutex> lock(priv->bufferMutex);
			priv->bufferNotEmpty.wait(lock, [this]() -> bool
				{ return priv->dispatchedCount != priv->commitCount || priv->closed; }
			);
		}
		isActive = !priv->closed;

		while (events.size() < priv->bufferSize && priv->dispatchedCount != priv->commitCount)
		{
			auto index = priv->dispatchedCount % priv->buffer.size();
			const auto& data = priv->buffer[index];
			events.push_back(data.event);
			if (data.pendingCount < pendingCountHistogram.size())
				++pendingCountHistogram[data.pendingCount];
			++priv->dispatchedCount;
		}
		priv->bufferNotFull.notify_all();
		{
			std::lock_guard<std::mutex> lock(priv->bufferMutex);
			for (auto discardItem : priv->discardMap)
			{
				events.push_back(discardItem.second.createEvent(p));
				discardCount += discardItem.second.getCount();
			}
			priv->discardMap.clear();
		}

		for (auto item : events)
		{
			try
			{
				priv->appenders.appendLoopOnAppenders(item, p);
			}
			catch (std::exception& ex)
			{
				if (isActive)
				{
					priv->errorHandler->error(LOG4CXX_STR("async dispatcher"), ex, 0, item);
					isActive = false;
				}
			}
			catch (...)
			{
				if (isActive)
				{
					priv->errorHandler->error(LOG4CXX_STR("async dispatcher"));
					isActive = false;
				}
			}
		}
		if (!isActive)
		{
			LogString msg(LOG4CXX_STR("AsyncAppender")); 
			msg += LOG4CXX_STR(" discardCount ");
			StringHelper::toString(discardCount, p, msg);
			msg += LOG4CXX_STR(" pendingCountHistogram");
			for (auto item : pendingCountHistogram)
			{
				msg += logchar(' ');
				StringHelper::toString(item, p, msg);
			}
			LogLog::debug(msg);
		}
	}

}
