blob: bcd91c4a03a901869cfe5ef45f5361e2a93e5207 [file] [log] [blame]
/*
* Copyright 1999-2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* @version $Id$ */
#include "pr_warp.h"
/* Initialize this provider. */
static const char *warp_init(void) {
wa_debug(WA_MARK,"WARP provider initialized");
return(NULL);
}
/* Notify this provider of its imminent startup. */
static void warp_startup(void) {
wa_chain *elem=warp_connections;
/* Open all connections having deployed applications */
while (elem!=NULL) {
wa_connection *curr=(wa_connection *)elem->curr;
warp_config *conf=(warp_config *)curr->conf;
apr_socket_t * sock = 0;
wa_debug(WA_MARK,"Opening connection \"%s\"",curr->name);
sock=n_connect(curr);
if (sock!=NULL) {
wa_debug(WA_MARK,"Connection \"%s\" opened",curr->name);
if (c_configure(curr,sock)==wa_true) {
wa_debug(WA_MARK,"Connection \"%s\" configured",curr->name);
warp_sockpool_release(conf->socket_pool, curr, sock);
} else {
wa_log(WA_MARK,"Cannot configure connection \"%s\"",curr->name);
}
} else wa_log(WA_MARK,"Cannot open connection \"%s\"",curr->name);
elem=elem->next;
}
wa_debug(WA_MARK,"WARP provider started");
}
/* Cleans up all resources allocated by this provider. */
static void warp_shutdown(void) {
wa_debug(WA_MARK,"WARP provider shut down");
}
/* Configure a connection with the parameter from the web server
configuration file. */
static const char *warp_connect(wa_connection *conn, const char *param) {
apr_status_t r=APR_SUCCESS;
warp_config *conf=NULL;
apr_port_t port=0;
char *addr=NULL;
char *scop=NULL;
/* Allocation and checking */
conf=(warp_config *)apr_palloc(wa_pool,sizeof(warp_config));
if (conf==NULL) return("Cannot allocate connection configuration");
/* Check and parse parameter */
if (param==NULL) return("Parameter for connection not specified");
if (apr_parse_addr_port(&addr,&scop,&port,param,wa_pool)!=APR_SUCCESS)
return("Invalid format for parameter");
if (addr==NULL) return("Host name not specified in parameter");
if (scop!=NULL) return("Invalid format for parameter (scope)");
if (port==0) return("Port number not specified in parameter");
/* Create and APR sockaddr structure */
r=apr_sockaddr_info_get(&(conf->addr),addr,APR_INET,port,0,wa_pool);
if (r!=APR_SUCCESS) return("Cannot get socket address information");
/* Done */
#if APR_HAS_THREADS
apr_atomic_set(&conf->serv, (unsigned) 0);
apr_atomic_set(&conf->open_socket_count, (unsigned) 0);
#else
conf->serv = 0;
conf->open_socket_count = 0;
#endif
conn->conf=conf;
/* Create the socket pool */
conf->socket_pool = warp_sockpool_create();
if (conf->socket_pool == NULL) return("Cannot create socket pool");
return(NULL);
}
/* Receive notification of the deployment of an application. */
static const char *warp_deploy(wa_application *appl) {
wa_chain *elem=warp_connections;
wa_connection *conn=appl->conn;
/* Integer configuration -1 equals application not deployed */
appl->conf=(void *)-1;
/* Check if the connection specified in this application has already
been stored in our local array of connections */
while (elem!=NULL) {
if (conn==elem->curr) break;
elem=elem->next;
}
if (elem==NULL) {
elem=(wa_chain *)apr_palloc(wa_pool,sizeof(wa_chain));
elem->curr=conn;
elem->next=warp_connections;
warp_connections=elem;
}
/* Check if this application has already been stored in our local array of
applications */
elem=warp_applications;
while (elem!=NULL) {
if (appl==elem->curr) break;
elem=elem->next;
}
if (elem==NULL) {
elem=(wa_chain *)apr_palloc(wa_pool,sizeof(wa_chain));
elem->curr=appl;
elem->next=warp_applications;
warp_applications=elem;
}
return(NULL);
}
/* Describe the configuration member found in a connection. */
static char *warp_conninfo(wa_connection *conn, apr_pool_t *pool) {
warp_config *conf=(warp_config *)conn->conf;
apr_port_t port=0;
char *addr=NULL;
char *name=NULL;
char *buff=NULL;
#if APR_HAS_THREADS
apr_uint32_t socket_count = apr_atomic_read(&conf->open_socket_count);
apr_uint32_t server_id = apr_atomic_read(&conf->serv);
#else
apr_uint32_t socket_count = conf->open_socket_count;
apr_uint32_t server_id = conf->serv;
#endif
if (conf==NULL) return("Invalid configuration member");
apr_sockaddr_port_get(&port,conf->addr);
apr_sockaddr_ip_get(&addr,conf->addr);
apr_getnameinfo(&name,conf->addr,0);
buff=apr_psprintf(pool,"Host: %s Port:%d Address:%s Socket Count: %d Server ID: %d",
name,port,addr,socket_count,server_id);
return(buff);
}
/* Describe the configuration member found in a web application. */
static char *warp_applinfo(wa_application *appl, apr_pool_t *pool) {
return(apr_psprintf(pool,"Application ID: %d",(int)(appl->conf)));
}
/* Transmit headers */
static int headers(void *d, const char *n, const char *v) {
warp_header *data=(warp_header *)d;
wa_connection *conn=data->conn;
warp_config *conf=(warp_config *)conn->conf;
warp_packet *pack=data->pack;
pack->type=TYPE_REQ_HEADER;
p_write_string(pack,(char *)n);
p_write_string(pack,(char *)v);
if (n_send(data->sock,pack)!=wa_true) {
data->fail=wa_true;
return(FALSE);
}
wa_debug(WA_MARK,"Req. header %s: %s",n,v);
return(TRUE);
}
/* Handle a connection from the web server. */
static int warp_handle(wa_request *r, wa_application *appl) {
warp_header *h=(warp_header *)apr_palloc(r->pool,sizeof(warp_header));
wa_connection *conn=appl->conn;
warp_config *conf=(warp_config *)conn->conf;
warp_packet *pack=p_create(r->pool);
apr_socket_t * sock = NULL;
int status=0;
/* Check packet */
if (pack==NULL)
return(wa_rerror(WA_MARK,r,500,"Cannot create WARP packet"));
/* Check application */
if (((int)(appl->conf))==-1)
return(wa_rerror(WA_MARK,r,404,"Application not deployed"));
sock = warp_sockpool_acquire(conf->socket_pool);
/* Attempt to reconnect if disconnected */
if (sock==NULL) {
sock=n_connect(conn);
if (sock!=NULL) {
wa_debug(WA_MARK,"Connection \"%s\" opened",conn->name);
if (c_configure(conn,sock)==wa_true) {
wa_debug(WA_MARK,"Connection \"%s\" configured",conn->name);
} else {
wa_log(WA_MARK,"Cannot configure connection %s",conn->name);
return(wa_rerror(WA_MARK,r,500,
"Cannot configure connection \"%s\"",
conn->name));
}
} else {
wa_log(WA_MARK,"Cannot open connection %s",conn->name);
return(wa_rerror(WA_MARK,r,500,"Cannot open connection %s",
conn->name));
}
}
/* Let's do it */
pack->type=TYPE_REQ_INIT;
p_write_int(pack,(int)(appl->conf));
p_write_string(pack,r->meth);
p_write_string(pack,r->ruri);
p_write_string(pack,r->args);
p_write_string(pack,r->prot);
if (n_send(sock,pack)!=wa_true) {
n_disconnect(conn, sock);
sock=n_connect(conn);
if (sock!=NULL) {
wa_debug(WA_MARK,"Connection \"%s\" reopened",conn->name);
if (c_configure(conn,sock)==wa_true) {
wa_debug(WA_MARK,"Connection \"%s\" reconfigured",conn->name);
} else {
wa_log(WA_MARK,"Cannot reconfigure connection %s",conn->name);
return(wa_rerror(WA_MARK,r,500,
"Cannot reconfigure connection \"%s\"",
conn->name));
}
if (n_send(sock,pack)!=wa_true) {
n_disconnect(conn, sock);
return(wa_rerror(WA_MARK,r,500,
"Communication broken while reconnecting"));
} else {
wa_debug(WA_MARK,"Re-Req. %s %s %s",r->meth,r->ruri,r->prot);
}
} else {
wa_log(WA_MARK,"Cannot open connection %s",conn->name);
return(wa_rerror(WA_MARK,r,500,"Cannot open connection %s",
conn->name));
}
} else {
wa_debug(WA_MARK,"Req. %s %s %s",r->meth,r->ruri,r->prot);
}
p_reset(pack);
pack->type=TYPE_REQ_CONTENT;
p_write_string(pack,r->ctyp);
p_write_int(pack,r->clen);
if (n_send(sock,pack)!=wa_true) {
n_disconnect(conn, sock);
return(wa_rerror(WA_MARK,r,500,"Communication interrupted"));
} else {
wa_debug(WA_MARK,"Req. content typ=%s len=%d",r->ctyp,r->clen);
}
if (r->schm!=NULL) {
p_reset(pack);
pack->type=TYPE_REQ_SCHEME;
p_write_string(pack,r->schm);
if (n_send(sock,pack)!=wa_true) {
n_disconnect(conn,sock);
return(wa_rerror(WA_MARK,r,500,"Communication interrupted"));
} else {
wa_debug(WA_MARK,"Req. scheme %s",r->schm);
}
}
if ((r->user!=NULL)||(r->auth!=NULL)) {
p_reset(pack);
pack->type=TYPE_REQ_AUTH;
if (r->user==NULL) r->user="\0";
if (r->auth==NULL) r->auth="\0";
p_write_string(pack,r->user);
p_write_string(pack,r->auth);
if (n_send(sock,pack)!=wa_true) {
n_disconnect(conn,sock);
return(wa_rerror(WA_MARK,r,500,"Communication interrupted"));
} else {
wa_debug(WA_MARK,"Req. user %s auth %s",r->user,r->auth);
}
}
/* The request headers */
h->conn=conn;
h->pack=pack;
h->fail=wa_false;
h->sock=sock;
apr_table_do(headers,h,r->hdrs,NULL);
if (h->fail==wa_true) {
n_disconnect(conn,sock);
return(wa_rerror(WA_MARK,r,500,"Communication interrupted"));
}
/* The request client data */
if (r->clnt!=NULL) {
p_reset(pack);
pack->type=TYPE_REQ_CLIENT;
p_write_string(pack,r->clnt->host);
p_write_string(pack,r->clnt->addr);
p_write_ushort(pack,r->clnt->port);
if (n_send(sock,pack)!=wa_true) {
n_disconnect(conn,sock);
return(wa_rerror(WA_MARK,r,500,"Communication interrupted"));
} else {
wa_debug(WA_MARK,"Req. server %s:%d (%s)",r->clnt->host,
r->clnt->port,r->clnt->addr);
}
}
/* The request server data */
if (r->serv!=NULL) {
p_reset(pack);
pack->type=TYPE_REQ_SERVER;
p_write_string(pack,r->serv->host);
p_write_string(pack,r->serv->addr);
p_write_ushort(pack,r->serv->port);
if (n_send(sock,pack)!=wa_true) {
n_disconnect(conn,sock);
return(wa_rerror(WA_MARK,r,500,"Communication interrupted"));
} else {
wa_debug(WA_MARK,"Req. client %s:%d (%s)",r->serv->host,
r->serv->port,r->serv->addr);
}
}
p_reset(pack);
pack->type=TYPE_REQ_PROCEED;
if (n_send(sock,pack)!=wa_true) {
n_disconnect(conn,sock);
return(wa_rerror(WA_MARK,r,500,"Communication interrupted"));
}
while (1) {
if (n_recv(sock,pack)!=wa_true) {
n_disconnect(conn,sock);
return(wa_rerror(WA_MARK,r,500,"Communication interrupted"));
}
switch (pack->type) {
case TYPE_RES_STATUS: {
char *mesg=NULL;
p_read_ushort(pack,&status);
p_read_string(pack,&mesg);
wa_debug(WA_MARK,"=== %d %s",status,mesg);
wa_rsetstatus(r,status,mesg);
break;
}
case TYPE_RES_HEADER: {
char *name=NULL;
char *valu=NULL;
p_read_string(pack,&name);
p_read_string(pack,&valu);
if (strcasecmp("content-type",name)==0)
wa_rsetctype(r,valu);
else wa_rsetheader(r,name,valu);
wa_debug(WA_MARK,"=== %s: %s",name,valu);
break;
}
case TYPE_RES_COMMIT: {
wa_rcommit(r);
wa_debug(WA_MARK,"=== ");
break;
}
case TYPE_RES_BODY: {
wa_rwrite(r,pack->buff,pack->size);
wa_rflush(r);
pack->buff[pack->size]='\0';
wa_debug(WA_MARK,"Response body bytes: %d",pack->size);
break;
}
case TYPE_RES_DONE: {
wa_debug(WA_MARK,"=== DONE ===");
if (sock != NULL) {
warp_sockpool_release(conf->socket_pool, conn, sock);
}
return(status);
break;
}
case TYPE_CBK_READ: {
int size=-1;
p_read_ushort(pack,&size);
p_reset(pack);
wa_debug(WA_MARK,"Request body bytes: (Req=%d)",size);
size=wa_rread(r,pack->buff,size);
wa_debug(WA_MARK,"Request body bytes: (Got=%d)",size);
if (size==0) {
pack->type=TYPE_CBK_DONE;
} else if (size>0) {
pack->type=TYPE_CBK_DATA;
pack->size=size;
} else {
pack->type=TYPE_ERROR;
p_write_string(pack,"Transfer interrupted");
}
wa_debug(WA_MARK,"Request body bytes: (Sent=%d)",pack->size);
if (n_send(sock,pack)!=wa_true) {
n_disconnect(conn,sock);
return(wa_rerror(WA_MARK,r,500,"Communication interrupted"));
}
break;
}
case TYPE_ASK_SSL: {
wa_debug(WA_MARK,"TYPE_ASK_SSL");
/* Request for client certificate */
if (r->ssld==NULL) {
pack->type=TYPE_REP_SSL_NO;
pack->size=0;
} else {
pack->type=TYPE_REP_SSL;
p_write_string(pack,r->ssld->ciph);
p_write_string(pack,r->ssld->sess);
p_write_int(pack,r->ssld->size);
}
wa_debug(WA_MARK,"CC bytes: (Sent=%d)",pack->size);
if (n_send(sock,pack)!=wa_true) {
n_disconnect(conn,sock);
return(wa_rerror(WA_MARK,r,500,"Communication interrupted"));
}
break;
}
case TYPE_ASK_SSL_CLIENT: {
wa_debug(WA_MARK,"TYPE_ASK_SSL_CLIENT");
/* Request for client certificate */
if (r->ssld==NULL || r->ssld->cert==NULL) {
pack->type=TYPE_REP_SSL_NO;
pack->size=0;
} else {
pack->type=TYPE_REP_SSL_CERT;
p_write_string(pack,r->ssld->cert);
}
wa_debug(WA_MARK,"CC bytes: (Sent=%d)",pack->size);
if (n_send(sock,pack)!=wa_true) {
n_disconnect(conn,sock);
return(wa_rerror(WA_MARK,r,500,"Communication interrupted"));
}
break;
}
case TYPE_ERROR: {
char *mesg=NULL;
p_read_string(pack,&mesg);
if (sock != NULL) {
warp_sockpool_release(conf->socket_pool,
conn, sock);
}
return(wa_rerror(WA_MARK,r,500,"%s",mesg));
}
default: {
n_disconnect(conn,sock);
return(wa_rerror(WA_MARK,r,500,"Invalid packet %d",pack->type));
}
}
}
if (sock != NULL) {
warp_sockpool_release(conf->socket_pool, conn, sock);
}
return status;
}
/* The list of all configured connections */
wa_chain *warp_connections=NULL;
/* The list of all deployed connections */
wa_chain *warp_applications=NULL;
/* The warp provider structure */
wa_provider wa_provider_warp = {
"warp",
warp_init,
warp_startup,
warp_shutdown,
warp_connect,
warp_deploy,
warp_conninfo,
warp_applinfo,
warp_handle,
};