blob: a3a267e518e300c84a6dac6cd3c423f7cfa22e75 [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/****************************************************************************
ClusterLoadMonitor.cc
****************************************************************************/
#include "P_Cluster.h"
int
ClusterLoadMonitor::cf_monitor_enabled;
int
ClusterLoadMonitor::cf_ping_message_send_msec_interval;
int
ClusterLoadMonitor::cf_num_ping_response_buckets;
int
ClusterLoadMonitor::cf_msecs_per_ping_response_bucket;
int
ClusterLoadMonitor::cf_ping_latency_threshold_msecs;
int
ClusterLoadMonitor::cf_cluster_load_compute_msec_interval;
int
ClusterLoadMonitor::cf_cluster_periodic_msec_interval;
int
ClusterLoadMonitor::cf_ping_history_buf_length;
int
ClusterLoadMonitor::cf_cluster_load_clear_duration;
int
ClusterLoadMonitor::cf_cluster_load_exceed_duration;
ClusterLoadMonitor::ClusterLoadMonitor(ClusterHandler * ch)
:Continuation(0), ch(ch), ping_history_buf_head(0),
periodic_action(0), cluster_overloaded(0), cancel_periodic(0),
cluster_load_msg_sequence_number(0), cluster_load_msg_start_sequence_number(0)
{
mutex = this->ch->mutex;
SET_HANDLER(&ClusterLoadMonitor::cluster_load_periodic);
ping_message_send_msec_interval = cf_ping_message_send_msec_interval ? cf_ping_message_send_msec_interval : 100;
Debug("cluster_monitor", "ping_message_send_msec_interval=%d", ping_message_send_msec_interval);
num_ping_response_buckets = cf_num_ping_response_buckets ? cf_num_ping_response_buckets : 100;
Debug("cluster_monitor", "num_ping_response_buckets=%d", num_ping_response_buckets);
msecs_per_ping_response_bucket = cf_msecs_per_ping_response_bucket ? cf_msecs_per_ping_response_bucket : 50;
Debug("cluster_monitor", "msecs_per_ping_response_bucket=%d", msecs_per_ping_response_bucket);
ping_latency_threshold_msecs = cf_ping_latency_threshold_msecs ? cf_ping_latency_threshold_msecs : 500;
Debug("cluster_monitor", "ping_latency_threshold_msecs=%d", ping_latency_threshold_msecs);
cluster_load_compute_msec_interval =
cf_cluster_load_compute_msec_interval ? cf_cluster_load_compute_msec_interval : 5000;
Debug("cluster_monitor", "cluster_load_compute_msec_interval=%d", cluster_load_compute_msec_interval);
cluster_periodic_msec_interval = cf_cluster_periodic_msec_interval ? cf_cluster_periodic_msec_interval : 100;
Debug("cluster_monitor", "cluster_periodic_msec_interval=%d", cluster_periodic_msec_interval);
ping_history_buf_length = cf_ping_history_buf_length ? cf_ping_history_buf_length : 120;
Debug("cluster_monitor", "ping_history_buf_length=%d", ping_history_buf_length);
cluster_load_clear_duration = cf_cluster_load_clear_duration ? cf_cluster_load_clear_duration : 24;
Debug("cluster_monitor", "cluster_load_clear_duration=%d", cluster_load_clear_duration);
cluster_load_exceed_duration = cf_cluster_load_exceed_duration ? cf_cluster_load_exceed_duration : 4;
Debug("cluster_monitor", "cluster_load_exceed_duration=%d", cluster_load_exceed_duration);
int nbytes = sizeof(int) * num_ping_response_buckets;
ping_response_buckets = (int *)ats_malloc(nbytes);
memset((char *) ping_response_buckets, 0, nbytes);
nbytes = sizeof(ink_hrtime) * ping_history_buf_length;
ping_response_history_buf = (ink_hrtime *)ats_malloc(nbytes);
memset((char *) ping_response_history_buf, 0, nbytes);
last_ping_message_sent = HRTIME_SECONDS(0);
last_cluster_load_compute = HRTIME_SECONDS(0);
}
void
ClusterLoadMonitor::init()
{
periodic_action = eventProcessor.schedule_every(this, HRTIME_MSECONDS(cluster_periodic_msec_interval), ET_CALL);
}
ClusterLoadMonitor::~ClusterLoadMonitor()
{
//
// Note: Since the ClusterLoadMonitor is only associated
// with the ClusterHandler, a periodic callback operating
// on a freed ClusterLoadMonitor is not possible, since the
// ClusterHandler is only deleted after several minutes. Allowing
// plenty of time for the periodic to cancel itself via the
// "cancel_periodic" flag.
//
ink_release_assert(!periodic_action);
if (ping_response_buckets) {
ats_free(ping_response_buckets);
ping_response_buckets = 0;
}
if (ping_response_history_buf) {
ats_free(ping_response_history_buf);
ping_response_history_buf = 0;
}
}
void
ClusterLoadMonitor::cancel_monitor()
{
if (!cancel_periodic)
cancel_periodic = 1;
}
bool ClusterLoadMonitor::is_cluster_overloaded()
{
return (cluster_overloaded ? true : false);
}
void
ClusterLoadMonitor::compute_cluster_load()
{
// Compute ping message latency by scanning the response time
// buckets and averaging the results.
int n;
int sum = 0;
int entries = 0;
int n_bucket = 0;
for (n = 0; n < num_ping_response_buckets; ++n) {
if (ping_response_buckets[n]) {
entries += ping_response_buckets[n];
sum += (ping_response_buckets[n] * (n + 1));
}
ping_response_buckets[n] = 0;
}
if (entries) {
n_bucket = sum / entries;
} else {
n_bucket = 1;
}
ink_hrtime current_ping_latency = HRTIME_MSECONDS(n_bucket * msecs_per_ping_response_bucket);
// Invalidate messages associated with this sample interval
cluster_load_msg_start_sequence_number = cluster_load_msg_sequence_number;
// Log ping latency in history buffer.
ping_response_history_buf[ping_history_buf_head++] = current_ping_latency;
ping_history_buf_head = ping_history_buf_head % ping_history_buf_length;
// Determine the current state of the cluster interconnect using
// the configured limits. We determine the state as follows.
// if (cluster overloaded)
// Determine if it is still in the overload state by examining
// the last 'cluster_load_clear_duration' entries in the history
// buffer and declaring it not overloaded if none of the entries
// exceed the threshold.
// else
// Determine if it is now in the overload state by examining
// the last 'cluster_load_exceed_duration' entries in the history
// buffer and declaring it overloaded if all of the entries
// exceed the threshold.
int start, end;
ink_hrtime ping_latency_threshold = HRTIME_MSECONDS(ping_latency_threshold_msecs);
start = ping_history_buf_head - 1;
if (start < 0)
start += ping_history_buf_length;
end = start;
if (cluster_overloaded) {
end -= (cluster_load_clear_duration <= ping_history_buf_length ?
cluster_load_clear_duration : ping_history_buf_length);
} else {
end -= (cluster_load_exceed_duration <= ping_history_buf_length ?
cluster_load_exceed_duration : ping_history_buf_length);
}
if (end < 0)
end += ping_history_buf_length;
int threshold_clear = 0;
int threshold_exceeded = 0;
do {
if (ping_response_history_buf[start] >= ping_latency_threshold)
++threshold_exceeded;
else
++threshold_clear;
if (--start < 0)
start = start + ping_history_buf_length;
} while (start != end);
if (cluster_overloaded) {
if (threshold_exceeded == 0)
cluster_overloaded = 0;
} else {
if (threshold_exceeded && (threshold_clear == 0))
cluster_overloaded = 1;
}
Debug("cluster_monitor",
"[%u.%u.%u.%u] overload=%d, clear=%d, exceed=%d, latency=%d",
DOT_SEPARATED(this->ch->machine->ip), cluster_overloaded, threshold_clear, threshold_exceeded, n_bucket);
}
void
ClusterLoadMonitor::note_ping_response_time(ink_hrtime response_time, int sequence_number)
{
#ifdef CLUSTER_TOMCAT
ProxyMutex *mutex = this->ch->mutex; // hack for stats
#endif
CLUSTER_SUM_DYN_STAT(CLUSTER_PING_TIME_STAT, response_time);
int bucket = (int)
(response_time / HRTIME_MSECONDS(msecs_per_ping_response_bucket));
Debug("cluster_monitor_ping", "[%u.%u.%u.%u] ping: %d %d", DOT_SEPARATED(this->ch->machine->ip), bucket, sequence_number);
if (bucket >= num_ping_response_buckets)
bucket = num_ping_response_buckets - 1;
ink_atomic_increment(&ping_response_buckets[bucket], 1);
}
void
ClusterLoadMonitor::recv_cluster_load_msg(cluster_load_ping_msg * m)
{
// We have received back our ping message.
ink_hrtime now = ink_get_hrtime();
if ((now >= m->send_time)
&& ((m->sequence_number >= cluster_load_msg_start_sequence_number)
&& (m->sequence_number < cluster_load_msg_sequence_number))) {
// Valid message, note response time.
note_ping_response_time(now - m->send_time, m->sequence_number);
}
}
void
ClusterLoadMonitor::cluster_load_ping_rethandler(ClusterHandler *ch, void *data, int len)
{
// Global cluster load ping message return handler which
// dispatches the result to the class specific handler.
if (ch) {
if (len == sizeof(struct cluster_load_ping_msg)) {
struct cluster_load_ping_msg m;
memcpy((void *) &m, data, len); // unmarshal
if (m.monitor && (m.magicno == cluster_load_ping_msg::CL_MSG_MAGICNO)
&& (m.version == cluster_load_ping_msg::CL_MSG_VERSION)) {
m.monitor->recv_cluster_load_msg(&m);
}
}
}
}
void
ClusterLoadMonitor::send_cluster_load_msg(ink_hrtime current_time)
{
// Build and send cluster load ping message.
struct cluster_load_ping_msg m(this);
m.sequence_number = cluster_load_msg_sequence_number++;
m.send_time = current_time;
cluster_ping(ch, cluster_load_ping_rethandler, (void *) &m, sizeof(m));
}
int
ClusterLoadMonitor::cluster_load_periodic(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
// Perform periodic cluster load computation actions.
if (cancel_periodic) {
periodic_action->cancel();
periodic_action = 0;
return EVENT_DONE;
}
if (!cf_monitor_enabled) {
return EVENT_CONT;
}
// Generate periodic ping messages.
ink_hrtime current_time = ink_get_hrtime();
if ((current_time - last_ping_message_sent) > HRTIME_MSECONDS(ping_message_send_msec_interval)) {
send_cluster_load_msg(current_time);
last_ping_message_sent = current_time;
}
// Compute cluster load.
if ((current_time - last_cluster_load_compute) > HRTIME_MSECONDS(cluster_load_compute_msec_interval)) {
compute_cluster_load();
last_cluster_load_compute = current_time;
}
return EVENT_CONT;
}
// End of ClusterLoadMonitor.cc