blob: 02c090bc5c8bcf41d2b82f3b84f8f4348433fac3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/***************************************************************************
* Description: common stuff for bi-directional protocols ajp13/ajp14. *
* Author: Gal Shachor <shachor@il.ibm.com> *
* Author: Henri Gomez <hgomez@apache.org> *
***************************************************************************/
#include "jk_global.h"
#include "jk_util.h"
#include "jk_ajp13.h"
#include "jk_ajp14.h"
#include "jk_ajp_common.h"
#include "jk_connect.h"
#if defined(AS400) && !defined(AS400_UTF8)
#include "util_ebcdic.h"
#endif
/* Macro for checking the availability of the cache slot
*/
#define IS_SLOT_AVAIL(s) ((s) != NULL && (s)->avail)
const char *response_trans_headers[] = {
"Content-Type",
"Content-Language",
"Content-Length",
"Date",
"Last-Modified",
"Location",
"Set-Cookie",
"Set-Cookie2",
"Servlet-Engine",
"Status",
"WWW-Authenticate"
};
static const char *long_res_header_for_sc(int sc)
{
const char *rc = NULL;
sc = sc & 0X00FF;
if (sc <= SC_RES_HEADERS_NUM && sc > 0) {
rc = response_trans_headers[sc - 1];
}
return rc;
}
static const char *ajp_state_type[] = {
JK_AJP_STATE_TEXT_IDLE,
JK_AJP_STATE_TEXT_OK,
JK_AJP_STATE_TEXT_ERROR,
JK_AJP_STATE_TEXT_PROBE,
"unknown",
NULL
};
static char ajp_cping_mode[] = {
AJP_CPING_CONNECT_TEXT,
AJP_CPING_PREPOST_TEXT,
AJP_CPING_INTERVAL_TEXT,
};
#define UNKNOWN_METHOD (-1)
static int sc_for_req_method(const char *method, size_t len)
{
/* Note: the following code was generated by the "shilka" tool from
the "cocom" parsing/compilation toolkit. It is an optimized lookup
based on analysis of the input keywords. Postprocessing was done
on the shilka output, but the basic structure and analysis is
from there. Should new HTTP methods be added, then manual insertion
into this code is fine, or simply re-running the shilka tool on
the appropriate input. */
/* Note: it is also quite reasonable to just use our method_registry,
but I'm assuming (probably incorrectly) we want more speed here
(based on the optimizations the previous code was doing). */
switch (len) {
case 3:
switch (method[0]) {
case 'A':
return (method[1] == 'C'
&& method[2] == 'L'
? SC_M_ACL : UNKNOWN_METHOD);
case 'P':
return (method[1] == 'U'
&& method[2] == 'T'
? SC_M_PUT : UNKNOWN_METHOD);
case 'G':
return (method[1] == 'E'
&& method[2] == 'T'
? SC_M_GET : UNKNOWN_METHOD);
default:
return UNKNOWN_METHOD;
}
case 4:
switch (method[0]) {
case 'H':
return (method[1] == 'E'
&& method[2] == 'A'
&& method[3] == 'D'
? SC_M_HEAD : UNKNOWN_METHOD);
case 'P':
return (method[1] == 'O'
&& method[2] == 'S'
&& method[3] == 'T'
? SC_M_POST : UNKNOWN_METHOD);
case 'M':
return (method[1] == 'O'
&& method[2] == 'V'
&& method[3] == 'E'
? SC_M_MOVE : UNKNOWN_METHOD);
case 'L':
return (method[1] == 'O'
&& method[2] == 'C'
&& method[3] == 'K'
? SC_M_LOCK : UNKNOWN_METHOD);
case 'C':
return (method[1] == 'O'
&& method[2] == 'P'
&& method[3] == 'Y'
? SC_M_COPY : UNKNOWN_METHOD);
default:
return UNKNOWN_METHOD;
}
case 5:
switch (method[2]) {
case 'R':
return (memcmp(method, "MERGE", 5) == 0
? SC_M_MERGE : UNKNOWN_METHOD);
case 'C':
return (memcmp(method, "MKCOL", 5) == 0
? SC_M_MKCOL : UNKNOWN_METHOD);
case 'B':
return (memcmp(method, "LABEL", 5) == 0
? SC_M_LABEL : UNKNOWN_METHOD);
case 'A':
return (memcmp(method, "TRACE", 5) == 0
? SC_M_TRACE : UNKNOWN_METHOD);
default:
return UNKNOWN_METHOD;
}
case 6:
switch (method[0]) {
case 'U':
switch (method[5]) {
case 'K':
return (memcmp(method, "UNLOCK", 6) == 0
? SC_M_UNLOCK : UNKNOWN_METHOD);
case 'E':
return (memcmp(method, "UPDATE", 6) == 0
? SC_M_UPDATE : UNKNOWN_METHOD);
default:
return UNKNOWN_METHOD;
}
case 'R':
return (memcmp(method, "REPORT", 6) == 0
? SC_M_REPORT : UNKNOWN_METHOD);
case 'S':
return (memcmp(method, "SEARCH", 6) == 0
? SC_M_SEARCH : UNKNOWN_METHOD);
case 'D':
return (memcmp(method, "DELETE", 6) == 0
? SC_M_DELETE : UNKNOWN_METHOD);
default:
return UNKNOWN_METHOD;
}
case 7:
switch (method[1]) {
case 'P':
return (memcmp(method, "OPTIONS", 7) == 0
? SC_M_OPTIONS : UNKNOWN_METHOD);
case 'H':
return (memcmp(method, "CHECKIN", 7) == 0
? SC_M_CHECKIN : UNKNOWN_METHOD);
default:
return UNKNOWN_METHOD;
}
case 8:
switch (method[0]) {
case 'P':
return (memcmp(method, "PROPFIND", 8) == 0
? SC_M_PROPFIND : UNKNOWN_METHOD);
case 'C':
return (memcmp(method, "CHECKOUT", 8) == 0
? SC_M_CHECKOUT : UNKNOWN_METHOD);
default:
return UNKNOWN_METHOD;
}
case 9:
return (memcmp(method, "PROPPATCH", 9) == 0
? SC_M_PROPPATCH : UNKNOWN_METHOD);
case 10:
switch (method[0]) {
case 'U':
return (memcmp(method, "UNCHECKOUT", 10) == 0
? SC_M_UNCHECKOUT : UNKNOWN_METHOD);
case 'M':
return (memcmp(method, "MKACTIVITY", 10) == 0
? SC_M_MKACTIVITY : UNKNOWN_METHOD);
default:
return UNKNOWN_METHOD;
}
case 11:
return (memcmp(method, "MKWORKSPACE", 11) == 0
? SC_M_MKWORKSPACE : UNKNOWN_METHOD);
case 15:
return (memcmp(method, "VERSION-CONTROL", 15) == 0
? SC_M_VERSION_CONTROL : UNKNOWN_METHOD);
case 16:
return (memcmp(method, "BASELINE-CONTROL", 16) == 0
? SC_M_BASELINE_CONTROL : UNKNOWN_METHOD);
default:
return UNKNOWN_METHOD;
}
/* NOTREACHED */
}
static int sc_for_req_header(const char *header_name)
{
char header[16];
size_t len = strlen(header_name);
const char *p = header_name;
int i = 0;
/* ACCEPT-LANGUAGE is the longest header
* that is of interest.
*/
if (len < 4 || len > 15)
return UNKNOWN_METHOD;
while (*p) {
header[i++] = toupper((unsigned char)*p);
p++;
}
header[i] = '\0';
p = &header[1];
/* Always do memcmp including the final \0-termination character.
*/
switch (header[0]) {
case 'A':
if (memcmp(p, "CCEPT", 6) == 0) {
if (!header[6])
return SC_ACCEPT;
if (header[6] == '-') {
p += 6;
if (memcmp(p, "CHARSET", 8) == 0)
return SC_ACCEPT_CHARSET;
if (memcmp(p, "ENCODING", 9) == 0)
return SC_ACCEPT_ENCODING;
if (memcmp(p, "LANGUAGE", 9) == 0)
return SC_ACCEPT_LANGUAGE;
}
return UNKNOWN_METHOD;
}
if (memcmp(p, "UTHORIZATION", 13) == 0)
return SC_AUTHORIZATION;
break;
case 'C':
if(memcmp(p, "OOKIE2", 7) == 0)
return SC_COOKIE2;
if (memcmp(p, "OOKIE", 6) == 0)
return SC_COOKIE;
if(memcmp(p, "ONNECTION", 10) == 0)
return SC_CONNECTION;
if(memcmp(p, "ONTENT-TYPE", 12) == 0)
return SC_CONTENT_TYPE;
if(memcmp(p, "ONTENT-LENGTH", 14) == 0)
return SC_CONTENT_LENGTH;
break;
case 'H':
if(memcmp(p, "OST", 4) == 0)
return SC_HOST;
break;
case 'P':
if(memcmp(p, "RAGMA", 6) == 0)
return SC_PRAGMA;
break;
case 'R':
if(memcmp(p, "EFERER", 7) == 0)
return SC_REFERER;
break;
case 'U':
if(memcmp(p, "SER-AGENT", 10) == 0)
return SC_USER_AGENT;
break;
default:
break;;
}
return UNKNOWN_METHOD;
}
/* Return the string representation of the worker state
*/
const char *jk_ajp_get_state(ajp_worker_t *aw, jk_log_context_t *l)
{
return ajp_state_type[aw->s->state];
}
/* Return the int representation of the worker state
*/
int jk_ajp_get_state_code(const char *v)
{
if (!v)
return JK_AJP_STATE_DEF;
if (*v == 'i' || *v == 'I' || *v == 'n' || *v == 'N' || *v == '0')
return JK_AJP_STATE_IDLE;
if (*v == 'o' || *v == 'O' || *v == '1')
return JK_AJP_STATE_OK;
if (*v == 'e' || *v == 'E' || *v == '4')
return JK_AJP_STATE_ERROR;
if (*v == 'p' || *v == 'P' || *v == '6')
return JK_AJP_STATE_PROBE;
return JK_AJP_STATE_DEF;
}
void jk_ajp_get_cping_text(int mode, char *buf)
{
int bit = 1;
int log2 = 0;
int pos = 0;
while (bit <= mode && bit <= AJP_CPING_MAX) {
if (mode & bit) {
buf[pos] = ajp_cping_mode[log2];
pos +=1;
}
bit *= 2;
log2 += 1;
}
buf[pos] = '\0';
}
int jk_ajp_get_cping_mode(const char *m, int def)
{
int mv = 0;
if (!m)
return def;
while (*m != '\0') {
if (*m == AJP_CPING_CONNECT_TEXT ||
*m == tolower(AJP_CPING_CONNECT_TEXT))
mv |= AJP_CPING_CONNECT;
if (*m == AJP_CPING_PREPOST_TEXT ||
*m == tolower(AJP_CPING_PREPOST_TEXT))
mv |= AJP_CPING_PREPOST;
if (*m == AJP_CPING_INTERVAL_TEXT ||
*m == tolower(AJP_CPING_INTERVAL_TEXT))
mv |= AJP_CPING_INTERVAL;
if (*m == AJP_CPING_ALL_TEXT ||
*m == tolower(AJP_CPING_ALL_TEXT)) {
mv = AJP_CPING_CONNECT | AJP_CPING_PREPOST | AJP_CPING_INTERVAL;
break;
}
m++;
}
if (mv)
return mv;
return def;
}
/*
* Message structure
*
*
AJPV13_REQUEST/AJPV14_REQUEST=
request_prefix (1) (byte)
method (byte)
protocol (string)
req_uri (string)
remote_addr (string)
remote_host (string)
server_name (string)
server_port (short)
is_ssl (boolean)
num_headers (short)
num_headers*(req_header_name header_value)
?context (byte)(string)
?servlet_path (byte)(string)
?remote_user (byte)(string)
?auth_type (byte)(string)
?query_string (byte)(string)
?route (byte)(string)
?ssl_cert (byte)(string)
?ssl_cipher (byte)(string)
?ssl_session (byte)(string)
?ssl_key_size (byte)(int) via JkOptions +ForwardKeySize
request_terminator (byte)
?body content_length*(var binary)
*/
static int ajp_marshal_into_msgb(jk_msg_buf_t *msg,
jk_ws_service_t *s,
jk_log_context_t *l, ajp_endpoint_t * ae)
{
int method;
unsigned int i;
JK_TRACE_ENTER(l);
if ((method = sc_for_req_method(s->method,
strlen(s->method))) == UNKNOWN_METHOD)
method = SC_M_JK_STORED;
if (jk_b_append_byte(msg, JK_AJP13_FORWARD_REQUEST) ||
jk_b_append_byte(msg, (unsigned char)method) ||
jk_b_append_string(msg, s->protocol) ||
jk_b_append_string(msg, s->req_uri) ||
jk_b_append_string(msg, s->remote_addr) ||
jk_b_append_string(msg, s->remote_host) ||
jk_b_append_string(msg, s->server_name) ||
jk_b_append_int(msg, (unsigned short)s->server_port) ||
jk_b_append_byte(msg, (unsigned char)(s->is_ssl)) ||
jk_b_append_int(msg, (unsigned short)(s->num_headers))) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the message begining", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
for (i = 0; i < s->num_headers; i++) {
int sc;
if ((sc = sc_for_req_header(s->headers_names[i])) != UNKNOWN_METHOD) {
if (jk_b_append_int(msg, (unsigned short)sc)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the header code for '%s'",
ae->worker->name, s->headers_names[i]);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
else {
if (jk_b_append_string(msg, s->headers_names[i])) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the header name '%s'",
ae->worker->name, s->headers_names[i]);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
if (jk_b_append_string(msg, s->headers_values[i])) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the header value for header '%s' of "
"length %u", ae->worker->name, s->headers_names[i],
strlen(s->headers_names[i]));
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
if (s->secret) {
if (jk_b_append_byte(msg, SC_A_SECRET) ||
jk_b_append_string(msg, s->secret)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending secret", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
if (s->remote_user) {
if (jk_b_append_byte(msg, SC_A_REMOTE_USER) ||
jk_b_append_string(msg, s->remote_user)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the remote user", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
if (s->auth_type) {
if (jk_b_append_byte(msg, SC_A_AUTH_TYPE) ||
jk_b_append_string(msg, s->auth_type)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the auth type", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
if (s->query_string) {
if (jk_b_append_byte(msg, SC_A_QUERY_STRING) ||
#if defined(AS400) && !defined(AS400_UTF8)
jk_b_append_asciistring(msg, s->query_string)) {
#else
jk_b_append_string(msg, s->query_string)) {
#endif
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the query string of length %u",
ae->worker->name, strlen(s->query_string));
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
if (s->route) {
if (jk_b_append_byte(msg, SC_A_ROUTE) ||
jk_b_append_string(msg, s->route)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the route", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
if (s->ssl_cert_len) {
if (jk_b_append_byte(msg, SC_A_SSL_CERT) ||
jk_b_append_string(msg, s->ssl_cert)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the SSL certificates",
ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
if (s->ssl_cipher) {
if (jk_b_append_byte(msg, SC_A_SSL_CIPHER) ||
jk_b_append_string(msg, s->ssl_cipher)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the SSL ciphers", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
if (s->ssl_session) {
if (jk_b_append_byte(msg, SC_A_SSL_SESSION) ||
jk_b_append_string(msg, s->ssl_session)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the SSL session", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
/*
* ssl_key_size is required by Servlet 2.3 API
* added support only in ajp14 mode
* JFC removed: ae->proto == AJP14_PROTO
*/
if (s->ssl_key_size != -1) {
if (jk_b_append_byte(msg, SC_A_SSL_KEY_SIZE) ||
jk_b_append_int(msg, (unsigned short)s->ssl_key_size)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the SSL key size of length %d",
ae->worker->name, s->ssl_key_size);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
/* If the method was unrecognized, encode it as an attribute */
if (method == SC_M_JK_STORED) {
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG, "(%s) unknown method %s",
ae->worker->name, s->method);
if (jk_b_append_byte(msg, SC_A_STORED_METHOD) ||
jk_b_append_string(msg, s->method)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the request method", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
/* Forward the SSL protocol name.
* Modern Tomcat versions know how to retrieve
* the protocol name from this attribute.
*/
if (s->ssl_protocol && *s->ssl_protocol) {
if (jk_b_append_byte(msg, SC_A_REQ_ATTRIBUTE) ||
jk_b_append_string(msg, SC_A_SSL_PROTOCOL) ||
jk_b_append_string(msg, s->ssl_protocol)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the ssl protocol name %s",
ae->worker->name, s->ssl_protocol);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
/* Forward the remote port information, which was forgotten
* from the builtin data of the AJP 13 protocol.
* Since the servlet spec allows to retrieve it via getRemotePort(),
* we provide the port to the Tomcat connector as a request
* attribute. Modern Tomcat versions know how to retrieve
* the remote port from this attribute.
*/
if (jk_b_append_byte(msg, SC_A_REQ_ATTRIBUTE) ||
jk_b_append_string(msg, SC_A_REQ_REMOTE_PORT) ||
jk_b_append_string(msg, s->remote_port)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the remote port %s",
ae->worker->name, s->remote_port);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
/* Forward the local ip address information, which was forgotten
* from the builtin data of the AJP 13 protocol.
* Since the servlet spec allows to retrieve it via getLocalAddr(),
* we provide the address to the Tomcat connector as a request
* attribute. Modern Tomcat versions know how to retrieve
* the local address from this attribute.
*/
if (jk_b_append_byte(msg, SC_A_REQ_ATTRIBUTE) ||
jk_b_append_string(msg, SC_A_REQ_LOCAL_ADDR) ||
jk_b_append_string(msg, s->local_addr)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the local address %s",
ae->worker->name, s->local_addr);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
/* Forward activation information from the load balancer.
* It can be used by the backend to deny access by requests,
* which come with a session id but for an invalid session.
* Such requests get forwarded to backends even if they
* are disabled" in the load balancer, because the balancer
* does not know, which sessions are valid.
* If the backend can check, that is was "disabled" it can
* delete the session cookie and respond with a self-referential
* redirect. The new request will then be balanced to some
* other node that is not disabled.
*/
if (jk_b_append_byte(msg, SC_A_REQ_ATTRIBUTE) ||
jk_b_append_string(msg, SC_A_JK_LB_ACTIVATION) ||
jk_b_append_string(msg, s->activation)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the activation state %s",
ae->worker->name, s->activation);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
if (s->num_attributes > 0) {
for (i = 0; i < s->num_attributes; i++) {
if (jk_b_append_byte(msg, SC_A_REQ_ATTRIBUTE) ||
jk_b_append_string(msg, s->attributes_names[i]) ||
jk_b_append_string(msg, s->attributes_values[i])) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending attribute %s=%s",
ae->worker->name, s->attributes_names[i],
s->attributes_values[i]);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
}
if (jk_b_append_byte(msg, SC_A_ARE_DONE)) {
jk_log(l, JK_LOG_ERROR,
"(%s) failed appending the message end", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG, "(%s) ajp marshaling done", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_TRUE;
}
/*
AJPV13_RESPONSE/AJPV14_RESPONSE:=
response_prefix (2)
status (short)
status_msg (short)
num_headers (short)
num_headers*(res_header_name header_value)
*body_chunk
terminator boolean <! -- recycle connection or not -->
req_header_name :=
sc_req_header_name | (string)
res_header_name :=
sc_res_header_name | (string)
header_value :=
(string)
body_chunk :=
length (short)
body length*(var binary)
*/
static int ajp_unmarshal_response(jk_msg_buf_t *msg,
jk_res_data_t * d,
ajp_endpoint_t * ae, jk_log_context_t *l)
{
jk_pool_t *p = &ae->pool;
JK_TRACE_ENTER(l);
d->status = jk_b_get_int(msg);
if (!d->status) {
jk_log(l, JK_LOG_ERROR,
"(%s) NULL status", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
d->msg = jk_b_get_string(msg);
if (d->msg) {
#if (defined(AS400) && !defined(AS400_UTF8)) || defined(_OSD_POSIX)
jk_xlate_from_ascii(d->msg, strlen(d->msg));
#endif
}
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) status = %d", ae->worker->name, d->status);
d->num_headers = jk_b_get_int(msg);
d->header_names = d->header_values = NULL;
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"Number of headers is = %d",
d->num_headers);
if (d->num_headers) {
d->header_names = jk_pool_alloc(p, sizeof(char *) * d->num_headers);
d->header_values = jk_pool_alloc(p, sizeof(char *) * d->num_headers);
if (d->header_names && d->header_values) {
unsigned int i;
for (i = 0; i < d->num_headers; i++) {
unsigned short name = jk_b_pget_int(msg, msg->pos);
if ((name & 0XFF00) == 0XA000) {
jk_b_get_int(msg);
name = name & 0X00FF;
if (name <= SC_RES_HEADERS_NUM) {
d->header_names[i] =
(char *)long_res_header_for_sc(name);
}
else {
jk_log(l, JK_LOG_ERROR,
"(%s) No such sc (%d)", ae->worker->name, name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
else {
d->header_names[i] = jk_b_get_string(msg);
if (!d->header_names[i]) {
jk_log(l, JK_LOG_ERROR,
"(%s) NULL header name", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
#if (defined(AS400) && !defined(AS400_UTF8)) || defined(_OSD_POSIX)
jk_xlate_from_ascii(d->header_names[i],
strlen(d->header_names[i]));
#endif
}
d->header_values[i] = jk_b_get_string(msg);
if (!d->header_values[i]) {
jk_log(l, JK_LOG_ERROR,
"(%s) NULL header value", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
#if (defined(AS400) && !defined(AS400_UTF8)) || defined(_OSD_POSIX)
jk_xlate_from_ascii(d->header_values[i],
strlen(d->header_values[i]));
#endif
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) Header[%d] [%s] = [%s]",
ae->worker->name, i, d->header_names[i],
d->header_values[i]);
}
}
}
JK_TRACE_EXIT(l);
return JK_TRUE;
}
/*
* Abort endpoint use
*/
static void ajp_abort_endpoint(ajp_endpoint_t * ae, int shutdown,
jk_log_context_t *l)
{
JK_TRACE_ENTER(l);
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) aborting endpoint with socket %d",
ae->worker->name, ae->sd);
if (IS_VALID_SOCKET(ae->sd)) {
if (shutdown == JK_TRUE) {
if (ae->hard_close) {
/* Force unclean connection close to communicate client write
* errors back to Tomcat by aborting AJP response writes.
*/
jk_close_socket(ae->sd, l);
}
else {
jk_shutdown_socket(ae->sd, l);
}
}
JK_ATOMIC_DECREMENT(&(ae->worker->s->connected));
ae->sd = JK_INVALID_SOCKET;
}
ae->last_op = JK_AJP13_END_RESPONSE;
JK_TRACE_EXIT(l);
}
/*
* Reset the endpoint (clean buf and close socket)
*/
static void ajp_reset_endpoint(ajp_endpoint_t * ae, jk_log_context_t *l)
{
JK_TRACE_ENTER(l);
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) resetting endpoint with socket %d%s",
ae->worker->name, ae->sd, ae->reuse? "" : " (socket shutdown)");
if (!ae->reuse) {
ajp_abort_endpoint(ae, JK_TRUE, l);
}
jk_reset_pool(&(ae->pool));
JK_TRACE_EXIT(l);
}
/*
* Close the endpoint (close pool and close socket)
*/
void ajp_close_endpoint(ajp_endpoint_t * ae, jk_log_context_t *l)
{
JK_TRACE_ENTER(l);
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) closing endpoint with socket %d%s",
ae->worker->name, ae->sd, ae->reuse ? "" : " (socket shutdown)");
if (IS_VALID_SOCKET(ae->sd)) {
jk_shutdown_socket(ae->sd, l);
JK_ATOMIC_DECREMENT(&(ae->worker->s->connected));
ae->sd = JK_INVALID_SOCKET;
}
jk_close_pool(&(ae->pool));
free(ae);
JK_TRACE_EXIT(l);
}
/** Steal a connection from an idle cache endpoint
* @param ae endpoint that needs a new connection
* @param l log context
* @return JK_FALSE: failure
* JK_TRUE: success
* @remark Always closes old socket endpoint
*/
static int ajp_next_connection(ajp_endpoint_t *ae, jk_log_context_t *l)
{
unsigned int i;
int ret = JK_FALSE;
ajp_worker_t *aw = ae->worker;
JK_TRACE_ENTER(l);
/* Close previous socket
*/
if (IS_VALID_SOCKET(ae->sd)) {
jk_shutdown_socket(ae->sd, l);
JK_ATOMIC_DECREMENT(&(ae->worker->s->connected));
ae->sd = JK_INVALID_SOCKET;
}
JK_ENTER_CS(&aw->cs);
for (i = 0; i < aw->ep_cache_sz; i++) {
/* Find cache slot with usable socket
*/
if (IS_SLOT_AVAIL(aw->ep_cache[i]) &&
IS_VALID_SOCKET(aw->ep_cache[i]->sd)) {
ae->sd = aw->ep_cache[i]->sd;
aw->ep_cache[i]->sd = JK_INVALID_SOCKET;
break;
}
}
JK_LEAVE_CS(&aw->cs);
if (IS_VALID_SOCKET(ae->sd)) {
ret = JK_TRUE;
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) Will try pooled connection socket %d from slot %d",
ae->worker->name, ae->sd, i);
}
JK_TRACE_EXIT(l);
return ret;
}
/** Handle the cping/cpong query
* @param ae endpoint
* @param timeout wait timeout in milliseconds
* @param l log context
* @return JK_FALSE: failure
* JK_TRUE: success
* @remark Always closes socket in case of
* a socket error
*/
static int ajp_handle_cping_cpong(ajp_endpoint_t * ae, int timeout,
jk_log_context_t *l)
{
int i;
int cmd;
jk_msg_buf_t *msg;
JK_TRACE_ENTER(l);
ae->last_errno = 0;
msg = jk_b_new(&ae->pool);
if (!msg) {
jk_log(l, JK_LOG_ERROR,
"(%s) Failed allocating AJP message",
ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
if (jk_b_set_buffer_size(msg, 16)) {
jk_log(l, JK_LOG_ERROR,
"(%s) Failed allocating AJP message buffer",
ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
jk_b_reset(msg);
jk_b_append_byte(msg, AJP13_CPING_REQUEST);
/* Send CPing query
*/
if (ajp_connection_tcp_send_message(ae, msg, l) != JK_TRUE) {
jk_log(l, JK_LOG_INFO,
"(%s) can't send cping query",
ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
for (i = 0; i < 2; i++) {
/* wait for Pong reply for timeout milliseconds
*/
if (jk_is_input_event(ae->sd, timeout, l) == JK_FALSE) {
ae->last_errno = errno;
jk_log(l, JK_LOG_INFO, "(%s) timeout in reply cpong after %d ms. "
"Socket = %d (event=%d)",
ae->worker->name, timeout, ae->sd, errno);
/* We can't trust this connection any more.
*/
ajp_abort_endpoint(ae, JK_TRUE, l);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
/* Read and check for Pong reply
*/
if (ajp_connection_tcp_get_message(ae, msg, l) != JK_TRUE) {
jk_log(l, JK_LOG_INFO,
"(%s) awaited reply cpong, not received",
ae->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
if ((cmd = jk_b_get_byte(msg)) != AJP13_CPONG_REPLY) {
/* If the respose was not CPONG it means that
* the previous response was not consumed by the
* client but the AJP messages was already in
* the network buffer.
* silently drop this single extra packet instead
* recycling the connection
*/
if (i || ae->last_op == JK_AJP13_END_RESPONSE ||
cmd < JK_AJP13_SEND_BODY_CHUNK ||
cmd > AJP13_CPONG_REPLY) {
jk_log(l, JK_LOG_WARNING,
"(%s) awaited reply cpong, received %d instead. "
"Closing connection",
ae->worker->name, cmd);
/* We can't trust this connection any more.
*/
ajp_abort_endpoint(ae, JK_TRUE, l);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
else {
jk_log(l, JK_LOG_INFO,
"(%s) awaited reply cpong, received %d instead. "
"Retrying next packet",
ae->worker->name, cmd);
}
}
else {
ae->last_op = AJP13_CPONG_REPLY;
/* We have received Pong reply
*/
break;
}
}
JK_TRACE_EXIT(l);
return JK_TRUE;
}
/** Connect an endpoint to a backend
* @param ae endpoint
* @param l log context
* @return JK_FALSE: failure
* JK_TRUE: success
* @remark Always closes socket in case of
* a socket error
* @remark Cares about ae->last_errno
*/
int ajp_connect_to_endpoint(ajp_endpoint_t * ae, jk_log_context_t *l)
{
char buf[64];
int rc = JK_TRUE;
int connected;
JK_TRACE_ENTER(l);
ae->last_errno = 0;
ae->sd = jk_open_socket(&ae->worker->worker_inet_addr,
ae->worker->worker_source_inet_addr.ipaddr_ptr != NULL ?
&ae->worker->worker_source_inet_addr :
NULL,
ae->worker->keepalive,
ae->worker->socket_timeout,
ae->worker->socket_connect_timeout,
ae->worker->socket_buf, l);
if (!IS_VALID_SOCKET(ae->sd)) {
ae->last_errno = errno;
jk_log(l, JK_LOG_INFO,
"(%s) Failed opening socket to (%s) (errno=%d)",
ae->worker->name, jk_dump_hinfo(&ae->worker->worker_inet_addr,
buf, sizeof(buf)), ae->last_errno);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
connected = JK_ATOMIC_INCREMENT(&(ae->worker->s->connected));
/* Update maximum number of connections
*/
if (connected > ae->worker->s->max_connected)
ae->worker->s->max_connected = connected;
/* set last_access only if needed
*/
if (ae->worker->cache_timeout > 0)
ae->last_access = time(NULL);
/* Check if we must execute a logon after the physical connect
* XXX: Not sure, if we really should do logon before cping/cpong
* and if no cping/cpong is allowed before or after logon.
*/
if (ae->worker->logon != NULL) {
rc = ae->worker->logon(ae, l);
if (rc == JK_FALSE) {
jk_log(l, JK_LOG_ERROR,
"(%s) ajp14 worker logon to the backend server failed",
ae->worker->name);
/* Close the socket if unable to logon
*/
ajp_abort_endpoint(ae, JK_TRUE, l);
}
}
/* XXX: Should we send a cping also after logon to validate the connection?
*/
else if (ae->worker->connect_timeout > 0) {
rc = ajp_handle_cping_cpong(ae, ae->worker->connect_timeout, l);
if (rc == JK_FALSE)
jk_log(l, JK_LOG_ERROR,
"(%s) cping/cpong after connecting to the backend server "
"failed (errno=%d)",
ae->worker->name, ae->last_errno);
}
JK_TRACE_EXIT(l);
return rc;
}
/* Syncing config values from shm
*/
void jk_ajp_pull(ajp_worker_t * aw, int locked, jk_log_context_t *l)
{
int address_change = JK_FALSE;
int port = 0;
char host[JK_SHM_STR_SIZ];
jk_sockaddr_t inet_addr;
JK_TRACE_ENTER(l);
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"syncing mem for ajp worker '%s' from shm (%d->%d) [%d->%d]",
aw->name, aw->sequence, aw->s->h.sequence, aw->addr_sequence,
aw->s->addr_sequence);
if (locked == JK_FALSE)
jk_shm_lock();
aw->cache_timeout = aw->s->cache_timeout;
aw->connect_timeout = aw->s->connect_timeout;
aw->ping_timeout = aw->s->ping_timeout;
aw->reply_timeout = aw->s->reply_timeout;
aw->prepost_timeout = aw->s->prepost_timeout;
aw->recovery_opts = aw->s->recovery_opts;
aw->retries = aw->s->retries;
aw->retry_interval = aw->s->retry_interval;
aw->busy_limit = aw->s->busy_limit;
aw->max_packet_size = aw->s->max_packet_size;
aw->sequence = aw->s->h.sequence;
if (aw->addr_sequence != aw->s->addr_sequence) {
address_change = JK_TRUE;
aw->addr_sequence = aw->s->addr_sequence;
strncpy(host, aw->s->host, JK_SHM_STR_SIZ);
port = aw->s->port;
}
if (locked == JK_FALSE)
jk_shm_unlock();
if (address_change == JK_TRUE && port != 0) {
aw->port = port;
strncpy(aw->host, host, JK_SHM_STR_SIZ);
if (!jk_resolve(host, port, &inet_addr,
aw->worker.we->pool, aw->prefer_ipv6, l)) {
jk_log(l, JK_LOG_ERROR,
"Failed resolving address '%s:%d' for worker '%s'.",
host, port, aw->name);
/* Disable contact */
aw->port = 0;
}
else {
unsigned int i;
JK_ENTER_CS(&aw->cs);
for (i = 0; i < aw->ep_cache_sz; i++) {
/* Close all avail connections in the cache
* Note that this won't change active connections.
*/
if (IS_SLOT_AVAIL(aw->ep_cache[i]) &&
IS_VALID_SOCKET(aw->ep_cache[i]->sd)) {
jk_sock_t sd = aw->ep_cache[i]->sd;
aw->ep_cache[i]->sd = JK_INVALID_SOCKET;
aw->ep_cache[i]->addr_sequence = aw->addr_sequence;
jk_shutdown_socket(sd, l);
JK_ATOMIC_DECREMENT(&(aw->s->connected));
}
}
jk_clone_sockaddr(&(aw->worker_inet_addr), &inet_addr);
JK_LEAVE_CS(&aw->cs);
}
}
JK_TRACE_EXIT(l);
}
/* Syncing config values to shm
*/
void jk_ajp_push(ajp_worker_t * aw, int locked, jk_log_context_t *l)
{
int address_change = JK_FALSE;
JK_TRACE_ENTER(l);
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"syncing shm for ajp worker '%s' from mem (%d->%d) [%d->%d]",
aw->name, aw->s->h.sequence, aw->sequence, aw->s->addr_sequence,
aw->addr_sequence);
if (locked == JK_FALSE)
jk_shm_lock();
aw->s->cache_timeout = aw->cache_timeout;
aw->s->connect_timeout = aw->connect_timeout;
aw->s->ping_timeout = aw->ping_timeout;
aw->s->reply_timeout = aw->reply_timeout;
aw->s->prepost_timeout = aw->prepost_timeout;
aw->s->recovery_opts = aw->recovery_opts;
aw->s->retries = aw->retries;
aw->s->retry_interval = aw->retry_interval;
aw->s->busy_limit = aw->busy_limit;
aw->s->max_packet_size = aw->max_packet_size;
/* Force squence update on push
*/
++aw->s->h.sequence;
aw->sequence = aw->s->h.sequence;
if (aw->s->addr_sequence != aw->addr_sequence) {
++aw->s->addr_sequence;
address_change = JK_TRUE;
strncpy(aw->s->host, aw->host, JK_SHM_STR_SIZ);
aw->s->port = aw->port;
aw->addr_sequence = aw->s->addr_sequence;
}
if (locked == JK_FALSE)
jk_shm_unlock();
if (address_change == JK_TRUE) {
unsigned int i;
JK_ENTER_CS(&aw->cs);
for (i = 0; i < aw->ep_cache_sz; i++) {
/* Close all connections in the cache
*/
if (IS_SLOT_AVAIL(aw->ep_cache[i]) &&
IS_VALID_SOCKET(aw->ep_cache[i]->sd)) {
jk_sock_t sd = aw->ep_cache[i]->sd;
aw->ep_cache[i]->sd = JK_INVALID_SOCKET;
aw->ep_cache[i]->addr_sequence = aw->addr_sequence;
jk_shutdown_socket(sd, l);
JK_ATOMIC_DECREMENT(&(aw->s->connected));
}
}
JK_LEAVE_CS(&aw->cs);
}
JK_TRACE_EXIT(l);
}
/** Send a message to an endpoint, using corresponding PROTO HEADER
* @param ae endpoint
* @param msg message to send
* @param l log context
* @return JK_FATAL_ERROR: endpoint contains unknown protocol
* JK_FALSE: other failure
* JK_TRUE: success
* @remark Always closes socket in case of
* a socket error, or JK_FATAL_ERROR
* @remark Cares about ae->last_errno
*/
int ajp_connection_tcp_send_message(ajp_endpoint_t * ae,
jk_msg_buf_t *msg, jk_log_context_t *l)
{
int rc;
JK_TRACE_ENTER(l);
ae->last_errno = 0;
if (ae->proto == AJP13_PROTO) {
jk_b_end(msg, AJP13_WS_HEADER);
if (JK_IS_DEBUG_LEVEL(l))
jk_dump_buff(l, JK_LOG_DEBUG, "sending to ajp13", msg);
}
else if (ae->proto == AJP14_PROTO) {
jk_b_end(msg, AJP14_WS_HEADER);
if (JK_IS_DEBUG_LEVEL(l))
jk_dump_buff(l, JK_LOG_DEBUG, "sending to ajp14", msg);
}
else {
jk_log(l, JK_LOG_ERROR,
"(%s) unknown protocol %d, supported are AJP13/AJP14",
ae->worker->name, ae->proto);
/* We've got a protocol error.
* We can't trust this connection any more,
* because we might have send already parts of the request.
*/
ajp_abort_endpoint(ae, JK_TRUE, l);
JK_TRACE_EXIT(l);
return JK_FATAL_ERROR;
}
/* This is the only place in this function where we use the socket.
* If sendfull gets an error, it implicitely closes the socket.
* So any socket error inside ajp_connection_tcp_send_message
* results in a socket close and invalidated endpoint connection.
*/
if ((rc = jk_tcp_socket_sendfull(ae->sd, msg->buf,
msg->len, l)) > 0) {
ae->endpoint.wr += (jk_uint64_t)rc;
JK_TRACE_EXIT(l);
return JK_TRUE;
}
ae->last_errno = errno;
jk_log(l, JK_LOG_INFO,
"(%s) sendfull for socket %d returned %d (errno=%d)",
ae->worker->name, ae->sd, rc, ae->last_errno);
ajp_abort_endpoint(ae, JK_FALSE, l);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
/** Receive a message from an endpoint, checking PROTO HEADER
* @param ae endpoint
* @param msg message to send
* @param l log context
* @return JK_TRUE: success
* JK_FALSE: could not read the AJP packet header
* JK_AJP_PROTOCOL_ERROR: failure after reading
* the AJP packet header
* @remark Always closes socket in case of
* a socket error
* @remark Cares about ae->last_errno
*/
int ajp_connection_tcp_get_message(ajp_endpoint_t * ae,
jk_msg_buf_t *msg, jk_log_context_t *l)
{
unsigned char head[AJP_HEADER_LEN];
int rc;
int msglen;
unsigned int header;
char buf[64];
JK_TRACE_ENTER(l);
ae->last_errno = 0;
/* If recvfull gets an error, it implicitely closes the socket.
* We will invalidate the endpoint connection.
*/
rc = jk_tcp_socket_recvfull(ae->sd, head, AJP_HEADER_LEN, l);
/* If the return code is not negative
* then we always get back the correct number of bytes.
*/
if (rc < 0) {
if (rc == JK_SOCKET_EOF) {
ae->last_errno = EPIPE;
jk_log(l, JK_LOG_INFO,
"(%s) can't receive the response header message from tomcat, "
"tomcat (%s) has forced a connection close for socket %d",
ae->worker->name, jk_dump_hinfo(&ae->worker->worker_inet_addr,
buf, sizeof(buf)), ae->sd);
}
else {
ae->last_errno = -rc;
jk_log(l, JK_LOG_INFO,
"(%s) can't receive the response header message from tomcat, "
"network problems or tomcat (%s) is down (errno=%d)",
ae->worker->name, jk_dump_hinfo(&ae->worker->worker_inet_addr,
buf, sizeof(buf)), ae->last_errno);
}
ajp_abort_endpoint(ae, JK_FALSE, l);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
ae->endpoint.rd += (jk_uint64_t)rc;
header = ((unsigned int)head[0] << 8) | head[1];
if (ae->proto == AJP13_PROTO) {
if (header != AJP13_SW_HEADER) {
if (header == AJP14_SW_HEADER) {
jk_log(l, JK_LOG_ERROR,
"(%s) received AJP14 reply on an AJP13 connection from %s",
ae->worker->name,
jk_dump_hinfo(&ae->worker->worker_inet_addr, buf,
sizeof(buf)));
}
else {
jk_log(l, JK_LOG_ERROR,
"(%s) wrong message format 0x%04x from %s",
ae->worker->name, header,
jk_dump_hinfo(&ae->worker->worker_inet_addr, buf,
sizeof(buf)));
}
/* We've got a protocol error.
* We can't trust this connection any more.
*/
ajp_abort_endpoint(ae, JK_TRUE, l);
JK_TRACE_EXIT(l);
return JK_AJP_PROTOCOL_ERROR;
}
}
else if (ae->proto == AJP14_PROTO) {
if (header != AJP14_SW_HEADER) {
if (header == AJP13_SW_HEADER) {
jk_log(l, JK_LOG_ERROR,
"(%s) received AJP13 reply on an AJP14 connection from %s",
ae->worker->name,
jk_dump_hinfo(&ae->worker->worker_inet_addr, buf,
sizeof(buf)));
}
else {
jk_log(l, JK_LOG_ERROR,
"(%s) wrong message format 0x%04x from %s",
ae->worker->name, header,
jk_dump_hinfo(&ae->worker->worker_inet_addr, buf,
sizeof(buf)));
}
/* We've got a protocol error.
* We can't trust this connection any more.
*/
ajp_abort_endpoint(ae, JK_TRUE, l);
JK_TRACE_EXIT(l);
return JK_AJP_PROTOCOL_ERROR;
}
}
msglen = ((head[2] & 0xff) << 8);
msglen += (head[3] & 0xFF);
if (msglen > msg->maxlen) {
jk_log(l, JK_LOG_ERROR,
"(%s) wrong message size %d %d from %s",
ae->worker->name, msglen, msg->maxlen,
jk_dump_hinfo(&ae->worker->worker_inet_addr, buf, sizeof(buf)));
/* We've got a protocol error.
* We can't trust this connection any more.
*/
ajp_abort_endpoint(ae, JK_TRUE, l);
JK_TRACE_EXIT(l);
return JK_AJP_PROTOCOL_ERROR;
}
msg->len = msglen;
msg->pos = 0;
/* If recvfull gets an error, it implicitely closes the socket.
* We will invalidate the endpoint connection.
*/
rc = jk_tcp_socket_recvfull(ae->sd, msg->buf, msglen, l);
/* If the return code is not negative
* then we always get back the correct number of bytes.
*/
if (rc < 0) {
if (rc == JK_SOCKET_EOF) {
ae->last_errno = EPIPE;
jk_log(l, JK_LOG_ERROR,
"(%s) can't receive the response body message from tomcat, "
"tomcat (%s) has forced a connection close for socket %d",
ae->worker->name,
jk_dump_hinfo(&ae->worker->worker_inet_addr, buf, sizeof(buf)),
ae->sd);
}
else {
ae->last_errno = -rc;
jk_log(l, JK_LOG_ERROR,
"(%s) can't receive the response body message from tomcat, "
"network problems or tomcat (%s) is down (errno=%d)",
ae->worker->name,
jk_dump_hinfo(&ae->worker->worker_inet_addr, buf, sizeof(buf)),
ae->last_errno);
}
ajp_abort_endpoint(ae, JK_FALSE, l);
JK_TRACE_EXIT(l);
/* Although we have a connection, this is effectively a protocol error.
* We received the AJP header packet, but not the packet payload
*/
return JK_AJP_PROTOCOL_ERROR;
}
ae->endpoint.rd += (jk_uint64_t)rc;
if (JK_IS_DEBUG_LEVEL(l)) {
if (ae->proto == AJP13_PROTO)
jk_dump_buff(l, JK_LOG_DEBUG, "received from ajp13", msg);
else if (ae->proto == AJP14_PROTO)
jk_dump_buff(l, JK_LOG_DEBUG, "received from ajp14", msg);
}
JK_TRACE_EXIT(l);
return JK_TRUE;
}
/*
* Read all the data from the socket.
*
* Socket API doesn't guaranty that all the data will be kept in a
* single read, so we must loop until all awaited data is received
*/
static int ajp_read_fully_from_server(jk_ws_service_t *s, jk_log_context_t *l,
unsigned char *buf, unsigned int len)
{
unsigned int rdlen = 0;
unsigned int padded_len = len;
JK_TRACE_ENTER(l);
if (s->is_chunked && s->no_more_chunks) {
JK_TRACE_EXIT(l);
return 0;
}
if (s->is_chunked) {
/* Corner case: buf must be large enough to hold next
* chunk size (if we're on or near a chunk border).
* Pad the length to a reasonable value, otherwise the
* read fails and the remaining chunks are tossed.
*/
padded_len = (len < CHUNK_BUFFER_PAD) ? len : len - CHUNK_BUFFER_PAD;
}
while (rdlen < padded_len) {
unsigned int this_time = 0;
if (!s->read(s, buf + rdlen, len - rdlen, &this_time)) {
/* Remote Client read failed.
*/
JK_TRACE_EXIT(l);
return JK_CLIENT_RD_ERROR;
}
if (0 == this_time) {
if (s->is_chunked) {
s->no_more_chunks = 1; /* read no more */
}
break;
}
rdlen += this_time;
}
JK_TRACE_EXIT(l);
return (int)rdlen;
}
/*
* Read data from AJP13/AJP14 protocol
* Returns -1 on error, else number of bytes read
*/
static int ajp_read_into_msg_buff(ajp_endpoint_t * ae,
jk_ws_service_t *s,
jk_msg_buf_t *msg, int len,
jk_log_context_t *l)
{
unsigned char *read_buf = msg->buf;
int maxlen;
JK_TRACE_ENTER(l);
jk_b_reset(msg);
read_buf += AJP_HEADER_LEN; /* leave some space for the buffer headers */
read_buf += AJP_HEADER_SZ_LEN; /* leave some space for the read length */
maxlen = ae->worker->max_packet_size - AJP_HEADER_LEN - AJP_HEADER_SZ_LEN;
/* Pick the max size since we don't know the content_length
*/
if (s->is_chunked && ae->left_bytes_to_send == 0) {
len = maxlen;
}
else {
if ((jk_uint64_t)maxlen > ae->left_bytes_to_send) {
maxlen = (int)ae->left_bytes_to_send;
}
if (len < 0 || len > maxlen) {
len = maxlen;
}
}
if ((len = ajp_read_fully_from_server(s, l, read_buf, len)) < 0) {
jk_log(l, JK_LOG_INFO,
"(%s) receiving data from client failed. "
"Connection aborted or network problems",
ae->worker->name);
JK_TRACE_EXIT(l);
return JK_CLIENT_RD_ERROR;
}
if (!s->is_chunked) {
ae->left_bytes_to_send -= len;
}
if (len > 0) {
/* Recipient recognizes empty packet as end of stream, not
* an empty body packet
*/
if (0 != jk_b_append_int(msg, (unsigned short)len)) {
jk_log(l, JK_LOG_INFO,
"(%s) Failed appending message length", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_CLIENT_RD_ERROR;
}
}
msg->len += len;
JK_TRACE_EXIT(l);
return len;
}
/*
* send request to Tomcat via Ajp13
* - first try to find reuseable socket
* - if no such available, try to connect
* - send request, but send must be seen as asynchronous,
* since send() call will return noerror about 95% of time
* Hopefully we'll get more information on next read.
*
* nb: op->request is the original request msg buffer
* op->reply is the reply msg buffer which could be scratched
*
* Return values of ajp_send_request() function:
* return value op->recoverable reason
* JK_FATAL_ERROR JK_FALSE ajp_connection_tcp_send_message()
* returns JK_FATAL_ERROR
* Endpoint belongs to unknown protocol.
* JK_FATAL_ERROR JK_TRUE ajp_connection_tcp_send_message()
* returns JK_FALSE
* Sending request or request body in jk_tcp_socket_sendfull()
* returns with error.
* JK_FATAL_ERROR JK_TRUE Could not connect to backend
* JK_CLIENT_RD_ERROR JK_FALSE Error during reading parts of POST
* body from client
* JK_TRUE JK_TRUE All other cases (OK)
*/
static int ajp_send_request(jk_endpoint_t *e,
jk_ws_service_t *s,
jk_log_context_t *l,
ajp_endpoint_t * ae, ajp_operation_t * op)
{
int err_conn = 0;
int err_cping = 0;
int err_send = 0;
int rc;
int postlen;
JK_TRACE_ENTER(l);
ae->last_errno = 0;
/* Up to now, we can recover
*/
op->recoverable = JK_TRUE;
/* Check if the previous request really ended
*/
if (ae->last_op != JK_AJP13_END_RESPONSE &&
ae->last_op != AJP13_CPONG_REPLY) {
jk_log(l, JK_LOG_INFO,
"(%s) did not receive END_RESPONSE, "
"closing socket %d",
ae->worker->name, ae->sd);
ajp_abort_endpoint(ae, JK_TRUE, l);
}
/* First try to check open connections...
*/
while (IS_VALID_SOCKET(ae->sd)) {
int err = JK_FALSE;
if (jk_is_socket_connected(ae->sd, l) == JK_FALSE) {
ae->last_errno = errno;
jk_log(l, JK_LOG_DEBUG,
"(%s) failed sending request, "
"socket %d is not connected any more (errno=%d)",
ae->worker->name, ae->sd, ae->last_errno);
ajp_abort_endpoint(ae, JK_FALSE, l);
err = JK_TRUE;
err_conn++;
}
if (ae->worker->prepost_timeout > 0 && !err) {
/* handle cping/cpong if prepost_timeout is set
* If the socket is disconnected no need to handle
* the cping/cpong
*/
if (ajp_handle_cping_cpong(ae,
ae->worker->prepost_timeout, l) == JK_FALSE) {
jk_log(l, JK_LOG_INFO,
"(%s) failed sending request, "
"socket %d prepost cping/cpong failure (errno=%d)",
ae->worker->name, ae->sd, ae->last_errno);
/* XXX: Is there any reason to try other
* connections to the node if one of them fails
* the cping/cpong heartbeat?
* Tomcat can be either too busy or simply dead, so
* there is a chance that all other connections would
* fail as well.
*/
err = JK_TRUE;
err_cping++;
}
}
/* We've got a connected socket and the optional
* cping/cpong worked, so let's send the request now.
*/
if (err == JK_FALSE) {
rc = ajp_connection_tcp_send_message(ae, op->request, l);
/* If this worked, we can break out of the loop
* and proceed with the request.
*/
if (rc == JK_TRUE) {
ae->last_op = JK_AJP13_FORWARD_REQUEST;
break;
}
/* Error during sending the request.
*/
err_send++;
if (rc == JK_FATAL_ERROR)
op->recoverable = JK_FALSE;
jk_log(l, JK_LOG_INFO,
"(%s) failed sending request (%srecoverable) "
"(errno=%d)",
ae->worker->name,
op->recoverable ? "" : "un",
ae->last_errno);
JK_TRACE_EXIT(l);
return JK_FATAL_ERROR;
}
/* If we got an error or can't send data, then try to steal another
* pooled connection and try again. If we are not successful, break
* out of this loop and try to open a new connection after the loop.
*/
if (ajp_next_connection(ae, l) == JK_FALSE)
break;
}
/* If we failed to reuse a connection, try to reconnect.
*/
if (!IS_VALID_SOCKET(ae->sd)) {
/* Could not steal any connection from an endpoint - backend
* is disconnected or all connections are in use
*/
if (err_conn + err_cping + err_send > 0)
if (err_cping + err_send > 0)
jk_log(l, JK_LOG_INFO,
"(%s) no usable connection found, will create a new one, "
"detected by connect check (%d), cping (%d), send (%d).",
ae->worker->name, err_conn, err_cping, err_send);
else
jk_log(l, JK_LOG_DEBUG,
"(%s) no usable connection found, will create a new one, "
"detected by connect check (%d), cping (%d), send (%d).",
ae->worker->name, err_conn, err_cping, err_send);
else if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) no usable connection found, will create a new one.",
ae->worker->name);
/* Connect to the backend.
*/
if (ajp_connect_to_endpoint(ae, l) != JK_TRUE) {
jk_log(l, JK_LOG_ERROR,
"(%s) connecting to backend failed. "
"Tomcat is probably not started "
"or is listening on the wrong port (errno=%d)",
ae->worker->name, ae->last_errno);
JK_TRACE_EXIT(l);
return JK_FATAL_ERROR;
}
if (ae->worker->connect_timeout <= 0 &&
ae->worker->prepost_timeout > 0) {
/* handle cping/cpong if prepost_timeout is set
* and we didn't already do a connect cping/cpong.
*/
if (ajp_handle_cping_cpong(ae,
ae->worker->prepost_timeout, l) == JK_FALSE) {
jk_log(l, JK_LOG_INFO,
"(%s) failed sending request, "
"socket %d prepost cping/cpong failure (errno=%d)",
ae->worker->name, ae->sd, ae->last_errno);
JK_TRACE_EXIT(l);
return JK_FATAL_ERROR;
}
}
/* We've got a connected socket and the optional
* cping/cpong worked, so let's send the request now.
*/
rc = ajp_connection_tcp_send_message(ae, op->request, l);
/* Error during sending the request.
*/
if (rc != JK_TRUE) {
if (rc == JK_FATAL_ERROR)
op->recoverable = JK_FALSE;
jk_log(l, JK_LOG_ERROR,
"(%s) failed sending request on a fresh connection "
"(%srecoverable), socket %d (errno=%d)",
ae->worker->name, op->recoverable ? "" : "un",
ae->sd, ae->last_errno);
JK_TRACE_EXIT(l);
return JK_FATAL_ERROR;
}
ae->last_op = JK_AJP13_FORWARD_REQUEST;
}
else if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) Statistics about invalid connections: "
"connect check (%d), cping (%d), send (%d)",
ae->worker->name, err_conn, err_cping, err_send);
/*
* From now on an error means that we have an internal server error
* or Tomcat crashed.
*/
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) request body to send %" JK_UINT64_T_FMT
" - request body to resend %d",
ae->worker->name, ae->left_bytes_to_send,
op->post->len > AJP_HEADER_LEN ?
op->post->len - AJP_HEADER_LEN : 0);
/*
* POST recovery job is done here and will work when data to
* POST are less than 8k, since it's the maximum size of op-post buffer.
* We send here the first part of data which was sent previously to the
* remote Tomcat
*
* Did we have something to resend (ie the op-post has been feeded previously
*/
postlen = op->post->len;
if (postlen > AJP_HEADER_LEN) {
rc = ajp_connection_tcp_send_message(ae, op->post, l);
/* Error during sending the request body.
*/
if (rc != JK_TRUE) {
if (rc == JK_FATAL_ERROR)
op->recoverable = JK_FALSE;
jk_log(l, JK_LOG_ERROR,
"(%s) failed sending request body of size %d "
"(%srecoverable), socket %d (errno=%d)",
ae->worker->name, postlen, op->recoverable ? "" : "un",
ae->sd, ae->last_errno);
JK_TRACE_EXIT(l);
return JK_FATAL_ERROR;
}
else {
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG, "(%s) Resent the request body (%d)",
ae->worker->name, postlen);
}
}
else if (s->reco_status == RECO_FILLED) {
/* Recovery in LB MODE
*/
postlen = s->reco_buf->len;
if (postlen > AJP_HEADER_LEN) {
rc = ajp_connection_tcp_send_message(ae, s->reco_buf, l);
/* Error during sending the request body.
*/
if (rc != JK_TRUE) {
if (rc == JK_FATAL_ERROR)
op->recoverable = JK_FALSE;
jk_log(l, JK_LOG_ERROR,
"(%s) failed sending request body of size %d (lb mode) "
"(%srecoverable), socket %d (errno=%d)",
ae->worker->name, postlen, op->recoverable ? "" : "un",
ae->sd, ae->last_errno);
JK_TRACE_EXIT(l);
return JK_FATAL_ERROR;
}
}
else {
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) Resent the request body (lb mode) (%d)",
ae->worker->name, postlen);
}
}
else {
/* We never sent any POST data and we check if we have to send at
* least one block of data (max 8k). These data will be kept in reply
* for resend if the remote Tomcat is down, a fact we will learn only
* doing a read (not yet)
*
* || s->is_chunked - this can't be done here. The original protocol
* sends the first chunk of post data (based on Content-Length),
* and that's what the java side expects.
* Sending this data for chunked would break other ajp13 servers.
*
* Note that chunking will continue to work - using the normal read.
*/
if (ae->left_bytes_to_send > 0) {
int len;
if ((len = ajp_read_into_msg_buff(ae, s, op->post, -1, l)) <= 0) {
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) browser stop sending data, no need to recover",
ae->worker->name);
op->recoverable = JK_FALSE;
/* Send an empty POST message since per AJP protocol
* spec whenever we have content length the message
* packet must be followed with initial POST packet.
* Size zero will be handled as error in container.
*/
jk_b_reset(op->post);
jk_b_append_int(op->post, 0);
ajp_connection_tcp_send_message(ae, op->post, l);
JK_TRACE_EXIT(l);
return JK_CLIENT_RD_ERROR;
}
/* If a RECOVERY buffer is available in LB mode, fill it
*/
if (s->reco_status == RECO_INITED) {
jk_b_copy(op->post, s->reco_buf);
s->reco_status = RECO_FILLED;
}
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) sending %d bytes of request body",
ae->worker->name, len);
s->content_read = (jk_uint64_t)len;
rc = ajp_connection_tcp_send_message(ae, op->post, l);
/* Error during sending the request body.
*/
if (rc != JK_TRUE) {
if (rc == JK_FATAL_ERROR)
op->recoverable = JK_FALSE;
jk_log(l, JK_LOG_ERROR,
"(%s) failed sending request body of size %d "
"(%srecoverable), socket %d (errno=%d)",
ae->worker->name, len, op->recoverable ? "" : "un",
ae->sd, ae->last_errno);
JK_TRACE_EXIT(l);
return JK_FATAL_ERROR;
}
}
}
JK_TRACE_EXIT(l);
return JK_TRUE;
}
/*
* What to do with incoming data (dispatcher)
*/
static int ajp_process_callback(jk_msg_buf_t *msg,
jk_msg_buf_t *pmsg,
ajp_endpoint_t * ae,
jk_ws_service_t *s, jk_log_context_t *l)
{
int code = (int)jk_b_get_byte(msg);
JK_TRACE_ENTER(l);
switch (code) {
case JK_AJP13_SEND_HEADERS:
{
int rc;
jk_res_data_t res;
if (ae->last_op == JK_AJP13_SEND_HEADERS) {
/* Do not send anything to the client.
* Backend already send us the headers.
*/
if (JK_IS_DEBUG_LEVEL(l)) {
jk_log(l, JK_LOG_DEBUG,
"(%s) Already received AJP13_SEND HEADERS",
ae->worker->name);
}
JK_TRACE_EXIT(l);
return JK_AJP13_ERROR;
}
if (!ajp_unmarshal_response(msg, &res, ae, l)) {
jk_log(l, JK_LOG_ERROR,
"(%s) ajp_unmarshal_response failed", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_AJP13_ERROR;
}
if (s->num_resp_headers > 0) {
char **old_names = res.header_names;
char **old_values = res.header_values;
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG, "(%s) Adding %d response headers "
"to %d headers received from tomcat",
ae->worker->name, s->num_resp_headers,
res.num_headers);
res.header_names = jk_pool_alloc(s->pool,
(s->num_resp_headers +
res.num_headers) *
sizeof(char *));
res.header_values = jk_pool_alloc(s->pool,
(s->num_resp_headers +
res.num_headers) *
sizeof(char *));
if (!res.header_names || !res.header_values) {
jk_log(l, JK_LOG_ERROR,
"(%s) Failed allocating one %d response headers.",
ae->worker->name,
s->num_resp_headers + res.num_headers);
res.header_names = old_names;
res.header_values = old_values;
}
else {
if (res.num_headers) {
memcpy(res.header_names, old_names,
res.num_headers * sizeof(char *));
memcpy(res.header_values, old_values,
res.num_headers * sizeof(char *));
}
if (s->num_resp_headers) {
memcpy(res.header_names + res.num_headers,
s->resp_headers_names,
s->num_resp_headers * sizeof(char *));
memcpy(res.header_values + res.num_headers,
s->resp_headers_values,
s->num_resp_headers * sizeof(char *));
}
res.num_headers = res.num_headers + s->num_resp_headers;
}
}
s->http_response_status = res.status;
if (s->extension.fail_on_status_size > 0)
rc = is_http_status_fail(s->extension.fail_on_status_size,
s->extension.fail_on_status,
res.status);
else
rc = is_http_status_fail(ae->worker->http_status_fail_num,
ae->worker->http_status_fail,
res.status);
if (rc > 0) {
JK_TRACE_EXIT(l);
return JK_STATUS_FATAL_ERROR;
}
if (rc < 0) {
JK_TRACE_EXIT(l);
return JK_STATUS_ERROR;
}
if (s->extension.use_server_error_pages &&
s->http_response_status >= s->extension.use_server_error_pages)
s->response_blocked = JK_TRUE;
/*
* Call even if response is blocked, since it also handles
* forwarding some headers for special http status codes
* even if the server uses an own error page.
* Example: The WWW-Authenticate header in case of
* HTTP_UNAUTHORIZED (401).
*/
s->start_response(s, res.status, res.msg,
(const char *const *)res.header_names,
(const char *const *)res.header_values,
res.num_headers);
if (!s->response_blocked) {
if (s->flush && s->flush_header)
s->flush(s);
}
}
return JK_AJP13_SEND_HEADERS;
case JK_AJP13_SEND_BODY_CHUNK:
if (ae->last_op == JK_AJP13_FORWARD_REQUEST) {
/* AJP13_SEND_BODY_CHUNK with length 0 is
* explicit flush packet message.
* Ignore those if they are left over from previous responses.
* Reportedly some versions of JBoss suffer from that problem.
*/
if (jk_b_get_int(msg) == 0) {
jk_log(l, JK_LOG_DEBUG,
"(%s) Ignoring flush message received while sending the "
"request", ae->worker->name);
return ae->last_op;
}
/* We have just send a request but received something
* that probably originates from buffered response.
*/
if (JK_IS_DEBUG_LEVEL(l)) {
jk_log(l, JK_LOG_DEBUG,
"(%s) Unexpected AJP13_SEND_BODY_CHUNK",
ae->worker->name);
}
JK_TRACE_EXIT(l);
return JK_AJP13_ERROR;
}
if (!s->response_blocked) {
unsigned int len = (unsigned int)jk_b_get_int(msg);
/*
* Do a sanity check on len to prevent write reading beyond buffer
* boundaries and thus revealing possible sensitive memory
* contents to the client.
* len cannot be larger than msg->len - 3 because the ajp message
* contains the magic byte for JK_AJP13_SEND_BODY_CHUNK (1 byte)
* and the length of the chunk (2 bytes). The remaining part of
* the message is the chunk.
*/
if (len > (unsigned int)(msg->len - 3)) {
jk_log(l, JK_LOG_ERROR,
"(%s) Chunk length too large. Length of AJP message "
"is %d, chunk length is %d.",
ae->worker->name, msg->len, len);
JK_TRACE_EXIT(l);
return JK_INTERNAL_ERROR;
}
if (len == 0) {
/* AJP13_SEND_BODY_CHUNK with length 0 is
* explicit flush packet message.
*/
if (s->response_started) {
if (s->flush) {
s->flush(s);
}
}
else {
jk_log(l, JK_LOG_DEBUG,
"(%s) Ignoring flush message received before headers",
ae->worker->name);
}
}
else {
if (!s->write(s, msg->buf + msg->pos, len)) {
jk_log(l, JK_LOG_INFO,
"(%s) Writing to client aborted or client network "
"problems", ae->worker->name);
JK_TRACE_EXIT(l);
return JK_CLIENT_WR_ERROR;
}
if (s->flush && s->flush_packets)
s->flush(s);
}
}
break;
case JK_AJP13_GET_BODY_CHUNK:
{
int len = (int)jk_b_get_int(msg);
if (len < 0) {
len = 0;
}
/* the right place to add file storage for upload
*/
if ((len = ajp_read_into_msg_buff(ae, s, pmsg, len, l)) >= 0) {
s->content_read += (jk_uint64_t)len;
JK_TRACE_EXIT(l);
return JK_AJP13_HAS_RESPONSE;
}
jk_log(l, JK_LOG_INFO,
"(%s) Reading from client aborted or client network problems",
ae->worker->name);
JK_TRACE_EXIT(l);
return JK_CLIENT_RD_ERROR;
}
break;
case JK_AJP13_END_RESPONSE:
ae->reuse = (int)jk_b_get_byte(msg);
if (!ae->reuse) {
/*
* AJP13 protocol reuse flag set to false.
* Tomcat will close its side of the connection.
*/
jk_log(l, JK_LOG_WARNING, "(%s) AJP13 protocol: Reuse is set to false",
ae->worker->name);
}
else if (s->disable_reuse) {
if (JK_IS_DEBUG_LEVEL(l)) {
jk_log(l, JK_LOG_DEBUG, "(%s) AJP13 protocol: Reuse is disabled",
ae->worker->name);
}
ae->reuse = JK_FALSE;
}
else {
/* Reuse in all cases
*/
if (JK_IS_DEBUG_LEVEL(l)) {
jk_log(l, JK_LOG_DEBUG, "(%s) AJP13 protocol: Reuse is OK",
ae->worker->name);
}
ae->reuse = JK_TRUE;
}
if (!s->response_blocked) {
if (s->done) {
/* Done with response
*/
s->done(s);
}
else if (s->flush && !s->flush_packets) {
/* Flush after the last write
*/
s->flush(s);
}
}
JK_TRACE_EXIT(l);
return JK_AJP13_END_RESPONSE;
break;
default:
jk_log(l, JK_LOG_ERROR,
"(%s) Unknown AJP protocol code: %02X", ae->worker->name, code);
JK_TRACE_EXIT(l);
return JK_AJP13_ERROR;
}
JK_TRACE_EXIT(l);
return JK_AJP13_NO_RESPONSE;
}
/*
* get replies from Tomcat via Ajp13/Ajp14
* ajp13/ajp14 is async but handling read/send this way prevent nice recovery
* In fact if tomcat link is broken during upload (browser -> apache -> tomcat)
* we'll loose data and we'll have to abort the whole request.
*
* Return values of ajp_get_reply() function:
* return value op->recoverable reason
* JK_REPLY_TIMEOUT ?recovery_options Reply timeout while waiting for
* response packet
* JK_FALSE ?recovery_options Error during
* ajp_connection_tcp_get_message()
* Could not read the AJP packet header
* JK_AJP_PROTOCOL_ERROR: ?recovery_options Error during
* ajp_connection_tcp_get_message()
* Failure after reading the AJP packet header
* JK_STATUS_ERROR mostly JK_TRUE ajp_process_callback() returns
* JK_STATUS_ERROR
* Recoverable, if callback didn't return with a JK_HAS_RESPONSE before.
* JK_HAS_RESPONSE: parts of the post buffer are consumed.
* JK_STATUS_FATAL_ERROR mostly JK_TRUE ajp_process_callback() returns
* JK_STATUS_FATAL_ERROR
* Recoverable, if callback didn't return with a JK_HAS_RESPONSE before.
* JK_HAS_RESPONSE: parts of the post buffer are consumed.
* JK_FATAL_ERROR ? ajp_process_callback() returns
* JK_AJP13_ERROR
* JK_AJP13_ERROR: protocol error, or JK_INTERNAL_ERROR:
* chunk size to large
* JK_CLIENT_RD_ERROR ? ajp_process_callback() returns
* JK_CLIENT_RD_ERROR
* JK_CLIENT_RD_ERROR: could not read post from client.
* JK_CLIENT_WR_ERROR ? ajp_process_callback() returns
* JK_CLIENT_WR_ERROR
* JK_CLIENT_WR_ERROR: could not write back result to client
* JK_TRUE ? ajp_process_callback() returns
* JK_AJP13_END_RESPONSE
* JK_FALSE ? Other unhandled cases
* (unknown return codes)
*/
static int ajp_get_reply(jk_endpoint_t *e,
jk_ws_service_t *s,
jk_log_context_t *l,
ajp_endpoint_t * p, ajp_operation_t * op)
{
/* Don't get header from tomcat yet
*/
int headeratclient = JK_FALSE;
JK_TRACE_ENTER(l);
p->last_errno = 0;
/* Start read all reply message
*/
while (1) {
int rc = 0;
/* Allow to overwrite reply_timeout on a per URL basis via service struct
*/
int reply_timeout = s->extension.reply_timeout;
if (reply_timeout < 0)
reply_timeout = p->worker->reply_timeout;
/* If we set a reply timeout, check if something is available
*/
if (reply_timeout > 0) {
if (jk_is_input_event(p->sd, reply_timeout, l) ==
JK_FALSE) {
p->last_errno = errno;
jk_log(l, JK_LOG_ERROR,
"(%s) Timeout with waiting reply from tomcat. "
"Tomcat is down, stopped or network problems (errno=%d)",
p->worker->name, p->last_errno);
/* We can't trust this connection any more.
*/
ajp_abort_endpoint(p, JK_TRUE, l);
if (headeratclient == JK_FALSE) {
if (p->worker->recovery_opts & RECOVER_ABORT_IF_TCGETREQUEST)
op->recoverable = JK_FALSE;
/*
* We revert back to recoverable, if recovery_opts allow it
* for GET or HEAD
*/
if (op->recoverable == JK_FALSE) {
if (p->worker->recovery_opts &
RECOVER_ALWAYS_HTTP_HEAD) {
if (!strcmp(s->method, "HEAD"))
op->recoverable = JK_TRUE;
}
else if (p->worker->recovery_opts &
RECOVER_ALWAYS_HTTP_GET) {
if (!strcmp(s->method, "GET"))
op->recoverable = JK_TRUE;
}
}
}
else {
if (p->worker->recovery_opts & RECOVER_ABORT_IF_TCSENDHEADER)
op->recoverable = JK_FALSE;
}
JK_TRACE_EXIT(l);
return JK_REPLY_TIMEOUT;
}
}
if ((rc = ajp_connection_tcp_get_message(p, op->reply, l)) != JK_TRUE) {
if (headeratclient == JK_FALSE) {
jk_log(l, JK_LOG_ERROR,
"(%s) Tomcat is down or refused connection. "
"No response has been sent to the client (yet)",
p->worker->name);
/*
* communication with tomcat has been interrupted BEFORE
* headers have been sent to the client.
*
* We mark it unrecoverable if recovery_opts set to
* RECOVER_ABORT_IF_TCGETREQUEST
*/
if (p->worker->recovery_opts & RECOVER_ABORT_IF_TCGETREQUEST)
op->recoverable = JK_FALSE;
/*
* We revert back to recoverable, if recovery_opts allow it
* for GET or HEAD
*/
if (op->recoverable == JK_FALSE) {
if (p->worker->recovery_opts & RECOVER_ALWAYS_HTTP_HEAD) {
if (!strcmp(s->method, "HEAD"))
op->recoverable = JK_TRUE;
}
else if (p->worker->recovery_opts & RECOVER_ALWAYS_HTTP_GET) {
if (!strcmp(s->method, "GET"))
op->recoverable = JK_TRUE;
}
}
}
else {
jk_log(l, JK_LOG_ERROR,
"(%s) Tomcat is down or network problems. "
"Part of the response has already been sent to the client",
p->worker->name);
/* communication with tomcat has been interrupted AFTER
* headers have been sent to the client.
* headers (and maybe parts of the body) have already been
* sent, therefore the response is "complete" in a sense
* that nobody should append any data, especially no 500 error
* page of the webserver!
*
* We mark it unrecoverable if recovery_opts set to
* RECOVER_ABORT_IF_TCSENDHEADER
*/
if (p->worker->recovery_opts & RECOVER_ABORT_IF_TCSENDHEADER)
op->recoverable = JK_FALSE;
}
JK_TRACE_EXIT(l);
return rc;
}
rc = ajp_process_callback(op->reply, op->post, p, s, l);
p->last_op = rc;
/* no more data to be sent, fine we have finish here
*/
if (JK_AJP13_END_RESPONSE == rc) {
JK_TRACE_EXIT(l);
return JK_TRUE;
}
if (JK_AJP13_SEND_HEADERS == rc) {
if (headeratclient == JK_FALSE) {
headeratclient = JK_TRUE;
continue;
}
else {
/* Backend send headers twice?
* This is protocol violation
*/
jk_log(l, JK_LOG_ERROR,
"(%s) Tomcat already send headers",
p->worker->name);
op->recoverable = JK_FALSE;
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
if (JK_STATUS_ERROR == rc || JK_STATUS_FATAL_ERROR == rc) {
jk_log(l, JK_LOG_INFO,
"(%s) request failed%s, "
"because of response status %d, ",
p->worker->name,
rc == JK_STATUS_FATAL_ERROR ? "" : " (soft)",
s->http_response_status);
JK_TRACE_EXIT(l);
return rc;
}
if (JK_AJP13_HAS_RESPONSE == rc) {
/*
* in upload-mode there is no second chance since
* we may have already sent part of the uploaded data
* to Tomcat.
* In this case if Tomcat connection is broken we must
* abort request and indicate error.
* A possible work-around could be to store the uploaded
* data to file and replay for it
*/
op->recoverable = JK_FALSE;
rc = ajp_connection_tcp_send_message(p, op->post, l);
if (rc != JK_TRUE) {
jk_log(l, JK_LOG_ERROR,
"(%s) Tomcat is down or network problems",
p->worker->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
if (JK_AJP13_ERROR == rc) {
/*
* Tomcat has send invalid AJP message.
* Loadbalancer if present will decide if
* failover is possible.
*/
JK_TRACE_EXIT(l);
return JK_FATAL_ERROR;
}
if (JK_CLIENT_RD_ERROR == rc) {
/*
* Client has stop sending to us, so get out.
* We assume this isn't our fault, so just a normal exit.
*/
JK_TRACE_EXIT(l);
return JK_CLIENT_RD_ERROR;
}
if (JK_CLIENT_WR_ERROR == rc) {
/*
* Client has stop receiving to us, so get out.
* We assume this isn't our fault, so just a normal exit.
*/
JK_TRACE_EXIT(l);
return JK_CLIENT_WR_ERROR;
}
if (JK_INTERNAL_ERROR == rc) {
/*
* Internal error, like memory allocation or invalid packet lengths.
*/
JK_TRACE_EXIT(l);
return JK_FATAL_ERROR;
}
if (JK_AJP13_NO_RESPONSE == rc) {
/*
* This is fine, loop again, more data to send.
*/
continue;
}
if (rc < 0) {
op->recoverable = JK_FALSE;
jk_log(l, JK_LOG_ERROR,
"(%s) Callback returns with unknown value %d",
p->worker->name, rc);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
}
/* XXX: Not reached?
*/
JK_TRACE_EXIT(l);
return JK_FALSE;
}
static void ajp_update_stats(jk_endpoint_t *e, ajp_worker_t *aw, int rc,
jk_log_context_t *l)
{
aw->s->readed += e->rd;
aw->s->transferred += e->wr;
JK_ATOMIC_DECREMENT(&(aw->s->busy));
if (rc == JK_TRUE) {
aw->s->state = JK_AJP_STATE_OK;
}
else if (rc == JK_CLIENT_ERROR) {
aw->s->state = JK_AJP_STATE_OK;
aw->s->client_errors++;
}
else {
aw->s->state = JK_AJP_STATE_ERROR;
aw->s->errors++;
aw->s->error_time = time(NULL);
}
}
/*
* service is now splitted in ajp_send_request and ajp_get_reply
* much more easier to do errors recovery
*
* We serve here the request, using AJP13/AJP14 (e->proto)
*
* Return values of service() method for ajp13/ajp14 worker:
* return value is_error e->recoverable reason
* JK_FALSE JK_HTTP_SERVER_ERROR TRUE Invalid Parameters
* (null values)
* JK_SERVER_ERROR JK_HTTP_SERVER_ERROR TRUE Error during
* initializing empty
* request, response or
* post body objects
* JK_CLIENT_ERROR JK_HTTP_REQUEST_TOO_LARGE JK_TRUE Request doesn't fit
* into buffer
* (error during
* ajp_marshal_into_msgb())
* JK_CLIENT_ERROR JK_HTTP_BAD_REQUEST JK_FALSE Error during
* reading parts of
* POST body from client
* JK_FATAL_ERROR JK_HTTP_SERVER_ERROR JK_FALSE If ajp_send_request()
* returns JK_FATAL_ERROR
* and !op->recoverable.
* JK_FATAL_ERROR JK_HTTP_SERVER_ERROR JK_FALSE If ajp_send_request()
* returns JK_TRUE
* but !op->recoverable.
* This should never happen.
* JK_CLIENT_ERROR JK_HTTP_BAD_REQUEST ? ajp_get_reply()
* returns JK_CLIENT_RD_ERROR
* JK_CLIENT_ERROR JK_HTTP_OK ? ajp_get_reply()
* returns JK_CLIENT_WR_ERROR
* JK_FATAL_ERROR JK_HTTP_SERVER_ERROR JK_TRUE ajp_get_reply()
* returns JK_FATAL_ERROR
* JK_FATAL_ERROR: protocol error or internal error
* JK_FATAL_ERROR JK_HTTP_SERVER_ERROR JK_FALSE ajp_get_reply()
* returns JK_FATAL_ERROR
* JK_FATAL_ERROR: protocol error or internal error
* JK_STATUS_ERROR JK_HTTP_SERVER_BUSY JK_TRUE ajp_get_reply()
* returns JK_STATUS_ERROR
* Only if op->recoverable and no more ajp13/ajp14 direct retries
* JK_STATUS_ERROR JK_HTTP_SERVER_BUSY JK_FALSE ajp_get_reply()
* returns JK_STATUS_ERROR
* Only if !op->recoverable
* JK_STATUS_FATAL_ERROR JK_HTTP_SERVER_BUSY JK_TRUE ajp_get_reply()
* returns JK_STATUS_ERROR
* Only if op->recoverable and no more ajp13/ajp14 direct retries
* JK_STATUS_FATAL_ERROR JK_HTTP_SERVER_BUSY JK_FALSE ajp_get_reply()
* returns JK_STATUS_FATAL_ERROR
* Only if !op->recoverable
* JK_REPLY_TIMEOUT JK_HTTP_GATEWAY_TIME_OUT JK_TRUE ajp_get_reply()
* returns JK_REPLY_TIMEOUT
* JK_AJP_PROTOCOL_ERROR JK_HTTP_GATEWAY_TIME_OUT ? ajp_get_reply()
* returns JK_AJP_PROTOCOL_ERROR
* ??? JK_FATAL_ERROR JK_HTTP_GATEWAY_TIME_OUT JK_FALSE ajp_get_reply()
* returns something else
* Only if !op->recoverable
* ??? JK_FALSE JK_HTTP_SERVER_BUSY JK_TRUE ajp_get_reply()
* returns JK_FALSE
* Only if op->recoverable and no more ajp13/ajp14 direct retries
* JK_TRUE JK_HTTP_OK ? OK
*/
static int JK_METHOD ajp_service(jk_endpoint_t *e,
jk_ws_service_t *s,
jk_log_context_t *l, int *is_error)
{
int i;
int err = JK_TRUE;
ajp_operation_t oper;
ajp_operation_t *op = &oper;
ajp_endpoint_t *p;
ajp_worker_t *aw;
int log_error;
int rc = JK_UNSET;
char *msg = "";
int retry_interval;
int busy;
JK_TRACE_ENTER(l);
if (!e || !e->endpoint_private || !s || !is_error) {
JK_LOG_NULL_PARAMS(l);
if (is_error)
*is_error = JK_HTTP_SERVER_ERROR;
JK_TRACE_EXIT(l);
return JK_FALSE;
}
/* Reset endpoint read and write sizes for
* this request.
*/
e->rd = e->wr = 0;
e->recoverable = JK_TRUE;
p = e->endpoint_private;
aw = p->worker;
if (aw->sequence != aw->s->h.sequence)
jk_ajp_pull(aw, JK_FALSE, l);
aw->s->used++;
/* Set returned error to SERVER ERROR
*/
*is_error = JK_HTTP_SERVER_ERROR;
op->request = jk_b_new(&(p->pool));
if (!op->request) {
jk_log(l, JK_LOG_ERROR,
"(%s) Failed allocating AJP request message", aw->name);
JK_TRACE_EXIT(l);
return JK_SERVER_ERROR;
}
if (jk_b_set_buffer_size(op->request, aw->max_packet_size)) {
jk_log(l, JK_LOG_ERROR,
"(%s) Failed allocating AJP request message buffer", aw->name);
JK_TRACE_EXIT(l);
return JK_SERVER_ERROR;
}
jk_b_reset(op->request);
op->reply = jk_b_new(&(p->pool));
if (!op->reply) {
jk_log(l, JK_LOG_ERROR,
"(%s) Failed allocating AJP reply message", aw->name);
JK_TRACE_EXIT(l);
return JK_SERVER_ERROR;
}
if (jk_b_set_buffer_size(op->reply, aw->max_packet_size)) {
jk_log(l, JK_LOG_ERROR,
"(%s) Failed allocating AJP reply message buffer", aw->name);
JK_TRACE_EXIT(l);
return JK_SERVER_ERROR;
}
op->post = jk_b_new(&(p->pool));
if (!op->post) {
jk_log(l, JK_LOG_ERROR,
"(%s) Failed allocating AJP post message", aw->name);
JK_TRACE_EXIT(l);
return JK_SERVER_ERROR;
}
if (jk_b_set_buffer_size(op->post, aw->max_packet_size)) {
jk_log(l, JK_LOG_ERROR,
"(%s) Failed allocating AJP post message buffer", aw->name);
JK_TRACE_EXIT(l);
return JK_SERVER_ERROR;
}
jk_b_reset(op->post);
/* Set returned error to OK
*/
*is_error = JK_HTTP_OK;
op->recoverable = JK_TRUE;
op->uploadfd = -1; /* not yet used, later ;) */
p->left_bytes_to_send = s->content_length;
p->reuse = JK_FALSE;
p->hard_close = JK_FALSE;
s->secret = aw->secret;
/*
* We get here initial request (in op->request)
*/
if (!ajp_marshal_into_msgb(op->request, s, l, p)) {
*is_error = JK_HTTP_REQUEST_TOO_LARGE;
jk_log(l, JK_LOG_INFO,
"(%s) Creating AJP message failed "
"without recovery - check max_packet_size", aw->name);
aw->s->client_errors++;
JK_TRACE_EXIT(l);
return JK_CLIENT_ERROR;
}
if (JK_IS_DEBUG_LEVEL(l)) {
jk_log(l, JK_LOG_DEBUG, "processing %s with %d retries",
aw->name, aw->retries);
}
busy = JK_ATOMIC_INCREMENT(&(aw->s->busy));
if (aw->busy_limit > 0 && busy > aw->busy_limit) {
JK_ATOMIC_DECREMENT(&(aw->s->busy));
e->recoverable = JK_TRUE;
aw->s->errors++;
aw->s->error_time = time(NULL);
*is_error = JK_HTTP_SERVER_BUSY;
rc = JK_BUSY_ERROR;
jk_log(l, JK_LOG_ERROR,
"(%s) sending request to tomcat failed (recoverable), "
"busy limit %d reached (rc=%d, errors=%d, client_errors=%d).",
aw->name, aw->busy_limit, rc, aw->s->errors,
aw->s->client_errors);
JK_TRACE_EXIT(l);
return rc;
}
if (aw->s->state == JK_AJP_STATE_ERROR)
aw->s->state = JK_AJP_STATE_PROBE;
if (busy > aw->s->max_busy)
aw->s->max_busy = busy;
retry_interval = p->worker->retry_interval;
for (i = 0; i < aw->retries; i++) {
/* Reset reply message buffer for each retry
*/
jk_b_reset(op->reply);
/*
* ajp_send_request() already locally handles
* reconnecting and broken connection detection.
* So if we already failed in it, wait a bit before
* retrying the same backend.
*/
if (i > 0 && retry_interval >= 0) {
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) retry %d, sleeping for %d ms before retrying",
aw->name, i, retry_interval);
jk_sleep(retry_interval);
/* Pull shared memory if something changed during sleep
*/
if (aw->sequence != aw->s->h.sequence)
jk_ajp_pull(aw, JK_FALSE, l);
}
/*
* We're using op->request which hold initial request
* if Tomcat is stopped or restarted, we will pass op->request
* to next valid tomcat.
*/
log_error = JK_TRUE;
rc = JK_UNSET;
msg = "";
err = ajp_send_request(e, s, l, p, op);
e->recoverable = op->recoverable;
if (err == JK_CLIENT_RD_ERROR) {
*is_error = JK_HTTP_BAD_REQUEST;
msg = "because of client read error";
aw->s->client_errors++;
rc = JK_CLIENT_ERROR;
log_error = JK_FALSE;
e->recoverable = JK_FALSE;
/* Ajp message set reuse to TRUE in END_REQUEST message
* However due to client bad request if the recovery
* RECOVER_ABORT_IF_CLIENTERROR is set the physical connection
* will be closed and application in Tomcat can catch that
* generated exception, knowing the client aborted the
* connection. This AJP protocol limitation, where we
* should actually send some packet informing the backend
* that client broke the connection in a middle of
* request/response cycle.
*/
if (aw->recovery_opts & RECOVER_ABORT_IF_CLIENTERROR) {
/* Mark the endpoint for shutdown */
p->reuse = JK_FALSE;
p->hard_close = JK_TRUE;
}
}
else if (err == JK_FATAL_ERROR) {
*is_error = JK_HTTP_SERVER_BUSY;
msg = "because of error during request sending";
rc = err;
if (!op->recoverable) {
*is_error = JK_HTTP_SERVER_ERROR;
msg = "because of protocol error during request sending";
}
}
else if (err == JK_TRUE && op->recoverable) {
/* Up to there we can recover
*/
err = ajp_get_reply(e, s, l, p, op);
e->recoverable = op->recoverable;
if (err == JK_TRUE) {
*is_error = JK_HTTP_OK;
/* Done with the request
*/
ajp_update_stats(e, aw, JK_TRUE, l);
JK_TRACE_EXIT(l);
return JK_TRUE;
}
if (err == JK_CLIENT_RD_ERROR) {
*is_error = JK_HTTP_BAD_REQUEST;
msg = "because of client read error";
aw->s->client_errors++;
rc = JK_CLIENT_ERROR;
log_error = JK_FALSE;
e->recoverable = JK_FALSE;
op->recoverable = JK_FALSE;
if (aw->recovery_opts & RECOVER_ABORT_IF_CLIENTERROR) {
/* Mark the endpoint for shutdown */
p->reuse = JK_FALSE;
p->hard_close = JK_TRUE;
}
}
else if (err == JK_CLIENT_WR_ERROR) {
/* XXX: Is this correct to log this as 200?
*/
*is_error = JK_HTTP_OK;
msg = "because of client write error";
aw->s->client_errors++;
rc = JK_CLIENT_ERROR;
log_error = JK_FALSE;
e->recoverable = JK_FALSE;
op->recoverable = JK_FALSE;
if (aw->recovery_opts & RECOVER_ABORT_IF_CLIENTERROR) {
/* Mark the endpoint for shutdown
*/
p->reuse = JK_FALSE;
p->hard_close = JK_TRUE;
}
}
else if (err == JK_FATAL_ERROR) {
*is_error = JK_HTTP_SERVER_ERROR;
msg = "because of server error";
rc = err;
}
else if (err == JK_REPLY_TIMEOUT) {
*is_error = JK_HTTP_GATEWAY_TIME_OUT;
msg = "because of reply timeout";
aw->s->reply_timeouts++;
rc = err;
}
else if (err == JK_STATUS_ERROR || err == JK_STATUS_FATAL_ERROR) {
*is_error = JK_HTTP_SERVER_BUSY;
msg = "because of response status";
rc = err;
}
else if (err == JK_AJP_PROTOCOL_ERROR) {
*is_error = JK_HTTP_BAD_GATEWAY;
msg = "because of protocol error";
rc = err;
}
/* This should only be the cases err == JK_FALSE
*/
else {
/* if we can't get reply, check if unrecoverable flag was set
* if is_recoverable_error is cleared, we have started
* receiving upload data and we must consider that
* operation is no more recoverable
*/
*is_error = JK_HTTP_BAD_GATEWAY;
msg = "";
rc = JK_FALSE;
}
}
else {
/* XXX: this should never happen:
* ajp_send_request() never returns JK_TRUE if !op->recoverable.
* and all other return values have already been handled.
*/
e->recoverable = JK_FALSE;
*is_error = JK_HTTP_SERVER_ERROR;
msg = "because of an unknown reason";
rc = JK_FATAL_ERROR;
jk_log(l, JK_LOG_ERROR,
"(%s) unexpected condition err=%d (%srecoverable)",
aw->name, err, op->recoverable ? "" : "un");
}
if (!op->recoverable && log_error == JK_TRUE) {
jk_log(l, JK_LOG_ERROR,
"(%s) sending request to tomcat failed (unrecoverable), "
"%s "
"(attempt=%d)",
aw->name, msg, i + 1);
}
else {
jk_log(l, JK_LOG_INFO,
"(%s) sending request to tomcat failed (%srecoverable), "
"%s "
"(attempt=%d)",
aw->name,
op->recoverable ? "" : "un",
msg, i + 1);
}
if (!op->recoverable) {
ajp_update_stats(e, aw, rc, l);
JK_TRACE_EXIT(l);
return rc;
}
/* Get another connection from the pool and try again.
* Note: All sockets are probably closed already.
*/
ajp_next_connection(p, l);
}
ajp_update_stats(e, aw, rc, l);
/* Log the error only once per failed request.
*/
jk_log(l, JK_LOG_ERROR,
"(%s) connecting to tomcat failed "
"(rc=%d, errors=%d, client_errors=%d).",
aw->name, rc, aw->s->errors, aw->s->client_errors);
JK_TRACE_EXIT(l);
return rc;
}
/*
* Validate the worker (ajp13/ajp14)
*/
int ajp_validate(jk_worker_t *pThis,
jk_map_t *props,
jk_worker_env_t *we, jk_log_context_t *l, int proto)
{
int port;
const char *host;
JK_TRACE_ENTER(l);
if (proto == AJP13_PROTO) {
port = AJP13_DEF_PORT;
host = AJP13_DEF_HOST;
}
else if (proto == AJP14_PROTO) {
port = AJP14_DEF_PORT;
host = AJP14_DEF_HOST;
}
else {
if (pThis && pThis->worker_private) {
ajp_worker_t *p = pThis->worker_private;
jk_log(l, JK_LOG_ERROR,
"(%s) unknown protocol %d", p->name, proto);
}
else {
jk_log(l, JK_LOG_ERROR,
"(unset worker) unknown protocol %d", proto);
}
JK_TRACE_EXIT(l);
return JK_FALSE;
}
if (pThis && pThis->worker_private) {
const char *tmp;
ajp_worker_t *p = pThis->worker_private;
p->worker.we = we;
p->port = jk_get_worker_port(props, p->name, port);
if (!host) {
host = "undefined";
}
tmp = jk_get_worker_host(props, p->name, host);
if (jk_check_attribute_length("host name", tmp, l) == JK_FALSE) {
JK_TRACE_EXIT(l);
return JK_FALSE;
}
strncpy(p->host, tmp, JK_SHM_STR_SIZ);
p->prefer_ipv6 = jk_get_worker_prefer_ipv6(props, p->name, JK_FALSE);
tmp = jk_get_worker_source(props, p->name, "");
if (jk_check_attribute_length("source address", tmp, l) == JK_FALSE) {
JK_TRACE_EXIT(l);
return JK_FALSE;
}
strncpy(p->source, tmp, JK_SHM_STR_SIZ);
if (p->s->h.sequence == 0) {
/* Initial setup.
*/
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"worker %s target is '%s:%d'",
p->name, p->host, p->port);
if (p->port > 0) {
if (!jk_resolve(p->host, p->port, &p->worker_inet_addr,
we->pool, p->prefer_ipv6, l)) {
jk_log(l, JK_LOG_ERROR,
"worker %s can't resolve tomcat address %s",
p->name, p->host);
p->s->port = p->port = 0;
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"worker %s contact is disabled",
p->name);
}
}
if (p->source && *p->source) {
if (!jk_resolve(p->source, 0, &p->worker_source_inet_addr,
we->pool, p->prefer_ipv6, l)) {
p->worker_source_inet_addr.ipaddr_ptr = NULL;
jk_log(l, JK_LOG_WARNING,
"worker %s can't resolve source address '%s'",
p->name, p->source);
}
}
p->addr_sequence = 0;
p->s->addr_sequence = 0;
p->s->last_maintain_time = time(NULL);
p->s->last_reset = p->s->last_maintain_time;
p->s->port = p->port;
strncpy(p->s->host, p->host, JK_SHM_STR_SIZ);
jk_ajp_push(p, JK_TRUE, l);
}
else {
/* Somebody already setup this worker.
*/
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"worker %s contact '%s:%d' already configured "
"type=%d (%d) [%d]",
p->name, p->host, p->port, p->s->h.type,
p->s->h.sequence, p->s->addr_sequence);
/* Force resolve */
p->addr_sequence = -1;
jk_ajp_pull(p, JK_TRUE, l);
}
JK_TRACE_EXIT(l);
return JK_TRUE;
}
else {
JK_LOG_NULL_PARAMS(l);
}
JK_TRACE_EXIT(l);
return JK_FALSE;
}
static int ajp_create_endpoint_cache(ajp_worker_t *p, int proto,
jk_log_context_t *l)
{
unsigned int i;
time_t now = time(NULL);
JK_TRACE_ENTER(l);
p->ep_cache = (ajp_endpoint_t **)calloc(1, sizeof(ajp_endpoint_t *) *
p->ep_cache_sz);
if (!p->ep_cache) {
JK_TRACE_EXIT(l);
return JK_FALSE;
}
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) setting connection pool size to %u with min %u "
"and acquire timeout %d",
p->name, p->ep_cache_sz, p->ep_mincache_sz,
p->cache_acquire_timeout);
for (i = 0; i < p->ep_cache_sz; i++) {
p->ep_cache[i] = (ajp_endpoint_t *)calloc(1, sizeof(ajp_endpoint_t));
if (!p->ep_cache[i]) {
jk_log(l, JK_LOG_ERROR,
"(%s) allocating endpoint slot %d (errno=%d)",
p->name, i, errno);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
p->ep_cache[i]->sd = JK_INVALID_SOCKET;
p->ep_cache[i]->reuse = JK_FALSE;
p->ep_cache[i]->avail = JK_TRUE;
p->ep_cache[i]->hard_close = JK_FALSE;
p->ep_cache[i]->last_access = now;
jk_open_pool(&(p->ep_cache[i]->pool), p->ep_cache[i]->buf,
sizeof(p->ep_cache[i]->buf));
p->ep_cache[i]->worker = p;
p->ep_cache[i]->endpoint.endpoint_private = p->ep_cache[i];
p->ep_cache[i]->proto = proto;
p->ep_cache[i]->endpoint.service = ajp_service;
p->ep_cache[i]->endpoint.done = ajp_done;
p->ep_cache[i]->last_op = JK_AJP13_END_RESPONSE;
p->ep_cache[i]->addr_sequence = 0;
}
JK_TRACE_EXIT(l);
return JK_TRUE;
}
int ajp_init(jk_worker_t *pThis, jk_map_t *props, jk_worker_env_t *we,
jk_log_context_t *l, int proto)
{
int rc = JK_FALSE;
int cache;
/*
* start the connection cache
*/
JK_TRACE_ENTER(l);
cache = jk_get_worker_def_cache_size(proto);
if (pThis && pThis->worker_private) {
ajp_worker_t *p = pThis->worker_private;
p->worker.we = we;
p->ep_cache_sz = jk_get_worker_cache_size(props, p->name, cache);
p->ep_mincache_sz = jk_get_worker_cache_size_min(props, p->name,
(p->ep_cache_sz+1) / 2);
p->socket_timeout =
jk_get_worker_socket_timeout(props, p->name, AJP_DEF_SOCKET_TIMEOUT);
p->socket_connect_timeout =
jk_get_worker_socket_connect_timeout(props, p->name,
p->socket_timeout * 1000);
p->keepalive =
jk_get_worker_socket_keepalive(props, p->name, JK_FALSE);
p->cache_timeout =
jk_get_worker_cache_timeout(props, p->name,
AJP_DEF_CACHE_TIMEOUT);
p->ping_timeout =
jk_get_worker_ping_timeout(props, p->name,
AJP_DEF_PING_TIMEOUT);
p->ping_mode =
jk_get_worker_ping_mode(props, p->name,
AJP_CPING_NONE);
p->connect_timeout =
jk_get_worker_connect_timeout(props, p->name,
AJP_DEF_CONNECT_TIMEOUT);
p->prepost_timeout =
jk_get_worker_prepost_timeout(props, p->name,
AJP_DEF_PREPOST_TIMEOUT);
if ((p->ping_mode & AJP_CPING_CONNECT) &&
p->connect_timeout == AJP_DEF_CONNECT_TIMEOUT)
p->connect_timeout = p->ping_timeout;
if ((p->ping_mode & AJP_CPING_PREPOST) &&
p->prepost_timeout == AJP_DEF_PREPOST_TIMEOUT)
p->prepost_timeout = p->ping_timeout;
p->conn_ping_interval =
jk_get_worker_conn_ping_interval(props, p->name, 0);
if ((p->ping_mode & AJP_CPING_INTERVAL) &&
p->conn_ping_interval == 0) {
/* XXX: Ping timeout is in miliseconds
* and ping_interval is in seconds.
* Use 10 times larger value for ping interval
* (ping_timeout / 1000) * 10
*/
p->conn_ping_interval = p->ping_timeout / 100;
}
p->reply_timeout =
jk_get_worker_reply_timeout(props, p->name,
AJP_DEF_REPLY_TIMEOUT);
p->recovery_opts =
jk_get_worker_recovery_opts(props, p->name,
AJP_DEF_RECOVERY_OPTS);
p->retries =
jk_get_worker_retries(props, p->name,
JK_RETRIES);
p->lb_retries =
jk_get_worker_lb_retries(props, p->name,
JK_LB_RETRIES);
p->max_packet_size =
jk_get_max_packet_size(props, p->name);
p->socket_buf =
jk_get_worker_socket_buffer(props, p->name, p->max_packet_size);
p->retry_interval =
jk_get_worker_retry_interval(props, p->name,
JK_SLEEP_DEF);
p->cache_acquire_timeout = jk_get_worker_cache_acquire_timeout(props,
p->name, p->retries * p->retry_interval);
p->busy_limit =
jk_get_worker_busy_limit(props, p->name, 0);
jk_get_worker_fail_on_status(props, p->name,
&(p->http_status_fail),
&(p->http_status_fail_num));
if (p->retries < 1) {
jk_log(l, JK_LOG_INFO,
"(%s) number of retries must be greater then 1. "
"Setting to default=%d",
p->name, JK_RETRIES);
p->retries = JK_RETRIES;
}
p->maintain_time = jk_get_worker_maintain_time(props);
if(p->maintain_time < 0)
p->maintain_time = 0;
if (JK_IS_DEBUG_LEVEL(l)) {
jk_log(l, JK_LOG_DEBUG,
"(%s) setting endpoint options:",
p->name);
jk_log(l, JK_LOG_DEBUG,
"keepalive: %d",
p->keepalive);
jk_log(l, JK_LOG_DEBUG,
"socket timeout: %d",
p->socket_timeout);
jk_log(l, JK_LOG_DEBUG,
"socket connect timeout: %d",
p->socket_connect_timeout);
jk_log(l, JK_LOG_DEBUG,
"buffer size: %d",
p->socket_buf);
jk_log(l, JK_LOG_DEBUG,
"pool timeout: %d",
p->cache_timeout);
jk_log(l, JK_LOG_DEBUG,
"ping timeout: %d",
p->ping_timeout);
jk_log(l, JK_LOG_DEBUG,
"connect timeout: %d",
p->connect_timeout);
jk_log(l, JK_LOG_DEBUG,
"reply timeout: %d",
p->reply_timeout);
jk_log(l, JK_LOG_DEBUG,
"prepost timeout: %d",
p->prepost_timeout);
jk_log(l, JK_LOG_DEBUG,
"recovery options: %d",
p->recovery_opts);
jk_log(l, JK_LOG_DEBUG,
"retries: %d",
p->retries);
jk_log(l, JK_LOG_DEBUG,
"max packet size: %d",
p->max_packet_size);
jk_log(l, JK_LOG_DEBUG,
"retry interval: %d",
p->retry_interval);
jk_log(l, JK_LOG_DEBUG,
"busy limit: %d",
p->busy_limit);
}
/*
* Need to initialize secret here since we could return from inside
* of the following loop
*/
p->secret = jk_get_worker_secret(props, p->name);
if (!ajp_create_endpoint_cache(p, proto, l)) {
jk_log(l, JK_LOG_ERROR,
"(%s) allocating connection pool of size %u",
p->name, p->ep_cache_sz);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
rc = JK_TRUE;
}
else {
JK_LOG_NULL_PARAMS(l);
}
JK_TRACE_EXIT(l);
return rc;
}
int JK_METHOD ajp_worker_factory(jk_worker_t **w,
const char *name, jk_log_context_t *l)
{
int rc;
ajp_worker_t *aw;
JK_TRACE_ENTER(l);
if (name == NULL || w == NULL) {
JK_LOG_NULL_PARAMS(l);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
aw = (ajp_worker_t *) calloc(1, sizeof(ajp_worker_t));
if (!aw) {
jk_log(l, JK_LOG_ERROR,
"(%s) malloc of private_data failed", name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
jk_open_pool(&aw->p,
aw->buf,
sizeof(jk_pool_atom_t) * TINY_POOL_SIZE);
strncpy(aw->name, name, JK_SHM_STR_SIZ);
aw->login = NULL;
aw->ep_cache_sz = 0;
aw->ep_cache = NULL;
aw->connect_retry_attempts = AJP_DEF_RETRY_ATTEMPTS;
aw->worker.worker_private = aw;
aw->worker.maintain = ajp_maintain;
aw->worker.shutdown = ajp_shutdown;
aw->logon = NULL;
*w = &aw->worker;
aw->s = jk_shm_alloc_ajp_worker(&aw->p, name, l);
if (!aw->s) {
jk_close_pool(&aw->p);
free(aw);
jk_log(l, JK_LOG_ERROR,
"(%s) allocating ajp worker record from shared memory", aw->name);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
JK_INIT_CS(&aw->cs, rc);
if (!rc) {
jk_log(l, JK_LOG_ERROR,
"(%s) creating thread lock (errno=%d)",
aw->name, errno);
jk_close_pool(&aw->p);
free(aw);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"ajp worker '%s' type=%d created",
aw->name, aw->s->h.type);
JK_TRACE_EXIT(l);
return JK_TRUE;
}
int ajp_destroy(jk_worker_t **pThis, jk_log_context_t *l, int proto)
{
JK_TRACE_ENTER(l);
if (pThis && *pThis && (*pThis)->worker_private) {
unsigned int i;
ajp_worker_t *aw = (*pThis)->worker_private;
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) up to %u endpoints to close",
aw->name, aw->ep_cache_sz);
for (i = 0; i < aw->ep_cache_sz; i++) {
if (aw->ep_cache[i])
ajp_close_endpoint(aw->ep_cache[i], l);
}
free(aw->ep_cache);
JK_DELETE_CS(&aw->cs);
if (aw->login) {
/* take care of removing previously allocated data */
if (aw->login->servlet_engine_name)
free(aw->login->servlet_engine_name);
free(aw->login);
aw->login = NULL;
}
jk_close_pool(&aw->p);
free(aw);
JK_TRACE_EXIT(l);
return JK_TRUE;
}
JK_LOG_NULL_PARAMS(l);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
int JK_METHOD ajp_done(jk_endpoint_t **e, jk_log_context_t *l)
{
JK_TRACE_ENTER(l);
if (e && *e && (*e)->endpoint_private) {
ajp_endpoint_t *p = (*e)->endpoint_private;
ajp_worker_t *w = p->worker;
/* set last_access only if needed */
if (w->cache_timeout > 0)
p->last_access = time(NULL);
if (w->s->addr_sequence != p->addr_sequence) {
p->reuse = JK_FALSE;
p->addr_sequence = w->s->addr_sequence;
}
ajp_reset_endpoint(p, l);
*e = NULL;
JK_ENTER_CS(&w->cs);
p->avail = JK_TRUE;
JK_LEAVE_CS(&w->cs);
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"recycling connection pool for worker %s and socket %d",
p->worker->name, (int)p->sd);
JK_TRACE_EXIT(l);
return JK_TRUE;
}
JK_LOG_NULL_PARAMS(l);
JK_TRACE_EXIT(l);
return JK_FALSE;
}
int ajp_get_endpoint(jk_worker_t *pThis,
jk_endpoint_t **je, jk_log_context_t *l, int proto)
{
JK_TRACE_ENTER(l);
if (pThis && pThis->worker_private && je) {
ajp_worker_t *aw = pThis->worker_private;
ajp_endpoint_t *ae = NULL;
int retry = 0;
*je = NULL;
/* Loop until cache_acquire_timeout interval elapses
*/
while ((retry * JK_SLEEP_DEF) < aw->cache_acquire_timeout) {
unsigned int slot;
JK_ENTER_CS(&aw->cs);
/* Try to find connected socket cache entry
*/
for (slot = 0; slot < aw->ep_cache_sz; slot++) {
if (IS_SLOT_AVAIL(aw->ep_cache[slot]) &&
IS_VALID_SOCKET(aw->ep_cache[slot]->sd)) {
ae = aw->ep_cache[slot];
if (ae->reuse) {
aw->ep_cache[slot]->avail = JK_FALSE;
break;
}
else {
/* XXX: We shouldn't have non reusable
* opened socket in the cache
*/
ajp_reset_endpoint(ae, l);
ae->avail = JK_TRUE;
ae = NULL;
jk_log(l, JK_LOG_WARNING,
"(%s) closing non reusable pool slot=%d",
aw->name, slot);
}
}
}
if (!ae) {
/* No connected cache entry found.
* Use the first free one.
*/
for (slot = 0; slot < aw->ep_cache_sz; slot++) {
if (IS_SLOT_AVAIL(aw->ep_cache[slot])) {
ae = aw->ep_cache[slot];
aw->ep_cache[slot]->avail = JK_FALSE;
break;
}
}
}
JK_LEAVE_CS(&aw->cs);
if (ae) {
if (aw->cache_timeout > 0)
ae->last_access = time(NULL);
*je = &ae->endpoint;
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) acquired connection pool slot=%u "
"after %d retries",
aw->name, slot, retry);
JK_TRACE_EXIT(l);
return JK_TRUE;
}
else {
retry++;
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"could not get free endpoint for worker %s"
" (retry %d, sleeping for %d ms)",
aw->name, retry, JK_SLEEP_DEF);
jk_sleep(JK_SLEEP_DEF);
}
}
jk_log(l, JK_LOG_WARNING,
"Unable to get the free endpoint for worker %s from %u slots",
aw->name, aw->ep_cache_sz);
}
else {
JK_LOG_NULL_PARAMS(l);
}
JK_TRACE_EXIT(l);
return JK_FALSE;
}
int JK_METHOD ajp_maintain(jk_worker_t *pThis, time_t mstarted, int global,
jk_log_context_t *l)
{
JK_TRACE_ENTER(l);
if (pThis && pThis->worker_private) {
ajp_worker_t *aw = pThis->worker_private;
int i;
unsigned int n = 0, k = 0, cnt = 0;
unsigned int m, m_count = 0;
jk_sock_t *m_sock;
/* Do connection pool maintenance only if timeouts or keepalives are set
*/
if (aw->cache_timeout <= 0 &&
aw->conn_ping_interval <= 0) {
/* Nothing to do.
*/
JK_TRACE_EXIT(l);
return JK_TRUE;
}
JK_ENTER_CS(&aw->cs);
/* Count open slots
*/
for (i = (int)aw->ep_cache_sz - 1; i >= 0; i--) {
if (aw->ep_cache[i] && IS_VALID_SOCKET(aw->ep_cache[i]->sd))
cnt++;
}
m_sock = (jk_sock_t *)malloc((cnt + 1) * sizeof(jk_sock_t));
/* Handle worker cache timeouts
*/
if (aw->cache_timeout > 0) {
for (i = (int)aw->ep_cache_sz - 1;
i >= 0; i--) {
/* Skip the closed sockets
*/
if (IS_SLOT_AVAIL(aw->ep_cache[i]) &&
IS_VALID_SOCKET(aw->ep_cache[i]->sd)) {
int elapsed = (int)difftime(mstarted,
aw->ep_cache[i]->last_access);
if (elapsed > aw->cache_timeout) {
time_t rt = 0;
n++;
if (JK_IS_DEBUG_LEVEL(l))
rt = time(NULL);
aw->ep_cache[i]->reuse = JK_FALSE;
m_sock[m_count++] = aw->ep_cache[i]->sd;
aw->ep_cache[i]->sd = JK_INVALID_SOCKET;
ajp_reset_endpoint(aw->ep_cache[i], l);
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) cleaning pool slot=%d elapsed %d in %d",
aw->name, i, elapsed,
(int)(difftime(time(NULL), rt)));
}
}
if (cnt <= aw->ep_mincache_sz + n) {
if (JK_IS_DEBUG_LEVEL(l)) {
jk_log(l, JK_LOG_DEBUG,
"(%s) reached pool min size %u from %u cache slots",
aw->name, aw->ep_mincache_sz, aw->ep_cache_sz);
}
break;
}
}
}
/* Handle worker connection keepalive
*/
if (aw->conn_ping_interval > 0 && aw->ping_timeout > 0) {
for (i = (int)aw->ep_cache_sz - 1; i >= 0; i--) {
/* Skip the closed sockets
*/
if (IS_SLOT_AVAIL(aw->ep_cache[i]) &&
IS_VALID_SOCKET(aw->ep_cache[i]->sd)) {
int elapsed = (int)difftime(mstarted,
aw->ep_cache[i]->last_access);
if (elapsed > aw->conn_ping_interval) {
k++;
/* handle cping/cpong.
*/
if (ajp_handle_cping_cpong(aw->ep_cache[i],
aw->ping_timeout, l) == JK_FALSE) {
jk_log(l, JK_LOG_INFO,
"(%s) failed sending request, "
"socket %d keepalive cping/cpong "
"failure (errno=%d)",
aw->name,
aw->ep_cache[i]->sd,
aw->ep_cache[i]->last_errno);
aw->ep_cache[i]->reuse = JK_FALSE;
m_sock[m_count++] = aw->ep_cache[i]->sd;
aw->ep_cache[i]->sd = JK_INVALID_SOCKET;
ajp_reset_endpoint(aw->ep_cache[i], l);
}
}
}
}
}
JK_LEAVE_CS(&aw->cs);
/* Shutdown sockets outside of the lock.
* This has benefits only if maintain was
* called from the watchdog thread.
*/
for (m = 0; m < m_count; m++) {
if (IS_VALID_SOCKET(m_sock[m])) {
jk_shutdown_socket(m_sock[m], l);
JK_ATOMIC_DECREMENT(&(aw->s->connected));
}
}
free(m_sock);
if ((k + n) && JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) pinged %u and recycled %u sockets in %d seconds "
"from %u pool slots",
aw->name, k, n, (int)(difftime(time(NULL), mstarted)),
aw->ep_cache_sz);
JK_TRACE_EXIT(l);
return JK_TRUE;
}
else {
JK_LOG_NULL_PARAMS(l);
}
JK_TRACE_EXIT(l);
return JK_FALSE;
}
int JK_METHOD ajp_shutdown(jk_worker_t *pThis, jk_log_context_t *l)
{
JK_TRACE_ENTER(l);
if (pThis && pThis->worker_private) {
ajp_worker_t *aw = pThis->worker_private;
int i;
unsigned int n = 0;
JK_ENTER_CS(&aw->cs);
for (i = (int)aw->ep_cache_sz - 1;
i >= 0; i--) {
/* Skip the closed sockets
*/
if (IS_SLOT_AVAIL(aw->ep_cache[i]) &&
IS_VALID_SOCKET(aw->ep_cache[i]->sd)) {
n++;
aw->ep_cache[i]->reuse = JK_FALSE;
aw->ep_cache[i]->hard_close = JK_TRUE;
ajp_reset_endpoint(aw->ep_cache[i], l);
aw->ep_cache[i]->sd = JK_INVALID_SOCKET;
if (JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) shut down pool slot=%d",
aw->name, i);
}
}
JK_LEAVE_CS(&aw->cs);
if (n && JK_IS_DEBUG_LEVEL(l))
jk_log(l, JK_LOG_DEBUG,
"(%s) shut down %u sockets from %u pool slots",
aw->name, n, aw->ep_cache_sz);
JK_TRACE_EXIT(l);
return JK_TRUE;
}
else {
JK_LOG_NULL_PARAMS(l);
}
JK_TRACE_EXIT(l);
return JK_FALSE;
}
int ajp_has_endpoint(jk_worker_t *pThis,
jk_log_context_t *l)
{
JK_TRACE_ENTER(l);
if (pThis && pThis->worker_private) {
ajp_worker_t *aw = pThis->worker_private;
unsigned int slot;
JK_ENTER_CS(&aw->cs);
/* Try to find connected socket cache entry
*/
for (slot = 0; slot < aw->ep_cache_sz; slot++) {
if (IS_SLOT_AVAIL(aw->ep_cache[slot])) {
JK_LEAVE_CS(&aw->cs);
return JK_TRUE;
}
}
JK_LEAVE_CS(&aw->cs);
}
else {
JK_LOG_NULL_PARAMS(l);
}
JK_TRACE_EXIT(l);
return JK_FALSE;
}