blob: 4f6f59b7feebc22176cbbf2f4f2c43b4b1dc06f1 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.model.adapter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Publisher;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.StateTransition;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.transport.TransactionTimeoutTicker;
import org.apache.qpid.transport.network.Ticker;
public final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter>
{
private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed out";
private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed out";
// Attributes
private final AMQSessionModel _session;
private final Action _deleteModelTask;
private final AbstractAMQPConnection<?> _amqpConnection;
public SessionAdapter(final AbstractAMQPConnection<?> amqpConnection,
final AMQSessionModel session)
{
super(parentsMap(amqpConnection), createAttributes(session));
_amqpConnection = amqpConnection;
_session = session;
_session.addConsumerListener(new ConsumerListener()
{
@Override
public void consumerAdded(final Consumer<?> consumer)
{
childAdded(consumer);
}
@Override
public void consumerRemoved(final Consumer<?> consumer)
{
childRemoved(consumer);
}
});
session.setModelObject(this);
_deleteModelTask = new Action()
{
@Override
public void performAction(final Object object)
{
session.removeDeleteTask(this);
deleteAsync();
}
};
session.addDeleteTask(_deleteModelTask);
setState(State.ACTIVE);
}
private static Map<String, Object> createAttributes(final AMQSessionModel session)
{
Map<String, Object> attributes = new HashMap<>();
attributes.put(ID, UUID.randomUUID());
attributes.put(NAME, String.valueOf(session.getChannelId()));
attributes.put(DURABLE, false);
attributes.put(LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
return attributes;
}
@Override
protected void postResolveChildren()
{
super.postResolveChildren();
registerTransactionTimeoutTickers(_amqpConnection, _session);
}
@Override
public int getChannelId()
{
return _session.getChannelId();
}
@Override
public boolean isProducerFlowBlocked()
{
return _session.getBlocking();
}
public Collection<org.apache.qpid.server.model.Consumer> getConsumers()
{
return (Collection<Consumer>) _session.getConsumers();
}
public Collection<Publisher> getPublishers()
{
return Collections.emptySet(); //TODO
}
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
if (clazz == org.apache.qpid.server.model.Consumer.class)
{
return (Collection<C>) getConsumers();
}
else if (clazz == Publisher.class)
{
return (Collection<C>) getPublishers();
}
else
{
return Collections.emptySet();
}
}
@Override
public long getConsumerCount()
{
return _session.getConsumerCount();
}
@Override
public long getLocalTransactionBegins()
{
return _session.getTxnStart();
}
@Override
public int getLocalTransactionOpen()
{
long open = _session.getTxnStart() - (_session.getTxnCommits() + _session.getTxnRejects());
return (open > 0l) ? 1 : 0;
}
@Override
public long getLocalTransactionRollbacks()
{
return _session.getTxnRejects();
}
@Override
public long getUnacknowledgedMessages()
{
return _session.getUnacknowledgedMessageCount();
}
@Override
public Date getTransactionStartTime()
{
return new Date(_session.getTransactionStartTime());
}
@Override
public Date getTransactionUpdateTime()
{
return new Date(_session.getTransactionUpdateTime());
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
private ListenableFuture<Void> doDelete()
{
deleted();
setState(State.DELETED);
_session.removeDeleteTask(_deleteModelTask);
return Futures.immediateFuture(null);
}
private void registerTransactionTimeoutTickers(AbstractAMQPConnection<?> amqpConnection,
final AMQSessionModel session)
{
NamedAddressSpace addressSpace = amqpConnection.getAddressSpace();
if (addressSpace instanceof VirtualHost)
{
final EventLogger eventLogger = amqpConnection.getEventLogger();
final VirtualHost virtualhost = (VirtualHost) addressSpace;
final List<Ticker> tickers = new ArrayList<>(4);
final Supplier<Long> transactionStartTimeSupplier = new Supplier<Long>()
{
@Override
public Long get()
{
return SessionAdapter.this._session.getTransactionStartTime();
}
};
final Supplier<Long> transactionUpdateTimeSupplier = new Supplier<Long>()
{
@Override
public Long get()
{
return SessionAdapter.this._session.getTransactionUpdateTime();
}
};
long notificationRepeatPeriod =
getContextValue(Long.class, Session.TRANSACTION_TIMEOUT_NOTIFICATION_REPEAT_PERIOD);
if (virtualhost.getStoreTransactionOpenTimeoutWarn() > 0)
{
tickers.add(new TransactionTimeoutTicker(
virtualhost.getStoreTransactionOpenTimeoutWarn(),
notificationRepeatPeriod, transactionStartTimeSupplier,
new Action<Long>()
{
@Override
public void performAction(Long age)
{
eventLogger.message(_session.getLogSubject(), ChannelMessages.OPEN_TXN(age));
}
}
));
}
if (virtualhost.getStoreTransactionOpenTimeoutClose() > 0)
{
tickers.add(new TransactionTimeoutTicker(
virtualhost.getStoreTransactionOpenTimeoutClose(),
notificationRepeatPeriod, transactionStartTimeSupplier,
new Action<Long>()
{
@Override
public void performAction(Long age)
{
_session.doTimeoutAction(OPEN_TRANSACTION_TIMEOUT_ERROR);
}
}
));
}
if (virtualhost.getStoreTransactionIdleTimeoutWarn() > 0)
{
tickers.add(new TransactionTimeoutTicker(
virtualhost.getStoreTransactionIdleTimeoutWarn(),
notificationRepeatPeriod, transactionUpdateTimeSupplier,
new Action<Long>()
{
@Override
public void performAction(Long age)
{
eventLogger.message(_session.getLogSubject(), ChannelMessages.IDLE_TXN(age));
}
}
));
}
if (virtualhost.getStoreTransactionIdleTimeoutClose() > 0)
{
tickers.add(new TransactionTimeoutTicker(
virtualhost.getStoreTransactionIdleTimeoutClose(),
notificationRepeatPeriod, transactionUpdateTimeSupplier,
new Action<Long>()
{
@Override
public void performAction(Long age)
{
_session.doTimeoutAction(IDLE_TRANSACTION_TIMEOUT_ERROR);
}
}
));
}
for (Ticker ticker : tickers)
{
session.addTicker(ticker);
}
Action deleteTickerTask = new Action()
{
@Override
public void performAction(Object o)
{
session.removeDeleteTask(this);
for (Ticker ticker : tickers)
{
session.removeTicker(ticker);
}
}
};
session.addDeleteTask(deleteTickerTask);
}
}
}