blob: b7e5e2e2e047a61b11b9e025013d0432a69a1487 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.coyote.http11.upgrade;
import java.io.EOFException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.Nio2Channel;
import org.apache.tomcat.util.net.Nio2Endpoint;
import org.apache.tomcat.util.net.SocketStatus;
import org.apache.tomcat.util.net.SocketWrapper;
public class Nio2ServletInputStream extends AbstractServletInputStream {
private final AbstractEndpoint<Nio2Channel> endpoint;
private final SocketWrapper<Nio2Channel> wrapper;
private final Nio2Channel channel;
private final CompletionHandler<Integer, SocketWrapper<Nio2Channel>> completionHandler;
private boolean flipped = false;
private volatile boolean readPending = false;
private volatile boolean interest = true;
public Nio2ServletInputStream(SocketWrapper<Nio2Channel> wrapper, AbstractEndpoint<Nio2Channel> endpoint0) {
this.endpoint = endpoint0;
this.wrapper = wrapper;
this.channel = wrapper.getSocket();
this.completionHandler = new CompletionHandler<Integer, SocketWrapper<Nio2Channel>>() {
@Override
public void completed(Integer nBytes, SocketWrapper<Nio2Channel> attachment) {
boolean notify = false;
synchronized (completionHandler) {
if (nBytes.intValue() < 0) {
failed(new EOFException(), attachment);
} else {
readPending = false;
if (interest && !Nio2Endpoint.isInline()) {
interest = false;
notify = true;
}
}
}
if (notify) {
endpoint.processSocket(attachment, SocketStatus.OPEN_READ, false);
}
}
@Override
public void failed(Throwable exc, SocketWrapper<Nio2Channel> attachment) {
attachment.setError(true);
readPending = false;
if (exc instanceof AsynchronousCloseException) {
// If already closed, don't call onError and close again
return;
}
onError(exc);
endpoint.processSocket(attachment, SocketStatus.ERROR, true);
}
};
}
@Override
protected boolean doIsReady() throws IOException {
synchronized (completionHandler) {
if (readPending) {
interest = true;
return false;
}
ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer();
if (!flipped) {
readBuffer.flip();
flipped = true;
}
if (readBuffer.remaining() > 0) {
return true;
}
readBuffer.clear();
flipped = false;
int nRead = fillReadBuffer(false);
boolean isReady = nRead > 0;
if (isReady) {
if (!flipped) {
readBuffer.flip();
flipped = true;
}
} else {
interest = true;
}
return isReady;
}
}
@Override
protected int doRead(boolean block, byte[] b, int off, int len)
throws IOException {
synchronized (completionHandler) {
if (readPending) {
return 0;
}
ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer();
if (!flipped) {
readBuffer.flip();
flipped = true;
}
int remaining = readBuffer.remaining();
// Is there enough data in the read buffer to satisfy this request?
if (remaining >= len) {
readBuffer.get(b, off, len);
return len;
}
// Copy what data there is in the read buffer to the byte array
int leftToWrite = len;
int newOffset = off;
if (remaining > 0) {
readBuffer.get(b, off, remaining);
leftToWrite -= remaining;
newOffset += remaining;
}
// Fill the read buffer as best we can
readBuffer.clear();
flipped = false;
int nRead = fillReadBuffer(block);
// Full as much of the remaining byte array as possible with the data
// that was just read
if (nRead > 0) {
if (!flipped) {
readBuffer.flip();
flipped = true;
}
if (nRead > leftToWrite) {
readBuffer.get(b, newOffset, leftToWrite);
leftToWrite = 0;
} else {
readBuffer.get(b, newOffset, nRead);
leftToWrite -= nRead;
}
} else if (nRead == 0) {
if (block) {
if (!flipped) {
readBuffer.flip();
flipped = true;
}
}
} else if (nRead == -1) {
throw new EOFException();
}
return len - leftToWrite;
}
}
@Override
protected void doClose() throws IOException {
channel.close();
}
private int fillReadBuffer(boolean block) throws IOException {
ByteBuffer readBuffer = channel.getBufHandler().getReadBuffer();
int nRead = 0;
if (block) {
readPending = true;
readBuffer.clear();
flipped = false;
try {
nRead = channel.read(readBuffer)
.get(wrapper.getTimeout(), TimeUnit.MILLISECONDS).intValue();
readPending = false;
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
onError(e.getCause());
throw (IOException) e.getCause();
} else {
onError(e);
throw new IOException(e);
}
} catch (InterruptedException e) {
onError(e);
throw new IOException(e);
} catch (TimeoutException e) {
SocketTimeoutException ex = new SocketTimeoutException();
onError(ex);
throw ex;
}
} else {
readPending = true;
readBuffer.clear();
flipped = false;
Nio2Endpoint.startInline();
channel.read(readBuffer,
wrapper.getTimeout(), TimeUnit.MILLISECONDS, wrapper, completionHandler);
Nio2Endpoint.endInline();
if (!readPending) {
nRead = readBuffer.position();
}
}
return nRead;
}
}