blob: dffd8ab89c0976bff131cc3f1ee4fec292bc5edf [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/****************************************************************************
Raf.cc
Description:
****************************************************************************/
#include "Raf.h"
#include "P_RecProcess.h"
#include "rafencode.h"
#include "congest/Congestion.h"
void
start_raf()
{
RecInt enabled = 0;
REC_ReadConfigInteger(enabled, "proxy.config.raf.enabled");
if (enabled) {
RecInt port = 0;
REC_ReadConfigInteger(port, "proxy.config.raf.port");
RafAcceptCont *c = new RafAcceptCont();
c->start(port);
}
}
RafAcceptCont::RafAcceptCont():
Continuation(new_ProxyMutex()), accept_action(NULL), accept_port(0)
{
SET_HANDLER(&RafAcceptCont::state_handle_accept);
}
RafAcceptCont::~RafAcceptCont()
{
mutex = NULL;
if (accept_action != NULL) {
accept_action->cancel();
}
}
void
RafAcceptCont::start(int accept_port_in)
{
ink_debug_assert(accept_action == NULL);
accept_port = accept_port_in;
accept_action = netProcessor.accept(this, accept_port);
}
int
RafAcceptCont::state_handle_accept(int event, void *data)
{
switch (event) {
case NET_EVENT_ACCEPT:
{
NetVConnection *new_vc = (NetVConnection *) data;
unsigned int client_ip = new_vc->get_remote_ip();
// Only allow connections from localhost for security reasons
unsigned int lip = 0;
unsigned char *plip = (unsigned char *) &lip;
plip[0] = 127;
plip[1] = 0;
plip[2] = 0;
plip[3] = 1;
if (client_ip != lip) {
char ip_string[32];
unsigned char *p = (unsigned char *) &(client_ip);
snprintf(ip_string, sizeof(ip_string), "%d.%d.%d.%d", p[0], p[1], p[2], p[3]);
Warning("raf connect by disallowed client %s, closing", ip_string);
new_vc->do_io_close();
return EVENT_DONE;
}
RafCont *c = new RafCont(new_vc);
MUTEX_TRY_LOCK(lock, c->mutex, this_ethread());
c->run();
break;
}
case NET_EVENT_ACCEPT_FAILED:
Warning("Raf accept failed on port %d", accept_port);
accept_action = NULL;
delete this;
break;
default:
ink_release_assert(0);
break;
}
return EVENT_DONE;
}
struct RafCmdEntry
{
const char *name;
RafCmdHandler handler;
};
const RafCmdEntry raf_cmd_table[] = {
{"query", &RafCont::process_query_cmd},
{"congest", &RafCont::process_congestion_cmd},
{"isalive", &RafCont::process_isalive_cmd},
{"exit", &RafCont::process_exit_cmd},
{"quit", &RafCont::process_exit_cmd}
};
int raf_cmd_entries = SIZE(raf_cmd_table);
RafCont::RafCont(NetVConnection * nvc):
Continuation(new_ProxyMutex()),
net_vc(nvc),
read_vio(NULL), write_vio(NULL), input_buffer(NULL), input_reader(NULL), output_buffer(NULL), pending_action(NULL)
{
mime_scanner_init(&scanner);
SET_HANDLER(&RafCont::main_handler);
Debug("raf", "New Raf Connection Accepted");
}
RafCont::~RafCont()
{
if (pending_action) {
pending_action->cancel();
pending_action = NULL;
}
if (net_vc) {
net_vc->do_io_close();
net_vc = NULL;
}
mime_scanner_clear(&scanner);
if (input_buffer) {
free_MIOBuffer(input_buffer);
input_buffer = NULL;
}
if (output_buffer) {
free_MIOBuffer(output_buffer);
output_buffer = NULL;
}
}
void
RafCont::kill()
{
delete this;
}
void
RafCont::run()
{
ink_debug_assert(input_buffer == NULL);
ink_debug_assert(read_vio == NULL);
input_buffer = new_MIOBuffer();
input_reader = input_buffer->alloc_reader();
output_buffer = new_MIOBuffer();
IOBufferReader *output_reader = output_buffer->alloc_reader();
net_vc->set_inactivity_timeout(HRTIME_MINUTES(10));
read_vio = net_vc->do_io_read(this, INT_MAX, input_buffer);
write_vio = net_vc->do_io_write(this, INT_MAX, output_reader);
}
int
RafCont::main_handler(int event, void *data)
{
if (event == CONGESTION_EVENT_CONGESTED_LIST_DONE) {
return state_handle_congest_list(event, data);
}
if (data == read_vio) {
return state_handle_input(event, data);
} else if (data == write_vio) {
return state_handle_output(event, data);
} else {
ink_release_assert(0);
}
return EVENT_DONE;
}
int
RafCont::state_handle_congest_list(int event, void *data)
{
ink_assert(event == CONGESTION_EVENT_CONGESTED_LIST_DONE);
write_vio->reenable();
read_vio->reenable();
/*
output_buffer->write("\r\n", 2);
mime_scanner_clear(&scanner);
// Final cmd
read_vio->nbytes = read_vio->ndone;
write_vio->nbytes =
write_vio->ndone + write_vio->get_reader()->read_avail();
write_vio->reenable();
*/
return EVENT_DONE;
}
int
RafCont::state_handle_output(int event, void *data)
{
Debug("raf", "state_handler_output received event %d", event);
switch (event) {
case VC_EVENT_WRITE_READY:
break;
case VC_EVENT_WRITE_COMPLETE:
case VC_EVENT_ERROR:
case VC_EVENT_INACTIVITY_TIMEOUT:
kill();
break;
default:
ink_release_assert(0);
break;
}
return EVENT_DONE;
}
int
RafCont::state_handle_input(int event, void *data)
{
Debug("raf", "state_handler_input received event %d", event);
switch (event) {
case VC_EVENT_READ_READY:
{
int read_avail = input_reader->read_avail();
const char *output_s;
const char *output_e;
MIMEParseResult r = PARSE_CONT;
while (read_avail > 0) {
int bavail = input_reader->block_read_avail();
const char *start = input_reader->start();
const char *scan_start = start;
bool output_share;
r = mime_scanner_get(&scanner, &scan_start, scan_start + bavail,
&output_s, &output_e, &output_share, false, MIME_SCANNER_TYPE_LINE);
input_reader->consume(scan_start - start);
read_avail = input_reader->read_avail();
if (r != PARSE_CONT) {
break;
}
}
switch (r) {
case PARSE_CONT:
read_vio->reenable();
break;
case PARSE_OK:
{
int r = process_raf_cmd(output_s, output_e);
output_buffer->write("\r\n", 2);
mime_scanner_clear(&scanner);
if (r == 0) {
// Final cmd
read_vio->nbytes = read_vio->ndone;
write_vio->nbytes = write_vio->ndone + write_vio->get_reader()->read_avail();
write_vio->reenable();
} else if (input_reader->read_avail() > 0) {
write_vio->reenable();
state_handle_input(event, data);
} else {
write_vio->reenable();
read_vio->reenable();
}
break;
}
case PARSE_ERROR:
case PARSE_DONE:
// These only occur if eof is set to true on the
// call to mime_scanner_get. Since we never set
// eof to true, this case should never occur
ink_release_assert(0);
break;
}
break;
}
case VC_EVENT_EOS:
case VC_EVENT_ERROR:
case VC_EVENT_INACTIVITY_TIMEOUT:
kill();
break;
}
return EVENT_DONE;
}
void
RafCont::free_cmd_strs(char **argv, int argc)
{
for (int i = 0; i < argc; i++) {
arena.str_free(argv[i]);
}
}
// int RafCont::process_raf_cmd(char* cmd_s, char* cmd_e)
//
// Process raf cmd input.
//
// Returns 1 is we are to keep the socket open & continue reading
// cmds and 0 if we are to close the socket after sending the
// response
//
int
RafCont::process_raf_cmd(const char *cmd_s, const char *cmd_e)
{
int arg_len;
const char *lastp = NULL;
int argc = 0;
char *cmd_ptrs[8];
memset(cmd_ptrs, 0, sizeof(cmd_ptrs));
// Loop over the input extracting the first four arguments
for (int i = 0; i < 8; i++) {
// Make sure we haven't run out of data
if (cmd_s >= cmd_e) {
break;
}
arg_len = raf_decodelen(cmd_s, cmd_e - cmd_s, &lastp);
if (arg_len <= 0) {
break;
}
argc++;
cmd_ptrs[i] = arena.str_alloc(arg_len);
arg_len = raf_decode(cmd_s, cmd_e - cmd_s, cmd_ptrs[i], arg_len, &lastp);
cmd_s = lastp;
(cmd_ptrs[i])[arg_len] = '\0';
}
// Trim CRLF off of last argument
if (argc > 0) {
char *last_arg = cmd_ptrs[argc - 1];
int last_len = strlen(last_arg);
if (last_len >= 2) {
if (last_arg[last_len - 1] == '\n') {
if (last_arg[last_len - 2] == '\r') {
last_arg[last_len - 2] = '\0';
} else {
last_arg[last_len - 1] = '\0';
}
}
}
}
// Send error if insufficient number of arguments
if (argc < 2) {
char *id;
if (argc < 1) {
id = "?";
} else {
id = cmd_ptrs[0];
}
output_raf_error(id, "No command sent");
free_cmd_strs(cmd_ptrs, argc);
return 1;
}
RafCmdHandler jump_point;
for (int j = 0; j < raf_cmd_entries; j++) {
if (strcmp(cmd_ptrs[1], raf_cmd_table[j].name) == 0) {
jump_point = raf_cmd_table[j].handler;
int r = (this->*jump_point) (cmd_ptrs, argc);
free_cmd_strs(cmd_ptrs, argc);
return r;
}
}
char msg[257];
ink_snprintf(msg, 256, "Unknown cmd '%s' sent", cmd_ptrs[1]);
msg[256] = '\0';
output_raf_error(cmd_ptrs[0], msg);
free_cmd_strs(cmd_ptrs, argc);
return 1;
}
void
RafCont::process_query_stat(const char *id, char *var)
{
char val_output[257];
bool r = false;
RecDataT val_type;
int rec_err = RecGetRecordDataType(var, &val_type);
r = (rec_err == REC_ERR_OKAY);
if (r) {
switch (val_type) {
case RECD_INT:
case RECD_COUNTER:
{
RecInt i = 0;
bool tmp = false;
if (val_type == RECD_COUNTER) {
i = REC_readCounter(var, &tmp);
} else {
i = REC_readInteger(var, &tmp);
}
ink_snprintf(val_output, 256, "%lld", i);
break;
}
case RECD_LLONG:
{
bool tmp = false;
RecLLong i = REC_readLLong(var, &tmp);
ink_snprintf(val_output, 256, "%lld", i);
break;
}
case RECD_FLOAT:
{
bool tmp = false;
RecFloat f = REC_readFloat(var, &tmp);
ink_snprintf(val_output, 256, "%f", f);
break;
}
case RECD_STRING:
{
bool tmp;
char *s = REC_readString(var, &tmp);
ink_snprintf(val_output, 256, "%s", s);
val_output[256] = '\0';
xfree(s);
break;
}
default:
r = false;
break;
}
}
if (r) {
output_resp_hdr(id, 0);
output_raf_arg(var);
output_raf_arg(val_output);
} else {
char msg[257];
ink_snprintf(msg, 256, "%s not found", var);
msg[256] = '\0';
output_raf_error(id, msg);
}
}
int
RafCont::process_congestion_cmd(char **argv, int argc)
{
const char list_cmd[] = "list";
const char remove_cmd[] = "remove";
int qstring_index = 2;
while (qstring_index < argc) {
if ((argv[qstring_index])[0] == '-') {
qstring_index++;
} else {
break;
}
}
if (qstring_index >= argc) {
output_raf_error(argv[0], "no arguments sent to congest cmd");
return 1;
}
if (strncmp(argv[qstring_index], list_cmd, sizeof(list_cmd) - 1) == 0) {
qstring_index++;
process_congest_list(argv + qstring_index, argc - qstring_index);
} else if (strncmp(argv[qstring_index], remove_cmd, sizeof(remove_cmd) - 1) == 0) {
qstring_index++;
process_congest_remove_entries(argv + qstring_index, argc - qstring_index);
} else {
char msg[257];
ink_snprintf(msg, 256, "Node %s not found", argv[qstring_index]);
msg[256] = '\0';
output_raf_error(argv[0], msg);
}
return 1;
}
void
RafCont::process_congest_remove_entries(char **argv, int argc)
{
int index = 0;
while (index < argc) {
remove_congested_entry(argv[index++], output_buffer);
}
}
void
RafCont::process_congest_list(char **argv, int argc)
{
int list_format = 0;
if (argc > 0) {
if (strncasecmp(argv[0], "short", 5) == 0) {
list_format = 0;
} else if (strncasecmp(argv[0], "long", 4) == 0) {
list_format = 1;
if (argc > 1) {
list_format = atoi(argv[1]);
}
}
}
Action *action = get_congest_list(this, output_buffer, list_format);
if (action == ACTION_RESULT_DONE) {
// state_handle_congest_list(CONGESTION_EVENT_CONGESTED_LIST_DONE, NULL);
} else {
pending_action = action;
}
}
void
RafCont::process_query_deadhosts(const char *id)
{
Action *action = get_congest_list(this, output_buffer, 0);
if (action == ACTION_RESULT_DONE) {
// state_handle_congest_list(CONGESTION_EVENT_CONGESTED_LIST_DONE, NULL);
} else {
pending_action = action;
}
}
int
RafCont::process_query_cmd(char **argv, int argc)
{
const char stats[] = "/stats/";
const char config[] = "/etc/trafficserver/";
int qstring_index = 2;
while (qstring_index < argc) {
if ((argv[qstring_index])[0] == '-') {
qstring_index++;
} else {
break;
}
}
if (qstring_index >= argc) {
output_raf_error(argv[0], "no arguments sent to query cmd");
return 1;
}
if (strcmp(argv[qstring_index], "/*") == 0) {
output_resp_hdr(argv[0], 0);
output_raf_msg(" /stats {} /etc/trafficserver {}");
} else if (strcmp(argv[qstring_index], "deadhosts") == 0) {
process_query_deadhosts(argv[0]);
// return 0;
} else {
if (strncmp(argv[qstring_index], stats, sizeof(stats) - 1) == 0) {
char *var = argv[qstring_index] + sizeof(stats) - 1;
process_query_stat(argv[0], var);
} else if (strncmp(argv[qstring_index], config, sizeof(config) - 1) == 0) {
/* Current both stats & config use the same routine to get their info */
char *var = argv[qstring_index] + sizeof(config) - 1;
process_query_stat(argv[0], var);
} else {
char msg[257];
ink_snprintf(msg, 256, "Node %s not found", argv[qstring_index]);
msg[256] = '\0';
output_raf_error(argv[0], msg);
}
}
return 1;
}
int
RafCont::process_exit_cmd(char **argv, int argc)
{
output_resp_hdr(argv[0], 0);
output_raf_arg("Bye!");
return 0;
}
int
RafCont::process_isalive_cmd(char **argv, int argc)
{
output_resp_hdr(argv[0], 0);
output_raf_arg("alive");
return 1;
}
// void RafCont::output_raf_error(const char* msg)
//
void
RafCont::output_raf_error(const char *id, const char *msg)
{
output_resp_hdr(id, 1);
output_raf_msg(msg);
}
void
RafCont::output_resp_hdr(const char *id, int result_code)
{
output_buffer->write(id, strlen(id));
if (result_code<0 || result_code> 1) {
result_code = 1;
}
// len= space + result code(0 or 1) + space + terminator
char buf[4];
snprintf(buf, sizeof(buf), " %d ", result_code);
// Don't output trailing space for success
if (result_code == 0) {
output_buffer->write(buf, 2);
} else {
output_buffer->write(buf, 3);
}
}
// void RafCont::output_raf_arg(const char* arg)
//
// Outputs an encoded raf argument. Adds a leading space to it
//
void
RafCont::output_raf_arg(const char *arg)
{
int len = raf_encodelen(arg, -1, 0);
char *encd = (char *) arena.alloc(len + 1);
encd[0] = ' ';
int elen = raf_encode(arg, -1, encd + 1, len, 0);
output_buffer->write(encd, elen + 1);
arena.free(encd, len + 1);
}
// void RafCont::output_raf_msg(const char* arg)
//
// outputs unencoded raf msg (for error msgs)
//
void
RafCont::output_raf_msg(const char *arg)
{
output_buffer->write(arg, strlen(arg));
}