blob: feeb09d8b87fb8b46a7b6812aae8285c19125cbb [file] [log] [blame]
/** @file
Private record core definitions
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "TextBuffer.h"
#include "Tokenizer.h"
#include "ink_string.h"
#include "P_RecCompatibility.h"
#include "P_RecUtils.h"
//-------------------------------------------------------------------------
// i_am_the_record_owner
//-------------------------------------------------------------------------
static bool
i_am_the_record_owner(RecT rec_type)
{
#if defined (REC_LOCAL)
switch (rec_type) {
case RECT_CONFIG:
case RECT_NODE:
case RECT_CLUSTER:
case RECT_LOCAL:
return true;
case RECT_PROCESS:
case RECT_PLUGIN:
return false;
default:
ink_debug_assert(!"Unexpected RecT type");
return false;
}
#elif defined (REC_PROCESS)
// g_mode_type is defined in either RecLocal.cc or RecProcess.cc.
// We can access it since we're inlined by on of these two files.
if (g_mode_type == RECM_CLIENT) {
switch (rec_type) {
case RECT_PROCESS:
case RECT_PLUGIN:
return true;
case RECT_CONFIG:
case RECT_NODE:
case RECT_CLUSTER:
case RECT_LOCAL:
return false;
default:
ink_debug_assert(!"Unexpected RecT type");
return false;
}
} else if (g_mode_type == RECM_STAND_ALONE) {
switch (rec_type) {
case RECT_CONFIG:
case RECT_PROCESS:
case RECT_NODE:
case RECT_CLUSTER:
case RECT_LOCAL:
case RECT_PLUGIN:
return true;
default:
ink_debug_assert(!"Unexpected RecT type");
return false;
}
}
#else
#error "Required #define not specificed; expected REC_LOCAL or REC_PROCESS"
#endif
return false;
}
//-------------------------------------------------------------------------
// send_set_message
//-------------------------------------------------------------------------
static int
send_set_message(RecRecord * record)
{
RecMessage *m;
rec_mutex_acquire(&(record->lock));
m = RecMessageAlloc(RECG_SET);
m = RecMessageMarshal_Realloc(m, record);
RecDebug(DL_Note, "[send] RECG_SET [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
RecMessageSend(m);
RecMessageFree(m);
rec_mutex_release(&(record->lock));
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// send_register_message
//-------------------------------------------------------------------------
static int
send_register_message(RecRecord * record)
{
RecMessage *m;
rec_mutex_acquire(&(record->lock));
m = RecMessageAlloc(RECG_REGISTER);
m = RecMessageMarshal_Realloc(m, record);
RecDebug(DL_Note, "[send] RECG_REGISTER [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
RecMessageSend(m);
RecMessageFree(m);
rec_mutex_release(&(record->lock));
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// send_push_message
//-------------------------------------------------------------------------
static int
send_push_message()
{
RecRecord *r;
RecMessage *m;
int i, num_records;
bool send_msg = false;
m = RecMessageAlloc(RECG_PUSH);
num_records = g_num_records;
for (i = 0; i < num_records; i++) {
r = &(g_records[i]);
rec_mutex_acquire(&(r->lock));
if (i_am_the_record_owner(r->rec_type)) {
if (r->sync_required & REC_PEER_SYNC_REQUIRED) {
m = RecMessageMarshal_Realloc(m, r);
r->sync_required = r->sync_required & ~REC_PEER_SYNC_REQUIRED;
send_msg = true;
}
}
rec_mutex_release(&(r->lock));
}
if (send_msg) {
RecDebug(DL_Note, "[send] RECG_PUSH [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
RecMessageSend(m);
}
RecMessageFree(m);
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// send_pull_message
//-------------------------------------------------------------------------
static int
send_pull_message(RecMessageT msg_type)
{
RecRecord *r;
RecMessage *m;
int i, num_records;
m = RecMessageAlloc(msg_type);
switch (msg_type) {
case RECG_PULL_REQ:
// We're requesting all of the records from our peer. No payload
// here, just send the message.
RecDebug(DL_Note, "[send] RECG_PULL_REQ [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
break;
case RECG_PULL_ACK:
// Respond to a RECG_PULL_REQ message from our peer. Send ALL
// records! Also be sure to send a response even if it has no
// payload. Our peer may be blocking and waiting for a response!
num_records = g_num_records;
for (i = 0; i < num_records; i++) {
r = &(g_records[i]);
if (i_am_the_record_owner(r->rec_type) ||
(REC_TYPE_IS_STAT(r->rec_type) && !(r->registered)) ||
(REC_TYPE_IS_STAT(r->rec_type) && !(r->stat_meta.persist_type != RECP_NON_PERSISTENT))) {
rec_mutex_acquire(&(r->lock));
m = RecMessageMarshal_Realloc(m, r);
r->sync_required = r->sync_required & ~REC_PEER_SYNC_REQUIRED;
rec_mutex_release(&(r->lock));
}
}
RecDebug(DL_Note, "[send] RECG_PULL_ACK [%d bytes]", sizeof(RecMessageHdr) + m->o_write - m->o_start);
break;
default:
RecMessageFree(m);
return REC_ERR_FAIL;
}
RecMessageSend(m);
RecMessageFree(m);
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// recv_message_cb
//-------------------------------------------------------------------------
static int
recv_message_cb(RecMessage * msg, RecMessageT msg_type, void *cookie)
{
REC_NOWARN_UNUSED(cookie);
RecRecord *r;
RecMessageItr itr;
switch (msg_type) {
case RECG_SET:
RecDebug(DL_Note, "[recv] RECG_SET [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
do {
if (REC_TYPE_IS_STAT(r->rec_type)) {
RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), &(r->stat_meta.data_raw));
} else {
RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), NULL);
}
} while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
}
break;
case RECG_REGISTER:
RecDebug(DL_Note, "[recv] RECG_REGISTER [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
do {
if (REC_TYPE_IS_STAT(r->rec_type)) {
RecRegisterStat(r->rec_type, r->name, r->data_type, r->data_default, r->stat_meta.persist_type);
} else if (REC_TYPE_IS_CONFIG(r->rec_type)) {
RecRegisterConfig(r->rec_type, r->name, r->data_type,
r->data_default, r->config_meta.update_type,
r->config_meta.check_type, r->config_meta.check_expr, r->config_meta.access_type);
}
} while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
}
break;
case RECG_PUSH:
RecDebug(DL_Note, "[recv] RECG_PUSH [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
do {
RecForceInsert(r);
} while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
}
break;
case RECG_PULL_ACK:
RecDebug(DL_Note, "[recv] RECG_PULL_ACK [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
if (RecMessageUnmarshalFirst(msg, &itr, &r) != REC_ERR_FAIL) {
do {
RecForceInsert(r);
} while (RecMessageUnmarshalNext(msg, &itr, &r) != REC_ERR_FAIL);
}
break;
case RECG_PULL_REQ:
RecDebug(DL_Note, "[recv] RECG_PULL_REQ [%d bytes]", sizeof(RecMessageHdr) + msg->o_end - msg->o_start);
send_pull_message(RECG_PULL_ACK);
break;
default:
ink_debug_assert(!"Unexpected RecG type");
return REC_ERR_FAIL;
}
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecRegisterStatXXX
//-------------------------------------------------------------------------
#define REC_REGISTER_STAT_XXX(A, B) \
ink_debug_assert((rec_type == RECT_NODE) || \
(rec_type == RECT_CLUSTER) || \
(rec_type == RECT_PROCESS) || \
(rec_type == RECT_LOCAL) || \
(rec_type == RECT_PLUGIN)); \
RecRecord *r; \
RecData my_data_default; \
my_data_default.A = data_default; \
if ((r = RecRegisterStat(rec_type, name, B, my_data_default, \
persist_type)) != NULL) { \
if (i_am_the_record_owner(r->rec_type)) { \
r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED; \
} else { \
send_register_message(r); \
} \
return REC_ERR_OKAY; \
} else { \
return REC_ERR_FAIL; \
}
int
RecRegisterStatInt(RecT rec_type, const char *name, RecInt data_default, RecPersistT persist_type)
{
REC_REGISTER_STAT_XXX(rec_int, RECD_INT);
}
int
RecRegisterStatLLong(RecT rec_type, const char *name, RecLLong data_default, RecPersistT persist_type)
{
REC_REGISTER_STAT_XXX(rec_llong, RECD_LLONG);
}
int
RecRegisterStatFloat(RecT rec_type, const char *name, RecFloat data_default, RecPersistT persist_type)
{
REC_REGISTER_STAT_XXX(rec_float, RECD_FLOAT);
}
int
RecRegisterStatString(RecT rec_type, const char *name, RecString data_default, RecPersistT persist_type)
{
REC_REGISTER_STAT_XXX(rec_string, RECD_STRING);
}
int
RecRegisterStatCounter(RecT rec_type, const char *name, RecCounter data_default, RecPersistT persist_type)
{
REC_REGISTER_STAT_XXX(rec_counter, RECD_COUNTER);
}
//-------------------------------------------------------------------------
// RecRegisterConfigXXX
//-------------------------------------------------------------------------
#define REC_REGISTER_CONFIG_XXX(A, B) \
RecRecord *r; \
RecData my_data_default; \
my_data_default.A = data_default; \
if ((r = RecRegisterConfig(rec_type, name, B, my_data_default, \
update_type, check_type, \
check_regex, access_type)) != NULL) { \
if (i_am_the_record_owner(r->rec_type)) { \
r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED; \
} else { \
send_register_message(r); \
} \
return REC_ERR_OKAY; \
} else { \
return REC_ERR_FAIL; \
}
int
RecRegisterConfigInt(RecT rec_type, const char *name,
RecInt data_default, RecUpdateT update_type,
RecCheckT check_type, const char *check_regex, RecAccessT access_type)
{
ink_debug_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
REC_REGISTER_CONFIG_XXX(rec_int, RECD_INT);
}
int
RecRegisterConfigLLong(RecT rec_type, const char *name,
RecLLong data_default, RecUpdateT update_type,
RecCheckT check_type, const char *check_regex, RecAccessT access_type)
{
ink_debug_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
REC_REGISTER_CONFIG_XXX(rec_llong, RECD_LLONG);
}
int
RecRegisterConfigFloat(RecT rec_type, const char *name,
RecFloat data_default, RecUpdateT update_type,
RecCheckT check_type, const char *check_regex, RecAccessT access_type)
{
ink_debug_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
REC_REGISTER_CONFIG_XXX(rec_float, RECD_FLOAT);
}
int
RecRegisterConfigString(RecT rec_type, const char *name,
const char *data_default_tmp, RecUpdateT update_type,
RecCheckT check_type, const char *check_regex, RecAccessT access_type)
{
RecString data_default = (RecString)data_default_tmp;
ink_debug_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
REC_REGISTER_CONFIG_XXX(rec_string, RECD_STRING);
}
int
RecRegisterConfigCounter(RecT rec_type, const char *name,
RecCounter data_default, RecUpdateT update_type,
RecCheckT check_type, const char *check_regex, RecAccessT access_type)
{
ink_debug_assert((rec_type == RECT_CONFIG) || (rec_type == RECT_LOCAL));
REC_REGISTER_CONFIG_XXX(rec_counter, RECD_COUNTER);
}
//-------------------------------------------------------------------------
// RecSetRecordXXX
//-------------------------------------------------------------------------
int
RecSetRecord(RecT rec_type, const char *name, RecDataT data_type, RecData *data, RecRawStat *data_raw, bool lock)
{
int err = REC_ERR_OKAY;
RecRecord *r1;
// FIXME: Most of the time we set, we don't actually need to wrlock
// since we are not modifying the g_records_ht.
if (lock) {
ink_rwlock_wrlock(&g_records_rwlock);
}
if (ink_hash_table_lookup(g_records_ht, name, (void **) &r1)) {
if (i_am_the_record_owner(r1->rec_type)) {
rec_mutex_acquire(&(r1->lock));
if ((data_type != RECD_NULL) && (r1->data_type != data_type)) {
err = REC_ERR_FAIL;
} else {
if (data_type == RECD_NULL) {
ink_assert(data->rec_string);
switch (r1->data_type) {
case RECD_INT:
r1->data.rec_int = atoi(data->rec_string);
data_type = RECD_INT;
break;
case RECD_LLONG:
r1->data.rec_llong = ink_atoll(data->rec_string);
data_type = RECD_LLONG;
break;
case RECD_FLOAT:
r1->data.rec_float = atof(data->rec_string);
data_type = RECD_FLOAT;
break;
case RECD_STRING:
data_type = RECD_STRING;
r1->data.rec_string = data->rec_string;
break;
case RECD_COUNTER:
r1->data.rec_int = atoi(data->rec_string);
data_type = RECD_COUNTER;
break;
default:
err = REC_ERR_FAIL;
break;
}
}
g_num_update[r1->rec_type]++;
if (RecDataSet(data_type, &(r1->data), data)) {
r1->sync_required = REC_SYNC_REQUIRED;
if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
r1->config_meta.update_required = REC_UPDATE_REQUIRED;
}
}
if (REC_TYPE_IS_STAT(r1->rec_type) && (data_raw != NULL)) {
r1->stat_meta.data_raw = *data_raw;
}
}
rec_mutex_release(&(r1->lock));
} else {
// We don't need to xstrdup() here as we will make copies of any
// strings when we marshal them into our RecMessage buffer.
RecRecord r2;
memset(&r2, 0, sizeof(RecRecord));
r2.rec_type = rec_type;
r2.name = name;
r2.data_type = (data_type != RECD_NULL) ? data_type : r1->data_type;
r2.data = *data;
if (REC_TYPE_IS_STAT(r2.rec_type) && (data_raw != NULL)) {
r2.stat_meta.data_raw = *data_raw;
}
err = send_set_message(&r2);
}
} else {
// Add the record but do not set the 'registered' flag, as this
// record really hasn't been registered yet. Also, in order to
// add the record, we need to have a rec_type, so if the user
// calls RecSetRecord on a record we haven't registered yet, we
// should fail out here.
if ((rec_type == RECT_NULL) || (data_type == RECD_NULL)) {
err = REC_ERR_FAIL;
goto Ldone;
}
r1 = RecAlloc(rec_type, name, data_type);
RecDataSet(data_type, &(r1->data), data);
if (REC_TYPE_IS_STAT(r1->rec_type) && (data_raw != NULL)) {
r1->stat_meta.data_raw = *data_raw;
}
if (i_am_the_record_owner(r1->rec_type)) {
r1->sync_required = r1->sync_required | REC_PEER_SYNC_REQUIRED;
} else {
err = send_set_message(r1);
}
ink_hash_table_insert(g_records_ht, name, (void *) r1);
}
Ldone:
if (lock) {
ink_rwlock_unlock(&g_records_rwlock);
}
return err;
}
int
RecSetRecordConvert(const char *name, const RecString rec_string, bool lock)
{
RecData data;
data.rec_string = rec_string;
return RecSetRecord(RECT_NULL, name, RECD_NULL, &data, NULL, lock);
}
int
RecSetRecordInt(const char *name, RecInt rec_int, bool lock)
{
RecData data;
data.rec_int = rec_int;
return RecSetRecord(RECT_NULL, name, RECD_INT, &data, NULL, lock);
}
int
RecSetRecordLLong(const char *name, RecLLong rec_llong, bool lock)
{
RecData data;
data.rec_llong = rec_llong;
return RecSetRecord(RECT_NULL, name, RECD_LLONG, &data, NULL, lock);
}
int
RecSetRecordFloat(const char *name, RecFloat rec_float, bool lock)
{
RecData data;
data.rec_float = rec_float;
return RecSetRecord(RECT_NULL, name, RECD_FLOAT, &data, NULL, lock);
}
int
RecSetRecordString(const char *name, const RecString rec_string, bool lock)
{
RecData data;
data.rec_string = rec_string;
return RecSetRecord(RECT_NULL, name, RECD_STRING, &data, NULL, lock);
}
int
RecSetRecordCounter(const char *name, RecCounter rec_counter, bool lock)
{
RecData data;
data.rec_counter = rec_counter;
return RecSetRecord(RECT_NULL, name, RECD_COUNTER, &data, NULL, lock);
}
//-------------------------------------------------------------------------
// RecReadStatsFile
//-------------------------------------------------------------------------
int
RecReadStatsFile()
{
RecRecord *r;
RecMessage *m;
RecMessageItr itr;
// lock our hash table
ink_rwlock_wrlock(&g_records_rwlock);
if ((m = RecMessageReadFromDisk(g_stats_snap_fpath)) != NULL) {
if (RecMessageUnmarshalFirst(m, &itr, &r) != REC_ERR_FAIL) {
do {
if ((r->name == NULL) || (!strlen(r->name)))
continue;
RecSetRecord(r->rec_type, r->name, r->data_type, &(r->data), &(r->stat_meta.data_raw), false);
} while (RecMessageUnmarshalNext(m, &itr, &r) != REC_ERR_FAIL);
}
}
ink_rwlock_unlock(&g_records_rwlock);
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecSyncStatsFile
//-------------------------------------------------------------------------
int
RecSyncStatsFile()
{
RecRecord *r;
RecMessage *m;
int i, num_records;
bool sync_to_disk;
// g_mode_type is defined in either RecLocal.cc or RecProcess.cc.
// We can access it since we're inlined by on of these two files.
if (g_mode_type == RECM_SERVER || g_mode_type == RECM_STAND_ALONE) {
m = RecMessageAlloc(RECG_NULL);
num_records = g_num_records;
sync_to_disk = false;
for (i = 0; i < num_records; i++) {
r = &(g_records[i]);
rec_mutex_acquire(&(r->lock));
if (REC_TYPE_IS_STAT(r->rec_type)) {
if (r->stat_meta.persist_type != RECP_NON_PERSISTENT) {
m = RecMessageMarshal_Realloc(m, r);
sync_to_disk = true;
}
}
rec_mutex_release(&(r->lock));
}
if (sync_to_disk) {
RecDebug(DL_Note, "Writing '%s' [%d bytes]", g_stats_snap_fpath, m->o_write - m->o_start + sizeof(RecMessageHdr));
RecMessageWriteToDisk(m, g_stats_snap_fpath);
}
RecMessageFree(m);
}
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecExecStatUpdateFuncs()
//
// Wrapper for RecExecRawStatUpdateFuncs(). Used for non-raw statistics.
//
//-------------------------------------------------------------------------
int
RecExecStatUpdateFuncs()
{
return RecExecRawStatUpdateFuncs();
}
//-------------------------------------------------------------------------
// RecExecRawStatUpdateFuncs()
//
// Parameters 'rsb' and 'id' only applicabel to raw stats.
//
// Note: 1. Although we support a list of update functions, currently,
// only the first function is invovled.
// 2. The update function is responsible to set the value. For
// example: RecSetGlobalRawStatSum(rsb, id, ???);
//
//-------------------------------------------------------------------------
int
RecExecRawStatUpdateFuncs()
{
RecRecord *r;
int i, num_records;
RecStatUpdateFuncList *cur_function = NULL;
num_records = g_num_records;
for (i = 0; i < num_records; i++) {
r = &(g_records[i]);
rec_mutex_acquire(&(r->lock));
if (REC_TYPE_IS_STAT(r->rec_type)) {
if (r->stat_meta.update_func_list) {
cur_function = r->stat_meta.update_func_list;
(*(cur_function->update_func)) (r->name, r->data_type, &(r->data),
cur_function->rsb, cur_function->id, cur_function->update_cookie);
}
}
rec_mutex_release(&(r->lock));
}
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecReadConfigFile
//-------------------------------------------------------------------------
int
RecReadConfigFile()
{
char *fbuf;
int fsize;
const char *line;
int line_num;
char *rec_type_str, *name_str, *data_type_str, *data_str;
RecT rec_type;
RecDataT data_type;
RecData data;
Tokenizer line_tok("\r\n");
tok_iter_state line_tok_state;
RecConfigFileEntry *cfe;
RecDebug(DL_Note, "Reading '%s'", g_rec_config_fpath);
// watch out, we're altering our g_rec_config_xxx structures
ink_mutex_acquire(&g_rec_config_lock);
if (RecFileImport_Xmalloc(g_rec_config_fpath, &fbuf, &fsize) == REC_ERR_FAIL) {
RecLog(DL_Warning, "Could not import '%s'", g_rec_config_fpath);
ink_mutex_release(&g_rec_config_lock);
return REC_ERR_FAIL;
}
// clear our g_rec_config_contents_xxx structures
while (!queue_is_empty(g_rec_config_contents_llq)) {
cfe = (RecConfigFileEntry *) dequeue(g_rec_config_contents_llq);
if (cfe->entry) {
xfree(cfe->entry);
}
xfree(cfe);
}
ink_hash_table_destroy(g_rec_config_contents_ht);
g_rec_config_contents_ht = ink_hash_table_create(InkHashTableKeyType_String);
// lock our hash table
ink_rwlock_wrlock(&g_records_rwlock);
memset(&data, 0, sizeof(RecData));
line_tok.Initialize(fbuf, SHARE_TOKS);
line = line_tok.iterFirst(&line_tok_state);
line_num = 1;
while (line) {
char *lc = xstrdup(line);
char *lt = lc;
char *ln;
while (isspace(*lt))
lt++;
rec_type_str = ink_strtok_r(lt, " \t", &ln);
// check for blank lines and comments
if ((!rec_type_str) || (rec_type_str && (*rec_type_str == '#'))) {
goto L_next_line;
}
name_str = ink_strtok_r(NULL, " \t", &ln);
data_type_str = ink_strtok_r(NULL, " \t", &ln);
// extract the string data (a little bit tricker since it can have spaces)
if (ln) {
// 'ln' will point to either the next token or a bunch of spaces
// if the user didn't supply a value (e.g. 'STRING '). First
// scan past all of the spaces. If we hit a '\0', then we we
// know we didn't have a valid value. If not, set 'data_str' to
// the start of the token and scan until we find the end. Once
// the end is found, back-peddle to remove any trailing spaces.
while (isspace(*ln))
ln++;
if (*ln == '\0') {
data_str = NULL;
} else {
data_str = ln;
while (*ln != '\0')
ln++;
ln--;
while (isspace(*ln) && (ln > data_str))
ln--;
ln++;
*ln = '\0';
}
} else {
data_str = NULL;
}
// check for errors
if (!(rec_type_str && name_str && data_type_str && data_str)) {
RecLog(DL_Warning, "Could not parse line at '%s:%d' -- skipping line: '%s'", g_rec_config_fpath, line_num, line);
goto L_next_line;
}
// record type
rec_type = RECT_NULL;
if (strcmp(rec_type_str, "CONFIG") == 0) {
rec_type = RECT_CONFIG;
} else if (strcmp(rec_type_str, "PROCESS") == 0) {
rec_type = RECT_PROCESS;
} else if (strcmp(rec_type_str, "NODE") == 0) {
rec_type = RECT_NODE;
} else if (strcmp(rec_type_str, "CLUSTER") == 0) {
rec_type = RECT_CLUSTER;
} else if (strcmp(rec_type_str, "LOCAL") == 0) {
rec_type = RECT_LOCAL;
} else {
RecLog(DL_Warning, "Unknown record type '%s' at '%s:%d' -- skipping line",
rec_type_str, g_rec_config_fpath, line_num);
goto L_next_line;
}
// data_type
data_type = RECD_NULL;
if (strcmp(data_type_str, "INT") == 0) {
data_type = RECD_INT;
} else if (strcmp(data_type_str, "LLONG") == 0) {
data_type = RECD_LLONG;
} else if (strcmp(data_type_str, "FLOAT") == 0) {
data_type = RECD_FLOAT;
} else if (strcmp(data_type_str, "STRING") == 0) {
data_type = RECD_STRING;
} else if (strcmp(data_type_str, "COUNTER") == 0) {
data_type = RECD_COUNTER;
} else {
RecLog(DL_Warning, "Unknown data type '%s' at '%s:%d' -- skipping line",
data_type_str, g_rec_config_fpath, line_num);
goto L_next_line;
}
// set the record
RecDataSetFromString(data_type, &data, data_str);
RecSetRecord(rec_type, name_str, data_type, &data, NULL, false);
RecDataClear(data_type, &data);
// update our g_rec_config_contents_xxx
cfe = (RecConfigFileEntry *) xmalloc(sizeof(RecConfigFileEntry));
cfe->entry_type = RECE_RECORD;
cfe->entry = xstrdup(name_str);
enqueue(g_rec_config_contents_llq, (void *) cfe);
ink_hash_table_insert(g_rec_config_contents_ht, name_str, NULL);
goto L_done;
L_next_line:
// store this line into g_rec_config_contents_llq so that we can
// write it out later
cfe = (RecConfigFileEntry *) xmalloc(sizeof(RecConfigFileEntry));
cfe->entry_type = RECE_COMMENT;
cfe->entry = xstrdup(line);
enqueue(g_rec_config_contents_llq, (void *) cfe);
L_done:
line = line_tok.iterNext(&line_tok_state);
line_num++;
xfree(lc);
}
// release our hash table
ink_rwlock_unlock(&g_records_rwlock);
ink_mutex_release(&g_rec_config_lock);
xfree(fbuf);
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecSyncConfigFile
//-------------------------------------------------------------------------
int
RecSyncConfigToTB(textBuffer * tb)
{
int err = REC_ERR_FAIL;
// g_mode_type is defined in either RecLocal.cc or RecProcess.cc.
// We can access it since we're inlined by on of these two files.
if (g_mode_type == RECM_SERVER || g_mode_type == RECM_STAND_ALONE) {
RecRecord *r;
int i, num_records;
RecConfigFileEntry *cfe;
bool sync_to_disk;
ink_mutex_acquire(&g_rec_config_lock);
num_records = g_num_records;
sync_to_disk = false;
for (i = 0; i < num_records; i++) {
r = &(g_records[i]);
rec_mutex_acquire(&(r->lock));
if (REC_TYPE_IS_CONFIG(r->rec_type)) {
if (r->sync_required & REC_DISK_SYNC_REQUIRED) {
if (!ink_hash_table_isbound(g_rec_config_contents_ht, r->name)) {
cfe = (RecConfigFileEntry *) xmalloc(sizeof(RecConfigFileEntry));
cfe->entry_type = RECE_RECORD;
cfe->entry = xstrdup(r->name);
enqueue(g_rec_config_contents_llq, (void *) cfe);
ink_hash_table_insert(g_rec_config_contents_ht, r->name, NULL);
}
r->sync_required = r->sync_required & ~REC_DISK_SYNC_REQUIRED;
sync_to_disk = true;
}
}
rec_mutex_release(&(r->lock));
}
if (sync_to_disk) {
char b[1024];
// okay, we're going to write into our textBuffer
err = REC_ERR_OKAY;
tb->reUse();
ink_rwlock_rdlock(&g_records_rwlock);
LLQrec *llq_rec = g_rec_config_contents_llq->head;
while (llq_rec != NULL) {
cfe = (RecConfigFileEntry *) llq_rec->data;
if (cfe->entry_type == RECE_COMMENT) {
tb->copyFrom(cfe->entry, strlen(cfe->entry));
tb->copyFrom("\n", 1);
} else {
if (ink_hash_table_lookup(g_records_ht, cfe->entry, (void **) &r)) {
rec_mutex_acquire(&(r->lock));
// rec_type
switch (r->rec_type) {
case RECT_CONFIG:
tb->copyFrom("CONFIG ", 7);
break;
case RECT_PROCESS:
tb->copyFrom("PROCESS ", 8);
break;
case RECT_NODE:
tb->copyFrom("NODE ", 5);
break;
case RECT_CLUSTER:
tb->copyFrom("CLUSTER ", 8);
break;
case RECT_LOCAL:
tb->copyFrom("LOCAL ", 6);
break;
default:
ink_debug_assert(!"Unexpected RecT type");
break;
}
// name
tb->copyFrom(cfe->entry, strlen(cfe->entry));
tb->copyFrom(" ", 1);
// data_type and value
switch (r->data_type) {
case RECD_INT:
tb->copyFrom("INT ", 4);
snprintf(b, 1023, "%lld", r->data.rec_int);
tb->copyFrom(b, strlen(b));
break;
case RECD_LLONG:
tb->copyFrom("LLONG ", 6);
snprintf(b, 1023, "%lld", r->data.rec_llong);
tb->copyFrom(b, strlen(b));
break;
case RECD_FLOAT:
tb->copyFrom("FLOAT ", 6);
snprintf(b, 1023, "%f", r->data.rec_float);
tb->copyFrom(b, strlen(b));
break;
case RECD_STRING:
tb->copyFrom("STRING ", 7);
if (r->data.rec_string) {
tb->copyFrom(r->data.rec_string, strlen(r->data.rec_string));
} else {
tb->copyFrom("NULL", strlen("NULL"));
}
break;
case RECD_COUNTER:
tb->copyFrom("COUNTER ", 8);
snprintf(b, 1023, "%lld", r->data.rec_counter);
tb->copyFrom(b, strlen(b));
break;
default:
ink_debug_assert(!"Unexpected RecD type");
break;
}
tb->copyFrom("\n", 1);
rec_mutex_release(&(r->lock));
}
}
llq_rec = llq_rec->next;
}
ink_rwlock_unlock(&g_records_rwlock);
}
ink_mutex_release(&g_rec_config_lock);
}
return err;
}
//-------------------------------------------------------------------------
// RecExecConifgUpdateCbs
//-------------------------------------------------------------------------
int
RecExecConfigUpdateCbs()
{
RecRecord *r;
int i, num_records;
unsigned int update_required_type;
#if defined (REC_LOCAL)
update_required_type = REC_LOCAL_UPDATE_REQUIRED;
#elif defined (REC_PROCESS)
update_required_type = REC_PROCESS_UPDATE_REQUIRED;
#else
#error "Required #define not specificed; expected REC_LOCAL or REC_PROCESS"
#endif
num_records = g_num_records;
for (i = 0; i < num_records; i++) {
r = &(g_records[i]);
rec_mutex_acquire(&(r->lock));
if (REC_TYPE_IS_CONFIG(r->rec_type)) {
/* -- upgrade to support a list of callback functions
if ((r->config_meta.update_required & update_required_type) &&
(r->config_meta.update_cb)) {
(*(r->config_meta.update_cb))(r->name, r->data_type, r->data,
r->config_meta.update_cookie);
r->config_meta.update_required =
r->config_meta.update_required & ~update_required_type;
}
*/
if ((r->config_meta.update_required & update_required_type) && (r->config_meta.update_cb_list)) {
RecConfigUpdateCbList *cur_callback = NULL;
for (cur_callback = r->config_meta.update_cb_list; cur_callback; cur_callback = cur_callback->next) {
(*(cur_callback->update_cb)) (r->name, r->data_type, r->data, cur_callback->update_cookie);
}
r->config_meta.update_required = r->config_meta.update_required & ~update_required_type;
}
}
rec_mutex_release(&(r->lock));
}
return REC_ERR_OKAY;
}
//------------------------------------------------------------------------
// RecResetStatRecord
//------------------------------------------------------------------------
int
RecResetStatRecord(char *name)
{
RecRecord *r1 = NULL;
int err = REC_ERR_OKAY;
if (ink_hash_table_lookup(g_records_ht, name, (void **) &r1)) {
if (i_am_the_record_owner(r1->rec_type)) {
rec_mutex_acquire(&(r1->lock));
RecDataSet(r1->data_type, &(r1->data), &(r1->data_default));
rec_mutex_release(&(r1->lock));
err = REC_ERR_OKAY;
} else {
RecRecord r2;
memset(&r2, 0, sizeof(RecRecord));
r2.rec_type = r1->rec_type;
r2.name = r1->name;
r2.data_type = r1->data_type;
r2.data = r1->data_default;
err = send_set_message(&r2);
}
} else {
err = REC_ERR_FAIL;
}
return err;
}
//------------------------------------------------------------------------
// RecResetStatRecord
//------------------------------------------------------------------------
int
RecResetStatRecord(RecT type)
{
int i, num_records;
int err = REC_ERR_OKAY;
RecDebug(DL_Note, "Reset Statistics Records");
num_records = g_num_records;
for (i = 0; i < num_records; i++) {
RecRecord *r1 = &(g_records[i]);
if (REC_TYPE_IS_STAT(r1->rec_type) &&
((type == RECT_NULL) || (r1->rec_type == type)) &&
(r1->stat_meta.persist_type != RECP_NON_PERSISTENT) && (r1->data_type != RECD_STRING)) {
if (i_am_the_record_owner(r1->rec_type)) {
rec_mutex_acquire(&(r1->lock));
if (!RecDataSet(r1->data_type, &(r1->data), &(r1->data_default))) {
err = REC_ERR_FAIL;
}
rec_mutex_release(&(r1->lock));
} else {
RecRecord r2;
memset(&r2, 0, sizeof(RecRecord));
r2.rec_type = r1->rec_type;
r2.name = r1->name;
r2.data_type = r1->data_type;
r2.data = r1->data_default;
err = send_set_message(&r2);
}
}
}
return err;
}
int
RecSetSyncRequired(char *name, bool lock)
{
int err = REC_ERR_FAIL;
RecRecord *r1;
// FIXME: Most of the time we set, we don't actually need to wrlock
// since we are not modifying the g_records_ht.
if (lock) {
ink_rwlock_wrlock(&g_records_rwlock);
}
if (ink_hash_table_lookup(g_records_ht, name, (void **) &r1)) {
if (i_am_the_record_owner(r1->rec_type)) {
rec_mutex_acquire(&(r1->lock));
r1->sync_required = REC_SYNC_REQUIRED;
if (REC_TYPE_IS_CONFIG(r1->rec_type)) {
r1->config_meta.update_required = REC_UPDATE_REQUIRED;
}
rec_mutex_release(&(r1->lock));
err = REC_ERR_OKAY;
} else {
// No point of doing the following because our peer will
// set the value with RecDataSet. However, since
// r2.name == r1->name, the sync_required bit will not be
// set.
/*
RecRecord r2;
memset(&r2, 0, sizeof(RecRecord));
r2.rec_type = r1->rec_type;
r2.name = r1->name;
r2.data_type = r1->data_type;
r2.data = r1->data_default;
err = send_set_message(&r2);
*/
}
}
if (lock) {
ink_rwlock_unlock(&g_records_rwlock);
}
return err;
}