log.c rewrite part two
DISPATCH-2133: qd_log_enabled() race
This closes #1366
diff --git a/src/log.c b/src/log.c
index e032bf6..5599483 100644
--- a/src/log.c
+++ b/src/log.c
@@ -26,6 +26,7 @@
#include "entity_cache.h"
#include "log_private.h"
#include "server_private.h"
+#include "schema_enum.h"
#include "qpid/dispatch/alloc.h"
#include "qpid/dispatch/atomic.h"
@@ -42,6 +43,35 @@
#define LOG_MAX (QD_LOG_TEXT_MAX+128)
#define LIST_MAX 1000
+// log.c lock strategy ========================================
+//
+// log sources ----------------------
+// 1. Log sources are created only at initialize time,
+// and are freed only during finalize time, so the
+// list itself does not need to be protected by a
+// lock.
+//
+// 2. Individual log sources do need protection, though,
+// because a management command may call qd_log_entity()
+// at any time, which may replace the log sink. So each
+// log source has its own lock, to prevent collisions
+// between write_log() and qd_log_entity().
+//
+// log sinks -----------------------
+// 1. There is a global list of log sinks, which may be
+// added to and deleted from at any time by qd_log_entity().
+// So there is a lock to protect the sinks list from
+// simultaneous additions and deletions.
+//
+// log entries ---------------------
+// 1. There is a global list of the most recent log entries
+// that may be added to at any time by any log source.
+// The list is bounded, so after some point additions
+// cause deletions.
+// So there is another lock to protect this entries lis
+// from simultaneous access.
+//
+//=============================================================
const char *QD_LOG_STATS_TYPE = "logStats";
static qd_log_source_t *default_log_source=0;
@@ -49,7 +79,6 @@
int qd_log_max_len() { return TEXT_MAX; }
typedef struct qd_log_entry_t qd_log_entry_t;
-
struct qd_log_entry_t {
DEQ_LINKS(qd_log_entry_t);
char *module;
@@ -59,14 +88,13 @@
struct timeval time;
char text[TEXT_MAX];
};
-
ALLOC_DECLARE(qd_log_entry_t);
ALLOC_DEFINE(qd_log_entry_t);
-
DEQ_DECLARE(qd_log_entry_t, qd_log_list_t);
static qd_log_list_t entries = {0};
+sys_mutex_t *entries_lock = 0;
-static void qd_log_entry_free_lh(qd_log_entry_t* entry) {
+static void qd_log_entry_free_lh(qd_log_entry_t *entry) {
DEQ_REMOVE(entries, entry);
free(entry->file);
free(entry->module);
@@ -81,11 +109,9 @@
FILE *file;
DEQ_LINKS(struct log_sink_t);
} log_sink_t;
-
-DEQ_DECLARE(log_sink_t, log_sink_list_t);
-
-static sys_mutex_t *log_sink_list_lock = 0;
-static log_sink_list_t sink_list = {0};
+DEQ_DECLARE(log_sink_t, log_sinks_t);
+static sys_mutex_t *log_sinks_lock = 0;
+static log_sinks_t sink_list = {0};
const char *format = "%Y-%m-%d %H:%M:%S.%%06lu %z";
bool utc = false;
@@ -111,9 +137,11 @@
static const char* SINK_SYSLOG = "syslog";
static const char* SOURCE_DEFAULT = "DEFAULT";
+// Hold the log_sinks_lock to prevent collision
+// with log_sink().
static void log_sink_decref(const log_sink_t *sink) {
if (!sink) return;
- sys_mutex_lock(log_sink_list_lock);
+ sys_mutex_lock(log_sinks_lock);
assert(sink->ref_count);
log_sink_t *mutable_sink = (log_sink_t *)sink;
@@ -127,12 +155,14 @@
closelog();
free(mutable_sink);
}
- sys_mutex_unlock(log_sink_list_lock);
+ sys_mutex_unlock(log_sinks_lock);
}
-static const log_sink_t* log_sink(const char* name) {
- sys_mutex_lock(log_sink_list_lock);
- log_sink_t* sink = DEQ_HEAD(sink_list);
+// Hold the log_sinks_lock to prevent collision
+// with log_sink_decref().
+static const log_sink_t *log_sink(const char *name) {
+ sys_mutex_lock(log_sinks_lock);
+ log_sink_t *sink = DEQ_HEAD(sink_list);
DEQ_FIND(sink, strcmp(sink->name, name) == 0);
if (sink) {
@@ -156,11 +186,8 @@
file = fopen(name, "a");
}
- //If file is not there, return 0.
- // We are not logging an error here since we are already holding the log_source_lock
- // Writing a log message will try to re-obtain the log_source_lock lock and cause a deadlock.
if (!file && !syslog) {
- sys_mutex_unlock(log_sink_list_lock);
+ sys_mutex_unlock(log_sinks_lock);
return 0;
}
@@ -173,7 +200,7 @@
DEQ_INSERT_TAIL(sink_list, sink);
}
- sys_mutex_unlock(log_sink_list_lock);
+ sys_mutex_unlock(log_sinks_lock);
return (const log_sink_t *)sink;
}
@@ -193,16 +220,13 @@
bool syslog;
const log_sink_t *sink;
uint64_t severity_histogram[N_LEVEL_INDICES];
+ sys_mutex_t *lock;
};
-
DEQ_DECLARE(qd_log_source_t, qd_log_source_list_t);
-
-static sys_mutex_t *log_source_lock = 0;
static qd_log_source_list_t source_list = {0};
-
typedef struct level_t {
- const char* name;
+ const char *name;
int bit; // QD_LOG bit
int mask; // Bit or higher
const int syslog;
@@ -229,7 +253,7 @@
static char level_names[TEXT_MAX] = {0}; /* Set up in qd_log_initialize */
/// Return NULL and set qd_error if not a valid bit.
-static const level_t* level_for_bit(int bit) {
+static const level_t *level_for_bit(int bit) {
level_index_t i = 0;
while (i < N_LEVELS && levels[i].bit != bit) ++i;
if (i == N_LEVELS) {
@@ -239,7 +263,7 @@
}
/// Return NULL and set qd_error if not a valid level.
-static const level_t* level_for_name(const char *name, int len) {
+static const level_t *level_for_name(const char *name, int len) {
level_index_t i = 0;
while (i < N_LEVELS && strncasecmp(levels[i].name, name, len) != 0) ++i;
if (i == N_LEVELS) {
@@ -265,7 +289,7 @@
}
/// Return the name of log level or 0 if not found.
-static const char* level_name(int level) {
+static const char *level_name(int level) {
return (0 <= level && level < N_LEVELS) ? levels[level].name : NULL;
}
@@ -283,18 +307,19 @@
{
int len = strlen(token);
int plus = (len > 0 && token[len-1] == '+') ? 1 : 0;
- const level_t* level = level_for_name(token, len-plus);
+ const level_t *level = level_for_name(token, len-plus);
mask |= (plus ? level->mask : level->bit);
}
free(enable);
return mask;
}
-/// Caller must hold log_source_lock
-static qd_log_source_t* lookup_log_source_lh(const char *module)
+static qd_log_source_t *lookup_log_source(const char *module)
{
- if (strcasecmp(module, SOURCE_DEFAULT) == 0)
+ if (strcasecmp(module, SOURCE_DEFAULT) == 0) {
return default_log_source;
+ }
+
qd_log_source_t *src = DEQ_HEAD(source_list);
DEQ_FIND(src, strcasecmp(module, src->module) == 0);
return src;
@@ -307,10 +332,10 @@
static void write_log(qd_log_source_t *log_source, qd_log_entry_t *entry)
{
// Don't let the sink list change while we are writing to one of them.
- sys_mutex_lock(log_sink_list_lock);
- const log_sink_t* sink = log_source->sink ? log_source->sink : default_log_source->sink;
+ sys_mutex_lock(log_source->lock);
+ const log_sink_t *sink = log_source->sink ? log_source->sink : default_log_source->sink;
if (!sink) {
- sys_mutex_unlock(log_sink_list_lock);
+ sys_mutex_unlock(log_source->lock);
return;
}
@@ -353,56 +378,42 @@
if (syslog_level != -1)
syslog(syslog_level, "%s", log_str);
}
- sys_mutex_unlock(log_sink_list_lock);
+ sys_mutex_unlock(log_source->lock);
}
/// Reset the log source to the default state
-static void qd_log_source_defaults(qd_log_source_t *log_source) {
- log_source->mask = -1;
- log_source->includeTimestamp = -1;
- log_source->includeSource = -1;
- log_source->sink = 0;
- memset ( log_source->severity_histogram, 0, sizeof(uint64_t) * (N_LEVEL_INDICES) );
-}
-
-/// Caller must hold the log_source_lock
-static qd_log_source_t *qd_log_source_lh(const char *module)
-{
- qd_log_source_t *log_source = lookup_log_source_lh(module);
- if (!log_source)
- {
- log_source = NEW(qd_log_source_t);
- ZERO(log_source);
- log_source->module = (char*) malloc(strlen(module) + 1);
- strcpy(log_source->module, module);
- qd_log_source_defaults(log_source);
- DEQ_INSERT_TAIL(source_list, log_source);
- qd_entity_cache_add(QD_LOG_STATS_TYPE, log_source);
- }
- return log_source;
+static void qd_log_source_defaults(qd_log_source_t *src) {
+ src->mask = -1;
+ src->includeTimestamp = -1;
+ src->includeSource = -1;
+ log_sink_decref(src->sink);
+ src->sink = 0;
+ memset ( src->severity_histogram, 0, sizeof(uint64_t) * (N_LEVEL_INDICES) );
}
qd_log_source_t *qd_log_source(const char *module)
{
- sys_mutex_lock(log_source_lock);
- qd_log_source_t* src = qd_log_source_lh(module);
- sys_mutex_unlock(log_source_lock);
+ qd_log_source_t *src = lookup_log_source(module);
return src;
}
+// This is called by management thread, and alters the
+// log sink. Take lock to avoid collision with worker threads.
qd_log_source_t *qd_log_source_reset(const char *module)
{
- sys_mutex_lock(log_source_lock);
- qd_log_source_t* src = qd_log_source_lh(module);
+ qd_log_source_t *src = qd_log_source(module);
+ sys_mutex_lock(src->lock);
qd_log_source_defaults(src);
- sys_mutex_unlock(log_source_lock);
+ sys_mutex_unlock(src->lock);
return src;
}
-static void qd_log_source_free_lh(qd_log_source_t* src) {
+// This is called only during finalize, which does not hold locks.
+static void qd_log_source_free(qd_log_source_t *src) {
DEQ_REMOVE(source_list, src);
log_sink_decref(src->sink);
free(src->module);
+ free(src->lock);
free(src);
}
@@ -414,17 +425,18 @@
void qd_vlog_impl(qd_log_source_t *source, qd_log_level_t level, bool check_level, const char *file, int line, const char *fmt, va_list ap)
{
- /*-----------------------------------------------
- Count this log-event in this log's histogram
- whether or not this log is currently enabled.
- We can always decide not to look at it later,
- based on its used/unused status.
- -----------------------------------------------*/
+ // Count this log-event in this log's histogram
+ // whether or not this log is currently enabled.
+ // We can always decide not to look at it later,
+ // based on its used/unused status.
int level_index = level_index_for_bit(level);
if (level_index < 0)
qd_error_clear();
- else
+ else {
+ sys_mutex_lock(source->lock);
source->severity_histogram[level_index]++;
+ sys_mutex_unlock(source->lock);
+ }
if (check_level) {
if (!qd_log_enabled(source, level))
@@ -435,12 +447,7 @@
qd_log_entry_t *entry = new_qd_log_entry_t();
DEQ_ITEM_INIT(entry);
- //
- // Obtain the log_source_lock global lock. We need to do this, if not, the qd_log_entity() function
- // could free the log_source->sink from underneath you and replace it with a new sink.
- // Once we obtain this lock, we only release the lock once the log line is written to the sink.
- //
- sys_mutex_lock(log_source_lock);
+ sys_mutex_lock(entries_lock);
entry->module = source->module ? strdup(source->module) : 0;
entry->level = level;
entry->file = file ? strdup(file) : 0;
@@ -451,7 +458,7 @@
DEQ_INSERT_TAIL(entries, entry);
if (DEQ_SIZE(entries) > LIST_MAX)
qd_log_entry_free_lh(DEQ_HEAD(entries));
- sys_mutex_unlock(log_source_lock);
+ sys_mutex_unlock(entries_lock);
}
void qd_log_impl_v1(qd_log_source_t *source, qd_log_level_t level, const char *file, int line, const char *fmt, ...)
@@ -486,7 +493,7 @@
int i = 0;
// NOTE: PyList_SetItem steals a reference so no leak here.
PyList_SetItem(py_entry, i++, PyUnicode_FromString(entry->module));
- const char* level = level_name( level_index_for_bit(entry->level) + 2 );
+ const char *level = level_name( level_index_for_bit(entry->level) + 2 );
PyList_SetItem(py_entry, i++, level ? PyUnicode_FromString(level) : inc_none());
PyList_SetItem(py_entry, i++, PyUnicode_FromString(entry->text));
PyList_SetItem(py_entry, i++, entry->file ? PyUnicode_FromString(entry->file) : inc_none());
@@ -506,13 +513,39 @@
return NULL;
}
+static void _add_log_source (const char *module_name) {
+ qd_log_source_t *log_source;
+ log_source = NEW(qd_log_source_t);
+ ZERO(log_source);
+ log_source->module = qd_strdup(module_name);
+ qd_log_source_defaults(log_source);
+ log_source->lock = sys_mutex();
+ DEQ_INSERT_TAIL(source_list, log_source);
+ qd_entity_cache_add(QD_LOG_STATS_TYPE, log_source);
+
+ if (!strcmp(SOURCE_DEFAULT, module_name)) {
+ default_log_source = log_source;
+ }
+}
+
void qd_log_initialize(void)
{
DEQ_INIT(entries);
DEQ_INIT(source_list);
DEQ_INIT(sink_list);
- log_sink_list_lock = sys_mutex();
+ int name_offset = strlen("QD_SCHEMA_LOG_MODULE_");
+
+ int i ;
+ for (i = 0; i < QD_SCHEMA_LOG_MODULE_ENUM_COUNT; ++ i)
+ {
+ const char *module_name = qd_schema_log_module_names[i] + name_offset;
+ _add_log_source(module_name);
+ }
+ _add_log_source("MAIN");
+ _add_log_source("DISPLAYNAME");
+
+ log_sinks_lock = sys_mutex();
// Set up level_names for use in error messages.
char *begin = level_names, *end = level_names+sizeof(level_names);
@@ -520,9 +553,8 @@
for (level_index_t i = NONE + 1; i < N_LEVELS; ++i)
aprintf(&begin, end, ", %s", levels[i].name);
- log_source_lock = sys_mutex();
+ entries_lock = sys_mutex();
- default_log_source = qd_log_source(SOURCE_DEFAULT);
default_log_source->mask = levels[INFO].mask;
default_log_source->includeTimestamp = true;
default_log_source->includeSource = 0;
@@ -532,7 +564,7 @@
void qd_log_finalize(void) {
while (DEQ_HEAD(source_list))
- qd_log_source_free_lh(DEQ_HEAD(source_list));
+ qd_log_source_free(DEQ_HEAD(source_list));
while (DEQ_HEAD(entries))
qd_log_entry_free_lh(DEQ_HEAD(entries));
while (DEQ_HEAD(sink_list))
@@ -540,11 +572,20 @@
default_log_source = NULL; // stale value would misconfigure new router started again in the same process
}
+// This is the entry point for management commands that
+// may arrive at any time and change the sink in a log
+// source.
+// If we happen to be writing to the soon-to-be-former
+// log sink when it is deleted, a paradox will be generated
+// that could destroy the entire space-time continuum in
+// which this code is being executed.
+// Thus the locks in each log source.
+//
qd_error_t qd_log_entity(qd_entity_t *entity)
{
qd_error_clear();
- char* module = 0;
+ char *module = 0;
char *outputFile = 0;
char *enable = 0;
int include_timestamp = 0;
@@ -572,12 +613,6 @@
//
QD_ERROR_BREAK();
- //
- // Obtain all attributes from the entity before obtaining the log_source_lock.
- // We do this because functions like qd_entity_get_string and qd_entity_get_bool ultimately call qd_vlog_impl() which
- // also holds the log_source_lock when calling write_log().
- //
-
if (qd_entity_has(entity, "outputFile")) {
has_output_file = true;
outputFile = qd_entity_get_string(entity, "outputFile");
@@ -602,33 +637,28 @@
QD_ERROR_BREAK();
}
- //
- // Obtain the log_source_lock lock. This lock is also used when write_log() is called.
- //
- sys_mutex_lock(log_source_lock);
+ qd_log_source_t *log_source = qd_log_source(module); /* The original(already existing) log source */
- qd_log_source_t *src = qd_log_source_lh(module); /* The original(already existing) log source */
+ sys_mutex_lock(log_source->lock);
if (has_output_file) {
- const log_sink_t* sink = log_sink(outputFile);
+ const log_sink_t *sink = log_sink(outputFile);
if (!sink) {
error_in_output = true;
- sys_mutex_unlock(log_source_lock);
+ sys_mutex_unlock(log_source->lock);
break;
}
// DEFAULT source may already have a sink, so free the old sink first
- if (src->sink) {
- log_sink_decref(src->sink);
- }
+ log_sink_decref(log_source->sink);
// Assign the new sink
- src->sink = sink;
+ log_source->sink = sink;
- if (src->sink->syslog) {
+ if (log_source->sink->syslog) {
// Timestamp should be off for syslog.
is_sink_syslog = true;
- src->includeTimestamp = 0;
+ log_source->includeTimestamp = 0;
}
}
@@ -637,28 +667,28 @@
if (mask < -1) {
error_in_enable = true;
- sys_mutex_unlock(log_source_lock);
+ sys_mutex_unlock(log_source->lock);
break;
}
else {
- src->mask = mask;
+ log_source->mask = mask;
}
- if (qd_log_enabled(src, QD_LOG_TRACE)) {
+ if (qd_log_enabled(log_source, QD_LOG_TRACE)) {
trace_enabled = true;
}
}
if (has_include_timestamp && !is_sink_syslog) {
// Timestamp should be off for syslog.
- src->includeTimestamp = include_timestamp;
+ log_source->includeTimestamp = include_timestamp;
}
if (has_include_source) {
- src->includeSource = include_source;
+ log_source->includeSource = include_source;
}
- sys_mutex_unlock(log_source_lock);
+ sys_mutex_unlock(log_source->lock);
} while(0);
@@ -679,8 +709,9 @@
free(enable);
//
- // If trace logging is enabled, loop thru all connections in the router and call the pn_transport_set_tracer callback
- // so proton frame trace can be output as part of the router trace log.
+ // If trace logging is enabled, loop thru all connections in the router and
+ // call the pn_transport_set_tracer callback so proton frame trace can be output
+ // as part of the router trace log.
//
if (trace_enabled) {
qd_server_trace_all_connections();
@@ -689,7 +720,7 @@
return qd_error_code();
}
-void qd_format_string(char* buf, int buf_size, const char *fmt, ...)
+void qd_format_string(char *buf, int buf_size, const char *fmt, ...)
{
va_list args;
va_start(args, fmt);
@@ -698,7 +729,7 @@
}
-qd_error_t qd_entity_refresh_logStats(qd_entity_t* entity, void *impl)
+qd_error_t qd_entity_refresh_logStats(qd_entity_t *entity, void *impl)
{
qd_log_source_t *log = (qd_log_source_t*)impl;
char identity_str[TEXT_MAX];
diff --git a/tests/tsan.supp b/tests/tsan.supp
index 7ab3ca0..417c619 100644
--- a/tests/tsan.supp
+++ b/tests/tsan.supp
@@ -50,7 +50,7 @@
race:qdr_process_tick_CT
# DISPATCH-2133 (harmless)
-race:qd_log_enabled
+#race:qd_log_enabled
# DISPATCH-2134
race:qdr_link_process_initial_delivery_CT