| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.coyote.http11; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.SelectionKey; |
| import java.nio.channels.Selector; |
| import java.util.Iterator; |
| |
| import org.apache.coyote.ActionCode; |
| import org.apache.coyote.ByteBufferHolder; |
| import org.apache.coyote.OutputBuffer; |
| import org.apache.coyote.Response; |
| import org.apache.tomcat.util.buf.ByteChunk; |
| import org.apache.tomcat.util.net.AbstractEndpoint; |
| import org.apache.tomcat.util.net.NioChannel; |
| import org.apache.tomcat.util.net.NioEndpoint; |
| import org.apache.tomcat.util.net.NioSelectorPool; |
| import org.apache.tomcat.util.net.SocketWrapper; |
| |
| /** |
| * Output buffer. |
| * |
| * @author <a href="mailto:remm@apache.org">Remy Maucherat</a> |
| */ |
| public class InternalNioOutputBuffer extends AbstractOutputBuffer<NioChannel> { |
| |
| // ----------------------------------------------------------- Constructors |
| |
| /** |
| * Default constructor. |
| */ |
| public InternalNioOutputBuffer(Response response, int headerBufferSize) { |
| |
| super(response, headerBufferSize); |
| |
| outputStreamOutputBuffer = new SocketOutputBuffer(); |
| } |
| |
| |
| /** |
| * Underlying socket. |
| */ |
| private NioChannel socket; |
| |
| /** |
| * Selector pool, for blocking reads and blocking writes |
| */ |
| private NioSelectorPool pool; |
| |
| /** |
| * Track if the byte buffer is flipped |
| */ |
| protected volatile boolean flipped = false; |
| |
| |
| // --------------------------------------------------------- Public Methods |
| |
| @Override |
| public void init(SocketWrapper<NioChannel> socketWrapper, |
| AbstractEndpoint<NioChannel> endpoint) throws IOException { |
| |
| socket = socketWrapper.getSocket(); |
| pool = ((NioEndpoint)endpoint).getSelectorPool(); |
| } |
| |
| |
| /** |
| * Recycle the output buffer. This should be called when closing the |
| * connection. |
| */ |
| @Override |
| public void recycle() { |
| super.recycle(); |
| if (socket != null) { |
| socket.getBufHandler().getWriteBuffer().clear(); |
| socket = null; |
| } |
| flipped = false; |
| } |
| |
| |
| // ------------------------------------------------ HTTP/1.1 Output Methods |
| |
| /** |
| * Send an acknowledgment. |
| */ |
| @Override |
| public void sendAck() throws IOException { |
| if (!committed) { |
| socket.getBufHandler().getWriteBuffer().put( |
| Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length); |
| int result = writeToSocket(socket.getBufHandler().getWriteBuffer(), true, true); |
| if (result < 0) { |
| throw new IOException(sm.getString("iob.failedwrite.ack")); |
| } |
| } |
| } |
| |
| /** |
| * |
| * @param bytebuffer ByteBuffer |
| * @param flip boolean |
| * @return int |
| * @throws IOException |
| */ |
| private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException { |
| if ( flip ) { |
| bytebuffer.flip(); |
| flipped = true; |
| } |
| |
| int written = 0; |
| NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(); |
| if ( att == null ) throw new IOException("Key must be cancelled"); |
| long writeTimeout = att.getWriteTimeout(); |
| Selector selector = null; |
| try { |
| selector = pool.get(); |
| } catch ( IOException x ) { |
| //ignore |
| } |
| try { |
| written = pool.write(bytebuffer, socket, selector, writeTimeout, block); |
| //make sure we are flushed |
| do { |
| if (socket.flush(true,selector,writeTimeout)) break; |
| }while ( true ); |
| } finally { |
| if ( selector != null ) pool.put(selector); |
| } |
| if ( block || bytebuffer.remaining()==0) { |
| //blocking writes must empty the buffer |
| //and if remaining==0 then we did empty it |
| bytebuffer.clear(); |
| flipped = false; |
| } |
| // If there is data left in the buffer the socket will be registered for |
| // write further up the stack. This is to ensure the socket is only |
| // registered for write once as both container and user code can trigger |
| // write registration. |
| return written; |
| } |
| |
| |
| // ------------------------------------------------------ Protected Methods |
| |
| /** |
| * Commit the response. |
| * |
| * @throws IOException an underlying I/O error occurred |
| */ |
| @Override |
| protected void commit() throws IOException { |
| |
| // The response is now committed |
| committed = true; |
| response.setCommitted(true); |
| |
| if (pos > 0) { |
| // Sending the response header buffer |
| addToBB(headerBuffer, 0, pos); |
| } |
| |
| } |
| |
| |
| private synchronized void addToBB(byte[] buf, int offset, int length) |
| throws IOException { |
| |
| if (length == 0) return; |
| |
| // Try to flush any data in the socket's write buffer first |
| boolean dataLeft = flushBuffer(isBlocking()); |
| |
| // Keep writing until all the data is written or a non-blocking write |
| // leaves data in the buffer |
| while (!dataLeft && length > 0) { |
| int thisTime = transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer()); |
| length = length - thisTime; |
| offset = offset + thisTime; |
| int written = writeToSocket(socket.getBufHandler().getWriteBuffer(), |
| isBlocking(), true); |
| if (written == 0) { |
| dataLeft = true; |
| } else { |
| dataLeft = flushBuffer(isBlocking()); |
| } |
| } |
| |
| NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment)socket.getAttachment(); |
| if (ka != null) ka.access();//prevent timeouts for just doing client writes |
| |
| if (!isBlocking() && length > 0) { |
| // Remaining data must be buffered |
| addToBuffers(buf, offset, length); |
| } |
| } |
| |
| |
| private void addToBuffers(byte[] buf, int offset, int length) { |
| ByteBufferHolder holder = bufferedWrites.peekLast(); |
| if (holder==null || holder.isFlipped() || holder.getBuf().remaining()<length) { |
| ByteBuffer buffer = ByteBuffer.allocate(Math.max(bufferedWriteSize,length)); |
| holder = new ByteBufferHolder(buffer,false); |
| bufferedWrites.add(holder); |
| } |
| holder.getBuf().put(buf,offset,length); |
| } |
| |
| |
| /** |
| * Callback to write data from the buffer. |
| */ |
| @Override |
| protected boolean flushBuffer(boolean block) throws IOException { |
| |
| //prevent timeout for async, |
| SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); |
| if (key != null) { |
| NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment(); |
| attach.access(); |
| } |
| |
| boolean dataLeft = hasMoreDataToFlush(); |
| |
| //write to the socket, if there is anything to write |
| if (dataLeft) { |
| writeToSocket(socket.getBufHandler().getWriteBuffer(),block, !flipped); |
| } |
| |
| dataLeft = hasMoreDataToFlush(); |
| |
| if (!dataLeft && bufferedWrites.size() > 0) { |
| Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); |
| while (!hasMoreDataToFlush() && bufIter.hasNext()) { |
| ByteBufferHolder buffer = bufIter.next(); |
| buffer.flip(); |
| while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) { |
| transfer(buffer.getBuf(), socket.getBufHandler().getWriteBuffer()); |
| if (buffer.getBuf().remaining() == 0) { |
| bufIter.remove(); |
| } |
| writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true); |
| //here we must break if we didn't finish the write |
| } |
| } |
| } |
| |
| return hasMoreDataToFlush(); |
| } |
| |
| |
| @Override |
| protected boolean hasMoreDataToFlush() { |
| return (flipped && socket.getBufHandler().getWriteBuffer().remaining()>0) || |
| (!flipped && socket.getBufHandler().getWriteBuffer().position() > 0); |
| } |
| |
| |
| @Override |
| protected void registerWriteInterest() throws IOException { |
| NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(); |
| if (att == null) { |
| throw new IOException("Key must be cancelled"); |
| } |
| att.getPoller().add(socket, SelectionKey.OP_WRITE); |
| } |
| |
| |
| private int transfer(byte[] from, int offset, int length, ByteBuffer to) { |
| int max = Math.min(length, to.remaining()); |
| to.put(from, offset, max); |
| return max; |
| } |
| |
| |
| private void transfer(ByteBuffer from, ByteBuffer to) { |
| int max = Math.min(from.remaining(), to.remaining()); |
| ByteBuffer tmp = from.duplicate (); |
| tmp.limit (tmp.position() + max); |
| to.put (tmp); |
| from.position(from.position() + max); |
| } |
| |
| |
| // ----------------------------------- OutputStreamOutputBuffer Inner Class |
| |
| /** |
| * This class is an output buffer which will write data to an output |
| * stream. |
| */ |
| protected class SocketOutputBuffer implements OutputBuffer { |
| |
| |
| /** |
| * Write chunk. |
| */ |
| @Override |
| public int doWrite(ByteChunk chunk, Response res) throws IOException { |
| try { |
| int len = chunk.getLength(); |
| int start = chunk.getStart(); |
| byte[] b = chunk.getBuffer(); |
| addToBB(b, start, len); |
| byteCount += chunk.getLength(); |
| return chunk.getLength(); |
| } catch (IOException ioe) { |
| response.action(ActionCode.CLOSE_NOW, ioe); |
| // Re-throw |
| throw ioe; |
| } |
| } |
| |
| @Override |
| public long getBytesWritten() { |
| return byteCount; |
| } |
| } |
| } |