blob: cb4b2ada8d73328df541000f2fb08c5892902309 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tomcat.lite.http;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import org.apache.tomcat.lite.http.HttpConnector.HttpConnection;
import org.apache.tomcat.lite.io.BBucket;
import org.apache.tomcat.lite.io.BBuffer;
import org.apache.tomcat.lite.io.FutureCallbacks;
import org.apache.tomcat.lite.io.IOBuffer;
import org.apache.tomcat.lite.io.IOChannel;
import org.apache.tomcat.lite.io.IOConnector;
/**
* HTTP async client and server, based on tomcat NIO/APR connectors
*
* 'Input', 'read', 'Recv' refers to information we get from the remote side -
* the request body for server-mode or response body for client.
*
* 'Output', 'write', 'Send' is for info we send - the post in client mode
* and the response body for server mode.
*
* @author Costin Manolache
*/
public class HttpChannel extends IOChannel {
static final int HEADER_SIZE = 8192;
static AtomicInteger serCnt = new AtomicInteger();
public static final String CONTENT_LENGTH= "Content-Length";
public static final String HTTP_10 = "HTTP/1.0";
public static final String HTTP_11 = "HTTP/1.1";
/**
* SEMI_COLON.
*/
public static final byte SEMI_COLON = (byte) ';';
public static final byte QUESTION = (byte) '?';
protected static Logger log = Logger.getLogger("HttpChannel");
boolean debug = false;
// ---- Callbacks and locks
FutureCallbacks<HttpChannel> doneLock = new FutureCallbacks<HttpChannel>();
FutureCallbacks<HttpChannel> headersReceivedLock =
new FutureCallbacks<HttpChannel>();
/**
* Called when the incoming headers have been received.
* ( response for client mode, request for server mode )
* @throws IOException
*/
protected HttpService httpService;
/**
* Called when:
* - body sent
* - body received
* - release() called - either service() done or client done with the
* buffers.
*
* After this callback:
* - socket closed if closeOnEndSend, or put in keep-alive
* - AsyncHttp.recycle()
* - returned to the pool.
*/
private RequestCompleted doneAllCallback;
protected boolean sendReceiveDone = false;
// Will be signalled (open) when the buffer is empty.
FutureCallbacks<IOChannel> flushLock = new FutureCallbacks<IOChannel>();
FutureCallbacks<HttpChannel> doneFuture;
boolean doneCallbackCalled = false;
// ----------
// Set if Exect: 100-continue was set on reqest.
// If this is the case - body won't be sent until
// server responds ( client ) and server will only
// read body after ack() - or skip to next request
// without swallowing the body.
protected boolean expectation = false;
/** Ready for recycle, if send/receive are done */
protected boolean release = false;
// -----------
protected boolean headersDone = false;
protected boolean error = false;
protected boolean abortDone = false;
protected int ser; // id - for jmx registration and logs
protected int channelId;
/**
* Null after endSendReceive and before sending the request
*/
HttpConnection conn;
HttpConnector httpConnector;
// Different ways to point to request response (server/client)
HttpRequest httpReq;
HttpResponse httpRes;
HttpMessage inMessage;
HttpMessage outMessage;
// receive can be for request ( server mode ) or response ( client )
IOBuffer receiveBody = new IOBuffer();
// notify us that user called close()
IOBuffer sendBody = new IOBuffer() {
public void close() throws IOException {
if (isAppendClosed()) {
return;
}
super.close();
outClosed();
}
};
// Server side only
protected String serverHeader = "TomcatLite";
long ioTimeout = 30 * 60000; // 30 min seems high enough
public HttpChannel() {
ser = serCnt.incrementAndGet();
httpReq = new HttpRequest(this);
httpRes = new HttpResponse(this);
init();
serverMode(false);
}
/**
* Close the connection, return to pool. Called if a
* framing error happens, or if we want to force the connection
* to close, without waiting for all data to be sent/received.
* @param t
*
* @throws IOException
*/
public void abort(Throwable t) {
abort(t.toString());
}
public void abort(String t) {
synchronized (this) {
if (abortDone) {
return;
}
abortDone = true;
}
try {
checkRelease();
trace("abort " + t);
if (conn != null) {
conn.abort(this, t);
}
inMessage.state = HttpMessage.State.DONE;
outMessage.state = HttpMessage.State.DONE;
sendReceiveDone = true;
error = true;
handleEndSendReceive();
} catch (Throwable ex) {
log.severe("Exception in abort " + ex);
}
}
/**
* If release was called - throw exception, you shouldn't use
* the object again.
* @throws IOException
*/
private void checkRelease() throws IOException {
if (release && sendReceiveDone) {
throw new IOException("Object released");
}
}
public IOChannel getSink() {
if (conn == null) {
return null;
}
return conn.getSink();
}
/**
* Called when the request is done. Need to send remaining byte.
*
*/
public void complete() throws IOException {
checkRelease();
if (!getOut().isAppendClosed()) {
getOut().close();
}
if (!getIn().isAppendClosed()) {
getIn().close();
}
startSending();
}
public int doRead(BBuffer chunk)
throws IOException {
checkRelease();
BBucket next = null;
while (true) {
getIn().waitData(0);
next = (BBucket) getIn().popFirst();
if (next != null) {
break;
} else if (getIn().isAppendClosed()) {
return -1;
} else {
System.err.println("Spurious waitData signal, no data");
}
}
chunk.append(next.array(), next.position(), next.remaining());
int read = next.remaining();
next.release();
return read;
}
public HttpConnector getConnector() {
return httpConnector;
}
public boolean getError() {
return error;
}
// ---------------- Writting -------------------------------
public String getId() {
return Integer.toString(ser);
}
public IOBuffer getIn() {
return receiveBody;
}
public long getIOTimeout() {
return ioTimeout;
}
// TODO: replace with getSocketChannel - used for remote addr, etc
public IOChannel getNet() {
if (conn == null) {
return null;
}
return conn.getSink();
}
public IOBuffer getOut() {
return sendBody;
}
public HttpRequest getRequest() {
return httpReq;
}
public HttpResponse getResponse() {
return httpRes;
}
public String getState() {
return
conn +
"RCV=[" + inMessage.state.toString() + " " +
receiveBody.toString()
+ "] SND=[" + outMessage.state.toString()
+ " " + sendBody.toString() + "]";
}
public String getStatus() {
return getResponse().getStatus() + " " + getResponse().getMessage();
}
public String getTarget() {
if (target == null) {
return ":0"; // server mode ?
}
return target.toString();
}
/**
* Called from IO thread, after the request body
* is completed ( or if there is no req body )
* @throws IOException
*/
protected void handleEndReceive() throws IOException {
if (inMessage.state == HttpMessage.State.DONE) {
return;
}
if (debug) {
trace("END_RECV");
}
getIn().close();
inMessage.state = HttpMessage.State.DONE;
handleEndSendReceive();
}
/*
* Called when sending, receiving and processing is done.
* Can be called:
* - from IO thread, if this is a result of a read/write event that
* finished the send/recev pair.
* - from an arbitrary thread, if read was complete and the last write
* was a success and done in that thread ( write is not bound to IO thr)
*
*/
protected void handleEndSendReceive() throws IOException {
// make sure the callback was called ( needed for abort )
handleHeadersReceived(inMessage);
this.doneLock.signal(this);
synchronized (this) {
if (doneCallbackCalled) {
return;
}
if (outMessage.state != HttpMessage.State.DONE ||
inMessage.state != HttpMessage.State.DONE) {
return;
}
doneCallbackCalled = true;
}
getIn().close();
if (doneAllCallback != null) {
doneAllCallback.handle(this, error ? new Throwable() : null);
}
if (conn != null) {
conn.endSendReceive(this);
}
conn = null;
if (debug) {
trace("END_SEND_RECEIVE"
+ (release ? " REL" : ""));
}
synchronized(this) {
sendReceiveDone = true;
maybeRelease();
}
}
/**
* called from IO thread OR servlet thread when last block has been sent.
* If not using the socket ( net.getOut().flushCallback ) - this must
* be called explicitely after flushing the body.
*/
void handleEndSent() throws IOException {
if (outMessage.state == HttpMessage.State.DONE) {
// Only once.
if (debug) {
trace("Duplicate END SEND");
}
return;
}
outMessage.state = HttpMessage.State.DONE;
getOut().close();
// Make sure the send/receive callback is called once
if (debug) {
trace("END_SEND");
}
handleEndSendReceive();
}
// ----- End Selector thread callbacks ----
public void handleError(String type) {
System.err.println("Error " + type + " " + outMessage.state);
}
void handleHeadersReceived(HttpMessage in) throws IOException {
if (!headersDone) {
headersDone = true;
headersReceivedLock.signal(this);
if (httpService != null) {
try {
httpService.service(getRequest(), getResponse());
} catch (Throwable t) {
t.printStackTrace();
abort(t);
}
}
}
}
private void init() {
headersDone = false;
sendReceiveDone = false;
receiveBody.recycle();
sendBody.recycle();
expectation = false;
error = false;
abortDone = false;
getRequest().recycle();
getResponse().recycle();
target = null;
doneLock.recycle();
headersReceivedLock.recycle();
flushLock.recycle();
doneCallbackCalled = false;
// Will be set again after pool
setHttpService(null);
doneAllCallback = null;
release = false;
}
public boolean isDone() {
return outMessage.state == HttpMessage.State.DONE && inMessage.state == HttpMessage.State.DONE;
}
/**
* Called when all done:
* - service finished ( endService was called )
* - output written
* - input read
*
* or by abort().
*
* @throws IOException
*/
private void maybeRelease() throws IOException {
synchronized (this) {
if (release && sendReceiveDone) {
if (debug) {
trace("RELEASE");
}
if (getConnector() != null) {
getConnector().returnToPool(this);
} else {
log.severe("Attempt to release with no pool");
}
}
}
}
/*
The field-content does not include any leading or trailing LWS:
linear white space occurring before the first non-whitespace
character of the field-value or after the last non-whitespace
character of the field-value. Such leading or trailing LWS MAY
be removed without changing the semantics of the field value.
Any LWS that occurs between field-content MAY be replaced with
a single Http11Parser.SP before interpreting the field value or forwarding
the message downstream.
*/
int normalizeHeader(BBuffer value) {
byte[] buf = value.array();
int cstart = value.position();
int end = value.limit();
int realPos = cstart;
int lastChar = cstart;
byte chr = 0;
boolean gotSpace = true;
for (int i = cstart; i < end; i++) {
chr = buf[i];
if (chr == BBuffer.CR) {
// skip
} else if(chr == BBuffer.LF) {
// skip
} else if (chr == BBuffer.SP || chr == BBuffer.HT) {
if (gotSpace) {
// skip
} else {
buf[realPos++] = BBuffer.SP;
gotSpace = true;
}
} else {
buf[realPos++] = chr;
lastChar = realPos; // to skip trailing spaces
gotSpace = false;
}
}
realPos = lastChar;
// so buffer is clean
for (int i = realPos; i < end; i++) {
buf[i] = BBuffer.SP;
}
value.setEnd(realPos);
return realPos;
}
protected void recycle() {
if (debug) {
trace("RECYCLE");
}
init();
}
/**
* Finalize sending and receiving.
* Indicates client is no longer interested, some IO may still be in flight.
* If in a POST and you're not interested in the body - it may be
* better to call abort().
*
* MUST be called to allow connection reuse and pooling.
*
* @throws IOException
*/
public void release() throws IOException {
synchronized(this) {
if (release) {
return;
}
trace("RELEASE");
release = true;
// If send/receive is done - we can reuse this object
maybeRelease();
}
}
public void send() throws IOException {
checkRelease();
if (httpReq == inMessage) {
conn.sendResponseHeaders(this);
} else {
if (getRequest().isCommitted()) {
return;
}
getRequest().setCommitted(true);
outMessage.state = HttpMessage.State.HEAD;
getConnector().connectAndSend(this);
}
}
/** Called when the outgoing stream is closed:
* - by an explicit call to close()
* - when all content has been sent.
*/
protected void outClosed() throws IOException {
if (conn != null) {
conn.outClosed(this);
}
}
public HttpChannel serverMode(boolean enabled) {
if (enabled) {
httpReq.setBody(receiveBody);
httpRes.setBody(sendBody);
inMessage = httpReq;
outMessage = httpRes;
} else {
httpReq.setBody(sendBody);
httpRes.setBody(receiveBody);
inMessage = httpRes;
outMessage = httpReq;
}
if (debug) {
}
return this;
}
public void setCompletedCallback(RequestCompleted doneAllCallback)
throws IOException {
this.doneAllCallback = doneAllCallback;
synchronized (this) {
if (doneCallbackCalled) {
return;
}
if (outMessage.state != HttpMessage.State.DONE || inMessage.state != HttpMessage.State.DONE) {
return;
}
}
doneCallbackCalled = true;
if (doneAllCallback != null) {
doneAllCallback.handle(this, error ? new Throwable() : null);
}
}
public void setConnector(HttpConnector pool) {
this.httpConnector = pool;
}
public void setHttpService(HttpService headersReceivedCallback) {
this.httpService = headersReceivedCallback;
}
public void setIOTimeout(long timeout) {
ioTimeout = timeout;
}
public void setTarget(String host) {
this.target = host;
}
public void startSending() throws IOException {
checkRelease();
if (conn != null) {
conn.startSending(this);
}
}
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("id=").append(ser)
.append(",rs=").append(getState())
.append(")");
return sb.toString();
}
void trace(String msg) {
if(debug) {
log.info(this.toString() + " " + msg + " done=" + doneCallbackCalled);
}
}
@Override
public void waitFlush(long timeMs) throws IOException {
if (getOut().getBufferCount() == 0) {
return;
}
flushLock.waitSignal(timeMs);
}
public HttpChannel setConnection(HttpConnection conn) {
this.conn = conn;
return this;
}
/**
* Normalize URI.
* <p>
* This method normalizes "\", "//", "/./" and "/../". This method will
* return false when trying to go above the root, or if the URI contains
* a null byte.
*
* @param uriMB URI to be normalized, will be modified
*/
public static boolean normalize(BBuffer uriBC) {
byte[] b = uriBC.array();
int start = uriBC.getStart();
int end = uriBC.getEnd();
// URL * is acceptable
if ((end - start == 1) && b[start] == (byte) '*')
return true;
if (b[start] != '/') {
// TODO: http://.... URLs
return true;
}
int pos = 0;
int index = 0;
// Replace '\' with '/'
// Check for null byte
for (pos = start; pos < end; pos++) {
if (b[pos] == (byte) '\\')
b[pos] = (byte) '/';
if (b[pos] == (byte) 0)
return false;
}
// The URL must start with '/'
if (b[start] != (byte) '/') {
return false;
}
// Replace "//" with "/"
for (pos = start; pos < (end - 1); pos++) {
if (b[pos] == (byte) '/') {
while ((pos + 1 < end) && (b[pos + 1] == (byte) '/')) {
copyBytes(b, pos, pos + 1, end - pos - 1);
end--;
}
}
}
// If the URI ends with "/." or "/..", then we append an extra "/"
// Note: It is possible to extend the URI by 1 without any side effect
// as the next character is a non-significant WS.
if (((end - start) >= 2) && (b[end - 1] == (byte) '.')) {
if ((b[end - 2] == (byte) '/')
|| ((b[end - 2] == (byte) '.')
&& (b[end - 3] == (byte) '/'))) {
b[end] = (byte) '/';
end++;
}
}
uriBC.setEnd(end);
index = 0;
// Resolve occurrences of "/./" in the normalized path
while (true) {
index = uriBC.indexOf("/./", 0, 3, index);
if (index < 0)
break;
copyBytes(b, start + index, start + index + 2,
end - start - index - 2);
end = end - 2;
uriBC.setEnd(end);
}
index = 0;
// Resolve occurrences of "/../" in the normalized path
while (true) {
index = uriBC.indexOf("/../", 0, 4, index);
if (index < 0)
break;
// Prevent from going outside our context
if (index == 0)
return false;
int index2 = -1;
for (pos = start + index - 1; (pos >= 0) && (index2 < 0); pos --) {
if (b[pos] == (byte) '/') {
index2 = pos;
}
}
copyBytes(b, start + index2, start + index + 3,
end - start - index - 3);
end = end + index2 - index - 3;
uriBC.setEnd(end);
index = index2;
}
//uriBC.setBytes(b, start, end);
uriBC.setEnd(end);
return true;
}
/**
* Copy an array of bytes to a different position. Used during
* normalization.
*/
private static void copyBytes(byte[] b, int dest, int src, int len) {
for (int pos = 0; pos < len; pos++) {
b[pos + dest] = b[pos + src];
}
}
/**
* This method will be called when the http headers have been received -
* the body may or may not be available.
*
* In server mode this is equivalent with a servlet request.
* This is also called for http client, when the response headers
* are received.
*
* TODO: rename it to HttMessageReceived or something similar.
*/
public static interface HttpService {
void service(HttpRequest httpReq, HttpResponse httpRes) throws IOException;
}
/**
* Called when both request and response bodies have been sent/
* received. After this call the HttpChannel will be disconnected
* from the http connection, which can be used for other requests.
*/
public static interface RequestCompleted {
void handle(HttpChannel data, Object extraData) throws IOException;
}
Runnable dispatcherRunnable = new Runnable() {
@Override
public void run() {
getConnector().getDispatcher().runService(HttpChannel.this);
}
};
}