blob: b81cee1be213fd44cdfbeb0a43e024d377ca12f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.provider.amqp;
import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsProducerId;
import org.apache.qpid.jms.meta.JmsSessionInfo;
import org.apache.qpid.jms.meta.JmsTransactionId;
import org.apache.qpid.jms.meta.JmsTransactionInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.ProviderException;
import org.apache.qpid.jms.provider.amqp.builders.AmqpTransactionCoordinatorBuilder;
import org.apache.qpid.jms.provider.exceptions.ProviderIllegalStateException;
import org.apache.qpid.jms.provider.exceptions.ProviderTransactionRolledBackException;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Handles the operations surrounding AMQP transaction control.
*
* The Transaction will carry a JmsTransactionId while the Transaction is open, once a
* transaction has been committed or rolled back the Transaction Id is cleared.
*/
public class AmqpTransactionContext implements AmqpResourceParent {
private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class);
private final AmqpSession session;
private final Map<JmsConsumerId, AmqpConsumer> txConsumers = new HashMap<>();
private final Map<JmsProducerId, AmqpProducer> txProducers = new HashMap<>();
private JmsTransactionId current;
private TransactionalState cachedAcceptedState;
private TransactionalState cachedTransactedState;
private AmqpTransactionCoordinator coordinator;
/**
* Creates a new AmqpTransactionContext instance.
*
* @param session
* The session that owns this transaction context.
* @param resourceInfo
* The resourceInfo that defines this transaction context.
*/
public AmqpTransactionContext(AmqpSession session, JmsSessionInfo resourceInfo) {
this.session = session;
}
public void begin(final JmsTransactionId txId, final AsyncResult request) throws ProviderException {
if (current != null) {
throw new ProviderIllegalStateException("Begin called while a TX is still Active.");
}
final AsyncResult declareCompletion = new AsyncResult() {
@Override
public void onSuccess() {
current = txId;
cachedAcceptedState = new TransactionalState();
cachedAcceptedState.setOutcome(Accepted.getInstance());
cachedAcceptedState.setTxnId(getAmqpTransactionId());
cachedTransactedState = new TransactionalState();
cachedTransactedState.setTxnId(getAmqpTransactionId());
request.onSuccess();
}
@Override
public void onFailure(ProviderException result) {
current = null;
cachedAcceptedState = null;
cachedTransactedState = null;
request.onFailure(result);
}
@Override
public boolean isComplete() {
return current != null;
}
};
if (coordinator == null || coordinator.isClosed()) {
AmqpTransactionCoordinatorBuilder builder =
new AmqpTransactionCoordinatorBuilder(this, session.getResourceInfo());
builder.buildResource(new AsyncResult() {
@Override
public void onSuccess() {
try {
coordinator.declare(txId, declareCompletion);
} catch (ProviderException e) {
request.onFailure(e);
}
}
@Override
public void onFailure(ProviderException result) {
request.onFailure(result);
}
@Override
public boolean isComplete() {
return request.isComplete();
}
});
} else {
coordinator.declare(txId, declareCompletion);
}
}
public void commit(final JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionInfo, final AsyncResult request) throws ProviderException {
if (!transactionInfo.getId().equals(current)) {
if (!transactionInfo.isInDoubt() && current == null) {
throw new ProviderIllegalStateException("Commit called with no active Transaction.");
} else if (!transactionInfo.isInDoubt() && current != null) {
throw new ProviderIllegalStateException("Attempt to Commit a transaction other than the current one");
} else {
throw new ProviderTransactionRolledBackException("Transaction in doubt and cannot be committed.");
}
}
preCommit();
LOG.trace("TX Context[{}] committing current TX[[]]", this, current);
DischargeCompletion completion = new DischargeCompletion(request, nextTransactionInfo, true);
coordinator.discharge(current, completion);
current = null;
if (completion.isPipelined()) {
// If the discharge completed abnormally then we don't bother creating a new TX as the
// caller will determine how to recover.
if (!completion.isComplete()) {
begin(nextTransactionInfo.getId(), completion.getDeclareCompletion());
} else {
completion.getDeclareCompletion().onFailure(completion.getFailureCause());
}
}
}
public void rollback(JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionInfo, final AsyncResult request) throws ProviderException {
if (!transactionInfo.getId().equals(current)) {
if (!transactionInfo.isInDoubt() && current == null) {
throw new ProviderIllegalStateException("Rollback called with no active Transaction.");
} else if (!transactionInfo.isInDoubt() && current != null) {
throw new ProviderIllegalStateException("Attempt to rollback a transaction other than the current one");
} else {
request.onSuccess();
return;
}
}
preRollback();
LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current);
DischargeCompletion completion = new DischargeCompletion(request, nextTransactionInfo, false);
coordinator.discharge(current, completion);
current = null;
if (completion.isPipelined()) {
// If the discharge completed abnormally then we don't bother creating a new TX as the
// caller will determine how to recover.
if (!completion.isComplete()) {
begin(nextTransactionInfo.getId(), completion.getDeclareCompletion());
} else {
completion.getDeclareCompletion().onFailure(completion.getFailureCause());
}
}
}
//----- Context utility methods ------------------------------------------//
public void registerTxConsumer(AmqpConsumer consumer) {
txConsumers.put(consumer.getConsumerId(), consumer);
}
public boolean isInTransaction(JmsConsumerId consumerId) {
return txConsumers.containsKey(consumerId);
}
public void registerTxProducer(AmqpProducer producer) {
txProducers.put(producer.getProducerId(), producer);
}
public boolean isInTransaction(JmsProducerId producerId) {
return txProducers.containsKey(producerId);
}
public AmqpSession getSession() {
return session;
}
public TransactionalState getTxnAcceptState() {
return cachedAcceptedState;
}
public TransactionalState getTxnEnrolledState() {
return cachedTransactedState;
}
public JmsTransactionId getTransactionId() {
return current;
}
public boolean isTransactionFailed() {
return coordinator == null ? false : coordinator.isClosed();
}
public Binary getAmqpTransactionId() {
Binary result = null;
if (current != null) {
result = (Binary) current.getProviderHint();
}
return result;
}
@Override
public String toString() {
return this.session.getSessionId() + ": txContext";
}
//----- Transaction pre / post completion --------------------------------//
private void preCommit() {
for (AmqpConsumer consumer : txConsumers.values()) {
consumer.preCommit();
}
}
private void preRollback() {
for (AmqpConsumer consumer : txConsumers.values()) {
consumer.preRollback();
}
}
private void postCommit() {
for (AmqpConsumer consumer : txConsumers.values()) {
consumer.postCommit();
}
txConsumers.clear();
txProducers.clear();
}
private void postRollback() {
for (AmqpConsumer consumer : txConsumers.values()) {
consumer.postRollback();
}
txConsumers.clear();
txProducers.clear();
}
//----- Resource Parent implementation -----------------------------------//
@Override
public void addChildResource(AmqpResource resource) {
if (resource instanceof AmqpTransactionCoordinator) {
coordinator = (AmqpTransactionCoordinator) resource;
}
}
@Override
public void removeChildResource(AmqpResource resource) {
// We don't clear the coordinator link so that we can refer to it
// to check if the current TX has failed due to link closed during
// normal operations.
}
@Override
public AmqpProvider getProvider() {
return session.getProvider();
}
//----- Completion for Commit or Rollback operation ----------------------//
private abstract class Completion implements AsyncResult {
protected boolean complete;
protected ProviderException failure;
@Override
public boolean isComplete() {
return complete;
}
public ProviderException getFailureCause() {
return failure;
}
}
private class DeclareCompletion extends Completion {
protected final DischargeCompletion parent;
public DeclareCompletion(DischargeCompletion parent) {
this.parent = parent;
}
@Override
public void onFailure(ProviderException result) {
complete = true;
failure = result;
parent.onDeclareFailure(result);
}
@Override
public void onSuccess() {
complete = true;
parent.onDeclareSuccess();
}
}
public class DischargeCompletion extends Completion {
private final DeclareCompletion declare;
private final AsyncResult request;
private final boolean commit;
public DischargeCompletion(AsyncResult request, JmsTransactionInfo nextTx, boolean commit) {
this.request = request;
this.commit = commit;
if (nextTx != null) {
this.declare = new DeclareCompletion(this);
} else {
this.declare = null;
}
}
public DeclareCompletion getDeclareCompletion() {
return declare;
}
public boolean isCommit() {
return commit;
}
public boolean isPipelined() {
return declare != null;
}
@Override
public void onFailure(ProviderException result) {
complete = true;
failure = result;
onDischargeFailure(result);
}
@Override
public void onSuccess() {
complete = true;
onDischargeSuccess();
}
public void onDeclareSuccess() {
// If the discharge has not completed yet we wait until it does
// so we end up with the correct result.
if (isComplete()) {
if (getFailureCause() == null) {
request.onSuccess();
} else {
request.onFailure(getFailureCause());
}
}
}
public void onDischargeSuccess() {
cleanup();
// If the declare already returned a result we can proceed otherwise
// we need to wait for it finish in order to get the correct outcome.
if (declare == null) {
request.onSuccess();
} else if (declare.isComplete()) {
if (declare.getFailureCause() == null) {
request.onSuccess();
} else {
request.onFailure(declare.getFailureCause());
}
}
}
public void onDeclareFailure(ProviderException failure) {
// If the discharge has not completed yet we wait until it does
// so we end up with the correct result.
if (isComplete()) {
if (getFailureCause() == null) {
request.onFailure(failure);
} else {
request.onFailure(getFailureCause());
}
}
}
public void onDischargeFailure(ProviderException failure) {
cleanup();
// If the declare already returned a result we can proceed otherwise
// we need to wait for it finish in order to get the correct outcome.
if (declare == null) {
request.onFailure(failure);
} else if (declare.isComplete()) {
request.onFailure(failure);
}
}
private void cleanup() {
if (commit) {
postCommit();
} else {
postRollback();
}
}
}
}