blob: 8cab1de88341c49b655539c3e73107f587b8f030 [file] [log] [blame]
/** @file
Record process definitions
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "libts.h"
#include "I_Tasks.h"
#include "P_EventSystem.h"
#include "P_RecCore.h"
#include "P_RecProcess.h"
#include "P_RecMessage.h"
#include "P_RecUtils.h"
#include "P_RecFile.h"
#include "mgmtapi.h"
static bool g_initialized = false;
static bool g_message_initialized = false;
static bool g_started = false;
static EventNotify g_force_req_notify;
static int g_rec_raw_stat_sync_interval_ms = REC_RAW_STAT_SYNC_INTERVAL_MS;
static int g_rec_config_update_interval_ms = REC_CONFIG_UPDATE_INTERVAL_MS;
static int g_rec_remote_sync_interval_ms = REC_REMOTE_SYNC_INTERVAL_MS;
static Event *raw_stat_sync_cont_event;
static Event *config_update_cont_event;
static Event *sync_cont_event;
//-------------------------------------------------------------------------
// i_am_the_record_owner, only used for librecprocess.a
//-------------------------------------------------------------------------
bool
i_am_the_record_owner(RecT rec_type)
{
if (g_mode_type == RECM_CLIENT) {
switch (rec_type) {
case RECT_PROCESS:
case RECT_PLUGIN:
return true;
case RECT_CONFIG:
case RECT_NODE:
case RECT_CLUSTER:
case RECT_LOCAL:
return false;
default:
ink_assert(!"Unexpected RecT type");
return false;
}
} else if (g_mode_type == RECM_STAND_ALONE) {
switch (rec_type) {
case RECT_CONFIG:
case RECT_PROCESS:
case RECT_NODE:
case RECT_CLUSTER:
case RECT_LOCAL:
case RECT_PLUGIN:
return true;
default:
ink_assert(!"Unexpected RecT type");
return false;
}
}
return false;
}
//-------------------------------------------------------------------------
// Simple setters for the intervals to decouple this from the proxy
//-------------------------------------------------------------------------
void
RecProcess_set_raw_stat_sync_interval_ms(int ms) {
Debug("statsproc", "g_rec_raw_stat_sync_interval_ms -> %d", ms);
g_rec_raw_stat_sync_interval_ms = ms;
if (raw_stat_sync_cont_event) {
Debug("statsproc", "Rescheduling raw-stat syncer");
raw_stat_sync_cont_event->schedule_every(HRTIME_MSECONDS(g_rec_raw_stat_sync_interval_ms));
}
}
void
RecProcess_set_config_update_interval_ms(int ms) {
Debug("statsproc", "g_rec_config_update_interval_ms -> %d", ms);
g_rec_config_update_interval_ms = ms;
if (config_update_cont_event) {
Debug("statsproc", "Rescheduling config syncer");
config_update_cont_event->schedule_every(HRTIME_MSECONDS(g_rec_config_update_interval_ms));
}
}
void
RecProcess_set_remote_sync_interval_ms(int ms) {
Debug("statsproc", "g_rec_remote_sync_interval_ms -> %d", ms);
g_rec_remote_sync_interval_ms = ms;
if (sync_cont_event) {
Debug("statsproc", "Rescheduling remote syncer");
sync_cont_event->schedule_every(HRTIME_MSECONDS(g_rec_remote_sync_interval_ms));
}
}
//-------------------------------------------------------------------------
// raw_stat_get_total
//-------------------------------------------------------------------------
static int
raw_stat_get_total(RecRawStatBlock *rsb, int id, RecRawStat *total)
{
int i;
RecRawStat *tlp;
total->sum = 0;
total->count = 0;
// get global values
total->sum = rsb->global[id]->sum;
total->count = rsb->global[id]->count;
// get thread local values
for (i = 0; i < eventProcessor.n_ethreads; i++) {
tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
total->sum += tlp->sum;
total->count += tlp->count;
}
for (i = 0; i < eventProcessor.n_dthreads; i++) {
tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
total->sum += tlp->sum;
total->count += tlp->count;
}
if (total->sum < 0) { // Assure that we stay positive
total->sum = 0;
}
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// raw_stat_sync_to_global
//-------------------------------------------------------------------------
static int
raw_stat_sync_to_global(RecRawStatBlock *rsb, int id)
{
int i;
RecRawStat *tlp;
RecRawStat total;
total.sum = 0;
total.count = 0;
// sum the thread local values
for (i = 0; i < eventProcessor.n_ethreads; i++) {
tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
total.sum += tlp->sum;
total.count += tlp->count;
}
for (i = 0; i < eventProcessor.n_dthreads; i++) {
tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
total.sum += tlp->sum;
total.count += tlp->count;
}
if (total.sum < 0) { // Assure that we stay positive
total.sum = 0;
}
// lock so the setting of the globals and last values are atomic
ink_mutex_acquire(&(rsb->mutex));
// get the delta from the last sync
RecRawStat delta;
delta.sum = total.sum - rsb->global[id]->last_sum;
delta.count = total.count - rsb->global[id]->last_count;
// This is too verbose now, so leaving it out / leif
//Debug("stats", "raw_stat_sync_to_global(): rsb pointer:%p id:%d delta:%" PRId64 " total:%" PRId64 " last:%" PRId64 " global:%" PRId64 "\n",
//rsb, id, delta.sum, total.sum, rsb->global[id]->last_sum, rsb->global[id]->sum);
// increment the global values by the delta
ink_atomic_increment(&(rsb->global[id]->sum), delta.sum);
ink_atomic_increment(&(rsb->global[id]->count), delta.count);
// set the new totals as the last values seen
ink_atomic_swap(&(rsb->global[id]->last_sum), total.sum);
ink_atomic_swap(&(rsb->global[id]->last_count), total.count);
ink_mutex_release(&(rsb->mutex));
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// raw_stat_clear
//-------------------------------------------------------------------------
static int
raw_stat_clear(RecRawStatBlock *rsb, int id)
{
Debug("stats", "raw_stat_clear(): rsb pointer:%p id:%d\n", rsb, id);
// the globals need to be reset too
// lock so the setting of the globals and last values are atomic
ink_mutex_acquire(&(rsb->mutex));
ink_atomic_swap(&(rsb->global[id]->sum), (int64_t)0);
ink_atomic_swap(&(rsb->global[id]->last_sum), (int64_t)0);
ink_atomic_swap(&(rsb->global[id]->count), (int64_t)0);
ink_atomic_swap(&(rsb->global[id]->last_count), (int64_t)0);
ink_mutex_release(&(rsb->mutex));
// reset the local stats
RecRawStat *tlp;
for (int i = 0; i < eventProcessor.n_ethreads; i++) {
tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
ink_atomic_swap(&(tlp->sum), (int64_t)0);
ink_atomic_swap(&(tlp->count), (int64_t)0);
}
for (int i = 0; i < eventProcessor.n_dthreads; i++) {
tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
ink_atomic_swap(&(tlp->sum), (int64_t)0);
ink_atomic_swap(&(tlp->count), (int64_t)0);
}
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// raw_stat_clear_sum
//-------------------------------------------------------------------------
static int
raw_stat_clear_sum(RecRawStatBlock *rsb, int id)
{
Debug("stats", "raw_stat_clear_sum(): rsb pointer:%p id:%d\n", rsb, id);
// the globals need to be reset too
// lock so the setting of the globals and last values are atomic
ink_mutex_acquire(&(rsb->mutex));
ink_atomic_swap(&(rsb->global[id]->sum), (int64_t)0);
ink_atomic_swap(&(rsb->global[id]->last_sum), (int64_t)0);
ink_mutex_release(&(rsb->mutex));
// reset the local stats
RecRawStat *tlp;
for (int i = 0; i < eventProcessor.n_ethreads; i++) {
tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
ink_atomic_swap(&(tlp->sum), (int64_t)0);
}
for (int i = 0; i < eventProcessor.n_dthreads; i++) {
tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
ink_atomic_swap(&(tlp->sum), (int64_t)0);
}
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// raw_stat_clear_count
//-------------------------------------------------------------------------
static int
raw_stat_clear_count(RecRawStatBlock *rsb, int id)
{
Debug("stats", "raw_stat_clear_count(): rsb pointer:%p id:%d\n", rsb, id);
// the globals need to be reset too
// lock so the setting of the globals and last values are atomic
ink_mutex_acquire(&(rsb->mutex));
ink_atomic_swap(&(rsb->global[id]->count), (int64_t)0);
ink_atomic_swap(&(rsb->global[id]->last_count), (int64_t)0);
ink_mutex_release(&(rsb->mutex));
// reset the local stats
RecRawStat *tlp;
for (int i = 0; i < eventProcessor.n_ethreads; i++) {
tlp = ((RecRawStat *) ((char *) (eventProcessor.all_ethreads[i]) + rsb->ethr_stat_offset)) + id;
ink_atomic_swap(&(tlp->count), (int64_t)0);
}
for (int i = 0; i < eventProcessor.n_dthreads; i++) {
tlp = ((RecRawStat *) ((char *) (eventProcessor.all_dthreads[i]) + rsb->ethr_stat_offset)) + id;
ink_atomic_swap(&(tlp->count), (int64_t)0);
}
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// recv_message_cb__process
//-------------------------------------------------------------------------
static int
recv_message_cb__process(RecMessage *msg, RecMessageT msg_type, void *cookie)
{
int err;
if ((err = recv_message_cb(msg, msg_type, cookie)) == REC_ERR_OKAY) {
if (msg_type == RECG_PULL_ACK) {
g_force_req_notify.lock();
g_force_req_notify.signal();
g_force_req_notify.unlock();
}
}
return err;
}
//-------------------------------------------------------------------------
// raw_stat_sync_cont
//-------------------------------------------------------------------------
struct raw_stat_sync_cont: public Continuation
{
raw_stat_sync_cont(ProxyMutex *m)
: Continuation(m)
{
SET_HANDLER(&raw_stat_sync_cont::exec_callbacks);
}
int exec_callbacks(int event, Event *e)
{
RecExecRawStatSyncCbs();
Debug("statsproc", "raw_stat_sync_cont() processed");
return EVENT_CONT;
}
};
//-------------------------------------------------------------------------
// config_update_cont
//-------------------------------------------------------------------------
struct config_update_cont: public Continuation
{
config_update_cont(ProxyMutex *m)
: Continuation(m)
{
SET_HANDLER(&config_update_cont::exec_callbacks);
}
int exec_callbacks(int event, Event *e)
{
RecExecConfigUpdateCbs(REC_PROCESS_UPDATE_REQUIRED);
Debug("statsproc", "config_update_cont() processed");
return EVENT_CONT;
}
};
//-------------------------------------------------------------------------
// sync_cont
//-------------------------------------------------------------------------
struct sync_cont: public Continuation
{
textBuffer *m_tb;
sync_cont(ProxyMutex *m)
: Continuation(m)
{
SET_HANDLER(&sync_cont::sync);
m_tb = NEW(new textBuffer(65536));
}
~sync_cont()
{
if (m_tb != NULL) {
delete m_tb;
m_tb = NULL;
}
}
int sync(int event, Event *e)
{
send_push_message();
RecSyncStatsFile();
if (RecSyncConfigToTB(m_tb) == REC_ERR_OKAY) {
RecWriteConfigFile(m_tb);
}
Debug("statsproc", "sync_cont() processed");
return EVENT_CONT;
}
};
//-------------------------------------------------------------------------
// RecProcessInit
//-------------------------------------------------------------------------
int
RecProcessInit(RecModeT mode_type, Diags *_diags)
{
if (g_initialized) {
return REC_ERR_OKAY;
}
g_mode_type = mode_type;
if (RecCoreInit(mode_type, _diags) == REC_ERR_FAIL) {
return REC_ERR_FAIL;
}
/* -- defer RecMessageInit() until ProcessManager is initialized and
* started
if (RecMessageInit(mode_type) == REC_ERR_FAIL) {
return REC_ERR_FAIL;
}
if (RecMessageRegisterRecvCb(recv_message_cb__process, NULL)) {
return REC_ERR_FAIL;
}
ink_cond_init(&g_force_req_cond);
ink_mutex_init(&g_force_req_mutex, NULL);
if (mode_type == RECM_CLIENT) {
send_pull_message(RECG_PULL_REQ);
ink_cond_wait(&g_force_req_cond, &g_force_req_mutex);
ink_mutex_release(&g_force_req_mutex);
}
*/
g_initialized = true;
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecProcessInitMessage
//-------------------------------------------------------------------------
int
RecProcessInitMessage(RecModeT mode_type)
{
if (g_message_initialized) {
return REC_ERR_OKAY;
}
if (RecMessageInit() == REC_ERR_FAIL) {
return REC_ERR_FAIL;
}
if (RecMessageRegisterRecvCb(recv_message_cb__process, NULL)) {
return REC_ERR_FAIL;
}
if (mode_type == RECM_CLIENT) {
send_pull_message(RECG_PULL_REQ);
g_force_req_notify.lock();
g_force_req_notify.wait();
g_force_req_notify.unlock();
}
g_message_initialized = true;
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecProcessStart
//-------------------------------------------------------------------------
int
RecProcessStart(void)
{
if (g_started) {
return REC_ERR_OKAY;
}
Debug("statsproc", "Starting sync continuations:");
raw_stat_sync_cont *rssc = NEW(new raw_stat_sync_cont(new_ProxyMutex()));
Debug("statsproc", "\traw-stat syncer");
raw_stat_sync_cont_event = eventProcessor.schedule_every(rssc, HRTIME_MSECONDS(g_rec_raw_stat_sync_interval_ms), ET_TASK);
config_update_cont *cuc = NEW(new config_update_cont(new_ProxyMutex()));
Debug("statsproc", "\tconfig syncer");
config_update_cont_event = eventProcessor.schedule_every(cuc, HRTIME_MSECONDS(g_rec_config_update_interval_ms), ET_TASK);
sync_cont *sc = NEW(new sync_cont(new_ProxyMutex()));
Debug("statsproc", "\tremote syncer");
sync_cont_event = eventProcessor.schedule_every(sc, HRTIME_MSECONDS(g_rec_remote_sync_interval_ms), ET_TASK);
g_started = true;
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecAllocateRawStatBlock
//-------------------------------------------------------------------------
RecRawStatBlock *
RecAllocateRawStatBlock(int num_stats)
{
off_t ethr_stat_offset;
RecRawStatBlock *rsb;
// allocate thread-local raw-stat memory
if ((ethr_stat_offset = eventProcessor.allocate(num_stats * sizeof(RecRawStat))) == -1) {
return NULL;
}
// create the raw-stat-block structure
rsb = (RecRawStatBlock *)ats_malloc(sizeof(RecRawStatBlock));
memset(rsb, 0, sizeof(RecRawStatBlock));
rsb->ethr_stat_offset = ethr_stat_offset;
rsb->global = (RecRawStat **)ats_malloc(num_stats * sizeof(RecRawStat *));
memset(rsb->global, 0, num_stats * sizeof(RecRawStat *));
rsb->num_stats = 0;
rsb->max_stats = num_stats;
ink_mutex_init(&(rsb->mutex),"net stat mutex");
return rsb;
}
//-------------------------------------------------------------------------
// RecRegisterRawStat
//-------------------------------------------------------------------------
int
_RecRegisterRawStat(RecRawStatBlock *rsb, RecT rec_type, const char *name, RecDataT data_type, RecPersistT persist_type, int id,
RecRawStatSyncCb sync_cb)
{
Debug("stats", "RecRawStatSyncCb(%s): rsb pointer:%p id:%d\n", name, rsb, id);
// check to see if we're good to proceed
ink_assert(id < rsb->max_stats);
int err = REC_ERR_OKAY;
RecRecord *r;
RecData data_default;
memset(&data_default, 0, sizeof(RecData));
// register the record
if ((r = RecRegisterStat(rec_type, name, data_type, data_default, persist_type)) == NULL) {
err = REC_ERR_FAIL;
goto Ldone;
}
r->rsb_id = id; // This is the index within the RSB raw block for this stat, used for lookups by name.
if (i_am_the_record_owner(r->rec_type)) {
r->sync_required = r->sync_required | REC_PEER_SYNC_REQUIRED;
} else {
send_register_message(r);
}
// store a pointer to our record->stat_meta.data_raw in our rsb
rsb->global[id] = &(r->stat_meta.data_raw);
rsb->global[id]->last_sum = 0;
rsb->global[id]->last_count = 0;
// setup the periodic sync callback
RecRegisterRawStatSyncCb(name, sync_cb, rsb, id);
Ldone:
return err;
}
//-------------------------------------------------------------------------
// RecRawStatSync...
//-------------------------------------------------------------------------
// Note: On these RecRawStatSync callbacks, our 'data' is protected
// under its lock by the caller, so no need to worry!
int
RecRawStatSyncSum(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
{
RecRawStat total;
Debug("stats", "raw sync:sum for %s", name);
raw_stat_sync_to_global(rsb, id);
total.sum = rsb->global[id]->sum;
total.count = rsb->global[id]->count;
RecDataSetFromInk64(data_type, data, total.sum);
return REC_ERR_OKAY;
}
int
RecRawStatSyncCount(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
{
RecRawStat total;
Debug("stats", "raw sync:count for %s", name);
raw_stat_sync_to_global(rsb, id);
total.sum = rsb->global[id]->sum;
total.count = rsb->global[id]->count;
RecDataSetFromInk64(data_type, data, total.count);
return REC_ERR_OKAY;
}
int
RecRawStatSyncAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
{
RecRawStat total;
RecFloat avg = 0.0f;
Debug("stats", "raw sync:avg for %s", name);
raw_stat_sync_to_global(rsb, id);
total.sum = rsb->global[id]->sum;
total.count = rsb->global[id]->count;
if (total.count != 0)
avg = (float) ((double) total.sum / (double) total.count);
RecDataSetFromFloat(data_type, data, avg);
return REC_ERR_OKAY;
}
int
RecRawStatSyncHrTimeAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
{
RecRawStat total;
RecFloat r;
Debug("stats", "raw sync:hr-timeavg for %s", name);
raw_stat_sync_to_global(rsb, id);
total.sum = rsb->global[id]->sum;
total.count = rsb->global[id]->count;
if (total.count == 0) {
r = 0.0f;
} else {
r = (float) ((double) total.sum / (double) total.count);
r = r / (float) (HRTIME_SECOND);
}
RecDataSetFromFloat(data_type, data, r);
return REC_ERR_OKAY;
}
int
RecRawStatSyncIntMsecsToFloatSeconds(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
{
RecRawStat total;
RecFloat r;
Debug("stats", "raw sync:seconds for %s", name);
raw_stat_sync_to_global(rsb, id);
total.sum = rsb->global[id]->sum;
total.count = rsb->global[id]->count;
if (total.count == 0) {
r = 0.0f;
} else {
r = (float) ((double) total.sum / 1000);
}
RecDataSetFromFloat(data_type, data, r);
return REC_ERR_OKAY;
}
int
RecRawStatSyncMHrTimeAvg(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id)
{
RecRawStat total;
RecFloat r;
Debug("stats", "raw sync:mhr-timeavg for %s", name);
raw_stat_sync_to_global(rsb, id);
total.sum = rsb->global[id]->sum;
total.count = rsb->global[id]->count;
if (total.count == 0) {
r = 0.0f;
} else {
r = (float) ((double) total.sum / (double) total.count);
r = r / (float) (HRTIME_MSECOND);
}
RecDataSetFromFloat(data_type, data, r);
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecIncrRawStatXXX
//-------------------------------------------------------------------------
int
RecIncrRawStatBlock(RecRawStatBlock */* rsb ATS_UNUSED */, EThread */* ethread ATS_UNUSED */,
RecRawStat */* stat_array ATS_UNUSED */)
{
return REC_ERR_FAIL;
}
//-------------------------------------------------------------------------
// RecSetRawStatXXX
//-------------------------------------------------------------------------
int
RecSetRawStatSum(RecRawStatBlock *rsb, int id, int64_t data)
{
raw_stat_clear_sum(rsb, id);
ink_atomic_swap(&(rsb->global[id]->sum), data);
return REC_ERR_OKAY;
}
int
RecSetRawStatCount(RecRawStatBlock *rsb, int id, int64_t data)
{
raw_stat_clear_count(rsb, id);
ink_atomic_swap(&(rsb->global[id]->count), data);
return REC_ERR_OKAY;
}
int
RecSetRawStatBlock(RecRawStatBlock */* rsb ATS_UNUSED */, RecRawStat */* stat_array ATS_UNUSED */)
{
return REC_ERR_FAIL;
}
//-------------------------------------------------------------------------
// RecGetRawStatXXX
//-------------------------------------------------------------------------
int
RecGetRawStatSum(RecRawStatBlock *rsb, int id, int64_t *data)
{
RecRawStat total;
raw_stat_get_total(rsb, id, &total);
*data = total.sum;
return REC_ERR_OKAY;
}
int
RecGetRawStatCount(RecRawStatBlock *rsb, int id, int64_t *data)
{
RecRawStat total;
raw_stat_get_total(rsb, id, &total);
*data = total.count;
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecIncrGlobalRawStatXXX
//-------------------------------------------------------------------------
int
RecIncrGlobalRawStat(RecRawStatBlock *rsb, int id, int64_t incr)
{
ink_atomic_increment(&(rsb->global[id]->sum), incr);
ink_atomic_increment(&(rsb->global[id]->count), 1);
return REC_ERR_OKAY;
}
int
RecIncrGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t incr)
{
ink_atomic_increment(&(rsb->global[id]->sum), incr);
return REC_ERR_OKAY;
}
int
RecIncrGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t incr)
{
ink_atomic_increment(&(rsb->global[id]->count), incr);
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecSetGlobalRawStatXXX
//-------------------------------------------------------------------------
int
RecSetGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t data)
{
ink_atomic_swap(&(rsb->global[id]->sum), data);
return REC_ERR_OKAY;
}
int
RecSetGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t data)
{
ink_atomic_swap(&(rsb->global[id]->count), data);
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecGetGlobalRawStatXXX
//-------------------------------------------------------------------------
int
RecGetGlobalRawStatSum(RecRawStatBlock *rsb, int id, int64_t *data)
{
*data = rsb->global[id]->sum;
return REC_ERR_OKAY;
}
int
RecGetGlobalRawStatCount(RecRawStatBlock *rsb, int id, int64_t *data)
{
*data = rsb->global[id]->count;
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RegGetGlobalRawStatXXXPtr
//-------------------------------------------------------------------------
RecRawStat *
RecGetGlobalRawStatPtr(RecRawStatBlock *rsb, int id)
{
return rsb->global[id];
}
int64_t *
RecGetGlobalRawStatSumPtr(RecRawStatBlock *rsb, int id)
{
return &(rsb->global[id]->sum);
}
int64_t *
RecGetGlobalRawStatCountPtr(RecRawStatBlock *rsb, int id)
{
return &(rsb->global[id]->count);
}
//-------------------------------------------------------------------------
// RecRegisterRawStatSyncCb
//-------------------------------------------------------------------------
int
RecRegisterRawStatSyncCb(const char *name, RecRawStatSyncCb sync_cb, RecRawStatBlock *rsb, int id)
{
int err = REC_ERR_FAIL;
RecRecord *r;
ink_rwlock_rdlock(&g_records_rwlock);
if (ink_hash_table_lookup(g_records_ht, name, (void **) &r)) {
rec_mutex_acquire(&(r->lock));
if (REC_TYPE_IS_STAT(r->rec_type)) {
if (!(r->stat_meta.sync_cb)) {
r->stat_meta.sync_rsb = rsb;
r->stat_meta.sync_id = id;
r->stat_meta.sync_cb = sync_cb;
r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version = r->version;
err = REC_ERR_OKAY;
} else {
ink_release_assert(false); // We shouldn't register CBs twice...
}
}
rec_mutex_release(&(r->lock));
}
ink_rwlock_unlock(&g_records_rwlock);
return err;
}
//-------------------------------------------------------------------------
// RecExecRawStatSyncCbs
//-------------------------------------------------------------------------
int
RecExecRawStatSyncCbs()
{
RecRecord *r;
int i, num_records;
num_records = g_num_records;
for (i = 0; i < num_records; i++) {
r = &(g_records[i]);
rec_mutex_acquire(&(r->lock));
if (REC_TYPE_IS_STAT(r->rec_type)) {
if (r->stat_meta.sync_cb) {
if (r->version && r->version != r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version) {
raw_stat_clear(r->stat_meta.sync_rsb, r->stat_meta.sync_id);
r->stat_meta.sync_rsb->global[r->stat_meta.sync_id]->version = r->version;
} else {
(*(r->stat_meta.sync_cb)) (r->name, r->data_type, &(r->data), r->stat_meta.sync_rsb, r->stat_meta.sync_id);
}
r->sync_required = REC_SYNC_REQUIRED;
}
}
rec_mutex_release(&(r->lock));
}
return REC_ERR_OKAY;
}