| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| |
| #include <log4cxx/asyncappender.h> |
| |
| |
| #include <log4cxx/helpers/loglog.h> |
| #include <log4cxx/spi/loggingevent.h> |
| #include <log4cxx/helpers/stringhelper.h> |
| #include <log4cxx/helpers/optionconverter.h> |
| #include <log4cxx/helpers/threadutility.h> |
| #include <log4cxx/private/appenderskeleton_priv.h> |
| #include <thread> |
| #include <atomic> |
| #include <condition_variable> |
| |
| #if LOG4CXX_EVENTS_AT_EXIT |
| #include <log4cxx/private/atexitregistry.h> |
| #endif |
| |
| using namespace LOG4CXX_NS; |
| using namespace LOG4CXX_NS::helpers; |
| using namespace LOG4CXX_NS::spi; |
| |
| #if 15 < LOG4CXX_ABI_VERSION |
| namespace |
| { |
| #endif |
| |
| /** |
| * The default buffer size is set to 128 events. |
| */ |
| enum { DEFAULT_BUFFER_SIZE = 128 }; |
| |
| class DiscardSummary |
| { |
| private: |
| /** |
| * First event of the highest severity. |
| */ |
| LoggingEventPtr maxEvent; |
| |
| /** |
| * Total count of messages discarded. |
| */ |
| int count; |
| |
| public: |
| /** |
| * Create new instance. |
| * |
| * @param event event, may not be null. |
| */ |
| DiscardSummary(const LoggingEventPtr& event); |
| /** Copy constructor. */ |
| DiscardSummary(const DiscardSummary& src); |
| /** Assignment operator. */ |
| DiscardSummary& operator=(const DiscardSummary& src); |
| |
| /** |
| * Add discarded event to summary. |
| * |
| * @param event event, may not be null. |
| */ |
| void add(const LoggingEventPtr& event); |
| |
| /** |
| * Create an event with a discard count and the message from \c maxEvent. |
| * |
| * @return the new event. |
| */ |
| LoggingEventPtr createEvent(Pool& p); |
| |
| #if LOG4CXX_ABI_VERSION <= 15 |
| static |
| ::LOG4CXX_NS::spi::LoggingEventPtr createEvent(::LOG4CXX_NS::helpers::Pool& p, |
| size_t discardedCount); |
| #endif |
| |
| /** |
| * The number of messages discarded. |
| */ |
| int getCount() const { return count; } |
| }; |
| |
| typedef std::map<LogString, DiscardSummary> DiscardMap; |
| |
| #if 15 < LOG4CXX_ABI_VERSION |
| } |
| #endif |
| |
| #ifdef __cpp_lib_hardware_interference_size |
| using std::hardware_constructive_interference_size; |
| using std::hardware_destructive_interference_size; |
| #else |
| // 64 bytes on x86-64 │ L1_CACHE_BYTES │ L1_CACHE_SHIFT │ __cacheline_aligned │ ... |
| constexpr std::size_t hardware_constructive_interference_size = 64; |
| constexpr std::size_t hardware_destructive_interference_size = 64; |
| #endif |
| |
| struct AsyncAppender::AsyncAppenderPriv : public AppenderSkeleton::AppenderSkeletonPrivate |
| { |
| AsyncAppenderPriv() : |
| AppenderSkeletonPrivate(), |
| buffer(DEFAULT_BUFFER_SIZE), |
| bufferSize(DEFAULT_BUFFER_SIZE), |
| appenders(pool), |
| dispatcher(), |
| locationInfo(false), |
| blocking(true) |
| #if LOG4CXX_EVENTS_AT_EXIT |
| , atExitRegistryRaii([this]{stopDispatcher();}) |
| #endif |
| , eventCount(0) |
| , dispatchedCount(0) |
| , commitCount(0) |
| { |
| } |
| |
| /** |
| * Event buffer. |
| */ |
| struct EventData |
| { |
| LoggingEventPtr event; |
| size_t pendingCount; |
| }; |
| std::vector<EventData> buffer; |
| |
| /** |
| * Mutex used to guard access to buffer and discardMap. |
| */ |
| std::mutex bufferMutex; |
| |
| std::condition_variable bufferNotFull; |
| std::condition_variable bufferNotEmpty; |
| |
| /** |
| * Map of DiscardSummary objects keyed by logger name. |
| */ |
| DiscardMap discardMap; |
| |
| /** |
| * The maximum number of undispatched events. |
| */ |
| int bufferSize; |
| |
| /** |
| * Nested appenders. |
| */ |
| helpers::AppenderAttachableImpl appenders; |
| |
| /** |
| * Dispatcher. |
| */ |
| std::thread dispatcher; |
| |
| void stopDispatcher() |
| { |
| { |
| std::lock_guard<std::mutex> lock(bufferMutex); |
| closed = true; |
| } |
| bufferNotEmpty.notify_all(); |
| bufferNotFull.notify_all(); |
| |
| if (dispatcher.joinable()) |
| { |
| dispatcher.join(); |
| } |
| } |
| |
| /** |
| * Should location info be included in dispatched messages. |
| */ |
| bool locationInfo; |
| |
| /** |
| * Does appender block when buffer is full. |
| */ |
| bool blocking; |
| |
| #if LOG4CXX_EVENTS_AT_EXIT |
| helpers::AtExitRegistry::Raii atExitRegistryRaii; |
| #endif |
| |
| /** |
| * Used to calculate the buffer position at which to store the next event. |
| */ |
| alignas(hardware_constructive_interference_size) std::atomic<size_t> eventCount; |
| |
| /** |
| * Used to calculate the buffer position from which to extract the next event. |
| */ |
| alignas(hardware_constructive_interference_size) std::atomic<size_t> dispatchedCount; |
| |
| /** |
| * Used to communicate to the dispatch thread when an event is committed in buffer. |
| */ |
| alignas(hardware_constructive_interference_size) std::atomic<size_t> commitCount; |
| }; |
| |
| |
| IMPLEMENT_LOG4CXX_OBJECT(AsyncAppender) |
| |
| #define priv static_cast<AsyncAppenderPriv*>(m_priv.get()) |
| |
| AsyncAppender::AsyncAppender() |
| : AppenderSkeleton(std::make_unique<AsyncAppenderPriv>()) |
| { |
| } |
| |
| AsyncAppender::~AsyncAppender() |
| { |
| finalize(); |
| } |
| |
| void AsyncAppender::addAppender(const AppenderPtr newAppender) |
| { |
| priv->appenders.addAppender(newAppender); |
| } |
| |
| |
| void AsyncAppender::setOption(const LogString& option, |
| const LogString& value) |
| { |
| if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("LOCATIONINFO"), LOG4CXX_STR("locationinfo"))) |
| { |
| setLocationInfo(OptionConverter::toBoolean(value, false)); |
| } |
| |
| if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("BUFFERSIZE"), LOG4CXX_STR("buffersize"))) |
| { |
| setBufferSize(OptionConverter::toInt(value, DEFAULT_BUFFER_SIZE)); |
| } |
| |
| if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("BLOCKING"), LOG4CXX_STR("blocking"))) |
| { |
| setBlocking(OptionConverter::toBoolean(value, true)); |
| } |
| else |
| { |
| AppenderSkeleton::setOption(option, value); |
| } |
| } |
| |
| |
| void AsyncAppender::doAppend(const spi::LoggingEventPtr& event, Pool& pool1) |
| { |
| doAppendImpl(event, pool1); |
| } |
| |
| void AsyncAppender::append(const spi::LoggingEventPtr& event, Pool& p) |
| { |
| if (priv->bufferSize <= 0) |
| { |
| priv->appenders.appendLoopOnAppenders(event, p); |
| } |
| |
| // Set the NDC and MDC for the calling thread as these |
| // LoggingEvent fields were not set at event creation time. |
| LogString ndcVal; |
| event->getNDC(ndcVal); |
| // Get a copy of this thread's MDC. |
| event->getMDCCopy(); |
| |
| if (!priv->dispatcher.joinable()) |
| { |
| std::lock_guard<std::mutex> lock(priv->bufferMutex); |
| if (!priv->dispatcher.joinable()) |
| priv->dispatcher = ThreadUtility::instance()->createThread( LOG4CXX_STR("AsyncAppender"), &AsyncAppender::dispatch, this ); |
| } |
| while (true) |
| { |
| auto pendingCount = priv->eventCount - priv->dispatchedCount; |
| if (0 <= pendingCount && pendingCount < priv->bufferSize) |
| { |
| // Claim a slot in the ring buffer |
| auto oldEventCount = priv->eventCount++; |
| auto index = oldEventCount % priv->buffer.size(); |
| // Wait for a free slot |
| while (priv->bufferSize <= oldEventCount - priv->dispatchedCount) |
| ; |
| // Write to the ring buffer |
| priv->buffer[index] = AsyncAppenderPriv::EventData{event, pendingCount}; |
| // Notify the dispatch thread that an event has been added |
| auto savedEventCount = oldEventCount; |
| while (!priv->commitCount.compare_exchange_weak(oldEventCount, oldEventCount + 1, std::memory_order_release)) |
| { |
| oldEventCount = savedEventCount; |
| } |
| priv->bufferNotEmpty.notify_all(); |
| break; |
| } |
| priv->bufferNotEmpty.notify_all(); |
| // |
| // Following code is only reachable if buffer is full or eventCount has overflowed |
| // |
| std::unique_lock<std::mutex> lock(priv->bufferMutex); |
| // |
| // if blocking and thread is not already interrupted |
| // and not the dispatcher then |
| // wait for a buffer notification |
| bool discard = true; |
| |
| if (priv->blocking |
| && !priv->closed |
| && (priv->dispatcher.get_id() != std::this_thread::get_id()) ) |
| { |
| priv->bufferNotFull.wait(lock, [this]() |
| { |
| return priv->eventCount - priv->dispatchedCount < priv->bufferSize; |
| }); |
| discard = false; |
| } |
| |
| // |
| // if blocking is false or thread has been interrupted |
| // add event to discard map. |
| // |
| if (discard) |
| { |
| LogString loggerName = event->getLoggerName(); |
| DiscardMap::iterator iter = priv->discardMap.find(loggerName); |
| |
| if (iter == priv->discardMap.end()) |
| { |
| DiscardSummary summary(event); |
| priv->discardMap.insert(DiscardMap::value_type(loggerName, summary)); |
| } |
| else |
| { |
| (*iter).second.add(event); |
| } |
| |
| break; |
| } |
| } |
| } |
| |
| void AsyncAppender::close() |
| { |
| priv->stopDispatcher(); |
| for (auto item : priv->appenders.getAllAppenders()) |
| { |
| item->close(); |
| } |
| } |
| |
| AppenderList AsyncAppender::getAllAppenders() const |
| { |
| return priv->appenders.getAllAppenders(); |
| } |
| |
| AppenderPtr AsyncAppender::getAppender(const LogString& n) const |
| { |
| return priv->appenders.getAppender(n); |
| } |
| |
| bool AsyncAppender::isAttached(const AppenderPtr appender) const |
| { |
| return priv->appenders.isAttached(appender); |
| } |
| |
| bool AsyncAppender::requiresLayout() const |
| { |
| return false; |
| } |
| |
| void AsyncAppender::removeAllAppenders() |
| { |
| priv->appenders.removeAllAppenders(); |
| } |
| |
| void AsyncAppender::removeAppender(const AppenderPtr appender) |
| { |
| priv->appenders.removeAppender(appender); |
| } |
| |
| void AsyncAppender::removeAppender(const LogString& n) |
| { |
| priv->appenders.removeAppender(n); |
| } |
| |
| bool AsyncAppender::getLocationInfo() const |
| { |
| return priv->locationInfo; |
| } |
| |
| void AsyncAppender::setLocationInfo(bool flag) |
| { |
| priv->locationInfo = flag; |
| } |
| |
| |
| void AsyncAppender::setBufferSize(int size) |
| { |
| if (size < 0) |
| { |
| throw IllegalArgumentException(LOG4CXX_STR("size argument must be non-negative")); |
| } |
| |
| std::lock_guard<std::mutex> lock(priv->bufferMutex); |
| priv->bufferSize = (size < 1) ? 1 : size; |
| priv->buffer.resize(priv->bufferSize); |
| priv->bufferNotFull.notify_all(); |
| } |
| |
| int AsyncAppender::getBufferSize() const |
| { |
| return priv->bufferSize; |
| } |
| |
| void AsyncAppender::setBlocking(bool value) |
| { |
| std::lock_guard<std::mutex> lock(priv->bufferMutex); |
| priv->blocking = value; |
| priv->bufferNotFull.notify_all(); |
| } |
| |
| bool AsyncAppender::getBlocking() const |
| { |
| return priv->blocking; |
| } |
| |
| DiscardSummary::DiscardSummary(const LoggingEventPtr& event) : |
| maxEvent(event), count(1) |
| { |
| } |
| |
| DiscardSummary::DiscardSummary(const DiscardSummary& src) : |
| maxEvent(src.maxEvent), count(src.count) |
| { |
| } |
| |
| DiscardSummary& DiscardSummary::operator=(const DiscardSummary& src) |
| { |
| maxEvent = src.maxEvent; |
| count = src.count; |
| return *this; |
| } |
| |
| void DiscardSummary::add(const LoggingEventPtr& event) |
| { |
| if (event->getLevel()->toInt() > maxEvent->getLevel()->toInt()) |
| { |
| maxEvent = event; |
| } |
| |
| count++; |
| } |
| |
| LoggingEventPtr DiscardSummary::createEvent(Pool& p) |
| { |
| LogString msg(LOG4CXX_STR("Discarded ")); |
| StringHelper::toString(count, p, msg); |
| msg.append(LOG4CXX_STR(" messages due to a full event buffer including: ")); |
| msg.append(maxEvent->getMessage()); |
| return std::make_shared<LoggingEvent>( |
| maxEvent->getLoggerName(), |
| maxEvent->getLevel(), |
| msg, |
| LocationInfo::getLocationUnavailable() ); |
| } |
| |
| #if LOG4CXX_ABI_VERSION <= 15 |
| ::LOG4CXX_NS::spi::LoggingEventPtr |
| DiscardSummary::createEvent(::LOG4CXX_NS::helpers::Pool& p, |
| size_t discardedCount) |
| { |
| LogString msg(LOG4CXX_STR("Discarded ")); |
| StringHelper::toString(discardedCount, p, msg); |
| msg.append(LOG4CXX_STR(" messages due to a full event buffer")); |
| |
| return std::make_shared<LoggingEvent>( |
| LOG4CXX_STR(""), |
| LOG4CXX_NS::Level::getError(), |
| msg, |
| LocationInfo::getLocationUnavailable() ); |
| } |
| #endif |
| |
| |
| void AsyncAppender::dispatch() |
| { |
| size_t discardCount = 0; |
| std::vector<size_t> pendingCountHistogram(priv->bufferSize, 0); |
| bool isActive = true; |
| |
| while (isActive) |
| { |
| Pool p; |
| LoggingEventList events; |
| events.reserve(priv->bufferSize); |
| for (int count = 0; count < 2 && priv->dispatchedCount == priv->commitCount; ++count) |
| std::this_thread::yield(); // Wait a bit |
| if (priv->dispatchedCount == priv->commitCount) |
| { |
| std::unique_lock<std::mutex> lock(priv->bufferMutex); |
| priv->bufferNotEmpty.wait(lock, [this]() -> bool |
| { return priv->dispatchedCount != priv->commitCount || priv->closed; } |
| ); |
| } |
| isActive = !priv->closed; |
| |
| while (events.size() < priv->bufferSize && priv->dispatchedCount != priv->commitCount) |
| { |
| auto index = priv->dispatchedCount % priv->buffer.size(); |
| const auto& data = priv->buffer[index]; |
| events.push_back(data.event); |
| if (data.pendingCount < pendingCountHistogram.size()) |
| ++pendingCountHistogram[data.pendingCount]; |
| ++priv->dispatchedCount; |
| } |
| priv->bufferNotFull.notify_all(); |
| { |
| std::lock_guard<std::mutex> lock(priv->bufferMutex); |
| for (auto discardItem : priv->discardMap) |
| { |
| events.push_back(discardItem.second.createEvent(p)); |
| discardCount += discardItem.second.getCount(); |
| } |
| priv->discardMap.clear(); |
| } |
| |
| for (auto item : events) |
| { |
| try |
| { |
| priv->appenders.appendLoopOnAppenders(item, p); |
| } |
| catch (std::exception& ex) |
| { |
| if (isActive) |
| { |
| priv->errorHandler->error(LOG4CXX_STR("async dispatcher"), ex, 0, item); |
| isActive = false; |
| } |
| } |
| catch (...) |
| { |
| if (isActive) |
| { |
| priv->errorHandler->error(LOG4CXX_STR("async dispatcher")); |
| isActive = false; |
| } |
| } |
| } |
| if (!isActive) |
| { |
| LogString msg(LOG4CXX_STR("AsyncAppender")); |
| msg += LOG4CXX_STR(" discardCount "); |
| StringHelper::toString(discardCount, p, msg); |
| msg += LOG4CXX_STR(" pendingCountHistogram"); |
| for (auto item : pendingCountHistogram) |
| { |
| msg += logchar(' '); |
| StringHelper::toString(item, p, msg); |
| } |
| LogLog::debug(msg); |
| } |
| } |
| |
| } |