blob: ff64fe836dd9e5ec936ccb4dd56fe787d9052446 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.transport;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.security.Principal;
import java.security.cert.Certificate;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.util.SystemUtils;
public abstract class NonBlockingConnection implements SchedulableConnection
{
private static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingConnection.class);
final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
private final SocketChannel _socketChannel;
private final Deque<NetworkConnectionScheduler> _schedulerDeque = new ConcurrentLinkedDeque<>();
private final String _remoteAddressString;
private final AtomicBoolean _scheduled = new AtomicBoolean();
private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
private final AtomicLong _maxReadIdleMillis = new AtomicLong();
private final List<SchedulingDelayNotificationListener>
_schedulingDelayNotificationListeners = new CopyOnWriteArrayList<>();
private final String _threadName;
private final ConcurrentLinkedQueue<QpidByteBuffer> _buffers = new ConcurrentLinkedQueue<>();
private NonBlockingConnectionDelegate _delegate;
private final AtomicBoolean _closed = new AtomicBoolean(false);
private volatile SelectorThread.SelectionTask _selectionTask;
private volatile long _scheduledTime;
private final AtomicBoolean _hasShutdown = new AtomicBoolean();
private final AtomicBoolean _unexpectedByteBufferSizeReported = new AtomicBoolean();
private final ProtocolEngine _protocolEngine;
private volatile Iterator<Runnable> _pendingIterator;
private volatile boolean _fullyWritten = true;
private volatile boolean _partialRead = false;
protected NonBlockingConnection(final SocketChannel socketChannel, ProtocolEngine protocolEngine,
final NetworkConnectionScheduler scheduler,
String remoteAddressString)
{
_socketChannel = socketChannel;
_protocolEngine = protocolEngine;
_remoteAddressString = remoteAddressString;
_threadName = SelectorThread.IO_THREAD_NAME_PREFIX + _remoteAddressString;
pushScheduler(scheduler);
protocolEngine.setWorkListener(new Action<ProtocolEngine>()
{
@Override
public void performAction(final ProtocolEngine object)
{
NetworkConnectionScheduler scheduler = getScheduler();
if(scheduler != null && !_scheduled.get())
{
getScheduler().schedule(NonBlockingConnection.this);
}
}
});
}
@Override
public final String getThreadName()
{
return _threadName;
}
protected final String getRemoteAddressString()
{
return _remoteAddressString;
}
protected final boolean isClosed()
{
return _closed.get();
}
protected final boolean setClosed()
{
return _closed.compareAndSet(false,true);
}
@Override
public final SocketChannel getSocketChannel()
{
return _socketChannel;
}
@Override
public final SocketAddress getRemoteAddress()
{
return getSocketChannel().socket().getRemoteSocketAddress();
}
@Override
public final SocketAddress getLocalAddress()
{
return getSocketChannel().socket().getLocalSocketAddress();
}
public final void pushScheduler(NetworkConnectionScheduler scheduler)
{
_schedulerDeque.addFirst(scheduler);
}
public final NetworkConnectionScheduler popScheduler()
{
return _schedulerDeque.removeFirst();
}
@Override
public final NetworkConnectionScheduler getScheduler()
{
return _schedulerDeque.peekFirst();
}
@Override
public final Principal getPeerPrincipal()
{
return _delegate.getPeerPrincipal();
}
@Override
public final Certificate getPeerCertificate()
{
return _delegate.getPeerCertificate();
}
@Override
public final String getTransportInfo()
{
return _delegate.getTransportInfo();
}
@Override
public final SelectorThread.SelectionTask getSelectionTask()
{
return _selectionTask;
}
@Override
public final void setSelectionTask(final SelectorThread.SelectionTask selectionTask)
{
_selectionTask = selectionTask;
}
@Override
public final boolean setScheduled()
{
final boolean scheduled = _scheduled.compareAndSet(false, true);
if (scheduled)
{
_scheduledTime = System.currentTimeMillis();
}
return scheduled;
}
@Override
public final void clearScheduled()
{
_scheduled.set(false);
_scheduledTime = 0;
}
@Override
public final long getScheduledTime()
{
return _scheduledTime;
}
@Override
public final void setMaxWriteIdleMillis(final long millis)
{
_maxWriteIdleMillis.set(millis);
}
@Override
public final void setMaxReadIdleMillis(final long millis)
{
_maxReadIdleMillis.set(millis);
}
@Override
public final long getMaxReadIdleMillis()
{
return _maxReadIdleMillis.get();
}
@Override
public final long getMaxWriteIdleMillis()
{
return _maxWriteIdleMillis.get();
}
@Override
public final boolean isStateChanged()
{
return _protocolEngine.hasWork();
}
@Override
public final boolean wantsRead()
{
return _fullyWritten;
}
@Override
public final boolean wantsWrite()
{
return !_fullyWritten;
}
@Override
public final boolean isPartialRead()
{
return _partialRead;
}
@Override
public boolean doWork()
{
_protocolEngine.clearWork();
try
{
if (!isClosed() && (!wantsConnect() || completeConnection()))
{
long currentTime = System.currentTimeMillis();
int tick = getTicker().getTimeToNextTick(currentTime);
if (tick <= 0)
{
getTicker().tick(currentTime);
}
_protocolEngine.setIOThread(Thread.currentThread());
_protocolEngine.setMessageAssignmentSuspended(true, true);
boolean processPendingComplete = processPending();
if (processPendingComplete)
{
_pendingIterator = null;
_protocolEngine.setTransportBlockedForWriting(false);
boolean dataRead = doRead();
_fullyWritten = doWrite();
_protocolEngine.setTransportBlockedForWriting(!_fullyWritten);
if (!_fullyWritten || dataRead || (_delegate.needsWork()
&& _delegate.getNetInputBuffer().position() != 0))
{
_protocolEngine.notifyWork();
}
if (_fullyWritten)
{
_protocolEngine.setMessageAssignmentSuspended(false, true);
}
}
else
{
_protocolEngine.notifyWork();
}
}
}
catch (IOException |
ConnectionScopedRuntimeException e)
{
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Exception performing I/O for connection '{}'",
getRemoteAddressString(), e);
}
else
{
LOGGER.info("Exception performing I/O for connection '{}' : {}",
getRemoteAddressString(), e.getMessage());
}
if(setClosed())
{
_protocolEngine.notifyWork();
}
}
finally
{
_protocolEngine.setIOThread(null);
}
final boolean closed = isClosed();
if (closed)
{
shutdown();
}
return closed;
}
private boolean completeConnection() throws IOException
{
boolean finishConnect = getSocketChannel().finishConnect();
return finishConnect && !wantsConnect();
}
private boolean processPending() throws IOException
{
if(_pendingIterator == null)
{
_pendingIterator = _protocolEngine.processPendingIterator();
}
final int networkBufferSize = getNetworkBufferSize();
while(_pendingIterator.hasNext())
{
long size = getBufferedSize();
if(size >= networkBufferSize)
{
_fullyWritten = doWrite();
long bytesWritten = size - getBufferedSize();
if(bytesWritten < (networkBufferSize / 2))
{
break;
}
}
else
{
final Runnable task = _pendingIterator.next();
task.run();
}
}
boolean complete = !_pendingIterator.hasNext();
if (getBufferedSize() >= networkBufferSize)
{
_fullyWritten = doWrite();
complete &= getBufferedSize() < networkBufferSize /2;
}
return complete;
}
protected abstract int getNetworkBufferSize();
@Override
public final void doPreWork()
{
if (!isClosed())
{
long currentTime = System.currentTimeMillis();
long schedulingDelay = currentTime - getScheduledTime();
if (!_schedulingDelayNotificationListeners.isEmpty())
{
for (SchedulingDelayNotificationListener listener : _schedulingDelayNotificationListeners)
{
listener.notifySchedulingDelay(schedulingDelay);
}
}
}
}
/**
* doRead is not reentrant.
*/
boolean doRead() throws IOException
{
_partialRead = false;
if(!isClosed() && _delegate.readyForRead())
{
int readData = readFromNetwork();
if (readData > 0)
{
return _delegate.processData();
}
else
{
return false;
}
}
else
{
return false;
}
}
@Override
public long writeToTransport(Collection<QpidByteBuffer> buffers) throws IOException
{
long written = QpidByteBuffer.write(getSocketChannel(), buffers);
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Written " + written + " bytes");
}
return written;
}
protected int readFromNetwork() throws IOException
{
QpidByteBuffer buffer = _delegate.getNetInputBuffer();
int read = buffer.read(getSocketChannel());
if (read == -1)
{
setClosed();
}
_partialRead = read != 0;
if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Read " + read + " byte(s)");
}
return read;
}
@Override
public final void addSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
{
_schedulingDelayNotificationListeners.add(listener);
}
@Override
public final void removeSchedulingDelayNotificationListeners(final SchedulingDelayNotificationListener listener)
{
_schedulingDelayNotificationListeners.remove(listener);
}
@Override
public final ByteBufferSender getSender()
{
return this;
}
@Override
public final boolean isDirectBufferPreferred()
{
return true;
}
private final long getBufferedSize()
{
// Avoids iterator garbage if empty
if (_buffers.isEmpty())
{
return 0L;
}
long totalSize = 0L;
for(QpidByteBuffer buf : _buffers)
{
totalSize += buf.remaining();
}
return totalSize;
}
final boolean doWrite() throws IOException
{
boolean fullyWritten = _delegate.doWrite(_buffers);
while(!_buffers.isEmpty())
{
QpidByteBuffer buf = _buffers.peek();
if(buf.hasRemaining())
{
break;
}
_buffers.poll();
buf.dispose();
}
if (fullyWritten)
{
_usedOutboundMessageSpace.set(0);
}
return fullyWritten;
}
@Override
public final void send(final QpidByteBuffer msg)
{
if (isClosed())
{
LOGGER.warn("Send ignored as the connection is already closed");
}
else if (msg.remaining() > 0)
{
_buffers.add(msg.duplicate());
}
msg.position(msg.limit());
}
protected final void shutdown()
{
if (_hasShutdown.getAndSet(true))
{
return;
}
try
{
shutdownInput();
shutdownFinalWrite();
_protocolEngine.closed();
shutdownOutput();
}
finally
{
try
{
try
{
NetworkConnectionScheduler scheduler = getScheduler();
if (scheduler != null)
{
scheduler.removeConnection(this);
}
}
finally
{
getSocketChannel().close();
}
}
catch (IOException e)
{
LOGGER.info("Exception closing socket '{}': {}", getRemoteAddressString(), e.getMessage());
}
if (SystemUtils.isWindows())
{
_delegate.shutdownInput();
_delegate.shutdownOutput();
}
}
}
private void shutdownFinalWrite()
{
try
{
while(!doWrite())
{
}
}
catch (IOException e)
{
LOGGER.info("Exception performing final write/close for '{}': {}", getRemoteAddressString(), e.getMessage());
}
}
private void shutdownOutput()
{
if(!SystemUtils.isWindows())
{
try
{
getSocketChannel().shutdownOutput();
}
catch (IOException e)
{
LOGGER.info("Exception closing socket '{}': {}", getRemoteAddressString(), e.getMessage());
}
finally
{
_delegate.shutdownOutput();
}
}
}
private void shutdownInput()
{
if(!SystemUtils.isWindows())
{
try
{
getSocketChannel().shutdownInput();
}
catch (IOException e)
{
LOGGER.info("Exception shutting down input for '{}': {}", getRemoteAddressString(), e.getMessage());
}
finally
{
_delegate.shutdownInput();
}
}
}
@Override
public final void start()
{
}
@Override
public final void flush()
{
}
@Override
public final void close()
{
LOGGER.debug("Closing " + getRemoteAddressString());
if(setClosed())
{
_protocolEngine.notifyWork();
getSelectionTask().wakeup();
}
}
@Override
public void processAmqpData(QpidByteBuffer applicationData)
{
_protocolEngine.received(applicationData);
}
@Override
public final void reserveOutboundMessageSpace(long size)
{
if (_usedOutboundMessageSpace.addAndGet(size) > getOutboundMessageBufferLimit())
{
_protocolEngine.setMessageAssignmentSuspended(true, false);
}
}
@Override
public final Ticker getTicker()
{
return _protocolEngine.getAggregateTicker();
}
protected abstract long getOutboundMessageBufferLimit();
public final void reportUnexpectedByteBufferSizeUsage()
{
if (_unexpectedByteBufferSizeReported.compareAndSet(false,true))
{
LOGGER.info("At least one frame unexpectedly does not fit into default byte buffer size ({}B) on a connection {}.",
getNetworkBufferSize(), this.toString());
}
}
protected NonBlockingConnectionDelegate getDelegate()
{
return _delegate;
}
protected void setDelegate(NonBlockingConnectionDelegate delegate)
{
_delegate = delegate;
}
public final ProtocolEngine getProtocolEngine()
{
return _protocolEngine;
}
}