blob: 027fa5f43aeac22060cf3aa314a8450c99af824d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.coyote.http11;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import org.apache.coyote.ActionCode;
import org.apache.coyote.ByteBufferHolder;
import org.apache.coyote.OutputBuffer;
import org.apache.coyote.Response;
import org.apache.tomcat.util.buf.ByteChunk;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.NioChannel;
import org.apache.tomcat.util.net.NioEndpoint;
import org.apache.tomcat.util.net.NioSelectorPool;
import org.apache.tomcat.util.net.SocketWrapper;
/**
* Output buffer.
*
* @author <a href="mailto:remm@apache.org">Remy Maucherat</a>
*/
public class InternalNioOutputBuffer extends AbstractOutputBuffer<NioChannel> {
// ----------------------------------------------------------- Constructors
/**
* Default constructor.
*/
public InternalNioOutputBuffer(Response response, int headerBufferSize) {
super(response, headerBufferSize);
outputStreamOutputBuffer = new SocketOutputBuffer();
}
/**
* Underlying socket.
*/
private NioChannel socket;
/**
* Selector pool, for blocking reads and blocking writes
*/
private NioSelectorPool pool;
/**
* Track if the byte buffer is flipped
*/
protected volatile boolean flipped = false;
// --------------------------------------------------------- Public Methods
@Override
public void init(SocketWrapper<NioChannel> socketWrapper,
AbstractEndpoint<NioChannel> endpoint) throws IOException {
socket = socketWrapper.getSocket();
pool = ((NioEndpoint)endpoint).getSelectorPool();
}
/**
* Recycle the output buffer. This should be called when closing the
* connection.
*/
@Override
public void recycle() {
super.recycle();
if (socket != null) {
socket.getBufHandler().getWriteBuffer().clear();
socket = null;
}
flipped = false;
}
// ------------------------------------------------ HTTP/1.1 Output Methods
/**
* Send an acknowledgment.
*/
@Override
public void sendAck() throws IOException {
if (!committed) {
socket.getBufHandler().getWriteBuffer().put(
Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length);
int result = writeToSocket(socket.getBufHandler().getWriteBuffer(), true, true);
if (result < 0) {
throw new IOException(sm.getString("iob.failedwrite.ack"));
}
}
}
/**
*
* @param bytebuffer ByteBuffer
* @param flip boolean
* @return int
* @throws IOException
*/
private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
if ( flip ) {
bytebuffer.flip();
flipped = true;
}
int written = 0;
NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment();
if ( att == null ) throw new IOException("Key must be cancelled");
long writeTimeout = att.getWriteTimeout();
Selector selector = null;
try {
selector = pool.get();
} catch ( IOException x ) {
//ignore
}
try {
written = pool.write(bytebuffer, socket, selector, writeTimeout, block);
//make sure we are flushed
do {
if (socket.flush(true,selector,writeTimeout)) break;
}while ( true );
} finally {
if ( selector != null ) pool.put(selector);
}
if ( block || bytebuffer.remaining()==0) {
//blocking writes must empty the buffer
//and if remaining==0 then we did empty it
bytebuffer.clear();
flipped = false;
}
// If there is data left in the buffer the socket will be registered for
// write further up the stack. This is to ensure the socket is only
// registered for write once as both container and user code can trigger
// write registration.
return written;
}
// ------------------------------------------------------ Protected Methods
/**
* Commit the response.
*
* @throws IOException an underlying I/O error occurred
*/
@Override
protected void commit() throws IOException {
// The response is now committed
committed = true;
response.setCommitted(true);
if (pos > 0) {
// Sending the response header buffer
addToBB(headerBuffer, 0, pos);
}
}
private synchronized void addToBB(byte[] buf, int offset, int length)
throws IOException {
if (length == 0) return;
// Try to flush any data in the socket's write buffer first
boolean dataLeft = flushBuffer(isBlocking());
// Keep writing until all the data is written or a non-blocking write
// leaves data in the buffer
while (!dataLeft && length > 0) {
int thisTime = transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer());
length = length - thisTime;
offset = offset + thisTime;
int written = writeToSocket(socket.getBufHandler().getWriteBuffer(),
isBlocking(), true);
if (written == 0) {
dataLeft = true;
} else {
dataLeft = flushBuffer(isBlocking());
}
}
NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment)socket.getAttachment();
if (ka != null) ka.access();//prevent timeouts for just doing client writes
if (!isBlocking() && length > 0) {
// Remaining data must be buffered
addToBuffers(buf, offset, length);
}
}
private void addToBuffers(byte[] buf, int offset, int length) {
ByteBufferHolder holder = bufferedWrites.peekLast();
if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) {
ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length));
holder = new ByteBufferHolder(buffer,false);
bufferedWrites.add(holder);
}
holder.getBuf().put(buf,offset,length);
}
/**
* Callback to write data from the buffer.
*/
@Override
protected boolean flushBuffer(boolean block) throws IOException {
//prevent timeout for async,
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if (key != null) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
attach.access();
}
boolean dataLeft = hasMoreDataToFlush();
//write to the socket, if there is anything to write
if (dataLeft) {
writeToSocket(socket.getBufHandler().getWriteBuffer(),block, !flipped);
}
dataLeft = hasMoreDataToFlush();
if (!dataLeft && bufferedWrites.size() > 0) {
Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator();
while (!hasMoreDataToFlush() && bufIter.hasNext()) {
ByteBufferHolder buffer = bufIter.next();
buffer.flip();
while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
transfer(buffer.getBuf(), socket.getBufHandler().getWriteBuffer());
if (buffer.getBuf().remaining() == 0) {
bufIter.remove();
}
writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true);
//here we must break if we didn't finish the write
}
}
}
return hasMoreDataToFlush();
}
@Override
protected boolean hasMoreDataToFlush() {
return (flipped && socket.getBufHandler().getWriteBuffer().remaining()>0) ||
(!flipped && socket.getBufHandler().getWriteBuffer().position() > 0);
}
@Override
protected void registerWriteInterest() throws IOException {
NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment();
if (att == null) {
throw new IOException("Key must be cancelled");
}
att.getPoller().add(socket, SelectionKey.OP_WRITE);
}
private int transfer(byte[] from, int offset, int length, ByteBuffer to) {
int max = Math.min(length, to.remaining());
to.put(from, offset, max);
return max;
}
private void transfer(ByteBuffer from, ByteBuffer to) {
int max = Math.min(from.remaining(), to.remaining());
ByteBuffer tmp = from.duplicate ();
tmp.limit (tmp.position() + max);
to.put (tmp);
from.position(from.position() + max);
}
// ----------------------------------- OutputStreamOutputBuffer Inner Class
/**
* This class is an output buffer which will write data to an output
* stream.
*/
protected class SocketOutputBuffer implements OutputBuffer {
/**
* Write chunk.
*/
@Override
public int doWrite(ByteChunk chunk, Response res) throws IOException {
try {
int len = chunk.getLength();
int start = chunk.getStart();
byte[] b = chunk.getBuffer();
addToBB(b, start, len);
byteCount += chunk.getLength();
return chunk.getLength();
} catch (IOException ioe) {
response.action(ActionCode.CLOSE_NOW, ioe);
// Re-throw
throw ioe;
}
}
@Override
public long getBytesWritten() {
return byteCount;
}
}
}