blob: 253603649dfaac041017ee5c07d6b66355634067 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.transport;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.AccessControlContext;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import javax.security.auth.SubjectDomainCombiner;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.Outcome;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.ContextProvider;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.TaskExecutorProvider;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.sasl.SaslSettings;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.network.NetworkConnection;
import org.apache.qpid.server.transport.network.Ticker;
import org.apache.qpid.server.txn.FlowToDiskTransactionObserver;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.TransactionObserver;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.FixedKeyMapCreator;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.ConnectionPrincipalStatistics;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C,T>, T>
extends AbstractConfiguredObject<C>
implements ProtocolEngine, AMQPConnection<C>, EventLoggerProvider, SaslSettings
{
public static final FixedKeyMapCreator PUBLISH_ACTION_MAP_CREATOR = new FixedKeyMapCreator("routingKey", "immediate");
private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed out";
private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed out";
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAMQPConnection.class);
private final Broker<?> _broker;
private final ServerNetworkConnection _network;
private final AmqpPort<?> _port;
private final Transport _transport;
private final Protocol _protocol;
private final long _connectionId;
private final AggregateTicker _aggregateTicker;
private final Subject _subject = new Subject();
private final List<Action<? super C>> _connectionCloseTaskList =
new CopyOnWriteArrayList<>();
private final LogSubject _logSubject;
private volatile ContextProvider _contextProvider;
private volatile EventLoggerProvider _eventLoggerProvider;
private String _clientProduct;
private String _clientVersion;
private String _remoteProcessPid;
private String _clientId;
private volatile boolean _stopped;
private final AtomicLong _messagesIn = new AtomicLong();
private final AtomicLong _messagesOut = new AtomicLong();
private final AtomicLong _transactedMessagesIn = new AtomicLong();
private final AtomicLong _transactedMessagesOut = new AtomicLong();
private final AtomicLong _bytesIn = new AtomicLong();
private final AtomicLong _bytesOut = new AtomicLong();
private final AtomicLong _localTransactionBegins = new AtomicLong();
private final AtomicLong _localTransactionRollbacks = new AtomicLong();
private final AtomicLong _localTransactionOpens = new AtomicLong();
private final SettableFuture<Void> _transportClosedFuture = SettableFuture.create();
private final SettableFuture<Void> _modelTransportRendezvousFuture = SettableFuture.create();
private volatile NamedAddressSpace _addressSpace;
private volatile long _lastReadTime;
private volatile long _lastWriteTime;
private volatile long _lastMessageInboundTime;
private volatile long _lastMessageOutboundTime;
private volatile boolean _messagesWritten;
private volatile AccessControlContext _accessControllerContext;
private volatile Thread _ioThread;
private volatile StatisticsGatherer _statisticsGatherer;
private volatile boolean _messageAuthorizationRequired;
private final AtomicLong _maxMessageSize = new AtomicLong(Long.MAX_VALUE);
private volatile int _messageCompressionThreshold;
private volatile TransactionObserver _transactionObserver;
private long _maxUncommittedInMemorySize;
private final Map<ServerTransaction, Set<Ticker>> _transactionTickers = new ConcurrentHashMap<>();
private volatile ConnectionPrincipalStatistics _connectionPrincipalStatistics;
public AbstractAMQPConnection(Broker<?> broker,
ServerNetworkConnection network,
AmqpPort<?> port,
Transport transport,
Protocol protocol,
long connectionId,
AggregateTicker aggregateTicker)
{
super(port, createAttributes(connectionId, network));
_broker = broker;
_eventLoggerProvider = broker;
_contextProvider = broker;
_statisticsGatherer = broker;
_network = network;
_port = port;
_transport = transport;
_protocol = protocol;
_connectionId = connectionId;
_aggregateTicker = aggregateTicker;
_subject.getPrincipals().add(new ConnectionPrincipal(this));
updateAccessControllerContext();
_transportClosedFuture.addListener(
() -> {
_modelTransportRendezvousFuture.set(null);
doAfter(closeAsync(), this::logConnectionClose);
}, getTaskExecutor());
setState(State.ACTIVE);
_logSubject = new ConnectionLogSubject(this);
}
private static Map<String, Object> createAttributes(long connectionId, NetworkConnection network)
{
Map<String,Object> attributes = new HashMap<>();
attributes.put(NAME, "[" + connectionId + "] " + String.valueOf(network.getRemoteAddress()).replaceAll("/", ""));
attributes.put(DURABLE, false);
return attributes;
}
@Override
public final AccessControlContext getAccessControlContextFromSubject(final Subject subject)
{
final AccessControlContext acc = AccessController.getContext();
return AccessController.doPrivileged(
(PrivilegedAction<AccessControlContext>) () -> {
if (subject == null)
return new AccessControlContext(acc, null);
else
return new AccessControlContext
(acc,
new SubjectDomainCombiner(subject));
});
}
@Override
protected void onOpen()
{
super.onOpen();
final long maxAuthDelay = _port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY);
final SlowConnectionOpenTicker slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
_aggregateTicker.addTicker(slowConnectionOpenTicker);
_lastReadTime = _lastWriteTime = _lastMessageInboundTime = _lastMessageOutboundTime = getCreatedTime().getTime();
_maxUncommittedInMemorySize = getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE);
_transactionObserver = _maxUncommittedInMemorySize < 0 ? FlowToDiskTransactionObserver.NOOP_TRANSACTION_OBSERVER : new FlowToDiskTransactionObserver(_maxUncommittedInMemorySize, _logSubject, _eventLoggerProvider.getEventLogger());
}
@Override
public Broker<?> getBroker()
{
return _broker;
}
public final ServerNetworkConnection getNetwork()
{
return _network;
}
@Override
public final AmqpPort<?> getPort()
{
return _port;
}
@Override
public final Transport getTransport()
{
return _transport;
}
@Override
public String getTransportInfo()
{
return _network.getTransportInfo();
}
@Override
public Protocol getProtocol()
{
return _protocol;
}
@Override
public AggregateTicker getAggregateTicker()
{
return _aggregateTicker;
}
@Override
public final Date getLastIoTime()
{
return new Date(Math.max(getLastReadTime(), getLastWriteTime()));
}
@Override
public final long getLastReadTime()
{
return _lastReadTime;
}
private void updateLastReadTime()
{
_lastReadTime = System.currentTimeMillis();
}
@Override
public final long getLastWriteTime()
{
return _lastWriteTime;
}
public final void updateLastWriteTime()
{
final long currentTime = System.currentTimeMillis();
_lastWriteTime = currentTime;
if(_messagesWritten)
{
_messagesWritten = false;
_lastMessageOutboundTime = currentTime;
}
}
@Override
public void updateLastMessageInboundTime()
{
_lastMessageInboundTime = _lastReadTime;
}
@Override
public void updateLastMessageOutboundTime()
{
_messagesWritten = true;
}
@Override
public Date getLastInboundMessageTime()
{
return new Date(_lastMessageInboundTime);
}
@Override
public Date getLastOutboundMessageTime()
{
return new Date(_lastMessageOutboundTime);
}
@Override
public Date getLastMessageTime()
{
return new Date(Math.max(_lastMessageInboundTime, _lastMessageOutboundTime));
}
@Override
public final long getConnectionId()
{
return _connectionId;
}
@Override
public String getRemoteAddressString()
{
return String.valueOf(_network.getRemoteAddress());
}
@Override
public final void stopConnection()
{
_stopped = true;
}
@Override
public boolean isConnectionStopped()
{
return _stopped;
}
@Override
public final String getAddressSpaceName()
{
return getAddressSpace() == null ? null : getAddressSpace().getName();
}
@Override
public String getClientVersion()
{
return _clientVersion;
}
@Override
public String getRemoteProcessPid()
{
return _remoteProcessPid;
}
@Override
public void pushScheduler(final NetworkConnectionScheduler networkConnectionScheduler)
{
if(_network instanceof NonBlockingConnection)
{
((NonBlockingConnection) _network).pushScheduler(networkConnectionScheduler);
}
}
@Override
public NetworkConnectionScheduler popScheduler()
{
if(_network instanceof NonBlockingConnection)
{
return ((NonBlockingConnection) _network).popScheduler();
}
return null;
}
@Override
public String getClientProduct()
{
return _clientProduct;
}
protected void updateMaxMessageSize()
{
_maxMessageSize.set(Math.min(getMaxMessageSize(getPort()), getMaxMessageSize(_contextProvider)));
}
private long getMaxMessageSize(final ContextProvider object)
{
long maxMessageSize;
try
{
maxMessageSize = object.getContextValue(Integer.class, MAX_MESSAGE_SIZE);
}
catch (NullPointerException | IllegalArgumentException e)
{
LOGGER.warn("Context variable {} has invalid value and cannot be used to restrict maximum message size",
MAX_MESSAGE_SIZE,
e);
maxMessageSize = Long.MAX_VALUE;
}
return maxMessageSize > 0 ? maxMessageSize : Long.MAX_VALUE;
}
@Override
public long getMaxMessageSize()
{
return _maxMessageSize.get();
}
@Override
public void addDeleteTask(final Action<? super C> task)
{
_connectionCloseTaskList.add(task);
}
@Override
public void removeDeleteTask(final Action<? super C> task)
{
_connectionCloseTaskList.remove(task);
}
public void performDeleteTasks()
{
if(runningAsSubject())
{
for (Action<? super C> task : _connectionCloseTaskList)
{
task.performAction((C)this);
}
}
else
{
runAsSubject(new PrivilegedAction<Object>()
{
@Override
public Object run()
{
performDeleteTasks();
return null;
}
});
}
}
@Override
public String getClientId()
{
return _clientId;
}
@Override
public final SocketAddress getRemoteSocketAddress()
{
return _network.getRemoteAddress();
}
@Override
public void registerMessageDelivered(long messageSize)
{
_messagesOut.incrementAndGet();
_bytesOut.addAndGet(messageSize);
_statisticsGatherer.registerMessageDelivered(messageSize);
}
@Override
public void registerMessageReceived(long messageSize)
{
updateLastMessageInboundTime();
_messagesIn.incrementAndGet();
_bytesIn.addAndGet(messageSize);
_statisticsGatherer.registerMessageReceived(messageSize);
}
@Override
public void registerTransactedMessageDelivered()
{
_transactedMessagesOut.incrementAndGet();
_statisticsGatherer.registerTransactedMessageDelivered();
}
@Override
public void registerTransactedMessageReceived()
{
_transactedMessagesIn.incrementAndGet();
_statisticsGatherer.registerTransactedMessageReceived();
}
public void setClientProduct(final String clientProduct)
{
_clientProduct = clientProduct;
}
public void setClientVersion(final String clientVersion)
{
_clientVersion = clientVersion;
}
public void setRemoteProcessPid(final String remoteProcessPid)
{
_remoteProcessPid = remoteProcessPid;
}
public void setClientId(final String clientId)
{
_clientId = clientId;
}
@Override
public void setIOThread(final Thread ioThread)
{
_ioThread = ioThread;
}
@Override
public boolean isIOThread()
{
return Thread.currentThread() == _ioThread;
}
@Override
public ListenableFuture<Void> doOnIOThreadAsync(final Runnable task)
{
if (isIOThread())
{
task.run();
return Futures.immediateFuture(null);
}
else
{
final SettableFuture<Void> future = SettableFuture.create();
addAsyncTask(
new Action<Object>()
{
@Override
public void performAction(final Object object)
{
try
{
task.run();
future.set(null);
}
catch (RuntimeException e)
{
future.setException(e);
}
}
});
return future;
}
}
@Override
public final void received(final QpidByteBuffer buf)
{
AccessController.doPrivileged((PrivilegedAction<Object>) () ->
{
updateLastReadTime();
try
{
onReceive(buf);
}
catch (StoreException e)
{
if (getAddressSpace().isActive())
{
throw new ServerScopedRuntimeException(e);
}
else
{
throw new ConnectionScopedRuntimeException(e);
}
}
return null;
}, getAccessControllerContext());
}
protected abstract void onReceive(final QpidByteBuffer msg);
protected abstract void addAsyncTask(final Action<? super T> action);
protected abstract boolean isOpeningInProgress();
protected <T> T runAsSubject(PrivilegedAction<T> action)
{
return Subject.doAs(_subject, action);
}
private boolean runningAsSubject()
{
return _subject.equals(Subject.getSubject(AccessController.getContext()));
}
@Override
public Subject getSubject()
{
return _subject;
}
@Override
public TaskExecutor getChildExecutor()
{
NamedAddressSpace addressSpace = getAddressSpace();
if (addressSpace instanceof TaskExecutorProvider)
{
return ((TaskExecutorProvider)addressSpace).getTaskExecutor();
}
else
{
return super.getChildExecutor();
}
}
@Override
public boolean isIncoming()
{
return true;
}
@Override
public String getLocalAddress()
{
return null;
}
@Override
public String getPrincipal()
{
final Principal authorizedPrincipal = getAuthorizedPrincipal();
return authorizedPrincipal == null ? null : authorizedPrincipal.getName();
}
@Override
public String getRemoteAddress()
{
return getRemoteAddressString();
}
@Override
public String getRemoteProcessName()
{
return null;
}
@Override
public Collection<Session> getSessions()
{
return getChildren(Session.class);
}
@Override
protected ListenableFuture<Void> onDelete()
{
return closeAsyncIfNotAlreadyClosing();
}
@Override
protected ListenableFuture<Void> beforeClose()
{
return closeAsyncIfNotAlreadyClosing();
}
@Override
protected ListenableFuture<Void> onClose()
{
if (_transactionObserver != null)
{
_transactionObserver.reset();
}
return Futures.immediateFuture(null);
}
private ListenableFuture<Void> closeAsyncIfNotAlreadyClosing()
{
if (!_modelTransportRendezvousFuture.isDone())
{
sendConnectionCloseAsync(CloseReason.MANAGEMENT, "Connection closed by external action");
}
return _modelTransportRendezvousFuture;
}
@Override
protected <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> childClass,
Map<String, Object> attributes)
{
if(childClass == Session.class)
{
throw new IllegalStateException();
}
else
{
throw new IllegalArgumentException("Cannot create a child of class " + childClass.getSimpleName());
}
}
@Override
public long getBytesIn()
{
return _bytesIn.get();
}
@Override
public long getBytesOut()
{
return _bytesOut.get();
}
@Override
public long getMessagesIn()
{
return _messagesIn.get();
}
@Override
public long getMessagesOut()
{
return _messagesOut.get();
}
@Override
public long getTransactedMessagesIn()
{
return _transactedMessagesIn.get();
}
@Override
public long getTransactedMessagesOut()
{
return _transactedMessagesOut.get();
}
public AccessControlContext getAccessControllerContext()
{
return _accessControllerContext;
}
public final void updateAccessControllerContext()
{
_accessControllerContext = getAccessControlContextFromSubject(
getSubject());
}
private void logConnectionOpen()
{
runAsSubject(new PrivilegedAction<Object>()
{
@Override
public Object run()
{
SocketAddress localAddress = _network.getLocalAddress();
final String localAddressStr;
if (localAddress instanceof InetSocketAddress)
{
InetSocketAddress inetAddress = (InetSocketAddress) localAddress;
localAddressStr = inetAddress.getAddress().getHostAddress() + ":" + inetAddress.getPort();
}
else
{
localAddressStr = localAddress.toString();
}
getEventLogger().message(ConnectionMessages.OPEN(getPort().getName(),
localAddressStr,
getProtocol().getProtocolVersion(),
getClientId(),
getClientVersion(),
getClientProduct(),
getTransport().isSecure(),
getClientId() != null,
getClientVersion() != null,
getClientProduct() != null));
return null;
}
});
}
private void logConnectionClose()
{
runAsSubject(new PrivilegedAction<Void>()
{
@Override
public Void run()
{
String closeCause = getCloseCause();
getEventLogger().message(isOrderlyClose()
? ConnectionMessages.CLOSE(closeCause, closeCause != null)
: ConnectionMessages.DROPPED_CONNECTION());
return null;
}
});
}
protected void initialiseHeartbeating(final long writerDelay, final long readerDelay)
{
if (writerDelay > 0)
{
_aggregateTicker.addTicker(new ServerIdleWriteTimeoutTicker(this, (int) writerDelay));
_network.setMaxWriteIdleMillis(writerDelay);
}
if (readerDelay > 0)
{
_aggregateTicker.addTicker(new ServerIdleReadTimeoutTicker(_network, this, (int) readerDelay));
_network.setMaxReadIdleMillis(readerDelay);
}
}
protected abstract boolean isOrderlyClose();
protected abstract String getCloseCause();
@Override
public int getSessionCount()
{
return getSessionModels().size();
}
protected void markTransportClosed()
{
_transportClosedFuture.set(null);
}
public LogSubject getLogSubject()
{
return _logSubject;
}
@Override
public EventLogger getEventLogger()
{
return _eventLoggerProvider.getEventLogger();
}
@Override
public final void checkAuthorizedMessagePrincipal(final String userId)
{
if(!(userId == null
|| "".equals(userId.trim())
|| !_messageAuthorizationRequired
|| getAuthorizedPrincipal().getName().equals(userId)))
{
throw new AccessControlException("The user id of the message '"
+ userId
+ "' is not valid on a connection authenticated as "
+ getAuthorizedPrincipal().getName());
}
}
@Override
public NamedAddressSpace getAddressSpace()
{
return _addressSpace;
}
public ContextProvider getContextProvider()
{
return _contextProvider;
}
public void setAddressSpace(NamedAddressSpace addressSpace)
{
_addressSpace = addressSpace;
if(addressSpace instanceof EventLoggerProvider)
{
_eventLoggerProvider = (EventLoggerProvider)addressSpace;
}
if(addressSpace instanceof ContextProvider)
{
_contextProvider = (ContextProvider) addressSpace;
}
if(addressSpace instanceof StatisticsGatherer)
{
_statisticsGatherer = (StatisticsGatherer) addressSpace;
}
updateMaxMessageSize();
_messageAuthorizationRequired = _contextProvider.getContextValue(Boolean.class, Broker.BROKER_MSG_AUTH);
_messageCompressionThreshold = _contextProvider.getContextValue(Integer.class,
Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE);
if(_messageCompressionThreshold <= 0)
{
_messageCompressionThreshold = Integer.MAX_VALUE;
}
getSubject().getPrincipals().add(addressSpace.getPrincipal());
updateAccessControllerContext();
logConnectionOpen();
}
@Override
public int getMessageCompressionThreshold()
{
return _messageCompressionThreshold;
}
@Override
public long getMaxUncommittedInMemorySize()
{
return _maxUncommittedInMemorySize;
}
@Override
public String toString()
{
return getNetwork().getRemoteAddress() + "(" + ((getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName()) + ")");
}
@Override
public Principal getAuthorizedPrincipal()
{
return AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(getSubject());
}
public void setSubject(final Subject subject)
{
if (subject == null)
{
throw new IllegalArgumentException("subject cannot be null");
}
getSubject().getPrincipals().addAll(subject.getPrincipals());
getSubject().getPrivateCredentials().addAll(subject.getPrivateCredentials());
getSubject().getPublicCredentials().addAll(subject.getPublicCredentials());
updateAccessControllerContext();
}
@Override
public LocalTransaction createLocalTransaction()
{
_localTransactionBegins.incrementAndGet();
_localTransactionOpens.incrementAndGet();
return new LocalTransaction(getAddressSpace().getMessageStore(),
() -> getLastReadTime(),
_transactionObserver,
getProtocol() != Protocol.AMQP_1_0);
}
@Override
public void registerTransactionTickers(final ServerTransaction serverTransaction,
final Action<String> closeAction, final long notificationRepeatPeriod)
{
NamedAddressSpace addressSpace = getAddressSpace();
if (addressSpace instanceof QueueManagingVirtualHost)
{
final QueueManagingVirtualHost<?> virtualhost = (QueueManagingVirtualHost<?>) addressSpace;
EventLogger eventLogger = virtualhost.getEventLogger();
final Set<Ticker> tickers = new LinkedHashSet<>(4);
if (virtualhost.getStoreTransactionOpenTimeoutWarn() > 0)
{
tickers.add(new TransactionTimeoutTicker(
virtualhost.getStoreTransactionOpenTimeoutWarn(),
notificationRepeatPeriod, serverTransaction::getTransactionStartTime,
age -> eventLogger.message(getLogSubject(), ConnectionMessages.OPEN_TXN(age))
));
}
if (virtualhost.getStoreTransactionOpenTimeoutClose() > 0)
{
tickers.add(new TransactionTimeoutTicker(
virtualhost.getStoreTransactionOpenTimeoutClose(),
notificationRepeatPeriod, serverTransaction::getTransactionStartTime,
age -> closeAction.performAction(OPEN_TRANSACTION_TIMEOUT_ERROR)));
}
if (virtualhost.getStoreTransactionIdleTimeoutWarn() > 0)
{
tickers.add(new TransactionTimeoutTicker(
virtualhost.getStoreTransactionIdleTimeoutWarn(),
notificationRepeatPeriod, serverTransaction::getTransactionUpdateTime,
age -> eventLogger.message(getLogSubject(), ConnectionMessages.IDLE_TXN(age))
));
}
if (virtualhost.getStoreTransactionIdleTimeoutClose() > 0)
{
tickers.add(new TransactionTimeoutTicker(
virtualhost.getStoreTransactionIdleTimeoutClose(),
notificationRepeatPeriod, serverTransaction::getTransactionUpdateTime,
age -> closeAction.performAction(IDLE_TRANSACTION_TIMEOUT_ERROR)
));
}
if (!tickers.isEmpty())
{
for (Ticker ticker : tickers)
{
getAggregateTicker().addTicker(ticker);
}
notifyWork();
}
_transactionTickers.put(serverTransaction, tickers);
}
}
@Override
public void unregisterTransactionTickers(final ServerTransaction serverTransaction)
{
NamedAddressSpace addressSpace = getAddressSpace();
if (addressSpace instanceof QueueManagingVirtualHost)
{
_transactionTickers.remove(serverTransaction).forEach(t -> getAggregateTicker().removeTicker(t));
}
}
private class SlowConnectionOpenTicker implements Ticker, SchedulingDelayNotificationListener
{
private final long _allowedTime;
private volatile long _accumulatedSchedulingDelay;
SlowConnectionOpenTicker(long timeoutTime)
{
_allowedTime = timeoutTime;
}
@Override
public int getTimeToNextTick(final long currentTime)
{
return (int) (getCreatedTime().getTime() + _allowedTime + _accumulatedSchedulingDelay - currentTime);
}
@Override
public int tick(final long currentTime)
{
int nextTick = getTimeToNextTick(currentTime);
if(nextTick <= 0)
{
if (isOpeningInProgress())
{
LOGGER.warn("Connection has taken more than {} ms to establish. Closing as possible DoS.",
_allowedTime);
getEventLogger().message(ConnectionMessages.IDLE_CLOSE(
"Protocol connection is not established within timeout period", true));
_network.close();
}
else
{
_aggregateTicker.removeTicker(this);
_network.removeSchedulingDelayNotificationListeners(this);
}
}
return nextTick;
}
@Override
public void notifySchedulingDelay(final long schedulingDelay)
{
if (schedulingDelay > 0)
{
_accumulatedSchedulingDelay += schedulingDelay;
}
}
}
@Override
protected void logOperation(final String operation)
{
getEventLogger().message(ConnectionMessages.OPERATION(operation));
}
@Override
public String getLocalFQDN()
{
SocketAddress address = getNetwork().getLocalAddress();
if (address instanceof InetSocketAddress)
{
return ((InetSocketAddress) address).getHostName();
}
else
{
throw new IllegalArgumentException("Unsupported socket address class: " + address);
}
}
@Override
public Principal getExternalPrincipal()
{
return getNetwork().getPeerPrincipal();
}
@Override
public Date getOldestTransactionStartTime()
{
long oldest = Long.MAX_VALUE;
Iterator<ServerTransaction> iterator = getOpenTransactions();
while (iterator.hasNext())
{
final ServerTransaction value = iterator.next();
if (value instanceof LocalTransaction)
{
long transactionStartTimeLong = value.getTransactionStartTime();
if (transactionStartTimeLong > 0 && oldest > transactionStartTimeLong)
{
oldest = transactionStartTimeLong;
}
}
}
return oldest == Long.MAX_VALUE ? null : new Date(oldest);
}
@Override
public long getLocalTransactionBegins()
{
return _localTransactionBegins.get();
}
@Override
public long getLocalTransactionOpen()
{
return _localTransactionOpens.get();
}
@Override
public long getLocalTransactionRollbacks()
{
return _localTransactionRollbacks.get();
}
@Override
public void incrementTransactionRollbackCounter()
{
_localTransactionRollbacks.incrementAndGet();
}
@Override
public void decrementTransactionOpenCounter()
{
_localTransactionOpens.decrementAndGet();
}
@Override
public void incrementTransactionOpenCounter()
{
_localTransactionOpens.incrementAndGet();
}
@Override
public void incrementTransactionBeginCounter()
{
_localTransactionBegins.incrementAndGet();
}
@Override
public void registered(final ConnectionPrincipalStatistics connectionPrincipalStatistics)
{
_connectionPrincipalStatistics = connectionPrincipalStatistics;
}
@Override
public int getAuthenticatedPrincipalConnectionCount()
{
if (_connectionPrincipalStatistics == null)
{
return 0;
}
return _connectionPrincipalStatistics.getConnectionCount();
}
@Override
public int getAuthenticatedPrincipalConnectionFrequency()
{
if (_connectionPrincipalStatistics == null)
{
return 0;
}
return _connectionPrincipalStatistics.getConnectionFrequency();
}
@Override
protected void logCreated(final Map<String, Object> attributes,
final Outcome outcome)
{
logConnectionOpen();
}
@Override
protected void logDeleted(final Outcome outcome)
{
getEventLogger().message(_logSubject, ConnectionMessages.MODEL_DELETE());
}
}