blob: 48a304657d7164d08571d276f7b2e8fe84ac808f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "kudu/gutil/macros.h"
#include <cstdint>
#include <ctime>
#include <memory>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include "kudu/util/condition_variable.h"
#include "kudu/util/mutex.h"
namespace kudu {
// Wrapper for a glog Logger which asynchronously writes log messages.
// This class starts a new thread responsible for forwarding the messages
// to the logger, and performs double buffering. Writers append to the
// current buffer and then wake up the logger thread. The logger swaps in
// a new buffer and writes any accumulated messages to the wrapped
// Logger.
//
// This double-buffering behavior dramatically improves performance, especially
// for logging messages which require flushing the underlying file (i.e WARNING
// and above for default). The flush can take a couple of milliseconds, and in
// some cases can even block for hundreds of milliseconds or more. With the
// double-buffered approach, threads can proceed with useful work while the IO
// thread blocks.
//
// The semantics provided by this wrapper are slightly weaker than the default
// glog semantics. By default, glog will immediately (synchronously) flush WARNING
// and above to the underlying file, whereas here we are deferring that flush to
// the separate thread. This means that a crash just after a 'LOG(WARNING)' would
// may be missing the message in the logs, but the perf benefit is probably
// worth it. We do take care that a glog FATAL message flushes all buffered log
// messages before exiting.
//
// NOTE: the logger limits the total amount of buffer space, so if the underlying
// log blocks for too long, eventually the threads generating the log messages
// will block as well. This prevents runaway memory usage.
class AsyncLogger : public google::base::Logger {
public:
AsyncLogger(google::base::Logger* wrapped,
int max_buffer_bytes);
~AsyncLogger();
void Start();
// Stop the thread. Flush() and Write() must not be called after this.
//
// NOTE: this is currently only used in tests: in real life, we enable async
// logging once when the program starts and then never disable it.
//
// REQUIRES: Start() must have been called.
void Stop();
// Write a message to the log.
//
// 'force_flush' is set by the GLog library based on the configured '--logbuflevel'
// flag. Any messages logged at the configured level or higher result in 'force_flush'
// being set to true, indicating that the message should be immediately written to the
// log rather than buffered in memory. See the class-level docs above for more detail
// about the implementation provided here.
//
// REQUIRES: Start() must have been called.
void Write(bool force_flush,
time_t timestamp,
const char* message,
size_t message_len) override;
// Flush any buffered messages.
void Flush() override;
// Get the current LOG file size.
// The returned value is approximate since some
// logged data may not have been flushed to disk yet.
uint32_t LogSize() override;
// Return a count of how many times an application thread was
// blocked due to the buffers being full and the writer thread
// not keeping up.
int app_threads_blocked_count_for_tests() const {
MutexLock l(lock_);
return app_threads_blocked_count_for_tests_;
}
private:
// A buffered message.
//
// TODO(todd): using std::string for buffered messages is convenient but not
// as efficient as it could be. Better would be to make the buffers just be
// Arenas and allocate both the message data and Msg struct from them, forming
// a linked list.
struct Msg {
time_t ts;
std::string message;
Msg(time_t ts, std::string message)
: ts(ts),
message(std::move(message)) {
}
};
// A buffer of messages waiting to be flushed.
struct Buffer {
std::vector<Msg> messages;
// Estimate of the size of 'messages'.
int size = 0;
// Whether this buffer needs an explicit flush of the
// underlying logger.
bool flush = false;
Buffer() {}
void clear() {
messages.clear();
size = 0;
flush = false;
}
void add(Msg msg, bool flush) {
size += sizeof(msg) + msg.message.size();
messages.emplace_back(std::move(msg));
this->flush |= flush;
}
bool needs_flush_or_write() const {
return flush || !messages.empty();
}
private:
DISALLOW_COPY_AND_ASSIGN(Buffer);
};
bool BufferFull(const Buffer& buf) const;
void RunThread();
// The maximum number of bytes used by the entire class.
const int max_buffer_bytes_;
google::base::Logger* const wrapped_;
std::thread thread_;
// Count of how many times an application thread was blocked due to
// a full buffer.
int app_threads_blocked_count_for_tests_ = 0;
// Count of how many times the writer thread has flushed the buffers.
// 64 bits should be enough to never worry about overflow.
uint64_t flush_count_ = 0;
// Protects buffers as well as 'state_'.
mutable Mutex lock_;
// Signaled by app threads to wake up the flusher, either for new
// data or because 'state_' changed.
ConditionVariable wake_flusher_cond_;
// Signaled by the flusher thread when the flusher has swapped in
// a free buffer to write to.
ConditionVariable free_buffer_cond_;
// Signaled by the flusher thread when it has completed flushing
// the current buffer.
ConditionVariable flush_complete_cond_;
// The buffer to which application threads append new log messages.
std::unique_ptr<Buffer> active_buf_;
// The buffer currently being flushed by the logger thread, cleared
// after a successful flush.
std::unique_ptr<Buffer> flushing_buf_;
// Trigger for the logger thread to stop.
enum State {
INITTED,
RUNNING,
STOPPED
};
State state_ = INITTED;
DISALLOW_COPY_AND_ASSIGN(AsyncLogger);
};
} // namespace kudu