blob: aa08bd2d9c21953c0bd7d3f34fac05c2c4a837a2 [file] [log] [blame]
/* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*/
#include <apr_pools.h>
#include <apr_poll.h>
#include <apr_version.h>
#include <apr_portable.h>
#include <apr_strings.h>
#include "serf.h"
#include "serf_bucket_util.h"
#include "serf_private.h"
static apr_status_t pump_cleanup(void *baton)
{
serf_pump_t *pump = baton;
if (pump->ostream_head != NULL) {
#ifdef SERF_DEBUG_BUCKET_USE
serf__bucket_drain(conn->ostream_head);
#endif
serf_bucket_destroy(pump->ostream_head);
pump->ostream_head = NULL;
pump->ostream_tail = NULL;
}
pump->pool = NULL; /* Don't run again */
pump->allocator = NULL;
pump->skt = NULL;
pump->vec_len = 0;
return APR_SUCCESS;
}
void serf_pump__init(serf_pump_t *pump,
serf_io_baton_t *io,
apr_socket_t *skt,
serf_config_t *config,
serf_bucket_alloc_t *allocator,
apr_pool_t *pool)
{
memset(pump, 0, sizeof(*pump));
pump->io = io;
pump->allocator = allocator;
pump->config = config;
pump->skt = skt;
pump->pool = pool;
apr_pool_cleanup_register(pool, pump, pump_cleanup,
apr_pool_cleanup_null);
}
void serf_pump__done(serf_pump_t *pump)
{
if (pump->pool) {
apr_pool_cleanup_run(pump->pool, pump, pump_cleanup);
}
pump->io = NULL;
pump->allocator = NULL;
pump->config = NULL;
/* pump->stream is managed by the current reader! */
pump->ostream_head = NULL;
pump->ostream_tail = NULL;
pump->done_writing = false;
pump->stop_writing = false;
pump->hit_eof = false;
pump->pool = NULL;
}
/* Safely check if there is still data pending on the connection, carefull
to not accidentally make it invalid. */
bool serf_pump__data_pending(serf_pump_t *pump)
{
if (pump->vec_len > 0)
return TRUE; /* We can't poll right now! */
if (pump->ostream_head) {
const char *data;
apr_size_t len;
apr_status_t status;
status = serf_bucket_peek(pump->ostream_head, &data, &len);
if (!SERF_BUCKET_READ_ERROR(status)) {
if (len > 0) {
return true;
}
}
else
return true; /* Sure, we have data (an error) */
}
return false;
}
static apr_status_t detect_eof(void *baton, serf_bucket_t *aggregate_bucket)
{
serf_pump_t *pump = baton;
pump->hit_eof = true;
if (pump->done_writing) {
pump->ostream_tail = NULL;
return APR_EOF;
}
else
return APR_EAGAIN;
}
void serf_pump__prepare_setup(serf_pump_t *pump)
{
if (pump->ostream_head == NULL) {
pump->ostream_head = serf_bucket_aggregate_create(pump->allocator);
}
if (pump->ostream_tail == NULL) {
pump->ostream_tail = serf_bucket_aggregate_create(pump->allocator);
serf_bucket_aggregate_hold_open(pump->ostream_tail, detect_eof, pump);
}
}
void serf_pump__complete_setup(serf_pump_t *pump,
serf_bucket_t *stream,
serf_bucket_t *ostream)
{
pump->stream = stream;
if (ostream)
serf_bucket_aggregate_append(pump->ostream_head, ostream);
else
serf_bucket_aggregate_append(pump->ostream_head, pump->ostream_tail);
/* Share the configuration with all the buckets in the newly created output
chain (see PLAIN or ENCRYPTED scenario's), including the request buckets
created by the application (ostream_tail will handle this for us). */
serf_bucket_set_config(pump->ostream_head, pump->config);
/* Share the configuration with the ssl_decrypt and socket buckets. The
response buckets wrapping the ssl_decrypt/socket buckets won't get the
config automatically because they are upstream. */
if (stream != NULL) {
pump->stream = stream;
serf_bucket_set_config(pump->stream, pump->config);
}
/* We typically have one of two scenarios, based on whether the
application decided to encrypt this connection:
PLAIN:
conn->stream = SOCKET(skt)
conn->ostream_head = AGGREGATE(ostream_tail)
conn->ostream_tail = STREAM(<detect_eof>, REQ1, REQ2, ...)
ENCRYPTED:
conn->stream = DECRYPT(SOCKET(skt))
conn->ostream_head = AGGREGATE(ENCRYPT(ostream_tail))
conn->ostream_tail = STREAM(<detect_eof>, REQ1, REQ2, ...)
where STREAM is an internal variant of AGGREGATE.
*/
}
void serf_pump__store_ipaddresses_in_config(serf_pump_t *pump)
{
apr_sockaddr_t *sa;
if (apr_socket_addr_get(&sa, APR_LOCAL, pump->skt) == APR_SUCCESS) {
char buf[48];
if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
serf_config_set_stringf(pump->config, SERF_CONFIG_CONN_LOCALIP,
pump->pool, "%s:%d", buf, (int)sa->port);
}
if (apr_socket_addr_get(&sa, APR_REMOTE, pump->skt) == APR_SUCCESS) {
char buf[48];
if (!apr_sockaddr_ip_getbuf(buf, sizeof(buf), sa))
serf_config_set_stringf(pump->config, SERF_CONFIG_CONN_REMOTEIP,
pump->pool, "%s:%d", buf, (int)sa->port);
}
}
static apr_status_t no_more_writes(serf_pump_t *pump)
{
/* Note that we should hold new requests until we open our new socket. */
pump->done_writing = true;
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config,
"stop writing on 0x%p\n", pump->io->u.v);
/* Clear our iovec. */
pump->vec_len = 0;
/* Update the pollset to know we don't want to write on this socket any
* more.
*/
serf_io__set_pollset_dirty(pump->io);
return APR_SUCCESS;
}
static apr_status_t socket_writev(serf_pump_t *pump)
{
apr_size_t written;
apr_status_t status;
serf_pump_t *conn = pump;
status = apr_socket_sendv(pump->skt, pump->vec,
pump->vec_len, &written);
if (status && !APR_STATUS_IS_EAGAIN(status))
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, pump->config,
"socket_sendv error %d on 0x%p\n", status, pump->io->u.v);
/* did we write everything? */
if (written) {
apr_size_t len = 0;
int i;
serf__log(LOGLVL_DEBUG, LOGCOMP_CONN, __FILE__, conn->config,
"--- socket_sendv: %d bytes on 0x%p. --\n",
(int)written, pump->io->u.v);
for (i = 0; i < conn->vec_len; i++) {
len += conn->vec[i].iov_len;
if (written < len) {
serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config,
"%.*s",
(int)(conn->vec[i].iov_len - (len - written)),
(const char *)conn->vec[i].iov_base);
if (i) {
memmove(conn->vec, &conn->vec[i],
sizeof(struct iovec) * (conn->vec_len - i));
conn->vec_len -= i;
}
conn->vec[0].iov_base = (char *)conn->vec[0].iov_base
+ conn->vec[0].iov_len
- (len - written);
conn->vec[0].iov_len = len - written;
break;
} else {
serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config,
"%.*s",
(int)conn->vec[i].iov_len,
(const char*)conn->vec[i].iov_base);
}
}
if (len == written) {
conn->vec_len = 0;
}
serf__log_nopref(LOGLVL_DEBUG, LOGCOMP_RAWMSG, conn->config, "\n");
/* Log progress information */
serf__context_progress_delta(conn->io->ctx, 0, written);
}
return status;
}
apr_status_t serf_pump__write(serf_pump_t *pump,
bool fetch_new)
{
apr_status_t status = APR_SUCCESS;
apr_status_t read_status = APR_SUCCESS;
pump->hit_eof = FALSE;
while (status == APR_SUCCESS) {
/* First try to write out what is already stored in the
connection vecs. */
while (pump->vec_len && !status) {
status = socket_writev(pump);
/* If the write would have blocked, then we're done.
* Don't try to write anything else to the socket.
*/
if (APR_STATUS_IS_EPIPE(status)
|| APR_STATUS_IS_ECONNRESET(status)
|| APR_STATUS_IS_ECONNABORTED(status))
return no_more_writes(pump);
}
if (status || !fetch_new) {
/* If we couldn't write everything that we tried,
make sure that we will receive a write event next time */
if (APR_STATUS_IS_EAGAIN(status)
&& !pump->io->dirty_conn
&& !(pump->io->reqevents & APR_POLLOUT))
{
serf_io__set_pollset_dirty(pump->io);
}
return status;
}
else if (read_status || pump->vec_len || pump->hit_eof)
return read_status;
/* ### optimize at some point by using read_for_sendfile */
/* TODO: now that read_iovec will effectively try to return as much
data as available, we probably don't want to read ALL_AVAIL, but
a lower number, like the size of one or a few TCP packets, the
available TCP buffer size ... */
pump->hit_eof = false;
read_status = serf_bucket_read_iovec(pump->ostream_head,
SERF_READ_ALL_AVAIL,
COUNT_OF(pump->vec),
pump->vec,
&pump->vec_len);
if (read_status == SERF_ERROR_WAIT_CONN) {
/* The bucket told us that it can't provide more data until
more data is read from the socket. This normally happens
during a SSL handshake.
We should avoid looking for writability for a while so
that (hopefully) something will appear in the bucket so
we can actually write something. otherwise, we could
end up in a CPU spin: socket wants something, but we
don't have anything (and keep returning EAGAIN) */
serf__log(LOGLVL_INFO, LOGCOMP_CONN, __FILE__, pump->config,
"Output stream requested temporary write delay "
"on 0x%p\n", pump->io->u.v);
pump->stop_writing = true;
serf_io__set_pollset_dirty(pump->io);
read_status = APR_EAGAIN;
}
else if (APR_STATUS_IS_EAGAIN(read_status)) {
/* We read some stuff, but did we read everything ? */
if (pump->hit_eof)
read_status = APR_SUCCESS;
}
else if (SERF_BUCKET_READ_ERROR(read_status)) {
/* Something bad happened. Propagate any errors. */
return read_status;
}
}
return status;
}
apr_status_t serf_pump__add_output(serf_pump_t *pump,
serf_bucket_t *bucket,
bool flush)
{
apr_status_t status;
if (!pump->ostream_tail) {
serf_bucket_destroy(bucket);
return APR_ENOSOCKET;
}
if (!flush
&& !pump->io->dirty_conn
&& !pump->stop_writing
&& !(pump->io->reqevents & APR_POLLOUT)
&& !serf_pump__data_pending(pump))
{
/* If not writing now,
* and not already dirty
* and nothing pending yet
Then mark the pollset dirty to trigger a write */
serf_io__set_pollset_dirty(pump->io);
}
serf_bucket_aggregate_append(pump->ostream_tail, bucket);
if (!flush)
return APR_SUCCESS;
/* Flush final output buffer (after ssl, etc.) */
status = serf_pump__write(pump, TRUE);
if (SERF_BUCKET_READ_ERROR(status))
return status;
else
return APR_SUCCESS;
}