blob: c0da99c72bc34424167eef7389339f5741a58e5c [file] [log] [blame]
/*
* Copyright 1999-2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/***************************************************************************
* Description: Load balancer worker, knows how to load balance among *
* several workers. *
* Author: Gal Shachor <shachor@il.ibm.com> *
* Author: Costin Manolache
***************************************************************************/
#include "jk_pool.h"
#include "jk_service.h"
#include "jk_worker.h"
#include "jk_logger.h"
#include "jk_config.h"
#include "jk_env.h"
#include "jk_requtil.h"
#if APR_HAS_THREADS
#include "apr_thread_proc.h"
#endif
#define DEFAULT_LB_FACTOR (1.0)
/* Time to wait before retry... XXX make it configurable*/
#define WAIT_BEFORE_RECOVER 60
#define MAX_ATTEMPTS 3
#define NO_WORKER_MSG "The servlet container is temporary unavailable or being upgraded\n";
#define STICKY_SESSION 1
typedef struct
{
struct jk_mutex *cs;
int attempts;
int recovery;
int timeout;
int sticky_session;
time_t error_time;
} jk_worker_lb_private_t;
/** Find the best worker. In process, check if timeout expired
for workers that failed in the past and give them another chance.
This will check the JSESSIONID and forward to the right worker
if in a session.
It'll also adjust the load balancing factors.
*/
static jk_worker_t *jk2_get_most_suitable_worker(jk_env_t *env,
jk_worker_t *lb,
jk_ws_service_t *s,
int attempt)
{
jk_worker_t *rc = NULL;
int lb_min = 0;
int lb_max = 0;
int i;
int j;
int level;
int currentLevel = JK_LB_LEVELS - 1;
char *session_route = NULL;
char *routeRedirect = NULL;
time_t now = 0;
jk_worker_lb_private_t *lb_priv = lb->worker_private;
if (lb_priv->sticky_session) {
session_route = jk2_requtil_getSessionRoute(env, s);
}
if (lb->mbean->debug > 0) {
env->l->jkLog(env, env->l, JK_LOG_DEBUG,
"lb.get_worker %d Session=%s\n",
lb_priv->sticky_session,
(session_route ? session_route : "NULL"));
}
if (session_route) {
for (level = 0; level < JK_LB_LEVELS; level++) {
for (i = 0; i < lb->workerCnt[level]; i++) {
jk_worker_t *w = lb->workerTables[level][i];
if (w->route != NULL && 0 == strcmp(session_route, w->route)) {
if (w->routeRedirect != NULL) {
/* Session was migrated to another worker */
routeRedirect = w->routeRedirect;
break;
}
if (attempt > 0 && w->in_error_state) {
/* We already tried to revive this worker. */
break;
}
else {
return w;
}
}
}
}
}
/* We have a session - but the worker is in error state
or has a "redirect".
Try the new worker.
*/
if (routeRedirect != NULL) {
for (level = 0; level < JK_LB_LEVELS; level++) {
for (i = 0; i < lb->workerCnt[level]; i++) {
jk_worker_t *w = lb->workerTables[level][i];
if (w->route != NULL && 0 == strcmp(routeRedirect, w->route)) {
if (attempt > 0 && w->in_error_state) {
/* We already tried to revive this worker. */
break;
}
else {
return w;
}
}
}
}
}
/** Get one worker that is ready
*/
for (level = 0; level < JK_LB_LEVELS; level++) {
for (i = 0; i < lb->workerCnt[level]; i++) {
jk_worker_t *w = lb->workerTables[level][i];
if (w->mbean->disabled)
continue;
if (w->graceful)
continue;
if (w->in_error_state)
continue;
if (w->lb_disabled)
continue;
if (rc == NULL) {
rc = w;
currentLevel = level;
lb_min = w->lb_value;
continue;
}
if (w->lb_value < lb_min) {
lb_min = w->lb_value;
rc = w;
currentLevel = level;
}
}
if (rc != NULL) {
/* We found a valid worker on the current level, don't worry about the
higher levels.
*/
break;
}
if (lb->hwBalanceErr > 0) {
/* don't go to higher levels - we'll return an error
*/
currentLevel = 0;
break;
}
}
/** Reenable workers in error state if the timeout has passed.
* Don't bother with 'higher' levels, since we'll never try them.
*/
for (level = 0; level <= currentLevel; level++) {
for (i = 0; i < lb->workerCnt[level]; i++) {
jk_worker_t *w = lb->workerTables[level][i];
if (w->mbean->disabled)
continue;
if (w->lb_disabled)
continue;
if (w->in_error_state) {
/* Check if it's ready for recovery */
if (now == 0)
now = time(NULL);
if ((now - w->error_time) > lb_priv->recovery) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb.getWorker() reenable %s\n",
w->mbean->name);
w->in_error_state = JK_FALSE;
/* Find max lb */
if (lb_max == 0) {
for (j = 0; j < lb->workerCnt[level]; j++) {
if (lb->workerTables[level][j]->lb_value > lb_max) {
lb_max = lb->workerTables[level][j]->lb_value;
}
}
}
w->lb_value = lb_max;
}
}
}
}
/* If no active worker is found, we'll try all workers in error_state,
*/
if (rc == NULL) {
/* no workers found (rc is null), now try as hard as possible to get a
worker anyway, pick one with largest error time.. */
int error_workers = 0;
env->l->jkLog(env, env->l, JK_LOG_INFO,
"lb.getWorker() All workers in error state, use the one with oldest error\n");
for (level = 0; level < JK_LB_LEVELS; level++) {
for (i = 0; i < lb->workerCnt[level]; i++) {
jk_worker_t *w = lb->workerTables[level][i];
if (w->mbean->disabled == JK_TRUE)
continue;
if (w->graceful)
continue;
if (w->lb_disabled)
continue;
error_workers++;
if (rc == NULL) {
rc = w;
currentLevel = level;
continue;
}
/* pick the oldest failed worker */
if (w->error_time < rc->error_time) {
currentLevel = level;
rc = w;
}
}
if (lb->hwBalanceErr > 0) {
/* Don't try higher levels, only level=0 */
break;
}
}
if (attempt >= error_workers) {
env->l->jkLog(env, env->l, JK_LOG_INFO,
"lb.getWorker() We tried all possible workers %d\n",
attempt);
return NULL;
}
}
if (rc != NULL) {
/* It it's the default, it'll remain the default - we don't
increase the factor
*/
rc->in_error_state = JK_FALSE;
if (rc->lb_value != 0) {
int newValue = rc->lb_value + rc->lb_factor;
if (newValue > 255) {
rc->lb_value = rc->lb_factor;
/* Roll over. This has 2 goals:
- avoid the lb factor becoming too big, and give a chance to run to
workers that were in error state ( I think it's cleaner than looking for "max" )
- the actual lb_value will be 1 byte. Even on the craziest platform, that
will be an atomic write. We do a lot of operations on lb_value in a MT environment,
and the chance of reading something inconsistent is considerable. Since APR
will not support atomic - and adding a CS would cost too much, this is actually
a good solution.
Note that lb_value is not used for anything critical - just to balance the load,
the worst that may happen is having a worker stay idle for 255 requests.
*/
for (i = 0; i < lb->workerCnt[currentLevel]; i++) {
jk_worker_t *w = lb->workerTables[currentLevel][i];
w->lb_value = w->lb_factor;
}
}
else {
rc->lb_value = newValue;
}
}
}
return rc;
}
/** Get the best worker and forward to it.
Since we don't directly connect to anything, there's no
need for an endpoint.
*/
static int JK_METHOD jk2_lb_service(jk_env_t *env,
jk_worker_t *lb, jk_ws_service_t *s)
{
int attempt = 0;
jk_workerEnv_t *wEnv = lb->workerEnv;
jk_worker_lb_private_t *lb_priv = lb->worker_private;
jk_worker_t *rec = NULL;
if (s == NULL) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb.service() NullPointerException\n");
return JK_ERR;
}
/* you can not recover on another load balancer */
s->realWorker = NULL;
/* Check for configuration updates
*/
if (wEnv->shm != NULL && wEnv->shm->head != NULL) {
/* We have shm, let's check for updates. This is just checking one
memory location, no lock involved. It is possible we'll read it
while somebody else is changing - but that's ok, we just check for
equality.
*/
/* We should check this periodically
*/
if (wEnv->config->ver != wEnv->shm->head->lbVer) {
wEnv->config->update(env, wEnv->config, NULL);
wEnv->config->ver = wEnv->shm->head->lbVer;
}
}
/* Initialize here the recovery POST buffer */
s->reco_buf = jk2_msg_ajp_create(env, s->pool, 0);
s->reco_status = RECO_INITED;
while (1) {
int rc;
/* Since there is potential worker state change
* make the find best worker call thread safe
*/
if (lb_priv->cs != NULL)
lb_priv->cs->lock(env, lb_priv->cs);
rec = jk2_get_most_suitable_worker(env, lb, s, attempt);
if (lb_priv->cs != NULL)
lb_priv->cs->unLock(env, lb_priv->cs);
attempt++;
s->is_recoverable_error = JK_FALSE;
if (rec == NULL) {
/* NULL record, no more workers left ... */
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.service() all workers in error or disabled state\n");
/* set hwBalanceErr status */
if (lb->hwBalanceErr != 0) {
s->status = lb->hwBalanceErr;
}
else {
s->status = lb->noWorkerCode; /* SERVICE_UNAVAILABLE is the default */
}
/* Don't touch headers if noErrorHeader set to TRUE, ie ErrorDocument in Apache via mod_alias */
if (!lb->noErrorHeader) {
/* XXX: I didn't understand the 302, since s->status is lb->hwBalanceErr or lb->noWorkerCode */
/* Both could be set in workers2.properties so ..... */
if (s->status == 302) {
s->headers_out->put(env, s->headers_out,
"Location", lb->noWorkerMsg, NULL);
s->head(env, s);
}
else {
s->headers_out->put(env, s->headers_out,
"Content-Type", "text/html", NULL);
s->head(env, s);
s->jkprintf(env, s, lb->noWorkerMsg);
}
}
s->afterRequest(env, s);
lb_priv->error_time = time(NULL);
return JK_ERR;
}
if (lb->mbean->debug > 0)
env->l->jkLog(env, env->l, JK_LOG_DEBUG,
"lb.service() try %s\n", rec->mbean->name);
if (rec->route == NULL) {
rec->route = rec->mbean->localName;
}
s->jvm_route = rec->route;
s->realWorker = rec;
rc = rec->service(env, rec, s);
if (rc == JK_OK) {
rec->in_error_state = JK_FALSE;
rec->error_time = 0;
return JK_OK;
}
/* If this is a browser connection error dont't check other
* workers.
*/
if (rc == JK_HANDLER_ERROR) {
rec->in_error_state = JK_FALSE;
rec->error_time = 0;
return JK_HANDLER_ERROR;
}
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb.service() worker failed %d for %s\n", rc,
rec->mbean->name);
/*
* Service failed !!!
*
* Time for fault tolerance (if possible)...
*/
rec->in_error_state = JK_TRUE;
rec->error_time = time(NULL);
if (!s->is_recoverable_error) {
/* Error is not recoverable - break with an error. */
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb.service() unrecoverable error...\n");
break;
}
/*
* Error is recoverable by submitting the request to
* another worker... Lets try to do that.
*/
if (lb->mbean->debug > 0) {
env->l->jkLog(env, env->l, JK_LOG_DEBUG,
"lb_worker.service() try other hosts\n");
}
}
return JK_ERR;
}
/** Init internal structures.
Called any time the config changes
*/
static int JK_METHOD jk2_lb_refresh(jk_env_t *env, jk_worker_t *lb)
{
int i;
int num_of_workers = lb->lbWorkerMap->size(env, lb->lbWorkerMap);
for (i = 0; i < num_of_workers; i++) {
char *name = lb->lbWorkerMap->nameAt(env, lb->lbWorkerMap, i);
jk_worker_t *w = env->getByName(env, name);
int level = 0;
int pos;
int workerCnt;
if (w == NULL) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.init(): no worker found %s\n", name);
continue;
}
if (w->mbean->disabled)
continue;
level = w->level;
/* It's like disabled */
if (level >= JK_LB_LEVELS)
continue;
/* check if worker is already in the table */
workerCnt = lb->workerCnt[level];
for (pos = 0; pos < workerCnt; pos++) {
if (lb->workerTables[level][pos] == w) {
break;
}
}
if (pos == workerCnt) {
if (pos == JK_LB_MAX_WORKERS) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.init(): maximum lb workers reached %s\n",
name);
continue;
}
pos = lb->workerCnt[level]++;
lb->workerTables[level][pos] = w;
w->lb_value = w->lb_factor;
w->in_error_state = JK_FALSE;
}
}
return JK_OK;
}
static char *jk2_worker_lb_multiValueInfo[] = { "worker", NULL };
static char *jk2_worker_lb_setAttributeInfo[] =
{ "attempts", "stickySession", "recovery", "timeout",
"hwBalanceErr", "noErrorHeader", "noWorkerMsg", "noWorkerCode", "worker",
NULL
};
static char *jk2_worker_lb_getAttributeInfo[] =
{ "attempts", "stickySession", "recovery", "timeout",
"hwBalanceErr", "noErrorHeader", "noWorkerMsg", "noWorkerCode", "workers",
NULL
};
static void *JK_METHOD jk2_lb_getAttribute(jk_env_t *env, jk_bean_t *mbean,
char *name)
{
jk_worker_t *lb = mbean->object;
jk_worker_lb_private_t *lb_priv = lb->worker_private;
if (strcmp(name, "workers") == 0) {
return jk2_map_concatKeys(env, lb->lbWorkerMap, ":");
}
else if (strcmp(name, "noWorkerMsg") == 0) {
return lb->noWorkerMsg;
}
else if (strcmp(name, "noWorkerCode") == 0) {
return jk2_env_itoa(env, lb->noWorkerCode);
}
else if (strcmp(name, "hwBalanceErr") == 0) {
return jk2_env_itoa(env, lb->hwBalanceErr);
}
else if (strcmp(name, "noErrorHeader") == 0) {
return jk2_env_itoa(env, lb->noErrorHeader);
}
else if (strcmp(name, "timeout") == 0) {
return jk2_env_itoa(env, lb_priv->timeout);
}
else if (strcmp(name, "recovery") == 0) {
return jk2_env_itoa(env, lb_priv->recovery);
}
else if (strcmp(name, "stickySession") == 0) {
return jk2_env_itoa(env, lb_priv->sticky_session);
}
else if (strcmp(name, "attempts") == 0) {
return jk2_env_itoa(env, lb_priv->attempts);
}
return NULL;
}
static int JK_METHOD jk2_lb_setAttribute(jk_env_t *env, jk_bean_t *mbean,
char *name, void *valueP)
{
jk_worker_t *lb = mbean->object;
char *value = valueP;
jk_worker_lb_private_t *lb_priv = lb->worker_private;
if (strcmp(name, "worker") == 0) {
if (lb->lbWorkerMap->get(env, lb->lbWorkerMap, value) != NULL) {
/* Already added */
return JK_OK;
}
value = lb->mbean->pool->pstrdup(env, lb->mbean->pool, value);
lb->lbWorkerMap->add(env, lb->lbWorkerMap, value, "");
if (lb->mbean->debug > 0)
env->l->jkLog(env, env->l, JK_LOG_DEBUG,
"lb_worker.setAttribute(): Adding to %s: %s\n",
lb->mbean->localName, value);
jk2_lb_refresh(env, lb);
return JK_OK;
}
else if (strcmp(name, "noWorkerMsg") == 0) {
lb->noWorkerMsg = value;
}
else if (strcmp(name, "noWorkerCode") == 0) {
lb->noWorkerCode = atoi(value);
}
else if (strcmp(name, "hwBalanceErr") == 0) {
lb->hwBalanceErr = atoi(value);
}
else if (strcmp(name, "noErrorHeader") == 0) {
lb->noErrorHeader = atoi(value);
}
else if (strcmp(name, "timeout") == 0) {
lb_priv->timeout = atoi(value);
}
else if (strcmp(name, "recovery") == 0) {
lb_priv->recovery = atoi(value);
}
else if (strcmp(name, "stickySession") == 0) {
lb_priv->sticky_session = atoi(value);
}
else if (strcmp(name, "attempts") == 0) {
lb_priv->attempts = atoi(value);
}
return JK_ERR;
}
static int JK_METHOD jk2_lb_init(jk_env_t *env, jk_bean_t *bean)
{
jk_worker_t *lb = bean->object;
int err;
int i = 0;
int num_of_workers = lb->lbWorkerMap->size(env, lb->lbWorkerMap);
err = jk2_lb_refresh(env, lb);
if (err != JK_OK)
return err;
/* if( lb->workerEnv->shm != NULL && lb->workerEnv->shm->head != NULL) */
/* jk2_lb_updateWorkers(env, lb, lb->workerEnv->shm); */
if (lb->mbean->debug > 0)
env->l->jkLog(env, env->l, JK_LOG_DEBUG, "lb.init() %s %d workers\n",
lb->mbean->name, num_of_workers);
return JK_OK;
}
static int JK_METHOD jk2_lb_destroy(jk_env_t *env, jk_bean_t *bean)
{
/* Workers are destroyed by the workerEnv. It is possible
that a worker is part of more than a lb.
Nothing to clean up so far.
*/
return JK_OK;
}
int JK_METHOD jk2_worker_lb_factory(jk_env_t *env, jk_pool_t *pool,
jk_bean_t *result, char *type, char *name)
{
jk_worker_t *w;
int i;
jk_bean_t *jkb;
jk_worker_lb_private_t *worker_private;
if (NULL == name) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.factory() NullPointerException\n");
return JK_ERR;
}
w = (jk_worker_t *)pool->calloc(env, pool, sizeof(jk_worker_t));
if (w == NULL) {
env->l->jkLog(env, env->l, JK_LOG_ERROR,
"lb_worker.factory() OutOfMemoryException\n");
return JK_ERR;
}
jkb = env->createBean2(env, pool, "threadMutex", NULL);
if (jkb != NULL) {
w->cs = jkb->object;
jkb->init(env, jkb);
}
worker_private = (jk_worker_lb_private_t *) pool->calloc(env,
pool,
sizeof
(jk_worker_lb_private_t));
jkb = env->createBean2(env, pool, "threadMutex", NULL);
if (jkb != NULL) {
worker_private->cs = jkb->object;
jkb->init(env, jkb);
}
worker_private->attempts = MAX_ATTEMPTS;
worker_private->recovery = WAIT_BEFORE_RECOVER;
worker_private->sticky_session = STICKY_SESSION;
w->worker_private = worker_private;
w->service = jk2_lb_service;
for (i = 0; i < JK_LB_LEVELS; i++) {
w->workerCnt[i] = 0;
}
jk2_map_default_create(env, &w->lbWorkerMap, pool);
result->init = jk2_lb_init;
result->destroy = jk2_lb_destroy;
result->setAttribute = jk2_lb_setAttribute;
result->multiValueInfo = jk2_worker_lb_multiValueInfo;
result->setAttributeInfo = jk2_worker_lb_setAttributeInfo;
result->object = w;
w->mbean = result;
w->hwBalanceErr = 0;
w->noWorkerCode = 503;
w->noWorkerMsg = NO_WORKER_MSG;
/* Let Apache handle the error via ErrorDocument and mod_alias */
w->noErrorHeader = 1;
w->workerEnv = env->getByName(env, "workerEnv");
w->workerEnv->addWorker(env, w->workerEnv, w);
return JK_OK;
}