| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.coyote.http2; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Base64; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.ConcurrentNavigableMap; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.ConcurrentSkipListSet; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.Lock; |
| |
| import javax.servlet.http.WebConnection; |
| |
| import org.apache.coyote.Adapter; |
| import org.apache.coyote.ProtocolException; |
| import org.apache.coyote.Request; |
| import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler; |
| import org.apache.coyote.http2.HpackDecoder.HeaderEmitter; |
| import org.apache.coyote.http2.HpackEncoder.State; |
| import org.apache.coyote.http2.Http2Parser.Input; |
| import org.apache.coyote.http2.Http2Parser.Output; |
| import org.apache.juli.logging.Log; |
| import org.apache.juli.logging.LogFactory; |
| import org.apache.tomcat.util.ExceptionUtils; |
| import org.apache.tomcat.util.http.MimeHeaders; |
| import org.apache.tomcat.util.http.parser.Priority; |
| import org.apache.tomcat.util.log.UserDataHelper; |
| import org.apache.tomcat.util.net.AbstractEndpoint.Handler.SocketState; |
| import org.apache.tomcat.util.net.SSLSupport; |
| import org.apache.tomcat.util.net.SendfileState; |
| import org.apache.tomcat.util.net.SocketEvent; |
| import org.apache.tomcat.util.net.SocketWrapperBase; |
| import org.apache.tomcat.util.res.StringManager; |
| |
| /** |
| * This represents an HTTP/2 connection from a client to Tomcat. It is designed on the basis that there will never be |
| * more than one thread performing I/O at a time. <br> |
| * For reading, this implementation is blocking within frames and non-blocking between frames. <br> |
| * Note: |
| * <ul> |
| * <li>You will need to nest a <UpgradeProtocol className="org.apache.coyote.http2.Http2Protocol" /> element |
| * inside a TLS enabled Connector element in server.xml to enable HTTP/2 support.</li> |
| * </ul> |
| */ |
| class Http2UpgradeHandler extends AbstractStream implements InternalHttpUpgradeHandler, Input, Output { |
| |
| protected static final Log log = LogFactory.getLog(Http2UpgradeHandler.class); |
| protected static final StringManager sm = StringManager.getManager(Http2UpgradeHandler.class); |
| |
| private static final AtomicInteger connectionIdGenerator = new AtomicInteger(0); |
| private static final Integer STREAM_ID_ZERO = Integer.valueOf(0); |
| |
| protected static final int FLAG_END_OF_STREAM = 1; |
| protected static final int FLAG_END_OF_HEADERS = 4; |
| |
| protected static final byte[] PING = { 0x00, 0x00, 0x08, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00 }; |
| protected static final byte[] PING_ACK = { 0x00, 0x00, 0x08, 0x06, 0x01, 0x00, 0x00, 0x00, 0x00 }; |
| |
| protected static final byte[] SETTINGS_ACK = { 0x00, 0x00, 0x00, 0x04, 0x01, 0x00, 0x00, 0x00, 0x00 }; |
| |
| protected static final byte[] GOAWAY = { 0x07, 0x00, 0x00, 0x00, 0x00, 0x00 }; |
| |
| private static final String HTTP2_SETTINGS_HEADER = "HTTP2-Settings"; |
| |
| protected static final HeaderSink HEADER_SINK = new HeaderSink(); |
| |
| protected static final UserDataHelper userDataHelper = new UserDataHelper(log); |
| |
| protected final String connectionId; |
| |
| protected final Http2Protocol protocol; |
| private final Adapter adapter; |
| protected volatile SocketWrapperBase<?> socketWrapper; |
| private volatile SSLSupport sslSupport; |
| |
| private volatile Http2Parser parser; |
| |
| // Simple state machine (sequence of states) |
| private final AtomicReference<ConnectionState> connectionState = new AtomicReference<>(ConnectionState.NEW); |
| private volatile long pausedNanoTime = Long.MAX_VALUE; |
| |
| /** |
| * Remote settings are settings defined by the client and sent to Tomcat that Tomcat must use when communicating |
| * with the client. |
| */ |
| private final ConnectionSettingsRemote remoteSettings; |
| /** |
| * Local settings are settings defined by Tomcat and sent to the client that the client must use when communicating |
| * with Tomcat. |
| */ |
| protected final ConnectionSettingsLocal localSettings; |
| |
| private HpackDecoder hpackDecoder; |
| private HpackEncoder hpackEncoder; |
| |
| private final ConcurrentNavigableMap<Integer,AbstractNonZeroStream> streams = new ConcurrentSkipListMap<>(); |
| protected final AtomicInteger activeRemoteStreamCount = new AtomicInteger(0); |
| private volatile int maxProcessedStreamId; |
| private final AtomicInteger nextLocalStreamId = new AtomicInteger(2); |
| private final PingManager pingManager = getPingManager(); |
| private volatile int newStreamsSinceLastPrune = 0; |
| private final Set<Stream> backLogStreams = new HashSet<>(); |
| private long backLogSize = 0; |
| // The time at which the connection will timeout unless data arrives before |
| // then. -1 means no timeout. |
| private volatile long connectionTimeout = -1; |
| |
| // Stream concurrency control |
| private AtomicInteger streamConcurrency = null; |
| private Queue<StreamRunnable> queuedRunnable = null; |
| |
| // Track 'overhead' frames vs 'request/response' frames |
| private final AtomicLong overheadCount; |
| private volatile int lastNonFinalDataPayload; |
| private volatile int lastWindowUpdate; |
| |
| // Time between the "graceful" GOAWAY (max stream id) and the final GOAWAY (last seen stream id) |
| private long drainTimeout = 0; |
| |
| Http2UpgradeHandler(Http2Protocol protocol, Adapter adapter, Request coyoteRequest) { |
| super(STREAM_ID_ZERO); |
| this.protocol = protocol; |
| this.adapter = adapter; |
| this.connectionId = Integer.toString(connectionIdGenerator.getAndIncrement()); |
| |
| // Defaults to -10 * the count factor. |
| // i.e. when the connection opens, 10 'overhead' frames in a row will |
| // cause the connection to be closed. |
| // Over time the count should be a slowly decreasing negative number. |
| // Therefore, the longer a connection is 'well-behaved', the greater |
| // tolerance it will have for a period of 'bad' behaviour. |
| overheadCount = new AtomicLong(-10L * protocol.getOverheadCountFactor()); |
| |
| lastNonFinalDataPayload = protocol.getOverheadDataThreshold() * 2; |
| lastWindowUpdate = protocol.getOverheadWindowUpdateThreshold() * 2; |
| |
| remoteSettings = new ConnectionSettingsRemote(connectionId); |
| localSettings = new ConnectionSettingsLocal(connectionId); |
| |
| /* |
| * Force set these initial limits. A well-behaved client should ACK the settings and adhere to them before it |
| * reaches the limits anyway. |
| */ |
| localSettings.set(Setting.MAX_CONCURRENT_STREAMS, protocol.getMaxConcurrentStreams(), true); |
| localSettings.set(Setting.INITIAL_WINDOW_SIZE, protocol.getInitialWindowSize(), true); |
| |
| pingManager.initiateDisabled = protocol.getInitiatePingDisabled(); |
| |
| drainTimeout = protocol.getDrainTimeout(); |
| |
| // Initial HTTP request becomes stream 1. |
| if (coyoteRequest != null) { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.upgrade", connectionId)); |
| } |
| Integer key = Integer.valueOf(1); |
| Stream stream = new Stream(key, this, coyoteRequest); |
| streams.put(key, stream); |
| activeRemoteStreamCount.set(1); |
| maxProcessedStreamId = 1; |
| } |
| } |
| |
| |
| protected PingManager getPingManager() { |
| return new PingManager(); |
| } |
| |
| |
| @Override |
| public void init(WebConnection webConnection) { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.init", connectionId, connectionState.get())); |
| } |
| |
| if (!connectionState.compareAndSet(ConnectionState.NEW, ConnectionState.CONNECTED)) { |
| return; |
| } |
| |
| // Init concurrency control if needed |
| if (protocol.getMaxConcurrentStreamExecution() < localSettings.getMaxConcurrentStreams()) { |
| streamConcurrency = new AtomicInteger(0); |
| queuedRunnable = new ConcurrentLinkedQueue<>(); |
| } |
| |
| parser = getParser(connectionId); |
| |
| Stream stream = null; |
| |
| socketWrapper.setReadTimeout(protocol.getReadTimeout()); |
| socketWrapper.setWriteTimeout(protocol.getWriteTimeout()); |
| |
| if (webConnection != null) { |
| // HTTP/2 started via HTTP upgrade. |
| // The initial HTTP/1.1 request is available as Stream 1. |
| |
| try { |
| // Process the initial settings frame |
| stream = getStream(1, true); |
| String base64Settings = stream.getCoyoteRequest().getHeader(HTTP2_SETTINGS_HEADER); |
| byte[] settings = Base64.getUrlDecoder().decode(base64Settings); |
| |
| // Settings are only valid on stream 0 |
| FrameType.SETTINGS.check(0, settings.length); |
| |
| for (int i = 0; i < settings.length % 6; i++) { |
| int id = ByteUtil.getTwoBytes(settings, i * 6); |
| long value = ByteUtil.getFourBytes(settings, (i * 6) + 2); |
| Setting key = Setting.valueOf(id); |
| if (key == Setting.UNKNOWN) { |
| log.warn(sm.getString("connectionSettings.unknown", connectionId, Integer.toString(id), |
| Long.toString(value))); |
| } |
| remoteSettings.set(key, value); |
| } |
| } catch (Http2Exception e) { |
| throw new ProtocolException(sm.getString("upgradeHandler.upgrade.fail", connectionId)); |
| } |
| } |
| |
| // Send the initial settings frame |
| writeSettings(); |
| |
| // Make sure the client has sent a valid connection preface before we |
| // send the response to the original request over HTTP/2. |
| try { |
| parser.readConnectionPreface(webConnection, stream); |
| } catch (Http2Exception e) { |
| String msg = sm.getString("upgradeHandler.invalidPreface", connectionId); |
| if (log.isDebugEnabled()) { |
| log.debug(msg, e); |
| } |
| throw new ProtocolException(msg); |
| } |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.prefaceReceived", connectionId)); |
| } |
| |
| // Allow streams and connection to determine timeouts |
| socketWrapper.setReadTimeout(-1); |
| socketWrapper.setWriteTimeout(-1); |
| |
| processConnection(webConnection, stream); |
| } |
| |
| protected void processConnection(WebConnection webConnection, Stream stream) { |
| // Send a ping to get an idea of round trip time as early as possible |
| try { |
| pingManager.sendPing(true); |
| } catch (IOException ioe) { |
| throw new ProtocolException(sm.getString("upgradeHandler.pingFailed", connectionId), ioe); |
| } |
| |
| if (webConnection != null) { |
| processStreamOnContainerThread(stream); |
| } |
| } |
| |
| protected Http2Parser getParser(String connectionId) { |
| return new Http2Parser(connectionId, this, this); |
| } |
| |
| |
| protected void processStreamOnContainerThread(Stream stream) { |
| StreamProcessor streamProcessor = new StreamProcessor(this, stream, adapter, socketWrapper); |
| streamProcessor.setSslSupport(sslSupport); |
| processStreamOnContainerThread(streamProcessor, SocketEvent.OPEN_READ); |
| } |
| |
| |
| protected void decrementActiveRemoteStreamCount(Stream stream) { |
| if (stream != null) { |
| setConnectionTimeoutForStreamCount(stream.decrementAndGetActiveRemoteStreamCount()); |
| } |
| } |
| |
| |
| void processStreamOnContainerThread(StreamProcessor streamProcessor, SocketEvent event) { |
| StreamRunnable streamRunnable = new StreamRunnable(streamProcessor, event); |
| if (streamConcurrency == null) { |
| socketWrapper.execute(streamRunnable); |
| } else { |
| if (getStreamConcurrency() < protocol.getMaxConcurrentStreamExecution()) { |
| increaseStreamConcurrency(); |
| socketWrapper.execute(streamRunnable); |
| } else { |
| queuedRunnable.offer(streamRunnable); |
| } |
| } |
| } |
| |
| |
| @Override |
| public void setSocketWrapper(SocketWrapperBase<?> wrapper) { |
| this.socketWrapper = wrapper; |
| } |
| |
| |
| @Override |
| public void setSslSupport(SSLSupport sslSupport) { |
| this.sslSupport = sslSupport; |
| } |
| |
| |
| @Override |
| public SocketState upgradeDispatch(SocketEvent status) { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.upgradeDispatch.entry", connectionId, status)); |
| } |
| |
| // WebConnection is not used so passing null here is fine |
| // Might not be necessary. init() will handle that. |
| init(null); |
| |
| SocketState result = SocketState.CLOSED; |
| |
| try { |
| switch (status) { |
| case OPEN_READ: |
| socketWrapper.getLock().lock(); |
| try { |
| if (socketWrapper.canWrite()) { |
| // Only send a ping if there is no other data waiting to be sent. |
| // Ping manager will ensure they aren't sent too frequently. |
| pingManager.sendPing(false); |
| } |
| } finally { |
| socketWrapper.getLock().unlock(); |
| } |
| try { |
| // Disable the connection timeout while frames are processed |
| setConnectionTimeout(-1); |
| while (true) { |
| try { |
| if (!parser.readFrame()) { |
| break; |
| } |
| } catch (StreamException se) { |
| // Log the Stream error but not necessarily all of |
| // them |
| UserDataHelper.Mode logMode = userDataHelper.getNextMode(); |
| if (logMode != null) { |
| String message = sm.getString("upgradeHandler.stream.error", connectionId, |
| Integer.toString(se.getStreamId())); |
| switch (logMode) { |
| case INFO_THEN_DEBUG: |
| message += sm.getString("upgradeHandler.fallToDebug"); |
| //$FALL-THROUGH$ |
| case INFO: |
| log.info(message, se); |
| break; |
| case DEBUG: |
| log.debug(message, se); |
| } |
| } |
| // Stream errors are not fatal to the connection so |
| // continue reading frames |
| Stream stream = getStream(se.getStreamId(), false); |
| if (stream == null) { |
| sendStreamReset(null, se); |
| } else { |
| stream.close(se); |
| } |
| } finally { |
| if (isOverheadLimitExceeded()) { |
| throw new ConnectionException( |
| sm.getString("upgradeHandler.tooMuchOverhead", connectionId), |
| Http2Error.ENHANCE_YOUR_CALM); |
| } |
| } |
| } |
| |
| // Need to know the correct timeout before starting the read |
| // but that may not be known at this time if one or more |
| // requests are currently being processed so don't set a |
| // timeout for the socket... |
| socketWrapper.setReadTimeout(-1); |
| |
| // ...set a timeout on the connection |
| setConnectionTimeoutForStreamCount(activeRemoteStreamCount.get()); |
| |
| } catch (Http2Exception ce) { |
| // Really ConnectionException |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("upgradeHandler.connectionError"), ce); |
| } |
| closeConnection(ce); |
| break; |
| } |
| |
| if (connectionState.get() != ConnectionState.CLOSED) { |
| if (socketWrapper.hasAsyncIO()) { |
| result = SocketState.ASYNC_IO; |
| } else { |
| result = SocketState.UPGRADED; |
| } |
| } |
| break; |
| |
| case OPEN_WRITE: |
| processWrites(); |
| if (socketWrapper.hasAsyncIO()) { |
| result = SocketState.ASYNC_IO; |
| } else { |
| result = SocketState.UPGRADED; |
| } |
| break; |
| |
| case TIMEOUT: |
| closeConnection(null); |
| break; |
| |
| case DISCONNECT: |
| case ERROR: |
| case STOP: |
| case CONNECT_FAIL: |
| close(); |
| break; |
| } |
| } catch (IOException ioe) { |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("upgradeHandler.ioerror", connectionId), ioe); |
| } |
| close(); |
| } catch (Throwable t) { |
| ExceptionUtils.handleThrowable(t); |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("upgradeHandler.throwable", connectionId), t); |
| } |
| // Unexpected errors close the connection. |
| close(); |
| } |
| |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.upgradeDispatch.exit", connectionId, result)); |
| } |
| return result; |
| } |
| |
| |
| /* |
| * Sets the connection timeout based on the current number of active streams. |
| */ |
| protected void setConnectionTimeoutForStreamCount(int streamCount) { |
| if (streamCount == 0) { |
| // No streams currently active. Use the keep-alive |
| // timeout for the connection. |
| long keepAliveTimeout = protocol.getKeepAliveTimeout(); |
| if (keepAliveTimeout == -1) { |
| setConnectionTimeout(-1); |
| } else { |
| setConnectionTimeout(System.currentTimeMillis() + keepAliveTimeout); |
| } |
| } else { |
| // Streams currently active. Individual streams have |
| // timeouts so keep the connection open. |
| setConnectionTimeout(-1); |
| } |
| } |
| |
| |
| private void setConnectionTimeout(long connectionTimeout) { |
| this.connectionTimeout = connectionTimeout; |
| } |
| |
| |
| @Override |
| public void timeoutAsync(long now) { |
| long connectionTimeout = this.connectionTimeout; |
| if (now == -1 || connectionTimeout > -1 && now > connectionTimeout) { |
| // Have to dispatch as this will be executed from a non-container |
| // thread. |
| socketWrapper.processSocket(SocketEvent.TIMEOUT, true); |
| } |
| } |
| |
| |
| ConnectionSettingsRemote getRemoteSettings() { |
| return remoteSettings; |
| } |
| |
| |
| ConnectionSettingsLocal getLocalSettings() { |
| return localSettings; |
| } |
| |
| |
| Http2Protocol getProtocol() { |
| return protocol; |
| } |
| |
| |
| @Override |
| public void pause() { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.pause.entry", connectionId)); |
| } |
| |
| if (connectionState.compareAndSet(ConnectionState.CONNECTED, ConnectionState.PAUSING)) { |
| pausedNanoTime = System.nanoTime(); |
| |
| try { |
| writeGoAwayFrame((1 << 31) - 1, Http2Error.NO_ERROR.getCode(), null); |
| } catch (IOException ignore) { |
| // This is fatal for the connection. Ignore it here. There will be |
| // further attempts at I/O in upgradeDispatch() and it can better |
| // handle the IO errors. |
| } |
| } |
| } |
| |
| |
| @Override |
| public void destroy() { |
| // NO-OP |
| } |
| |
| |
| void checkPauseState() throws IOException { |
| if (connectionState.get() == ConnectionState.PAUSING) { |
| if (pausedNanoTime + pingManager.getRoundTripTimeNano() + drainTimeout < System.nanoTime()) { |
| connectionState.compareAndSet(ConnectionState.PAUSING, ConnectionState.PAUSED); |
| writeGoAwayFrame(maxProcessedStreamId, Http2Error.NO_ERROR.getCode(), null); |
| } |
| } |
| } |
| |
| |
| private int increaseStreamConcurrency() { |
| return streamConcurrency.incrementAndGet(); |
| } |
| |
| private int decreaseStreamConcurrency() { |
| return streamConcurrency.decrementAndGet(); |
| } |
| |
| private int getStreamConcurrency() { |
| return streamConcurrency.get(); |
| } |
| |
| void executeQueuedStream() { |
| if (streamConcurrency == null) { |
| return; |
| } |
| decreaseStreamConcurrency(); |
| if (getStreamConcurrency() < protocol.getMaxConcurrentStreamExecution()) { |
| StreamRunnable streamRunnable = queuedRunnable.poll(); |
| if (streamRunnable != null) { |
| increaseStreamConcurrency(); |
| socketWrapper.execute(streamRunnable); |
| } |
| } |
| } |
| |
| |
| void sendStreamReset(StreamStateMachine state, StreamException se) throws IOException { |
| |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.rst.debug", connectionId, Integer.toString(se.getStreamId()), |
| se.getError(), se.getMessage())); |
| } |
| |
| // Treat a sent reset like a received reset and increment the overhead count |
| increaseOverheadCount(FrameType.RST, getProtocol().getOverheadResetFactor()); |
| |
| // Write a RST frame |
| byte[] rstFrame = new byte[13]; |
| // Length |
| ByteUtil.setThreeBytes(rstFrame, 0, 4); |
| // Type |
| rstFrame[3] = FrameType.RST.getIdByte(); |
| // No flags |
| // Stream ID |
| ByteUtil.set31Bits(rstFrame, 5, se.getStreamId()); |
| // Payload |
| ByteUtil.setFourBytes(rstFrame, 9, se.getError().getCode()); |
| |
| // Need to update state atomically with the sending of the RST |
| // frame else other threads currently working with this stream |
| // may see the state change and send a RST frame before the RST |
| // frame triggered by this thread. If that happens the client |
| // may see out of order RST frames which may hard to follow if |
| // the client is unaware the RST frames may be received out of |
| // order. |
| socketWrapper.getLock().lock(); |
| try { |
| if (state != null) { |
| boolean active = state.isActive(); |
| state.sendReset(); |
| if (active) { |
| decrementActiveRemoteStreamCount(getStream(se.getStreamId())); |
| } |
| } |
| socketWrapper.write(true, rstFrame, 0, rstFrame.length); |
| socketWrapper.flush(true); |
| } finally { |
| socketWrapper.getLock().unlock(); |
| } |
| } |
| |
| |
| void closeConnection(Http2Exception ce) { |
| long code; |
| byte[] msg; |
| if (ce == null) { |
| code = Http2Error.NO_ERROR.getCode(); |
| msg = null; |
| } else { |
| code = ce.getError().getCode(); |
| msg = ce.getMessage().getBytes(StandardCharsets.UTF_8); |
| } |
| try { |
| writeGoAwayFrame(maxProcessedStreamId, code, msg); |
| } catch (IOException ignore) { |
| // Ignore. GOAWAY is sent on a best efforts basis and the original |
| // error has already been logged. |
| } |
| close(); |
| } |
| |
| |
| /** |
| * Write the initial settings frame and any necessary supporting frames. If the initial settings increase the |
| * initial window size, it will also be necessary to send a WINDOW_UPDATE frame to increase the size of the flow |
| * control window for the connection (stream 0). |
| */ |
| protected void writeSettings() { |
| // Send the initial settings frame |
| try { |
| byte[] settings = localSettings.getSettingsFrameForPending(); |
| socketWrapper.write(true, settings, 0, settings.length); |
| byte[] windowUpdateFrame = createWindowUpdateForSettings(); |
| if (windowUpdateFrame.length > 0) { |
| socketWrapper.write(true, windowUpdateFrame, 0, windowUpdateFrame.length); |
| } |
| socketWrapper.flush(true); |
| } catch (IOException ioe) { |
| // Exception is logged further up stack |
| String msg = sm.getString("upgradeHandler.sendPrefaceFail", connectionId); |
| if (log.isDebugEnabled()) { |
| log.debug(msg); |
| } |
| throw new ProtocolException(msg, ioe); |
| } |
| } |
| |
| |
| /** |
| * @return The WINDOW_UPDATE frame if one is required or an empty array if no WINDOW_UPDATE is required. |
| */ |
| protected byte[] createWindowUpdateForSettings() { |
| // Build a WINDOW_UPDATE frame if one is required. If not, create an |
| // empty byte array. |
| byte[] windowUpdateFrame; |
| int increment = protocol.getInitialWindowSize() - ConnectionSettingsBase.DEFAULT_INITIAL_WINDOW_SIZE; |
| if (increment > 0) { |
| // Build window update frame for stream 0 |
| windowUpdateFrame = new byte[13]; |
| ByteUtil.setThreeBytes(windowUpdateFrame, 0, 4); |
| windowUpdateFrame[3] = FrameType.WINDOW_UPDATE.getIdByte(); |
| ByteUtil.set31Bits(windowUpdateFrame, 9, increment); |
| } else { |
| windowUpdateFrame = new byte[0]; |
| } |
| |
| return windowUpdateFrame; |
| } |
| |
| |
| protected void writeGoAwayFrame(int maxStreamId, long errorCode, byte[] debugMsg) throws IOException { |
| byte[] fixedPayload = new byte[8]; |
| ByteUtil.set31Bits(fixedPayload, 0, maxStreamId); |
| ByteUtil.setFourBytes(fixedPayload, 4, errorCode); |
| int len = 8; |
| if (debugMsg != null) { |
| len += debugMsg.length; |
| } |
| byte[] payloadLength = new byte[3]; |
| ByteUtil.setThreeBytes(payloadLength, 0, len); |
| |
| Lock lock = socketWrapper.getLock(); |
| lock.lock(); |
| try { |
| socketWrapper.write(true, payloadLength, 0, payloadLength.length); |
| socketWrapper.write(true, GOAWAY, 0, GOAWAY.length); |
| socketWrapper.write(true, fixedPayload, 0, 8); |
| if (debugMsg != null) { |
| socketWrapper.write(true, debugMsg, 0, debugMsg.length); |
| } |
| socketWrapper.flush(true); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| void writeHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders, boolean endOfStream, int payloadSize) |
| throws IOException { |
| // This ensures the Stream processing thread has control of the socket. |
| Lock lock = socketWrapper.getLock(); |
| lock.lock(); |
| try { |
| doWriteHeaders(stream, pushedStreamId, mimeHeaders, endOfStream, payloadSize); |
| } finally { |
| lock.unlock(); |
| } |
| stream.sentHeaders(); |
| if (endOfStream) { |
| sentEndOfStream(stream); |
| } |
| } |
| |
| |
| /* |
| * Separate method to allow Http2AsyncUpgradeHandler to call this code without synchronizing on socketWrapper since |
| * it doesn't need to. |
| */ |
| protected HeaderFrameBuffers doWriteHeaders(Stream stream, int pushedStreamId, MimeHeaders mimeHeaders, |
| boolean endOfStream, int payloadSize) throws IOException { |
| |
| if (log.isTraceEnabled()) { |
| if (pushedStreamId == 0) { |
| log.trace(sm.getString("upgradeHandler.writeHeaders", connectionId, stream.getIdAsString(), |
| Boolean.valueOf(endOfStream))); |
| } else { |
| log.trace(sm.getString("upgradeHandler.writePushHeaders", connectionId, stream.getIdAsString(), |
| Integer.valueOf(pushedStreamId), Boolean.valueOf(endOfStream))); |
| } |
| } |
| |
| if (!stream.canWrite()) { |
| return null; |
| } |
| |
| HeaderFrameBuffers headerFrameBuffers = getHeaderFrameBuffers(payloadSize); |
| |
| byte[] pushedStreamIdBytes = null; |
| if (pushedStreamId > 0) { |
| pushedStreamIdBytes = new byte[4]; |
| ByteUtil.set31Bits(pushedStreamIdBytes, 0, pushedStreamId); |
| } |
| |
| boolean first = true; |
| State state = null; |
| |
| while (state != State.COMPLETE) { |
| headerFrameBuffers.startFrame(); |
| if (first && pushedStreamIdBytes != null) { |
| headerFrameBuffers.getPayload().put(pushedStreamIdBytes); |
| } |
| state = getHpackEncoder().encode(mimeHeaders, headerFrameBuffers.getPayload()); |
| headerFrameBuffers.getPayload().flip(); |
| if (state == State.COMPLETE || headerFrameBuffers.getPayload().limit() > 0) { |
| ByteUtil.setThreeBytes(headerFrameBuffers.getHeader(), 0, headerFrameBuffers.getPayload().limit()); |
| if (first) { |
| first = false; |
| if (pushedStreamIdBytes == null) { |
| headerFrameBuffers.getHeader()[3] = FrameType.HEADERS.getIdByte(); |
| } else { |
| headerFrameBuffers.getHeader()[3] = FrameType.PUSH_PROMISE.getIdByte(); |
| } |
| if (endOfStream) { |
| headerFrameBuffers.getHeader()[4] = FLAG_END_OF_STREAM; |
| } |
| } else { |
| headerFrameBuffers.getHeader()[3] = FrameType.CONTINUATION.getIdByte(); |
| } |
| if (state == State.COMPLETE) { |
| headerFrameBuffers.getHeader()[4] += FLAG_END_OF_HEADERS; |
| } |
| if (log.isTraceEnabled()) { |
| log.trace(headerFrameBuffers.getPayload().limit() + " bytes"); |
| } |
| ByteUtil.set31Bits(headerFrameBuffers.getHeader(), 5, stream.getIdAsInt()); |
| headerFrameBuffers.endFrame(); |
| } else if (state == State.UNDERFLOW) { |
| headerFrameBuffers.expandPayload(); |
| } |
| } |
| headerFrameBuffers.endHeaders(); |
| return headerFrameBuffers; |
| } |
| |
| protected HeaderFrameBuffers getHeaderFrameBuffers(int initialPayloadSize) { |
| return new DefaultHeaderFrameBuffers(initialPayloadSize); |
| } |
| |
| |
| protected HpackEncoder getHpackEncoder() { |
| if (hpackEncoder == null) { |
| hpackEncoder = new HpackEncoder(); |
| } |
| // Ensure latest agreed table size is used |
| hpackEncoder.setMaxTableSize(remoteSettings.getHeaderTableSize()); |
| return hpackEncoder; |
| } |
| |
| |
| void writeBody(Stream stream, ByteBuffer data, int len, boolean finished) throws IOException { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.writeBody", connectionId, stream.getIdAsString(), |
| Integer.toString(len), Boolean.valueOf(finished))); |
| } |
| |
| reduceOverheadCount(FrameType.DATA); |
| |
| // Need to check this now since sending end of stream will change this. |
| boolean writable = stream.canWrite(); |
| byte[] header = new byte[9]; |
| ByteUtil.setThreeBytes(header, 0, len); |
| header[3] = FrameType.DATA.getIdByte(); |
| if (finished) { |
| header[4] = FLAG_END_OF_STREAM; |
| sentEndOfStream(stream); |
| } |
| if (writable) { |
| ByteUtil.set31Bits(header, 5, stream.getIdAsInt()); |
| socketWrapper.getLock().lock(); |
| try { |
| socketWrapper.write(true, header, 0, header.length); |
| int orgLimit = data.limit(); |
| data.limit(data.position() + len); |
| socketWrapper.write(true, data); |
| data.limit(orgLimit); |
| socketWrapper.flush(true); |
| } catch (IOException ioe) { |
| handleAppInitiatedIOException(ioe); |
| } finally { |
| socketWrapper.getLock().unlock(); |
| } |
| } |
| } |
| |
| |
| protected void sentEndOfStream(Stream stream) { |
| stream.sentEndOfStream(); |
| if (!stream.isActive()) { |
| decrementActiveRemoteStreamCount(stream); |
| } |
| } |
| |
| |
| /* |
| * Handles an I/O error on the socket underlying the HTTP/2 connection when it is triggered by application code |
| * (usually reading the request or writing the response). Such I/O errors are fatal so the connection is closed. The |
| * exception is re-thrown to make the client code aware of the problem. |
| * |
| * Note: We can not rely on this exception reaching the socket processor since the application code may swallow it. |
| */ |
| protected void handleAppInitiatedIOException(IOException ioe) throws IOException { |
| close(); |
| throw ioe; |
| } |
| |
| |
| /* |
| * Needs to know if this was application initiated since that affects the error handling. |
| */ |
| void writeWindowUpdate(AbstractNonZeroStream stream, int increment, boolean applicationInitiated) |
| throws IOException { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.windowUpdateConnection", getConnectionId(), |
| Integer.valueOf(increment))); |
| } |
| socketWrapper.getLock().lock(); |
| try { |
| // Build window update frame for stream 0 |
| byte[] frame = new byte[13]; |
| ByteUtil.setThreeBytes(frame, 0, 4); |
| frame[3] = FrameType.WINDOW_UPDATE.getIdByte(); |
| ByteUtil.set31Bits(frame, 9, increment); |
| socketWrapper.write(true, frame, 0, frame.length); |
| boolean needFlush = true; |
| // No need to send update from closed stream |
| if (stream instanceof Stream && ((Stream) stream).canWrite()) { |
| int streamIncrement = ((Stream) stream).getWindowUpdateSizeToWrite(increment); |
| if (streamIncrement > 0) { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.windowUpdateStream", getConnectionId(), getIdAsString(), |
| Integer.valueOf(streamIncrement))); |
| } |
| // Re-use buffer as connection update has already been written |
| ByteUtil.set31Bits(frame, 5, stream.getIdAsInt()); |
| ByteUtil.set31Bits(frame, 9, streamIncrement); |
| try { |
| socketWrapper.write(true, frame, 0, frame.length); |
| socketWrapper.flush(true); |
| needFlush = false; |
| } catch (IOException ioe) { |
| if (applicationInitiated) { |
| handleAppInitiatedIOException(ioe); |
| } else { |
| throw ioe; |
| } |
| } |
| } |
| } |
| if (needFlush) { |
| socketWrapper.flush(true); |
| } |
| } finally { |
| socketWrapper.getLock().unlock(); |
| } |
| } |
| |
| |
| protected void processWrites() throws IOException { |
| Lock lock = socketWrapper.getLock(); |
| lock.lock(); |
| try { |
| if (socketWrapper.flush(false)) { |
| socketWrapper.registerWriteInterest(); |
| } else { |
| // Only send a ping if there is no other data waiting to be sent. |
| // Ping manager will ensure they aren't sent too frequently. |
| pingManager.sendPing(false); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| |
| /* |
| * Requesting an allocation from the connection window for the specified stream. |
| */ |
| int reserveWindowSize(Stream stream, int reservation, boolean block) throws IOException { |
| /* |
| * Need to be holding the stream lock so releaseBacklog() can't notify this thread until after this thread |
| * enters wait(). |
| */ |
| int allocation = 0; |
| stream.windowAllocationLock.lock(); |
| try { |
| windowAllocationLock.lock(); |
| try { |
| if (!stream.canWrite()) { |
| stream.doStreamCancel( |
| sm.getString("upgradeHandler.stream.notWritable", stream.getConnectionId(), |
| stream.getIdAsString(), stream.state.getCurrentStateName()), |
| Http2Error.STREAM_CLOSED); |
| } |
| long windowSize = getWindowSize(); |
| if (stream.getConnectionAllocationMade() > 0) { |
| // The stream is/was in the backlog and has been granted an allocation - use it. |
| allocation = stream.getConnectionAllocationMade(); |
| stream.setConnectionAllocationMade(0); |
| } else if (windowSize < 1) { |
| /* |
| * The connection window has no capacity. If the stream has not been granted an allocation, and the |
| * stream was not already added to the backlog due to a partial reservation (see next else if block) |
| * add it to the backlog so it can obtain an allocation when capacity is available. |
| */ |
| if (stream.getConnectionAllocationMade() == 0 && stream.getConnectionAllocationRequested() == 0) { |
| stream.setConnectionAllocationRequested(reservation); |
| backLogSize += reservation; |
| backLogStreams.add(stream); |
| } |
| } else if (windowSize < reservation) { |
| /* |
| * The connection window has some capacity but not enough to fill this reservation. Allocate what |
| * capacity is available and add the stream to the backlog so it can obtain a further allocation |
| * when capacity is available. |
| */ |
| allocation = (int) windowSize; |
| decrementWindowSize(allocation); |
| int reservationRemaining = reservation - allocation; |
| stream.setConnectionAllocationRequested(reservationRemaining); |
| backLogSize += reservationRemaining; |
| backLogStreams.add(stream); |
| |
| } else { |
| // The connection window has sufficient capacity for this reservation. Allocate the full amount. |
| allocation = reservation; |
| decrementWindowSize(allocation); |
| } |
| } finally { |
| windowAllocationLock.unlock(); |
| } |
| if (allocation == 0) { |
| if (block) { |
| try { |
| // Connection level window is empty. Although this |
| // request is for a stream, use the connection |
| // timeout |
| long writeTimeout = protocol.getWriteTimeout(); |
| stream.waitForConnectionAllocation(writeTimeout); |
| // Has this stream been granted an allocation |
| if (stream.getConnectionAllocationMade() == 0) { |
| String msg; |
| Http2Error error; |
| if (stream.isActive()) { |
| if (log.isDebugEnabled()) { |
| log.debug(sm.getString("upgradeHandler.noAllocation", connectionId, |
| stream.getIdAsString())); |
| } |
| // No allocation |
| // Close the connection. Do this first since |
| // closing the stream will raise an exception. |
| close(); |
| msg = sm.getString("stream.writeTimeout"); |
| error = Http2Error.ENHANCE_YOUR_CALM; |
| } else { |
| msg = sm.getString("upgradeHandler.clientCancel"); |
| error = Http2Error.STREAM_CLOSED; |
| } |
| // Close the stream |
| // This thread is in application code so need |
| // to signal to the application that the |
| // stream is closing |
| stream.doStreamCancel(msg, error); |
| } else { |
| allocation = stream.getConnectionAllocationMade(); |
| stream.setConnectionAllocationMade(0); |
| } |
| } catch (InterruptedException e) { |
| throw new IOException(sm.getString("upgradeHandler.windowSizeReservationInterrupted", |
| connectionId, stream.getIdAsString(), Integer.toString(reservation)), e); |
| } |
| } else { |
| stream.waitForConnectionAllocationNonBlocking(); |
| return 0; |
| } |
| } |
| } finally { |
| stream.windowAllocationLock.unlock(); |
| } |
| return allocation; |
| } |
| |
| |
| @Override |
| protected void incrementWindowSize(int increment) throws Http2Exception { |
| Set<AbstractStream> streamsToNotify = null; |
| |
| windowAllocationLock.lock(); |
| try { |
| long windowSize = getWindowSize(); |
| if (windowSize < 1 && windowSize + increment > 0) { |
| // Connection window is exhausted. Assume there will be streams |
| // to notify. The overhead is minimal if there are none. |
| streamsToNotify = releaseBackLog((int) (windowSize + increment)); |
| } else { |
| super.incrementWindowSize(increment); |
| } |
| } finally { |
| windowAllocationLock.unlock(); |
| } |
| |
| if (streamsToNotify != null) { |
| for (AbstractStream stream : streamsToNotify) { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.releaseBacklog", connectionId, stream.getIdAsString())); |
| } |
| // There is never any O/P on stream zero but it is included in |
| // the backlog as it simplifies the code. Skip it if it appears |
| // here. |
| if (this == stream) { |
| continue; |
| } |
| ((Stream) stream).notifyConnection(); |
| } |
| } |
| } |
| |
| |
| /** |
| * Process send file (if supported) for the given stream. The appropriate request attributes should be set before |
| * calling this method. |
| * |
| * @param sendfileData The stream and associated data to process |
| * |
| * @return The result of the send file processing |
| */ |
| protected SendfileState processSendfile(SendfileData sendfileData) { |
| return SendfileState.DONE; |
| } |
| |
| |
| private Set<AbstractStream> releaseBackLog(int increment) throws Http2Exception { |
| windowAllocationLock.lock(); |
| try { |
| Set<AbstractStream> result = new HashSet<>(); |
| if (backLogSize < increment) { |
| // Can clear the whole backlog |
| for (AbstractStream stream : backLogStreams) { |
| if (stream.getConnectionAllocationRequested() > 0) { |
| stream.setConnectionAllocationMade(stream.getConnectionAllocationRequested()); |
| stream.setConnectionAllocationRequested(0); |
| result.add(stream); |
| } |
| } |
| // Cast is safe due to test above |
| int remaining = increment - (int) backLogSize; |
| backLogSize = 0; |
| super.incrementWindowSize(remaining); |
| |
| backLogStreams.clear(); |
| } else { |
| // Can't clear the whole backlog. |
| // Need streams in priority order |
| Set<Stream> orderedStreams = new ConcurrentSkipListSet<>(Comparator.comparingInt(Stream::getUrgency) |
| .thenComparing(Stream::getIncremental).thenComparing(Stream::getIdAsInt)); |
| orderedStreams.addAll(backLogStreams); |
| |
| // Iteration 1. Need to work out how much we can clear. |
| long urgencyWhereAllocationIsExhausted = 0; |
| long requestedAllocationForIncrementalStreams = 0; |
| int remaining = increment; |
| Iterator<Stream> orderedStreamsIterator = orderedStreams.iterator(); |
| while (orderedStreamsIterator.hasNext()) { |
| Stream s = orderedStreamsIterator.next(); |
| if (urgencyWhereAllocationIsExhausted < s.getUrgency()) { |
| if (remaining < 1) { |
| break; |
| } |
| requestedAllocationForIncrementalStreams = 0; |
| } |
| urgencyWhereAllocationIsExhausted = s.getUrgency(); |
| if (s.getIncremental()) { |
| requestedAllocationForIncrementalStreams += s.getConnectionAllocationRequested(); |
| remaining -= s.getConnectionAllocationRequested(); |
| } else { |
| remaining -= s.getConnectionAllocationRequested(); |
| if (remaining < 1) { |
| break; |
| } |
| } |
| } |
| |
| // Iteration 2. Allocate. |
| // Reset for second iteration |
| remaining = increment; |
| orderedStreamsIterator = orderedStreams.iterator(); |
| while (orderedStreamsIterator.hasNext()) { |
| Stream s = orderedStreamsIterator.next(); |
| if (s.getUrgency() < urgencyWhereAllocationIsExhausted) { |
| // Can fully allocate |
| remaining = allocate(s, remaining); |
| result.add(s); |
| orderedStreamsIterator.remove(); |
| backLogStreams.remove(s); |
| } else if (requestedAllocationForIncrementalStreams == 0) { |
| // Allocation ran out in non-incremental streams so fully |
| // allocate in iterator order until allocation is exhausted |
| remaining = allocate(s, remaining); |
| result.add(s); |
| if (s.getConnectionAllocationRequested() == 0) { |
| // Fully allocated |
| orderedStreamsIterator.remove(); |
| backLogStreams.remove(s); |
| } |
| if (remaining < 1) { |
| break; |
| } |
| } else { |
| // Allocation ran out in incremental streams. Distribute |
| // remaining allocation between the incremental streams at |
| // this urgency level. |
| if (s.getUrgency() != urgencyWhereAllocationIsExhausted) { |
| break; |
| } |
| |
| // Cast to long during the intermediate calculation to avoid integer overflow in multiplication |
| int share = (int) ((long) s.getConnectionAllocationRequested() * remaining / |
| requestedAllocationForIncrementalStreams); |
| if (share == 0) { |
| share = 1; |
| } |
| allocate(s, share); |
| result.add(s); |
| if (s.getConnectionAllocationRequested() == 0) { |
| // Fully allocated (unlikely but possible due to |
| // rounding if only a few bytes required). |
| orderedStreamsIterator.remove(); |
| backLogStreams.remove(s); |
| } |
| } |
| } |
| } |
| return result; |
| } finally { |
| windowAllocationLock.unlock(); |
| } |
| } |
| |
| |
| private int allocate(AbstractStream stream, int allocation) { |
| windowAllocationLock.lock(); |
| try { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.allocate.debug", getConnectionId(), stream.getIdAsString(), |
| Integer.toString(allocation))); |
| } |
| |
| int leftToAllocate = allocation; |
| |
| if (stream.getConnectionAllocationRequested() > 0) { |
| int allocatedThisTime = Math.min(allocation, stream.getConnectionAllocationRequested()); |
| stream.setConnectionAllocationRequested(stream.getConnectionAllocationRequested() - allocatedThisTime); |
| stream.setConnectionAllocationMade(stream.getConnectionAllocationMade() + allocatedThisTime); |
| leftToAllocate = leftToAllocate - allocatedThisTime; |
| } |
| |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.allocate.left", getConnectionId(), stream.getIdAsString(), |
| Integer.toString(leftToAllocate))); |
| } |
| |
| return leftToAllocate; |
| } finally { |
| windowAllocationLock.unlock(); |
| } |
| } |
| |
| |
| Stream getStream(int streamId) { |
| Integer key = Integer.valueOf(streamId); |
| AbstractStream result = streams.get(key); |
| if (result instanceof Stream) { |
| return (Stream) result; |
| } |
| return null; |
| } |
| |
| |
| private Stream getStream(int streamId, boolean unknownIsError) throws ConnectionException { |
| Stream result = getStream(streamId); |
| if (result == null && unknownIsError) { |
| // Stream has been closed and removed from the map |
| throw new ConnectionException(sm.getString("upgradeHandler.stream.closed", Integer.toString(streamId)), |
| Http2Error.PROTOCOL_ERROR); |
| } |
| return result; |
| } |
| |
| |
| private AbstractNonZeroStream getAbstractNonZeroStream(int streamId) { |
| Integer key = Integer.valueOf(streamId); |
| return streams.get(key); |
| } |
| |
| |
| private AbstractNonZeroStream getAbstractNonZeroStream(int streamId, boolean unknownIsError) |
| throws ConnectionException { |
| AbstractNonZeroStream result = getAbstractNonZeroStream(streamId); |
| if (result == null && unknownIsError) { |
| // Stream has been closed and removed from the map |
| throw new ConnectionException(sm.getString("upgradeHandler.stream.closed", Integer.toString(streamId)), |
| Http2Error.PROTOCOL_ERROR); |
| } |
| return result; |
| } |
| |
| |
| private Stream createRemoteStream(int streamId) throws ConnectionException { |
| Integer key = Integer.valueOf(streamId); |
| |
| if (streamId % 2 != 1) { |
| throw new ConnectionException(sm.getString("upgradeHandler.stream.even", key), Http2Error.PROTOCOL_ERROR); |
| } |
| |
| pruneClosedStreams(streamId); |
| |
| Stream result = new Stream(key, this); |
| streams.put(key, result); |
| return result; |
| } |
| |
| |
| private Stream createLocalStream(Request request) { |
| int streamId = nextLocalStreamId.getAndAdd(2); |
| |
| Integer key = Integer.valueOf(streamId); |
| |
| Stream result = new Stream(key, this, request); |
| streams.put(key, result); |
| return result; |
| } |
| |
| |
| private void close() { |
| ConnectionState previous = connectionState.getAndSet(ConnectionState.CLOSED); |
| if (previous == ConnectionState.CLOSED) { |
| // Already closed |
| return; |
| } |
| |
| for (AbstractNonZeroStream stream : streams.values()) { |
| if (stream instanceof Stream) { |
| // The connection is closing. Close the associated streams as no |
| // longer required (also notifies any threads waiting for allocations). |
| ((Stream) stream).receiveReset(Http2Error.CANCEL.getCode()); |
| } |
| } |
| try { |
| socketWrapper.close(); |
| } catch (Exception e) { |
| log.debug(sm.getString("upgradeHandler.socketCloseFailed"), e); |
| } |
| } |
| |
| |
| private void pruneClosedStreams(int streamId) { |
| // Only prune every 10 new streams |
| if (newStreamsSinceLastPrune < 9) { |
| // Not atomic. Increments may be lost. Not a problem. |
| newStreamsSinceLastPrune++; |
| return; |
| } |
| // Reset counter |
| newStreamsSinceLastPrune = 0; |
| |
| // RFC 7540, 5.3.4 endpoints should maintain state for at least the |
| // maximum number of concurrent streams. |
| long max = localSettings.getMaxConcurrentStreams(); |
| |
| // Ideally need to retain information for a "significant" amount of time |
| // after sending END_STREAM (RFC 7540, page 20) so we detect potential |
| // connection error. 5x seems reasonable. The client will have had |
| // plenty of opportunity to process the END_STREAM if another 5x max |
| // concurrent streams have been processed. |
| max = max * 5; |
| if (max > Integer.MAX_VALUE) { |
| max = Integer.MAX_VALUE; |
| } |
| |
| final int size = streams.size(); |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.pruneStart", connectionId, Long.toString(max), |
| Integer.toString(size))); |
| } |
| |
| int toClose = size - (int) max; |
| |
| // Need to try and prune some streams. Prune streams starting with the |
| // oldest. Pruning stops as soon as enough streams have been pruned. |
| // Iterator is in key order. |
| for (AbstractNonZeroStream stream : streams.values()) { |
| if (toClose < 1) { |
| return; |
| } |
| if (stream instanceof Stream && ((Stream) stream).isActive()) { |
| continue; |
| } |
| streams.remove(stream.getIdentifier()); |
| toClose--; |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.pruned", connectionId, stream.getIdAsString())); |
| } |
| |
| } |
| |
| if (toClose > 0) { |
| log.warn(sm.getString("upgradeHandler.pruneIncomplete", connectionId, Integer.toString(streamId), |
| Integer.toString(toClose))); |
| } |
| } |
| |
| |
| void push(Request request, Stream associatedStream) throws IOException { |
| if (localSettings.getMaxConcurrentStreams() < activeRemoteStreamCount.incrementAndGet()) { |
| // If there are too many open streams, simply ignore the push |
| // request. |
| setConnectionTimeoutForStreamCount(activeRemoteStreamCount.decrementAndGet()); |
| return; |
| } |
| |
| Stream pushStream; |
| |
| /* |
| * Uses SocketWrapper lock since PUSH_PROMISE frames have to be sent in order. Once the stream has been created |
| * we need to ensure that the PUSH_PROMISE is sent before the next stream is created for a PUSH_PROMISE. |
| */ |
| Lock lock = socketWrapper.getLock(); |
| lock.lock(); |
| try { |
| pushStream = createLocalStream(request); |
| writeHeaders(associatedStream, pushStream.getIdAsInt(), request.getMimeHeaders(), false, |
| Constants.DEFAULT_HEADERS_FRAME_SIZE); |
| } finally { |
| lock.unlock(); |
| } |
| |
| pushStream.sentPushPromise(); |
| |
| processStreamOnContainerThread(pushStream); |
| } |
| |
| |
| @Override |
| protected final String getConnectionId() { |
| return connectionId; |
| } |
| |
| |
| void reduceOverheadCount(FrameType frameType) { |
| /* |
| * A non-overhead frame reduces the overhead count by {@code Http2Protocol.DEFAULT_OVERHEAD_REDUCTION_FACTOR}. |
| * |
| * A simple browser request is likely to have one non-overhead frame (HEADERS) that results in a response with |
| * one further non-overhead frame (DATA). With the default settings, the overhead count will reduce by 40 for |
| * each simple request. |
| * |
| * Requests and responses with bodies will create additional non-overhead frames, further reducing the overhead |
| * count. |
| */ |
| updateOverheadCount(frameType, Http2Protocol.DEFAULT_OVERHEAD_REDUCTION_FACTOR); |
| } |
| |
| |
| @Override |
| public void increaseOverheadCount(FrameType frameType) { |
| /* |
| * An overhead frame (SETTINGS, PRIORITY, PING) increases the overhead count by overheadCountFactor. By default, |
| * this means an overhead frame increases the overhead count by 10. |
| * |
| * If the client ignores maxConcurrentStreams then any HEADERS frame received will also increase the overhead |
| * count by overheadCountFactor. |
| * |
| * A simple browser request should not trigger any overhead frames. |
| */ |
| updateOverheadCount(frameType, getProtocol().getOverheadCountFactor()); |
| } |
| |
| |
| /** |
| * Used to increase the overhead for frames that don't use the {@code overheadCountFactor} ({@code CONTINUATION}, |
| * {@code DATA}, {@code WINDOW_UPDATE} and {@code RESET}). |
| * |
| * @param frameType The frame type triggering the overhead increase |
| * @param increment The amount by which the overhead is increased |
| */ |
| protected void increaseOverheadCount(FrameType frameType, int increment) { |
| /* |
| * Three types of frame are susceptible to inefficient (and potentially malicious) use of small frames. These |
| * trigger an increase in overhead that is inversely proportional to size. The default threshold for all three |
| * potential areas for abuse (CONTINUATION, DATA, WINDOW_UPDATE) is 1024 bytes. Frames with sizes smaller than |
| * this will trigger an increase of threshold/size. |
| * |
| * The check for DATA and WINDOW_UPDATE frames takes an average over the last two frames to allow for client |
| * buffering schemes that can result in some small DATA payloads. |
| * |
| * The CONTINUATION and DATA frames checks are skipped for end of headers (CONTINUATION) and end of stream |
| * (DATA) as those frames may be small for legitimate reasons. |
| * |
| * RESET frames (received or sent) trigger an increase of overheadResetFactor. |
| * |
| * In all cases, the calling method determines the extent to which the overhead count is increased. |
| */ |
| updateOverheadCount(frameType, increment); |
| } |
| |
| |
| private void updateOverheadCount(FrameType frameType, int increment) { |
| long newOverheadCount = overheadCount.addAndGet(increment); |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.overheadChange", connectionId, getIdAsString(), frameType.name(), |
| Long.valueOf(newOverheadCount))); |
| } |
| } |
| |
| |
| boolean isOverheadLimitExceeded() { |
| return overheadCount.get() > 0; |
| } |
| |
| |
| // ----------------------------------------------- Http2Parser.Input methods |
| |
| @Override |
| public boolean fill(boolean block, byte[] data, int offset, int length) throws IOException { |
| int len = length; |
| int pos = offset; |
| boolean nextReadBlock = block; |
| int thisRead; |
| |
| while (len > 0) { |
| // Blocking reads use the protocol level read timeout. Non-blocking |
| // reads do not timeout. The intention is that once a frame has |
| // started to be read, the read timeout applies until it is |
| // completely read. |
| if (nextReadBlock) { |
| socketWrapper.setReadTimeout(protocol.getReadTimeout()); |
| } else { |
| socketWrapper.setReadTimeout(-1); |
| } |
| thisRead = socketWrapper.read(nextReadBlock, data, pos, len); |
| if (thisRead == 0) { |
| if (nextReadBlock) { |
| // Should never happen |
| throw new IllegalStateException(); |
| } else { |
| return false; |
| } |
| } else if (thisRead == -1) { |
| if (connectionState.get().isNewStreamAllowed()) { |
| throw new EOFException(); |
| } else { |
| return false; |
| } |
| } else { |
| pos += thisRead; |
| len -= thisRead; |
| nextReadBlock = true; |
| } |
| } |
| |
| return true; |
| } |
| |
| |
| @Override |
| public int getMaxFrameSize() { |
| return localSettings.getMaxFrameSize(); |
| } |
| |
| |
| // ---------------------------------------------- Http2Parser.Output methods |
| |
| @Override |
| public HpackDecoder getHpackDecoder() { |
| if (hpackDecoder == null) { |
| hpackDecoder = new HpackDecoder(localSettings.getHeaderTableSize()); |
| } |
| return hpackDecoder; |
| } |
| |
| |
| @Override |
| public ByteBuffer startRequestBodyFrame(int streamId, int dataLength, boolean endOfStream) throws Http2Exception { |
| // DATA frames reduce the overhead count ... |
| reduceOverheadCount(FrameType.DATA); |
| |
| // .. but lots of small payloads are inefficient so that will increase |
| // the overhead count unless it is the final DATA frame where small |
| // payloads are expected. |
| |
| // See also https://bz.apache.org/bugzilla/show_bug.cgi?id=63690 |
| // The buffering behaviour of some clients means that small data frames |
| // are much more frequent (roughly 1 in 20) than expected. Use an |
| // average over two frames to avoid false positives. |
| if (!endOfStream) { |
| int overheadThreshold = protocol.getOverheadDataThreshold(); |
| int average = (lastNonFinalDataPayload >> 1) + (dataLength >> 1); |
| lastNonFinalDataPayload = dataLength; |
| // Avoid division by zero |
| if (average == 0) { |
| average = 1; |
| } |
| if (average < overheadThreshold) { |
| increaseOverheadCount(FrameType.DATA, overheadThreshold / average); |
| } |
| } |
| |
| AbstractNonZeroStream abstractNonZeroStream = getAbstractNonZeroStream(streamId, true); |
| abstractNonZeroStream.checkState(FrameType.DATA); |
| abstractNonZeroStream.receivedData(dataLength); |
| ByteBuffer result = abstractNonZeroStream.getInputByteBuffer(true); |
| |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.startRequestBodyFrame.result", getConnectionId(), |
| abstractNonZeroStream.getIdAsString(), result)); |
| } |
| |
| return result; |
| } |
| |
| |
| @Override |
| public void endRequestBodyFrame(int streamId, int dataLength) throws Http2Exception, IOException { |
| AbstractNonZeroStream abstractNonZeroStream = getAbstractNonZeroStream(streamId, true); |
| if (abstractNonZeroStream instanceof Stream) { |
| ((Stream) abstractNonZeroStream).getInputBuffer().onDataAvailable(); |
| } else { |
| // The Stream was recycled between the call in Http2Parser to |
| // startRequestBodyFrame() and the synchronized block that contains |
| // the call to this method. This means the bytes read will have been |
| // written to the original stream and, effectively, swallowed. |
| // Therefore, need to notify that those bytes were swallowed here. |
| if (dataLength > 0) { |
| onSwallowedDataFramePayload(streamId, dataLength); |
| } |
| } |
| } |
| |
| |
| @Override |
| public void onSwallowedDataFramePayload(int streamId, int swallowedDataBytesCount) throws IOException { |
| AbstractNonZeroStream abstractNonZeroStream = getAbstractNonZeroStream(streamId); |
| writeWindowUpdate(abstractNonZeroStream, swallowedDataBytesCount, false); |
| } |
| |
| |
| @Override |
| public HeaderEmitter headersStart(int streamId, boolean headersEndStream) throws Http2Exception, IOException { |
| |
| Stream stream = getStream(streamId, false); |
| if (stream == null) { |
| // New stream |
| |
| // Check the pause state before processing headers since the pause state |
| // determines if a new stream is created or if this stream is ignored. |
| checkPauseState(); |
| |
| if (connectionState.get().isNewStreamAllowed()) { |
| if (streamId > maxProcessedStreamId) { |
| stream = createRemoteStream(streamId); |
| activeRemoteStreamCount.incrementAndGet(); |
| } else { |
| // ID for new stream must always be greater than any previous stream |
| throw new ConnectionException(sm.getString("upgradeHandler.stream.old", Integer.valueOf(streamId), |
| Integer.valueOf(maxProcessedStreamId)), Http2Error.PROTOCOL_ERROR); |
| } |
| } else { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.noNewStreams", connectionId, Integer.toString(streamId))); |
| } |
| reduceOverheadCount(FrameType.HEADERS); |
| // Stateless so a static can be used to save on GC |
| return HEADER_SINK; |
| } |
| } |
| |
| stream.checkState(FrameType.HEADERS); |
| stream.receivedStartOfHeaders(headersEndStream); |
| return stream; |
| } |
| |
| |
| /** |
| * Unused - NO-OP. |
| * |
| * @param streamId Unused |
| * @param parentStreamId Unused |
| * @param exclusive Unused |
| * @param weight Unused |
| * |
| * @throws Http2Exception Never thrown |
| * |
| * @deprecated Unused. Will be removed in Tomcat 11 onwards. |
| */ |
| @Deprecated |
| public void reprioritise(int streamId, int parentStreamId, boolean exclusive, int weight) throws Http2Exception { |
| // NO-OP |
| } |
| |
| |
| @Override |
| public void headersContinue(int payloadSize, boolean endOfHeaders) { |
| // Generally, continuation frames don't impact the overhead count but if |
| // they are small and the frame isn't the final header frame then that |
| // is indicative of an abusive client |
| if (!endOfHeaders) { |
| int overheadThreshold = getProtocol().getOverheadContinuationThreshold(); |
| if (payloadSize < overheadThreshold) { |
| if (payloadSize == 0) { |
| // Avoid division by zero |
| increaseOverheadCount(FrameType.CONTINUATION, overheadThreshold); |
| } else { |
| increaseOverheadCount(FrameType.CONTINUATION, overheadThreshold / payloadSize); |
| } |
| } |
| } |
| } |
| |
| |
| @Override |
| public void headersEnd(int streamId, boolean endOfStream) throws Http2Exception { |
| AbstractNonZeroStream abstractNonZeroStream = |
| getAbstractNonZeroStream(streamId, connectionState.get().isNewStreamAllowed()); |
| if (abstractNonZeroStream instanceof Stream) { |
| boolean processStream = false; |
| setMaxProcessedStream(streamId); |
| Stream stream = (Stream) abstractNonZeroStream; |
| if (stream.isActive()) { |
| if (stream.receivedEndOfHeaders()) { |
| if (localSettings.getMaxConcurrentStreams() < activeRemoteStreamCount.get()) { |
| decrementActiveRemoteStreamCount(stream); |
| // Ignoring maxConcurrentStreams increases the overhead count |
| increaseOverheadCount(FrameType.HEADERS); |
| throw new StreamException( |
| sm.getString("upgradeHandler.tooManyRemoteStreams", |
| Long.toString(localSettings.getMaxConcurrentStreams())), |
| Http2Error.REFUSED_STREAM, streamId); |
| } |
| // Valid new stream reduces the overhead count |
| reduceOverheadCount(FrameType.HEADERS); |
| |
| processStream = true; |
| } |
| } |
| /* |
| * Need to process end of stream before calling processStreamOnContainerThread to avoid a race condition |
| * where the container thread finishes before end of stream is processed, thinks the request hasn't been |
| * fully read so issues a RST with error code 0 (NO_ERROR) to tell the client not to send the request body, |
| * if any. This breaks tests and generates unnecessary RST messages for standard clients. |
| */ |
| if (endOfStream) { |
| receivedEndOfStream(stream); |
| } |
| if (processStream) { |
| processStreamOnContainerThread(stream); |
| } |
| } |
| } |
| |
| |
| @Override |
| public void receivedEndOfStream(int streamId) throws ConnectionException { |
| AbstractNonZeroStream abstractNonZeroStream = |
| getAbstractNonZeroStream(streamId, connectionState.get().isNewStreamAllowed()); |
| if (abstractNonZeroStream instanceof Stream) { |
| Stream stream = (Stream) abstractNonZeroStream; |
| receivedEndOfStream(stream); |
| } |
| } |
| |
| |
| private void receivedEndOfStream(Stream stream) throws ConnectionException { |
| stream.receivedEndOfStream(); |
| if (!stream.isActive()) { |
| decrementActiveRemoteStreamCount(stream); |
| } |
| } |
| |
| |
| private void setMaxProcessedStream(int streamId) { |
| if (maxProcessedStreamId < streamId) { |
| maxProcessedStreamId = streamId; |
| } |
| } |
| |
| |
| @Override |
| public void reset(int streamId, long errorCode) throws Http2Exception { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.reset.receive", getConnectionId(), Integer.toString(streamId), |
| Long.toString(errorCode))); |
| } |
| increaseOverheadCount(FrameType.RST, getProtocol().getOverheadResetFactor()); |
| AbstractNonZeroStream abstractNonZeroStream = getAbstractNonZeroStream(streamId, true); |
| abstractNonZeroStream.checkState(FrameType.RST); |
| if (abstractNonZeroStream instanceof Stream) { |
| Stream stream = (Stream) abstractNonZeroStream; |
| boolean active = stream.isActive(); |
| stream.receiveReset(errorCode); |
| if (active) { |
| decrementActiveRemoteStreamCount(stream); |
| } |
| } |
| } |
| |
| |
| @Override |
| public void setting(Setting setting, long value) throws ConnectionException { |
| |
| increaseOverheadCount(FrameType.SETTINGS); |
| |
| // Possible with empty settings frame |
| if (setting == null) { |
| return; |
| } |
| |
| // Special handling required |
| if (setting == Setting.INITIAL_WINDOW_SIZE) { |
| long oldValue = remoteSettings.getInitialWindowSize(); |
| // Do this first in case new value is invalid |
| remoteSettings.set(setting, value); |
| int diff = (int) (value - oldValue); |
| for (AbstractNonZeroStream stream : streams.values()) { |
| try { |
| stream.incrementWindowSize(diff); |
| } catch (Http2Exception h2e) { |
| ((Stream) stream).close(new StreamException( |
| sm.getString("upgradeHandler.windowSizeTooBig", connectionId, stream.getIdAsString()), |
| h2e.getError(), stream.getIdAsInt())); |
| } |
| } |
| } else if (setting == Setting.NO_RFC7540_PRIORITIES) { |
| // This should not be changed after the initial setting |
| if (value != ConnectionSettingsBase.DEFAULT_NO_RFC7540_PRIORITIES) { |
| throw new ConnectionException(sm.getString("upgradeHandler.enableRfc7450Priorities", connectionId), |
| Http2Error.PROTOCOL_ERROR); |
| } |
| } else { |
| remoteSettings.set(setting, value); |
| } |
| } |
| |
| |
| @Override |
| public void settingsEnd(boolean ack) throws IOException { |
| if (ack) { |
| if (!localSettings.ack()) { |
| // Ack was unexpected |
| log.warn(sm.getString("upgradeHandler.unexpectedAck", connectionId, getIdAsString())); |
| } |
| } else { |
| socketWrapper.getLock().lock(); |
| try { |
| socketWrapper.write(true, SETTINGS_ACK, 0, SETTINGS_ACK.length); |
| socketWrapper.flush(true); |
| } finally { |
| socketWrapper.getLock().unlock(); |
| } |
| } |
| } |
| |
| |
| @Override |
| public void pingReceive(byte[] payload, boolean ack) throws IOException { |
| if (!ack) { |
| increaseOverheadCount(FrameType.PING); |
| } |
| pingManager.receivePing(payload, ack); |
| } |
| |
| |
| @Override |
| public void goaway(int lastStreamId, long errorCode, String debugData) { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.goaway.debug", connectionId, Integer.toString(lastStreamId), |
| Long.toHexString(errorCode), debugData)); |
| } |
| close(); |
| } |
| |
| |
| @Override |
| public void incrementWindowSize(int streamId, int increment) throws Http2Exception { |
| // See also https://bz.apache.org/bugzilla/show_bug.cgi?id=63690 |
| // The buffering behaviour of some clients means that small data frames |
| // are much more frequent (roughly 1 in 20) than expected. Some clients |
| // issue a Window update for every DATA frame so a similar pattern may |
| // be observed. Use an average over two frames to avoid false positives. |
| |
| int average = (lastWindowUpdate >> 1) + (increment >> 1); |
| int overheadThreshold = protocol.getOverheadWindowUpdateThreshold(); |
| lastWindowUpdate = increment; |
| // Avoid division by zero |
| if (average == 0) { |
| average = 1; |
| } |
| |
| if (streamId == 0) { |
| // Check for small increments which are inefficient |
| if (average < overheadThreshold) { |
| // The smaller the increment, the larger the overhead |
| increaseOverheadCount(FrameType.WINDOW_UPDATE, overheadThreshold / average); |
| } |
| |
| incrementWindowSize(increment); |
| } else { |
| AbstractNonZeroStream stream = getAbstractNonZeroStream(streamId, true); |
| |
| // Check for small increments which are inefficient |
| if (average < overheadThreshold) { |
| // For Streams, client might only release the minimum so check |
| // against current demand |
| if (increment < stream.getConnectionAllocationRequested()) { |
| // The smaller the increment, the larger the overhead |
| increaseOverheadCount(FrameType.WINDOW_UPDATE, overheadThreshold / average); |
| } |
| } |
| |
| stream.checkState(FrameType.WINDOW_UPDATE); |
| stream.incrementWindowSize(increment); |
| } |
| } |
| |
| |
| @Override |
| public void priorityUpdate(int prioritizedStreamID, Priority p) throws Http2Exception { |
| increaseOverheadCount(FrameType.PRIORITY_UPDATE); |
| AbstractNonZeroStream abstractNonZeroStream = getAbstractNonZeroStream(prioritizedStreamID, true); |
| if (abstractNonZeroStream instanceof Stream) { |
| Stream stream = (Stream) abstractNonZeroStream; |
| stream.setUrgency(p.getUrgency()); |
| stream.setIncremental(p.getIncremental()); |
| } |
| } |
| |
| |
| @Override |
| public void onSwallowedUnknownFrame(int streamId, int frameTypeId, int flags, int size) throws IOException { |
| // NO-OP. |
| } |
| |
| |
| void replaceStream(AbstractNonZeroStream original, AbstractNonZeroStream replacement) { |
| AbstractNonZeroStream current = streams.get(original.getIdentifier()); |
| /* |
| * Only replace the Stream once. No point replacing one RecycledStream instance with another. |
| * |
| * This method is called from both StreamProcessor and Http2UpgradeHandler which may be operating on the Stream |
| * concurrently. It is therefore expected that there will be duplicate calls to this method - primarily |
| * triggered by stream errors when processing incoming frames. |
| */ |
| if (current instanceof Stream) { |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("upgradeHandler.replace.first", getConnectionId(), original.getIdAsString())); |
| } |
| streams.put(original.getIdentifier(), replacement); |
| } else { |
| if (log.isTraceEnabled()) { |
| log.trace( |
| sm.getString("upgradeHandler.replace.duplicate", getConnectionId(), original.getIdAsString())); |
| } |
| } |
| } |
| |
| |
| String getSniHostName() { |
| return socketWrapper.getSniHostName(); |
| } |
| |
| protected class PingManager { |
| |
| protected boolean initiateDisabled = false; |
| |
| // 10 seconds |
| protected final long pingIntervalNano = 10000000000L; |
| |
| protected volatile int sequence = 0; |
| protected long lastPingNanoTime = Long.MIN_VALUE; |
| |
| protected Queue<PingRecord> inflightPings = new ConcurrentLinkedQueue<>(); |
| protected Queue<Long> roundTripTimes = new ConcurrentLinkedQueue<>(); |
| |
| /** |
| * Check to see if a ping was sent recently and, if not, send one. |
| * |
| * @param force Send a ping, even if one was sent recently |
| * |
| * @throws IOException If an I/O issue prevents the ping from being sent |
| */ |
| public void sendPing(boolean force) throws IOException { |
| if (initiateDisabled) { |
| return; |
| } |
| long now = System.nanoTime(); |
| if (force || now - lastPingNanoTime > pingIntervalNano) { |
| lastPingNanoTime = now; |
| byte[] payload = new byte[8]; |
| int sentSequence = ++sequence; |
| PingRecord pingRecord = new PingRecord(sentSequence, now); |
| inflightPings.add(pingRecord); |
| ByteUtil.set31Bits(payload, 4, sentSequence); |
| socketWrapper.write(true, PING, 0, PING.length); |
| socketWrapper.write(true, payload, 0, payload.length); |
| socketWrapper.flush(true); |
| } |
| } |
| |
| public void receivePing(byte[] payload, boolean ack) throws IOException { |
| if (ack) { |
| // Extract the sequence from the payload |
| int receivedSequence = ByteUtil.get31Bits(payload, 4); |
| PingRecord pingRecord = inflightPings.poll(); |
| while (pingRecord != null && pingRecord.getSequence() < receivedSequence) { |
| pingRecord = inflightPings.poll(); |
| } |
| if (pingRecord == null) { |
| // Unexpected ACK. Log it. |
| } else { |
| long roundTripTime = System.nanoTime() - pingRecord.getSentNanoTime(); |
| roundTripTimes.add(Long.valueOf(roundTripTime)); |
| while (roundTripTimes.size() > 3) { |
| // Ignore the returned value as we just want to reduce |
| // the queue to 3 entries to use for the rolling average. |
| roundTripTimes.poll(); |
| } |
| if (log.isTraceEnabled()) { |
| log.trace(sm.getString("pingManager.roundTripTime", connectionId, Long.valueOf(roundTripTime))); |
| } |
| } |
| |
| } else { |
| // Client originated ping. Echo it back. |
| socketWrapper.getLock().lock(); |
| try { |
| socketWrapper.write(true, PING_ACK, 0, PING_ACK.length); |
| socketWrapper.write(true, payload, 0, payload.length); |
| socketWrapper.flush(true); |
| } finally { |
| socketWrapper.getLock().unlock(); |
| } |
| } |
| } |
| |
| public long getRoundTripTimeNano() { |
| return (long) roundTripTimes.stream().mapToLong(Long::longValue).average().orElse(0); |
| } |
| } |
| |
| |
| protected static class PingRecord { |
| |
| private final int sequence; |
| private final long sentNanoTime; |
| |
| public PingRecord(int sequence, long sentNanoTime) { |
| this.sequence = sequence; |
| this.sentNanoTime = sentNanoTime; |
| } |
| |
| public int getSequence() { |
| return sequence; |
| } |
| |
| public long getSentNanoTime() { |
| return sentNanoTime; |
| } |
| } |
| |
| |
| private enum ConnectionState { |
| |
| NEW(true), |
| CONNECTED(true), |
| PAUSING(true), |
| PAUSED(false), |
| CLOSED(false); |
| |
| private final boolean newStreamsAllowed; |
| |
| ConnectionState(boolean newStreamsAllowed) { |
| this.newStreamsAllowed = newStreamsAllowed; |
| } |
| |
| public boolean isNewStreamAllowed() { |
| return newStreamsAllowed; |
| } |
| } |
| |
| |
| protected interface HeaderFrameBuffers { |
| void startFrame(); |
| |
| void endFrame() throws IOException; |
| |
| void endHeaders() throws IOException; |
| |
| byte[] getHeader(); |
| |
| ByteBuffer getPayload(); |
| |
| void expandPayload(); |
| } |
| |
| |
| private class DefaultHeaderFrameBuffers implements HeaderFrameBuffers { |
| |
| private final byte[] header; |
| private ByteBuffer payload; |
| |
| DefaultHeaderFrameBuffers(int initialPayloadSize) { |
| header = new byte[9]; |
| payload = ByteBuffer.allocate(initialPayloadSize); |
| } |
| |
| @Override |
| public void startFrame() { |
| // NO-OP |
| } |
| |
| |
| @Override |
| public void endFrame() throws IOException { |
| try { |
| socketWrapper.write(true, header, 0, header.length); |
| socketWrapper.write(true, payload); |
| socketWrapper.flush(true); |
| } catch (IOException ioe) { |
| handleAppInitiatedIOException(ioe); |
| } |
| payload.clear(); |
| } |
| |
| @Override |
| public void endHeaders() { |
| // NO-OP |
| } |
| |
| @Override |
| public byte[] getHeader() { |
| return header; |
| } |
| |
| @Override |
| public ByteBuffer getPayload() { |
| return payload; |
| } |
| |
| @Override |
| public void expandPayload() { |
| payload = ByteBuffer.allocate(payload.capacity() * 2); |
| } |
| } |
| } |