blob: 9c4478d1af506814a78060893148ee7d28141ac5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.coyote.http11;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.coyote.ActionCode;
import org.apache.coyote.ByteBufferHolder;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Response;
import org.apache.tomcat.jni.Socket;
import org.apache.tomcat.jni.Status;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.SocketWrapper;
/**
* Output buffer.
*
* @author <a href="mailto:remm@apache.org">Remy Maucherat</a>
*/
public class InternalAprOutputBuffer extends AbstractOutputBuffer<Long> {
// ----------------------------------------------------------- Constructors
/**
* Default constructor.
*/
public InternalAprOutputBuffer(Response response, int headerBufferSize) {
super(response, headerBufferSize);
if (headerBufferSize < (8 * 1024)) {
bbuf = ByteBuffer.allocateDirect(6 * 1500);
} else {
bbuf = ByteBuffer.allocateDirect((headerBufferSize / 1500 + 1) * 1500);
}
outputStreamOutputBuffer = new SocketOutputBuffer();
}
// ----------------------------------------------------- Instance Variables
/**
* Underlying socket.
*/
private long socket;
private SocketWrapper<Long> wrapper;
/**
* Direct byte buffer used for writing.
*/
private final ByteBuffer bbuf;
/**
* <code>false</code> if bbuf is ready to be written to and
* <code>true</code> is ready to be read from.
*/
private volatile boolean flipped = false;
private AbstractEndpoint<Long> endpoint;
// --------------------------------------------------------- Public Methods
@Override
public void init(SocketWrapper<Long> socketWrapper,
AbstractEndpoint<Long> endpoint) throws IOException {
wrapper = socketWrapper;
socket = socketWrapper.getSocket().longValue();
this.endpoint = endpoint;
Socket.setsbb(this.socket, bbuf);
}
/**
* Recycle the output buffer. This should be called when closing the
* connection.
*/
@Override
public void recycle() {
super.recycle();
bbuf.clear();
flipped = false;
socket = 0;
wrapper = null;
}
// ------------------------------------------------ HTTP/1.1 Output Methods
/**
* Send an acknowledgment.
*/
@Override
public void sendAck() throws IOException {
if (!committed) {
if (Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0)
throw new IOException(sm.getString("iob.failedwrite.ack"));
}
}
// ------------------------------------------------------ Protected Methods
/**
* Commit the response.
*
* @throws IOException an underlying I/O error occurred
*/
@Override
protected void commit() throws IOException {
// The response is now committed
committed = true;
response.setCommitted(true);
if (pos > 0) {
// Sending the response header buffer
bbuf.put(headerBuffer, 0, pos);
}
}
private synchronized void addToBB(byte[] buf, int offset, int length)
throws IOException {
if (length == 0) return;
// If bbuf is currently being used for writes, add this data to the
// write buffer
if (flipped) {
addToBuffers(buf, offset, length);
return;
}
// Keep writing until all the data is written or a non-blocking write
// leaves data in the buffer
while (length > 0) {
int thisTime = length;
if (bbuf.position() == bbuf.capacity()) {
if (flushBuffer(isBlocking())) {
break;
}
}
if (thisTime > bbuf.capacity() - bbuf.position()) {
thisTime = bbuf.capacity() - bbuf.position();
}
bbuf.put(buf, offset, thisTime);
length = length - thisTime;
offset = offset + thisTime;
}
wrapper.access();
if (!isBlocking() && length>0) {
// Buffer the remaining data
addToBuffers(buf, offset, length);
}
}
private void addToBuffers(byte[] buf, int offset, int length) {
ByteBufferHolder holder = bufferedWrites.peekLast();
if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
holder = new ByteBufferHolder(buffer,false);
bufferedWrites.add(holder);
}
holder.getBuf().put(buf,offset,length);
}
@Override
protected synchronized boolean flushBuffer(boolean block)
throws IOException {
wrapper.access();
if (hasMoreDataToFlush()) {
writeToSocket(block);
}
if (bufferedWrites.size() > 0) {
Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
while (!hasMoreDataToFlush() && bufIter.hasNext()) {
ByteBufferHolder buffer = bufIter.next();
buffer.flip();
while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
transfer(buffer.getBuf(), bbuf);
if (buffer.getBuf().remaining() == 0) {
bufIter.remove();
}
writeToSocket(block);
//here we must break if we didn't finish the write
}
}
}
return hasMoreDataToFlush();
}
private synchronized void writeToSocket(boolean block) throws IOException {
Lock readLock = wrapper.getBlockingStatusReadLock();
WriteLock writeLock = wrapper.getBlockingStatusWriteLock();
readLock.lock();
try {
if (wrapper.getBlockingStatus() == block) {
writeToSocket();
return;
}
} finally {
readLock.unlock();
}
writeLock.lock();
try {
// Set the current settings for this socket
wrapper.setBlockingStatus(block);
if (block) {
Socket.timeoutSet(socket, endpoint.getSoTimeout() * 1000);
} else {
Socket.timeoutSet(socket, 0);
}
// Downgrade the lock
readLock.lock();
try {
writeLock.unlock();
writeToSocket();
} finally {
readLock.unlock();
}
} finally {
// Should have been released above but may not have been on some
// exception paths
if (writeLock.isHeldByCurrentThread()) {
writeLock.unlock();
}
}
}
private synchronized void writeToSocket() throws IOException {
if (!flipped) {
flipped = true;
bbuf.flip();
}
int written;
do {
written = Socket.sendbb(socket, bbuf.position(), bbuf.remaining());
if (Status.APR_STATUS_IS_EAGAIN(-written)) {
written = 0;
} else if (written < 0) {
throw new IOException("APR error: " + written);
}
bbuf.position(bbuf.position() + written);
} while (written > 0 && bbuf.hasRemaining());
if (bbuf.remaining() == 0) {
bbuf.clear();
flipped = false;
}
// If there is data left in the buffer the socket will be registered for
// write further up the stack. This is to ensure the socket is only
// registered for write once as both container and user code can trigger
// write registration.
}
private void transfer(ByteBuffer from, ByteBuffer to) {
int max = Math.min(from.remaining(), to.remaining());
int fromLimit = from.limit();
from.limit(from.position() + max);
to.put(from);
from.limit(fromLimit);
}
//-------------------------------------------------- Non-blocking IO methods
@Override
protected synchronized boolean hasMoreDataToFlush() {
return (flipped && bbuf.remaining() > 0) ||
(!flipped && bbuf.position() > 0);
}
@Override
protected void registerWriteInterest() {
wrapper.registerforEvent(-1, false, true);
}
// ----------------------------------- OutputStreamOutputBuffer Inner Class
/**
* This class is an output buffer which will write data to an output
* stream.
*/
protected class SocketOutputBuffer implements OutputBuffer {
/**
* Write chunk.
*/
@Override
public int doWrite(ByteChunk chunk, Response res) throws IOException {
try {
int len = chunk.getLength();
int start = chunk.getStart();
byte[] b = chunk.getBuffer();
addToBB(b, start, len);
byteCount += chunk.getLength();
return chunk.getLength();
} catch (IOException ioe) {
response.action(ActionCode.CLOSE_NOW, ioe);
// Re-throw
throw ioe;
}
}
@Override
public long getBytesWritten() {
return byteCount;
}
}
}