blob: 904064c77801639364095131ffdd927f8dd4b8fe [file] [log] [blame]
/** @file
Record message definitions
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "inktomi++.h"
#include "P_RecCompatibility.h"
#include "P_RecMessage.h"
#include "P_RecUtils.h"
static bool g_message_initialized = false;
static RecMessageRecvCb g_recv_cb = NULL;
static void *g_recv_cookie = NULL;
static RecModeT g_mode_type;
//-------------------------------------------------------------------------
//
// REC_BUILD_STAND_ALONE IMPLEMENTATION
//
//-------------------------------------------------------------------------
#if defined (REC_BUILD_STAND_ALONE)
static LLQ *g_send_llq = NULL;
static LLQ *g_recv_llq = NULL;
//-------------------------------------------------------------------------
// send_thr
//-------------------------------------------------------------------------
static void *
send_thr(void *data)
{
int msg_size;
RecMessageHdr *msg_hdr;
RecHandle h_pipe = (RecHandle)(intptr_t)data;
while (true) {
// dequeue will block if there's nothing in the queue
msg_hdr = (RecMessageHdr *) dequeue(g_send_llq);
msg_size = (msg_hdr->o_end - msg_hdr->o_start) + sizeof(RecMessageHdr);
if (RecPipeWrite(h_pipe, (char *) msg_hdr, msg_size) == REC_ERR_FAIL) {
ink_release_assert("Pipe write failed, message lost");
}
xfree(msg_hdr);
}
return NULL;
}
//-------------------------------------------------------------------------
// recv_thr
//-------------------------------------------------------------------------
static void *
recv_thr(void *data)
{
int msg_size;
RecMessageHdr msg_hdr;
RecMessage *msg;
RecHandle h_pipe = (RecHandle)(intptr_t)data;
while (true) {
if (RecPipeRead(h_pipe, (char *) (&msg_hdr), sizeof(RecMessageHdr)) == REC_ERR_FAIL) {
ink_release_assert("Pipe read failed");
}
msg = (RecMessage *) xmalloc((msg_hdr.o_end - msg_hdr.o_start) + sizeof(RecMessageHdr));
memcpy(msg, &msg_hdr, sizeof(RecMessageHdr));
if (RecPipeRead(h_pipe, (char *) (msg) + msg_hdr.o_start, msg_hdr.o_end - msg_hdr.o_start) == REC_ERR_FAIL) {
ink_release_assert("Pipe read failed");
}
msg_size = msg_hdr.o_end - msg_hdr.o_start + sizeof(RecMessageHdr);
enqueue(g_recv_llq, msg);
}
return NULL;
}
//-------------------------------------------------------------------------
// accept_thr
//-------------------------------------------------------------------------
static void *
accept_thr(void *data)
{
REC_NOWARN_UNUSED(data);
RecHandle h_pipe;
h_pipe = RecPipeCreate(REC_PIPE_DIR, REC_PIPE_NAME);
ink_thread_create(send_thr, (void *) h_pipe);
ink_thread_create(recv_thr, (void *) h_pipe);
return NULL;
}
//-------------------------------------------------------------------------
// recv_cb_thr
//-------------------------------------------------------------------------
static void *
recv_cb_thr(void *data)
{
REC_NOWARN_UNUSED(data);
RecMessage *msg;
while (true) {
if (g_recv_cb) {
msg = (RecMessage *) dequeue(g_recv_llq);
RecMessageRecvThis(0, (char *) msg, 0);
xfree(msg);
}
}
return NULL;
}
//-------------------------------------------------------------------------
// RecMessageInit
//-------------------------------------------------------------------------
int
RecMessageInit(RecModeT mode_type)
{
RecHandle h_pipe;
if (g_message_initialized) {
return REC_ERR_OKAY;
}
g_mode_type = mode_type;
g_send_llq = create_queue();
g_recv_llq = create_queue();
switch (mode_type) {
case RECM_CLIENT:
h_pipe = RecPipeConnect(REC_PIPE_DIR, REC_PIPE_NAME);
if (h_pipe == REC_HANDLE_INVALID) {
return REC_ERR_FAIL;
}
ink_thread_create(send_thr, (void *) h_pipe);
ink_thread_create(recv_thr, (void *) h_pipe);
break;
case RECM_SERVER:
ink_thread_create(accept_thr, NULL);
break;
case RECM_NULL:
case RECM_STAND_ALONE:
default:
ink_debug_assert(!"Unexpected RecModeT type");
break;
}
ink_thread_create(recv_cb_thr, NULL);
g_message_initialized = true;
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecMessageSend
//-------------------------------------------------------------------------
int
RecMessageSend(RecMessage * msg)
{
RecMessage *msg_cpy;
int msg_cpy_size;
// Make a copy of the record, but truncate it to the size actually used
if (g_mode_type == RECM_CLIENT || g_mode_type == RECM_SERVER) {
msg_cpy_size = sizeof(RecMessageHdr) + (msg->o_write - msg->o_start);
msg_cpy = (RecMessage *) xmalloc(msg_cpy_size);
memcpy(msg_cpy, msg, msg_cpy_size);
msg_cpy->o_end = msg_cpy->o_write;
enqueue(g_send_llq, (void *) msg_cpy);
}
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
//
// REC_BUILD_MGMT2 IMPLEMENTATION
//
//-------------------------------------------------------------------------
#elif defined (REC_BUILD_MGMT2)
#if defined(LOCAL_MANAGER)
#include "LocalManager.h"
#elif defined(PROCESS_MANAGER)
#include "ProcessManager.h"
#else
#error "Required #define not specificed; expected LOCAL_MANAGER or PROCESS_MANAGER"
#endif
//-------------------------------------------------------------------------
// RecMessageInit
//-------------------------------------------------------------------------
int
RecMessageInit(RecModeT mode_type)
{
if (g_message_initialized) {
return REC_ERR_OKAY;
}
g_mode_type = mode_type;
#if defined (LOCAL_MANAGER)
lmgmt->registerMgmtCallback(MGMT_SIGNAL_LIBRECORDS, RecMessageRecvThis, NULL);
#elif defined(PROCESS_MANAGER)
pmgmt->registerMgmtCallback(MGMT_EVENT_LIBRECORDS, RecMessageRecvThis, NULL);
#endif
g_message_initialized = true;
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecMessageSend
//-------------------------------------------------------------------------
int
RecMessageSend(RecMessage * msg)
{
int msg_size;
// Make a copy of the record, but truncate it to the size actually used
if (g_mode_type == RECM_CLIENT || g_mode_type == RECM_SERVER) {
msg->o_end = msg->o_write;
msg_size = sizeof(RecMessageHdr) + (msg->o_write - msg->o_start);
#if defined (LOCAL_MANAGER)
lmgmt->signalEvent(MGMT_EVENT_LIBRECORDS, (char *) msg, msg_size);
#elif defined(PROCESS_MANAGER)
pmgmt->signalManager(MGMT_SIGNAL_LIBRECORDS, (char *) msg, msg_size);
#endif
}
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
//
// STUB IMPLEMENTATION
//
//-------------------------------------------------------------------------
#elif defined (REC_BUILD_STUB)
#else
#error "Required #define not specificed; expected REC_BUILD_STAND_ALONE, REC_BUILD_MGMT2, or REC_BUILD_STUB"
#endif
//-------------------------------------------------------------------------
// RecMessageAlloc
//-------------------------------------------------------------------------
RecMessage *
RecMessageAlloc(RecMessageT msg_type, int initial_size)
{
RecMessage *msg;
msg = (RecMessage *) xmalloc(sizeof(RecMessageHdr) + initial_size);
memset(msg, 0, sizeof(RecMessageHdr) + initial_size);
msg->msg_type = msg_type;
msg->o_start = sizeof(RecMessageHdr);
msg->o_write = sizeof(RecMessageHdr);
msg->o_end = sizeof(RecMessageHdr) + initial_size;
msg->entries = 0;
return msg;
}
//-------------------------------------------------------------------------
// RecMessageFree
//-------------------------------------------------------------------------
int
RecMessageFree(RecMessage * msg)
{
xfree(msg);
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecMessageMarshal_Realloc
//-------------------------------------------------------------------------
RecMessage *
RecMessageMarshal_Realloc(RecMessage * msg, const RecRecord * record)
{
int msg_ele_size;
int rec_name_len = -1;
int rec_data_str_len = -1;
int rec_data_def_str_len = -1;
int rec_cfg_chk_len = -1;
RecMessageEleHdr *ele_hdr;
RecRecord *r;
char *p;
// find out how much space we need
msg_ele_size = sizeof(RecMessageEleHdr) + sizeof(RecRecord);
if (record->name) {
rec_name_len = strlen(record->name) + 1;
msg_ele_size += rec_name_len;
}
if (record->data_type == RECD_STRING) {
if (record->data.rec_string) {
rec_data_str_len = strlen(record->data.rec_string) + 1;
msg_ele_size += rec_data_str_len;
}
if (record->data_default.rec_string) {
rec_data_def_str_len = strlen(record->data_default.rec_string) + 1;
msg_ele_size += rec_data_def_str_len;
}
}
if (REC_TYPE_IS_CONFIG(record->rec_type) && (record->config_meta.check_expr)) {
rec_cfg_chk_len = strlen(record->config_meta.check_expr) + 1;
msg_ele_size += rec_cfg_chk_len;
}
msg_ele_size = (msg_ele_size + 7) & ~7; // 8 byte alignmenet
// get some space in our buffer
while (msg->o_end - msg->o_write < msg_ele_size) {
int realloc_size = (msg->o_end - msg->o_start) * 2;
msg = (RecMessage *) xrealloc(msg, sizeof(RecMessageHdr) + realloc_size);
msg->o_end = msg->o_start + realloc_size;
}
ele_hdr = (RecMessageEleHdr *) ((char *) msg + msg->o_write);
// The following memset() is pretty CPU intensive, replacing it with something
// like the below would reduce CPU usage a fair amount. /leif.
// *((char*)msg + msg->o_write) = 0;
memset((char *) msg + msg->o_write, 0, msg->o_end - msg->o_write);
msg->o_write += msg_ele_size;
// store the record
ele_hdr->magic = REC_MESSAGE_ELE_MAGIC;
ele_hdr->o_next = msg->o_write;
p = (char *) ele_hdr + sizeof(RecMessageEleHdr);
memcpy(p, record, sizeof(RecRecord));
r = (RecRecord *) p;
p += sizeof(RecRecord);
if (rec_name_len != -1) {
ink_assert((msg->o_end - ((uintptr_t) p - (uintptr_t) msg)) >= (uintptr_t) rec_name_len);
memcpy(p, record->name, rec_name_len);
r->name = (char *) ((uintptr_t) p - (uintptr_t) r);
p += rec_name_len;
}
if (rec_data_str_len != -1) {
ink_assert((msg->o_end - ((uintptr_t) p - (uintptr_t) msg)) >= (uintptr_t) rec_data_str_len);
memcpy(p, record->data.rec_string, rec_data_str_len);
r->data.rec_string = (char *) ((uintptr_t) p - (uintptr_t) r);
p += rec_data_str_len;
}
if (rec_data_def_str_len != -1) {
ink_assert((msg->o_end - ((uintptr_t) p - (uintptr_t) msg)) >= (uintptr_t) rec_data_def_str_len);
memcpy(p, record->data_default.rec_string, rec_data_def_str_len);
r->data_default.rec_string = (char *) ((uintptr_t) p - (uintptr_t) r);
p += rec_data_def_str_len;
}
if (rec_cfg_chk_len != -1) {
ink_assert((msg->o_end - ((uintptr_t) p - (uintptr_t) msg)) >= (uintptr_t) rec_cfg_chk_len);
memcpy(p, record->config_meta.check_expr, rec_cfg_chk_len);
r->config_meta.check_expr = (char *) ((uintptr_t) p - (uintptr_t) r);
p += rec_cfg_chk_len;
}
msg->entries += 1;
return msg;
}
//-------------------------------------------------------------------------
// RecMessageUnmarshalFirst
//-------------------------------------------------------------------------
int
RecMessageUnmarshalFirst(RecMessage * msg, RecMessageItr * itr, RecRecord ** record)
{
itr->ele_hdr = (RecMessageEleHdr *) ((char *) msg + msg->o_start);
itr->next = 1;
return RecMessageUnmarshalNext(msg, NULL, record);
}
//-------------------------------------------------------------------------
// RecMessageUnmarshalNext
//-------------------------------------------------------------------------
int
RecMessageUnmarshalNext(RecMessage * msg, RecMessageItr * itr, RecRecord ** record)
{
RecMessageEleHdr *eh;
RecRecord *r;
if (itr == NULL) {
if (msg->entries == 0) {
return REC_ERR_FAIL;
} else {
eh = (RecMessageEleHdr *) ((char *) msg + msg->o_start);
}
} else {
if (itr->next >= msg->entries) {
return REC_ERR_FAIL;
}
itr->ele_hdr = (RecMessageEleHdr *) ((char *) (msg) + itr->ele_hdr->o_next);
itr->next += 1;
eh = itr->ele_hdr;
}
ink_debug_assert(eh->magic == REC_MESSAGE_ELE_MAGIC);
// If the file is corrupt, ignore the the rest of the file.
if (eh->magic != REC_MESSAGE_ELE_MAGIC) {
Warning("Persistent statistics file records.stat is corrupted. Ignoring the rest of the file\n");
return REC_ERR_FAIL;
}
r = (RecRecord *) ((char *) eh + sizeof(RecMessageEleHdr));
if (r->name) {
r->name = (char *) r + (intptr_t) (r->name);
}
if (r->data_type == RECD_STRING) {
if (r->data.rec_string) {
r->data.rec_string = (char *) r + (intptr_t) (r->data.rec_string);
}
if (r->data_default.rec_string) {
r->data_default.rec_string = (char *) r + (intptr_t) (r->data_default.rec_string);
}
}
if (REC_TYPE_IS_CONFIG(r->rec_type) && (r->config_meta.check_expr)) {
r->config_meta.check_expr = (char *) r + (intptr_t) (r->config_meta.check_expr);
}
*record = r;
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecMessageRegisterRecvCb
//-------------------------------------------------------------------------
int
RecMessageRegisterRecvCb(RecMessageRecvCb recv_cb, void *cookie)
{
if (g_recv_cb) {
return REC_ERR_FAIL;
}
g_recv_cookie = cookie;
g_recv_cb = recv_cb;
return REC_ERR_OKAY;
}
//-------------------------------------------------------------------------
// RecMessageRecvThis
//-------------------------------------------------------------------------
void *
RecMessageRecvThis(void *cookie, char *data_raw, int data_len)
{
REC_NOWARN_UNUSED(cookie);
REC_NOWARN_UNUSED(data_len);
RecMessage *msg = (RecMessage *) data_raw;
g_recv_cb(msg, msg->msg_type, g_recv_cookie);
return NULL;
}
//-------------------------------------------------------------------------
// RecMessageReadFromDisk
//-------------------------------------------------------------------------
RecMessage *
RecMessageReadFromDisk(const char *fpath)
{
RecMessageHdr msg_hdr;
RecMessage *msg = NULL;
RecHandle h_file;
int bytes_read;
if ((h_file = RecFileOpenR(fpath)) == REC_HANDLE_INVALID) {
goto Lerror;
}
if (RecFileRead(h_file, (char *) (&msg_hdr), sizeof(RecMessageHdr), &bytes_read) == REC_ERR_FAIL) {
goto Lerror;
}
msg = (RecMessage *) xmalloc((msg_hdr.o_end - msg_hdr.o_start) + sizeof(RecMessageHdr));
memcpy(msg, &msg_hdr, sizeof(RecMessageHdr));
if (RecFileRead(h_file, (char *) (msg) + msg_hdr.o_start,
msg_hdr.o_end - msg_hdr.o_start, &bytes_read) == REC_ERR_FAIL) {
goto Lerror;
}
goto Ldone;
Lerror:
if (msg != NULL) {
xfree(msg);
msg = NULL;
}
Ldone:
if (h_file != REC_HANDLE_INVALID) {
RecFileClose(h_file);
}
return msg;
}
//-------------------------------------------------------------------------
// RecMessageWriteToDisk
//-------------------------------------------------------------------------
int
RecMessageWriteToDisk(RecMessage *msg, const char *fpath)
{
int msg_size;
RecHandle h_file;
int bytes_written;
// Cap the message (e.g. when we read it, o_end should reflect the
// size of the new buffer that we write to disk, not the size of the
// buffer in memory).
msg->o_end = msg->o_write;
msg_size = sizeof(RecMessageHdr) + (msg->o_write - msg->o_start);
if ((h_file = RecFileOpenW(fpath)) != REC_HANDLE_INVALID) {
if (RecFileWrite(h_file, (char *) msg, msg_size, &bytes_written) == REC_ERR_FAIL) {
return REC_ERR_FAIL;
}
RecFileClose(h_file);
}
return REC_ERR_OKAY;
}
//
// [hack for tsunami, bevans] -- I need this exported to the ACC module somehow,
// the acc module can't include stuff out put proxy/ or mgmt2/
//
int
RecAlarmSignal(int code, const char *msg, int msg_size)
{
#if defined (LOCAL_MANAGER)
lmgmt->signalEvent(code, (char *) msg, msg_size);
#elif defined(PROCESS_MANAGER)
pmgmt->signalManager(code, (char *) msg, msg_size);
#else
REC_NOWARN_UNUSED(code);
REC_NOWARN_UNUSED(msg);
REC_NOWARN_UNUSED(msg_size);
#endif
return REC_ERR_OKAY;
}