blob: 183e162bedacb6e24ecde5fb4354f0cefeb3d580 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.txn;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.store.MessageEnqueueRecord;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
public class DtxBranch
{
private static final Logger LOGGER = LoggerFactory.getLogger(DtxBranch.class);
private final Xid _xid;
private final List<ServerTransaction.Action> _postTransactionActions = new ArrayList<ServerTransaction.Action>();
private State _state = State.ACTIVE;
private long _timeout;
private Map<AMQPSession<?,?>, State> _associatedSessions = new HashMap<>();
private final List<EnqueueRecord> _enqueueRecords = new ArrayList<>();
private final List<DequeueRecord> _dequeueRecords = new ArrayList<>();
private Transaction _transaction;
private long _expiration;
private ScheduledFuture<?> _timeoutFuture;
private final DtxRegistry _dtxRegistry;
private Transaction.StoredXidRecord _storedXidRecord;
public enum State
{
ACTIVE,
PREPARED,
TIMEDOUT,
SUSPENDED,
FORGOTTEN,
HEUR_COM,
HEUR_RB,
ROLLBACK_ONLY
}
public DtxBranch(Xid xid, DtxRegistry dtxRegistry)
{
_xid = xid;
_dtxRegistry = dtxRegistry;
}
public DtxBranch(Transaction.StoredXidRecord storedXidRecord, DtxRegistry dtxRegistry)
{
this(new Xid(storedXidRecord.getFormat(), storedXidRecord.getGlobalId(), storedXidRecord.getBranchId()), dtxRegistry);
_storedXidRecord = storedXidRecord;
}
public Xid getXid()
{
return _xid;
}
public State getState()
{
return _state;
}
public void setState(State state)
{
_state = state;
}
public long getTimeout()
{
return _timeout;
}
public void setTimeout(long timeout)
{
LOGGER.debug("Setting timeout to {}s for DtxBranch {}", timeout, _xid);
if(_timeoutFuture != null)
{
LOGGER.debug("Attempting to cancel previous timeout task future for DtxBranch {}", _xid);
boolean succeeded = _timeoutFuture.cancel(false);
LOGGER.debug("Cancelling previous timeout task {} for DtxBranch {}", (succeeded ? "succeeded" : "failed"), _xid);
}
_timeout = timeout;
_expiration = timeout == 0 ? 0 : System.currentTimeMillis() + (1000 * timeout);
if(_timeout == 0)
{
_timeoutFuture = null;
}
else
{
long delay = 1000*_timeout;
LOGGER.debug("Scheduling timeout and rollback after {}s for DtxBranch {}", delay/1000, _xid);
_timeoutFuture = _dtxRegistry.scheduleTask(delay, new Runnable()
{
@Override
public void run()
{
LOGGER.debug("Timing out DtxBranch {}", _xid);
setState(State.TIMEDOUT);
rollback();
}
});
}
}
public boolean expired()
{
return _timeout != 0 && _expiration < System.currentTimeMillis();
}
public synchronized boolean isAssociated(AMQPSession<?,?> session)
{
return _associatedSessions.containsKey(session);
}
public synchronized boolean hasAssociatedSessions()
{
return !_associatedSessions.isEmpty();
}
public synchronized boolean hasAssociatedActiveSessions()
{
if(hasAssociatedSessions())
{
for(State state : _associatedSessions.values())
{
if(state != State.SUSPENDED)
{
return true;
}
}
}
return false;
}
public synchronized void clearAssociations()
{
_associatedSessions.clear();
}
synchronized boolean associateSession(AMQPSession<?,?> associatedSession)
{
return _associatedSessions.put(associatedSession, State.ACTIVE) != null;
}
synchronized boolean disassociateSession(AMQPSession<?,?> associatedSession)
{
return _associatedSessions.remove(associatedSession) != null;
}
public synchronized boolean resumeSession(AMQPSession<?,?> session)
{
if(_associatedSessions.containsKey(session) && _associatedSessions.get(session) == State.SUSPENDED)
{
_associatedSessions.put(session, State.ACTIVE);
return true;
}
return false;
}
public synchronized boolean suspendSession(AMQPSession<?,?> session)
{
if(_associatedSessions.containsKey(session) && _associatedSessions.get(session) == State.ACTIVE)
{
_associatedSessions.put(session, State.SUSPENDED);
return true;
}
return false;
}
public void prepare() throws StoreException
{
LOGGER.debug("Performing prepare for DtxBranch {}", _xid);
Transaction txn = _dtxRegistry.getMessageStore().newTransaction();
_storedXidRecord = txn.recordXid(_xid.getFormat(),
_xid.getGlobalId(),
_xid.getBranchId(),
_enqueueRecords.toArray(new EnqueueRecord[_enqueueRecords.size()]),
_dequeueRecords.toArray(new DequeueRecord[_dequeueRecords.size()]));
txn.commitTran();
prePrepareTransaction();
}
public synchronized void rollback() throws StoreException
{
LOGGER.debug("Performing rollback for DtxBranch {}", _xid);
if(_timeoutFuture != null)
{
LOGGER.debug("Attempting to cancel previous timeout task future for DtxBranch {}", _xid);
boolean succeeded = _timeoutFuture.cancel(false);
_timeoutFuture = null;
LOGGER.debug("Cancelling previous timeout task {} for DtxBranch {}", (succeeded ? "succeeded" : "failed"), _xid);
}
if(_transaction != null)
{
// prepare has previously been called
Transaction txn = _dtxRegistry.getMessageStore().newTransaction();
txn.removeXid(_storedXidRecord);
txn.commitTran();
_transaction.abortTran();
}
for(ServerTransaction.Action action : _postTransactionActions)
{
action.onRollback();
}
_postTransactionActions.clear();
}
public void commit() throws StoreException
{
LOGGER.debug("Performing commit for DtxBranch {}", _xid);
if(_timeoutFuture != null)
{
LOGGER.debug("Attempting to cancel previous timeout task future for DtxBranch {}", _xid);
boolean succeeded = _timeoutFuture.cancel(false);
_timeoutFuture = null;
LOGGER.debug("Cancelling previous timeout task {} for DtxBranch {}", (succeeded ? "succeeded" : "failed"), _xid);
}
if(_transaction == null)
{
prePrepareTransaction();
}
else
{
_transaction.removeXid(_storedXidRecord);
}
_transaction.commitTran();
for(ServerTransaction.Action action : _postTransactionActions)
{
action.postCommit();
}
_postTransactionActions.clear();
}
public void prePrepareTransaction() throws StoreException
{
_transaction = _dtxRegistry.getMessageStore().newTransaction();
for(final EnqueueRecord enqueue : _enqueueRecords)
{
final MessageEnqueueRecord record;
if(enqueue.isDurable())
{
record = _transaction.enqueueMessage(enqueue.getResource(), enqueue.getMessage());
}
else
{
record = null;
}
enqueue.getEnqueueAction().performAction(record);
}
for(DequeueRecord dequeue : _dequeueRecords)
{
_transaction.dequeueMessage(dequeue.getEnqueueRecord());
}
}
public void addPostTransactionAction(ServerTransaction.Action postTransactionAction)
{
_postTransactionActions.add(postTransactionAction);
}
public void dequeue(MessageEnqueueRecord record)
{
if(record != null)
{
_dequeueRecords.add(new DequeueRecord(record));
}
}
public void enqueue(TransactionLogResource queue,
EnqueueableMessage message,
final Action<MessageEnqueueRecord> enqueueAction)
{
_enqueueRecords.add(new EnqueueRecord(queue, message, enqueueAction));
}
private static class DequeueRecord implements Transaction.DequeueRecord
{
private final MessageEnqueueRecord _enqueueRecord;
public DequeueRecord(MessageEnqueueRecord enqueueRecord)
{
_enqueueRecord = enqueueRecord;
}
@Override
public MessageEnqueueRecord getEnqueueRecord()
{
return _enqueueRecord;
}
}
private static class EnqueueRecord implements Transaction.EnqueueRecord
{
private final TransactionLogResource _resource;
private final EnqueueableMessage _message;
private final Action<MessageEnqueueRecord> _enqueueAction;
public EnqueueRecord(final TransactionLogResource resource,
final EnqueueableMessage message,
final Action<MessageEnqueueRecord> enqueueAction)
{
_resource = resource;
_message = message;
_enqueueAction = enqueueAction;
}
public Action<MessageEnqueueRecord> getEnqueueAction()
{
return _enqueueAction;
}
@Override
public TransactionLogResource getResource()
{
return _resource;
}
@Override
public EnqueueableMessage getMessage()
{
return _message;
}
public boolean isDurable()
{
return _resource.getMessageDurability().persist(_message.isPersistent());
}
}
public void close()
{
if(_transaction != null)
{
_state = null;
_transaction.abortTran();
}
}
}