blob: 05dd46e57b38adacb0f545ade96aa2c06ef227c2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.session;
import java.security.AccessControlContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.security.auth.Subject;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.consumer.ScheduledConsumerTargetSet;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.Outcome;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.PublishAuthorisationCache;
import org.apache.qpid.server.security.SecurityToken;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.network.Ticker;
import org.apache.qpid.server.util.Action;
public abstract class AbstractAMQPSession<S extends AbstractAMQPSession<S, X>,
X extends ConsumerTarget<X>>
extends AbstractConfiguredObject<S>
implements AMQPSession<S, X>, EventLoggerProvider
{
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAMQPSession.class);
private final Action _deleteModelTask;
private final AMQPConnection<?> _connection;
private final int _sessionId;
protected final AccessControlContext _accessControllerContext;
protected final Subject _subject;
protected final SecurityToken _token;
protected final PublishAuthorisationCache _publishAuthCache;
protected final List<Action<? super S>> _taskList = new CopyOnWriteArrayList<>();
private final AtomicInteger _consumerCount = new AtomicInteger();
protected final Set<AbstractConsumerTarget> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
private final LogSubject _logSubject;
private Iterator<AbstractConsumerTarget> _processPendingIterator;
private final Set<Consumer<?,X>> _consumers = ConcurrentHashMap.newKeySet();
private final AtomicLong _messagesIn = new AtomicLong();
private final AtomicLong _messagesOut = new AtomicLong();
private final AtomicLong _transactedMessagesIn = new AtomicLong();
private final AtomicLong _transactedMessagesOut = new AtomicLong();
private final AtomicLong _bytesIn = new AtomicLong();
private final AtomicLong _bytesOut = new AtomicLong();
protected AbstractAMQPSession(final Connection<?> parent, final int sessionId)
{
this(parent, sessionId, new ChannelLogSubject((AMQPConnection) parent, sessionId));
}
protected AbstractAMQPSession(final Connection<?> parent, final int sessionId, final LogSubject logSubject)
{
super(parent, createAttributes(sessionId));
_connection = (AMQPConnection) parent;
_sessionId = sessionId;
_deleteModelTask = new Action<S>()
{
@Override
public void performAction(final S object)
{
removeDeleteTask(this);
deleteNoChecks();
}
};
_subject = new Subject(false, _connection.getSubject().getPrincipals(),
_connection.getSubject().getPublicCredentials(),
_connection.getSubject().getPrivateCredentials());
_subject.getPrincipals().add(new SessionPrincipal(this));
if (_connection.getAddressSpace() instanceof ConfiguredObject)
{
_token = ((ConfiguredObject) _connection.getAddressSpace()).newToken(_subject);
}
else
{
final Broker<?> broker = (Broker<?>) _connection.getBroker();
_token = broker.newToken(_subject);
}
_accessControllerContext = _connection.getAccessControlContextFromSubject(_subject);
final long authCacheTimeout = _connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT);
final int authCacheSize = _connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE);
_publishAuthCache = new PublishAuthorisationCache(_token, authCacheTimeout, authCacheSize);
_logSubject = logSubject;
setState(State.ACTIVE);
}
@Override
protected void onCreate()
{
super.onCreate();
addDeleteTask(_deleteModelTask);
}
private static Map<String, Object> createAttributes(final long sessionId)
{
Map<String, Object> attributes = new HashMap<>();
attributes.put(NAME, sessionId);
attributes.put(DURABLE, false);
attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
return attributes;
}
@Override
public int getChannelId()
{
return _sessionId;
}
@Override
public AMQPConnection<?> getAMQPConnection()
{
return _connection;
}
@Override
public boolean isProducerFlowBlocked()
{
return getBlocking();
}
@Override
public long getUnacknowledgedMessages()
{
return getUnacknowledgedMessageCount();
}
@Override
public void addDeleteTask(final Action<? super S> task)
{
_taskList.add(task);
}
@Override
public void removeDeleteTask(final Action<? super S> task)
{
_taskList.remove(task);
}
@Override
protected ListenableFuture<Void> onDelete()
{
removeDeleteTask(_deleteModelTask);
return super.onDelete();
}
@Override
public EventLogger getEventLogger()
{
return _connection.getEventLogger();
}
@Override
public void addTicker(final Ticker ticker)
{
_connection.getAggregateTicker().addTicker(ticker);
// trigger a wakeup to ensure the ticker will be taken into account
getAMQPConnection().notifyWork();
}
@Override
public void removeTicker(final Ticker ticker)
{
_connection.getAggregateTicker().removeTicker(ticker);
}
@Override
public LogSubject getLogSubject()
{
return _logSubject;
}
@Override
protected void logOperation(final String operation)
{
getEventLogger().message(ChannelMessages.OPERATION(operation));
}
@Override
public boolean processPending()
{
if (!getAMQPConnection().isIOThread() || isClosing())
{
return false;
}
updateBlockedStateIfNecessary();
if(!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
{
if (_processPendingIterator == null || !_processPendingIterator.hasNext())
{
_processPendingIterator = _consumersWithPendingWork.iterator();
}
if(_processPendingIterator.hasNext())
{
AbstractConsumerTarget target = _processPendingIterator.next();
_processPendingIterator.remove();
if (target.processPending())
{
_consumersWithPendingWork.add(target);
}
}
}
return !_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting();
}
@Override
public void notifyWork(final X target)
{
if(_consumersWithPendingWork.add((AbstractConsumerTarget) target))
{
getAMQPConnection().notifyWork(this);
}
}
@Override
public final long getConsumerCount()
{
return _consumerCount.get();
}
@Override
public final void consumerAdded(Consumer<?, X> consumer)
{
_consumerCount.incrementAndGet();
_consumers.add(consumer);
}
@Override
public final void consumerRemoved(Consumer<?, X> consumer)
{
_consumerCount.decrementAndGet();
_consumers.remove(consumer);
}
@Override
public Set<? extends Consumer<?, ?>> getConsumers()
{
return Collections.unmodifiableSet(_consumers);
}
protected abstract void updateBlockedStateIfNecessary();
public abstract boolean isClosing();
@Override
public ListenableFuture<Void> doOnIOThreadAsync(final Runnable task)
{
final ListenableFuture<Void> future = getAMQPConnection().doOnIOThreadAsync(task);
return doAfter(MoreExecutors.directExecutor(), future, new Runnable()
{
@Override
public void run()
{
getAMQPConnection().notifyWork(AbstractAMQPSession.this);
}
});
}
@Override
public long getBytesIn()
{
return _bytesIn.get();
}
@Override
public long getBytesOut()
{
return _bytesOut.get();
}
@Override
public long getMessagesIn()
{
return _messagesIn.get();
}
@Override
public long getMessagesOut()
{
return _messagesOut.get();
}
@Override
public long getTransactedMessagesIn()
{
return _transactedMessagesIn.get();
}
@Override
public long getTransactedMessagesOut()
{
return _transactedMessagesOut.get();
}
@Override
public void registerMessageDelivered(long messageSize)
{
_messagesOut.incrementAndGet();
_bytesOut.addAndGet(messageSize);
_connection.registerMessageDelivered(messageSize);
}
@Override
public void registerMessageReceived(long messageSize)
{
_messagesIn.incrementAndGet();
_bytesIn.addAndGet(messageSize);
_connection.registerMessageReceived(messageSize);
}
@Override
public void registerTransactedMessageDelivered()
{
_transactedMessagesOut.incrementAndGet();
_connection.registerTransactedMessageDelivered();
}
@Override
public void registerTransactedMessageReceived()
{
_transactedMessagesIn.incrementAndGet();
_connection.registerTransactedMessageReceived();
}
@Override
protected void logCreated(final Map<String, Object> attributes,
final Outcome outcome)
{
LOGGER.debug("{} : {} ({}) : Create : {}",
LogMessage.getActor(),
getCategoryClass().getSimpleName(),
getName(),
outcome);
}
@Override
protected void logDeleted(final Outcome outcome)
{
LOGGER.debug("{} : {} ({}) : Delete : {}",
LogMessage.getActor(),
getCategoryClass().getSimpleName(),
getName(),
outcome);
}
}