blob: 7eaa721606881fdc77944583a90e6cb7d8fd0043 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.transport;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.security.Principal;
import java.security.cert.Certificate;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportEncryption;
import org.apache.qpid.util.SystemUtils;
public class NonBlockingConnection implements ServerNetworkConnection, ByteBufferSender
{
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
private final SocketChannel _socketChannel;
private NonBlockingConnectionDelegate _delegate;
private final Deque<NetworkConnectionScheduler> _schedulerDeque = new ConcurrentLinkedDeque<>();
private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
private final String _remoteSocketAddress;
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final ProtocolEngine _protocolEngine;
private final Runnable _onTransportEncryptionAction;
private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
private final long _outboundMessageBufferLimit;
private volatile boolean _fullyWritten = true;
private boolean _partialRead = false;
private final AmqpPort _port;
private final AtomicBoolean _scheduled = new AtomicBoolean();
private volatile long _scheduledTime;
private volatile boolean _unexpectedByteBufferSizeReported;
private final String _threadName;
private volatile SelectorThread.SelectionTask _selectionTask;
private Iterator<Runnable> _pendingIterator;
private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
private final AtomicLong _maxReadIdleMillis = new AtomicLong();
private final List<SchedulingDelayNotificationListener> _schedulingDelayNotificationListeners = new CopyOnWriteArrayList<>();
private final AtomicBoolean _hasShutdown = new AtomicBoolean();
public NonBlockingConnection(SocketChannel socketChannel,
ProtocolEngine protocolEngine,
final Set<TransportEncryption> encryptionSet,
final Runnable onTransportEncryptionAction,
final NetworkConnectionScheduler scheduler,
final AmqpPort port)
{
_socketChannel = socketChannel;
pushScheduler(scheduler);
_protocolEngine = protocolEngine;
_onTransportEncryptionAction = onTransportEncryptionAction;
_remoteSocketAddress = _socketChannel.socket().getRemoteSocketAddress().toString();
_port = port;
_threadName = SelectorThread.IO_THREAD_NAME_PREFIX + _remoteSocketAddress.toString();
_outboundMessageBufferLimit = (long) _port.getContextValue(Long.class,
AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
protocolEngine.setWorkListener(new Action<ProtocolEngine>()
{
@Override
public void performAction(final ProtocolEngine object)
{
if(!_scheduled.get())
{
getScheduler().schedule(NonBlockingConnection.this);
}
}
});
if(encryptionSet.size() == 1)
{
setTransportEncryption(encryptionSet.iterator().next());
}
else
{
_delegate = new NonBlockingConnectionUndecidedDelegate(this);
}
}
String getThreadName()
{
return _threadName;
}
public boolean isPartialRead()
{
return _partialRead;
}
Ticker getTicker()
{
return _protocolEngine.getAggregateTicker();
}
SocketChannel getSocketChannel()
{
return _socketChannel;
}
@Override
public void start()
{
}
@Override
public ByteBufferSender getSender()
{
return this;
}
@Override
public void close()
{
LOGGER.debug("Closing " + _remoteSocketAddress);
if(_closed.compareAndSet(false,true))
{
_protocolEngine.notifyWork();
_selectionTask.wakeup();
}
}
@Override
public SocketAddress getRemoteAddress()
{
return _socketChannel.socket().getRemoteSocketAddress();
}
@Override
public SocketAddress getLocalAddress()
{
return _socketChannel.socket().getLocalSocketAddress();
}
@Override
public void setMaxWriteIdleMillis(final long millis)
{
_maxWriteIdleMillis.set(millis);
}
@Override
public void setMaxReadIdleMillis(final long millis)
{
_maxReadIdleMillis.set(millis);
}
@Override
public Principal getPeerPrincipal()
{
return _delegate.getPeerPrincipal();
}
@Override
public Certificate getPeerCertificate()
{
return _delegate.getPeerCertificate();
}
@Override
public long getMaxReadIdleMillis()
{
return _maxReadIdleMillis.get();
}
@Override
public long getMaxWriteIdleMillis()
{
return _maxWriteIdleMillis.get();
}
@Override
public void reserveOutboundMessageSpace(long size)
{
if (_usedOutboundMessageSpace.addAndGet(size) > _outboundMessageBufferLimit)
{
_protocolEngine.setMessageAssignmentSuspended(true, false);
}
}
@Override
public String getTransportInfo()
{
return _delegate.getTransportInfo();
}
boolean wantsRead()
{
return _fullyWritten;
}
boolean wantsWrite()
{
return !_fullyWritten;
}
public boolean isStateChanged()
{
return _protocolEngine.hasWork();
}
public void doPreWork()
{
if (!_closed.get())
{
long currentTime = System.currentTimeMillis();
long schedulingDelay = currentTime - getScheduledTime();
if (!_schedulingDelayNotificationListeners.isEmpty())
{
for (SchedulingDelayNotificationListener listener : _schedulingDelayNotificationListeners)
{
listener.notifySchedulingDelay(schedulingDelay);
}
}
}
}
public boolean doWork()
{
_protocolEngine.clearWork();
if (!_closed.get())
{
try
{
long currentTime = System.currentTimeMillis();
int tick = getTicker().getTimeToNextTick(currentTime);
if (tick <= 0)
{
getTicker().tick(currentTime);
}
_protocolEngine.setIOThread(Thread.currentThread());
_protocolEngine.setMessageAssignmentSuspended(true, true);
boolean processPendingComplete = processPending();
if(processPendingComplete)
{
_pendingIterator = null;
_protocolEngine.setTransportBlockedForWriting(false);
boolean dataRead = doRead();
_protocolEngine.setTransportBlockedForWriting(!doWrite());
if (!_fullyWritten || dataRead || (_delegate.needsWork() && _delegate.getNetInputBuffer().position() != 0))
{
_protocolEngine.notifyWork();
}
if (_fullyWritten)
{
_protocolEngine.setMessageAssignmentSuspended(false, true);
}
}
else
{
_protocolEngine.notifyWork();
}
}
catch (IOException |
ConnectionScopedRuntimeException e)
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Exception performing I/O for connection '{}'",
_remoteSocketAddress, e);
}
else
{
LOGGER.info("Exception performing I/O for connection '{}' : {}",
_remoteSocketAddress, e.getMessage());
}
if(_closed.compareAndSet(false,true))
{
_protocolEngine.notifyWork();
}
}
finally
{
_protocolEngine.setIOThread(null);
}
}
final boolean closed = _closed.get();
if (closed)
{
shutdown();
}
return closed;
}
@Override
public void addSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
{
_schedulingDelayNotificationListeners.add(listener);
}
@Override
public void removeSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
{
_schedulingDelayNotificationListeners.remove(listener);
}
private boolean processPending() throws IOException
{
if(_pendingIterator == null)
{
_pendingIterator = _protocolEngine.processPendingIterator();
}
final int networkBufferSize = _port.getNetworkBufferSize();
while(_pendingIterator.hasNext())
{
long size = getBufferedSize();
if(size >= networkBufferSize)
{
doWrite();
long bytesWritten = size - getBufferedSize();
if(bytesWritten < (networkBufferSize / 2))
{
break;
}
}
else
{
final Runnable task = _pendingIterator.next();
task.run();
}
}
boolean complete = !_pendingIterator.hasNext();
if (getBufferedSize() >= networkBufferSize)
{
doWrite();
complete &= getBufferedSize() < networkBufferSize /2;
}
return complete;
}
private long getBufferedSize()
{
// Avoids iterator garbage if empty
if (_buffers.isEmpty())
{
return 0L;
}
long totalSize = 0L;
for(QpidByteBuffer buf : _buffers)
{
totalSize += buf.remaining();
}
return totalSize;
}
private void shutdown()
{
if (_hasShutdown.getAndSet(true))
{
return;
}
try
{
shutdownInput();
shutdownFinalWrite();
_protocolEngine.closed();
shutdownOutput();
}
finally
{
try
{
try
{
NetworkConnectionScheduler scheduler = getScheduler();
if (scheduler != null)
{
scheduler.removeConnection(this);
}
}
finally
{
_socketChannel.close();
}
}
catch (IOException e)
{
LOGGER.info("Exception closing socket '{}': {}", _remoteSocketAddress, e.getMessage());
}
if (SystemUtils.isWindows())
{
_delegate.shutdownInput();
_delegate.shutdownOutput();
}
}
}
private void shutdownFinalWrite()
{
try
{
while(!doWrite())
{
}
}
catch (IOException e)
{
LOGGER.info("Exception performing final write/close for '{}': {}", _remoteSocketAddress, e.getMessage());
}
}
private void shutdownOutput()
{
if(!SystemUtils.isWindows())
{
try
{
_socketChannel.shutdownOutput();
}
catch (IOException e)
{
LOGGER.info("Exception closing socket '{}': {}", _remoteSocketAddress, e.getMessage());
}
finally
{
_delegate.shutdownOutput();
}
}
}
private void shutdownInput()
{
if(!SystemUtils.isWindows())
{
try
{
_socketChannel.shutdownInput();
}
catch (IOException e)
{
LOGGER.info("Exception shutting down input for '{}': {}", _remoteSocketAddress, e.getMessage());
}
finally
{
_delegate.shutdownInput();
}
}
}
/**
* doRead is not reentrant.
*/
boolean doRead() throws IOException
{
_partialRead = false;
if(!_closed.get() && _delegate.readyForRead())
{
int readData = readFromNetwork();
if (readData > 0)
{
return _delegate.processData();
}
else
{
return false;
}
}
else
{
return false;
}
}
long writeToTransport(Collection<QpidByteBuffer> buffers) throws IOException
{
long written = QpidByteBuffer.write(_socketChannel, buffers);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Written " + written + " bytes");
}
return written;
}
private boolean doWrite() throws IOException
{
_fullyWritten = _delegate.doWrite(_buffers);
while(!_buffers.isEmpty())
{
QpidByteBuffer buf = _buffers.peek();
if(buf.hasRemaining())
{
break;
}
_buffers.poll();
buf.dispose();
}
if (_fullyWritten)
{
_usedOutboundMessageSpace.set(0);
}
return _fullyWritten;
}
protected int readFromNetwork() throws IOException
{
QpidByteBuffer buffer = _delegate.getNetInputBuffer();
int read = buffer.read(_socketChannel);
if (read == -1)
{
_closed.set(true);
}
_partialRead = read != 0;
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Read " + read + " byte(s)");
}
return read;
}
@Override
public boolean isDirectBufferPreferred()
{
return true;
}
@Override
public void send(final QpidByteBuffer msg)
{
if (_closed.get())
{
LOGGER.warn("Send ignored as the connection is already closed");
}
else if (msg.remaining() > 0)
{
_buffers.add(msg.duplicate());
}
msg.position(msg.limit());
}
@Override
public void flush()
{
}
public final void pushScheduler(NetworkConnectionScheduler scheduler)
{
_schedulerDeque.addFirst(scheduler);
}
public final NetworkConnectionScheduler popScheduler()
{
return _schedulerDeque.removeFirst();
}
public final NetworkConnectionScheduler getScheduler()
{
return _schedulerDeque.peekFirst();
}
@Override
public String toString()
{
return "[NonBlockingConnection " + _remoteSocketAddress + "]";
}
public void processAmqpData(QpidByteBuffer applicationData)
{
_protocolEngine.received(applicationData);
}
public void setTransportEncryption(TransportEncryption transportEncryption)
{
NonBlockingConnectionDelegate oldDelegate = _delegate;
switch (transportEncryption)
{
case TLS:
_onTransportEncryptionAction.run();
_delegate = new NonBlockingConnectionTLSDelegate(this, _port);
break;
case NONE:
_delegate = new NonBlockingConnectionPlainDelegate(this, _port);
break;
default:
throw new IllegalArgumentException("unknown TransportEncryption " + transportEncryption);
}
if(oldDelegate != null)
{
QpidByteBuffer src = oldDelegate.getNetInputBuffer().duplicate();
src.flip();
_delegate.getNetInputBuffer().put(src);
src.dispose();
}
LOGGER.debug("Identified transport encryption as " + transportEncryption);
}
public boolean setScheduled()
{
final boolean scheduled = _scheduled.compareAndSet(false, true);
if (scheduled)
{
_scheduledTime = System.currentTimeMillis();
}
return scheduled;
}
public void clearScheduled()
{
_scheduled.set(false);
_scheduledTime = 0;
}
@Override
public long getScheduledTime()
{
return _scheduledTime;
}
void reportUnexpectedByteBufferSizeUsage()
{
if (!_unexpectedByteBufferSizeReported)
{
LOGGER.info("At least one frame unexpectedly does not fit into default byte buffer size ({}B) on a connection {}.",
_port.getNetworkBufferSize(), this.toString());
_unexpectedByteBufferSizeReported = true;
}
}
public SelectorThread.SelectionTask getSelectionTask()
{
return _selectionTask;
}
public void setSelectionTask(final SelectorThread.SelectionTask selectionTask)
{
_selectionTask = selectionTask;
}
}