| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.activemq.store.amq; |
| |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| import javax.transaction.xa.XAException; |
| import org.apache.activemq.command.JournalTopicAck; |
| import org.apache.activemq.command.JournalTransaction; |
| import org.apache.activemq.command.Message; |
| import org.apache.activemq.command.MessageAck; |
| import org.apache.activemq.command.TransactionId; |
| import org.apache.activemq.command.XATransactionId; |
| import org.apache.activemq.kaha.impl.async.Location; |
| import org.apache.activemq.store.TransactionRecoveryListener; |
| import org.apache.activemq.store.TransactionStore; |
| |
| /** |
| */ |
| public class AMQTransactionStore implements TransactionStore { |
| |
| protected Map<TransactionId, AMQTx> inflightTransactions = new LinkedHashMap<TransactionId, AMQTx>(); |
| Map<TransactionId, AMQTx> preparedTransactions = new LinkedHashMap<TransactionId, AMQTx>(); |
| |
| private final AMQPersistenceAdapter peristenceAdapter; |
| private boolean doingRecover; |
| |
| public AMQTransactionStore(AMQPersistenceAdapter adapter) { |
| this.peristenceAdapter = adapter; |
| } |
| |
| /** |
| * @throws IOException |
| * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) |
| */ |
| public void prepare(TransactionId txid) throws IOException { |
| AMQTx tx = null; |
| synchronized (inflightTransactions) { |
| tx = inflightTransactions.remove(txid); |
| } |
| if (tx == null) { |
| return; |
| } |
| peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true); |
| synchronized (preparedTransactions) { |
| preparedTransactions.put(txid, tx); |
| } |
| } |
| |
| /** |
| * @throws IOException |
| * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId) |
| */ |
| public void replayPrepare(TransactionId txid) throws IOException { |
| AMQTx tx = null; |
| synchronized (inflightTransactions) { |
| tx = inflightTransactions.remove(txid); |
| } |
| if (tx == null) { |
| return; |
| } |
| synchronized (preparedTransactions) { |
| preparedTransactions.put(txid, tx); |
| } |
| } |
| |
| public AMQTx getTx(TransactionId txid, Location location) { |
| AMQTx tx = null; |
| synchronized (inflightTransactions) { |
| tx = inflightTransactions.get(txid); |
| if (tx == null) { |
| tx = new AMQTx(location); |
| inflightTransactions.put(txid, tx); |
| } |
| } |
| return tx; |
| } |
| |
| /** |
| * @throws XAException |
| * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) |
| */ |
| public void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,Runnable postCommit) throws IOException { |
| if (preCommit != null) { |
| preCommit.run(); |
| } |
| AMQTx tx; |
| if (wasPrepared) { |
| synchronized (preparedTransactions) { |
| tx = preparedTransactions.remove(txid); |
| } |
| } else { |
| synchronized (inflightTransactions) { |
| tx = inflightTransactions.remove(txid); |
| } |
| } |
| if (tx == null) { |
| if (postCommit != null) { |
| postCommit.run(); |
| } |
| return; |
| } |
| if (txid.isXATransaction()) { |
| peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared), true,true); |
| } else { |
| peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true,true); |
| } |
| if (postCommit != null) { |
| postCommit.run(); |
| } |
| } |
| |
| /** |
| * @throws XAException |
| * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction) |
| */ |
| public AMQTx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException { |
| if (wasPrepared) { |
| synchronized (preparedTransactions) { |
| return preparedTransactions.remove(txid); |
| } |
| } else { |
| synchronized (inflightTransactions) { |
| return inflightTransactions.remove(txid); |
| } |
| } |
| } |
| |
| /** |
| * @throws IOException |
| * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) |
| */ |
| public void rollback(TransactionId txid) throws IOException { |
| AMQTx tx = null; |
| synchronized (inflightTransactions) { |
| tx = inflightTransactions.remove(txid); |
| } |
| if (tx != null) { |
| synchronized (preparedTransactions) { |
| tx = preparedTransactions.remove(txid); |
| } |
| } |
| if (tx != null) { |
| if (txid.isXATransaction()) { |
| peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false), true,true); |
| } else { |
| peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false), true,true); |
| } |
| } |
| } |
| |
| /** |
| * @throws IOException |
| * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId) |
| */ |
| public void replayRollback(TransactionId txid) throws IOException { |
| boolean inflight = false; |
| synchronized (inflightTransactions) { |
| inflight = inflightTransactions.remove(txid) != null; |
| } |
| if (inflight) { |
| synchronized (preparedTransactions) { |
| preparedTransactions.remove(txid); |
| } |
| } |
| } |
| |
| public void start() throws Exception { |
| } |
| |
| public void stop() throws Exception { |
| } |
| |
| public synchronized void recover(TransactionRecoveryListener listener) throws IOException { |
| // All the in-flight transactions get rolled back.. |
| synchronized (inflightTransactions) { |
| inflightTransactions.clear(); |
| } |
| this.doingRecover = true; |
| try { |
| Map<TransactionId, AMQTx> txs = null; |
| synchronized (preparedTransactions) { |
| txs = new LinkedHashMap<TransactionId, AMQTx>(preparedTransactions); |
| } |
| for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) { |
| Object txid = iter.next(); |
| AMQTx tx = txs.get(txid); |
| listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks()); |
| } |
| } finally { |
| this.doingRecover = false; |
| } |
| } |
| |
| /** |
| * @param message |
| * @throws IOException |
| */ |
| void addMessage(AMQMessageStore store, Message message, Location location) throws IOException { |
| AMQTx tx = getTx(message.getTransactionId(), location); |
| tx.add(store, message, location); |
| } |
| |
| /** |
| * @param ack |
| * @throws IOException |
| */ |
| public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException { |
| AMQTx tx = getTx(ack.getTransactionId(), location); |
| tx.add(store, ack); |
| } |
| |
| public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) { |
| AMQTx tx = getTx(ack.getTransactionId(), location); |
| tx.add(store, ack); |
| } |
| |
| public Location checkpoint() throws IOException { |
| // Nothing really to checkpoint.. since, we don't |
| // checkpoint tx operations in to long term store until they are |
| // committed. |
| // But we keep track of the first location of an operation |
| // that was associated with an active tx. The journal can not |
| // roll over active tx records. |
| Location minimumLocationInUse = null; |
| synchronized (inflightTransactions) { |
| for (Iterator<AMQTx> iter = inflightTransactions.values().iterator(); iter.hasNext();) { |
| AMQTx tx = iter.next(); |
| Location location = tx.getLocation(); |
| if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) { |
| minimumLocationInUse = location; |
| } |
| } |
| } |
| synchronized (preparedTransactions) { |
| for (Iterator<AMQTx> iter = preparedTransactions.values().iterator(); iter.hasNext();) { |
| AMQTx tx = iter.next(); |
| Location location = tx.getLocation(); |
| if (minimumLocationInUse == null || location.compareTo(minimumLocationInUse) < 0) { |
| minimumLocationInUse = location; |
| } |
| } |
| return minimumLocationInUse; |
| } |
| } |
| |
| public boolean isDoingRecover() { |
| return doingRecover; |
| } |
| |
| /** |
| * @return the preparedTransactions |
| */ |
| public Map<TransactionId, AMQTx> getPreparedTransactions() { |
| return this.preparedTransactions; |
| } |
| |
| /** |
| * @param preparedTransactions the preparedTransactions to set |
| */ |
| public void setPreparedTransactions(Map<TransactionId, AMQTx> preparedTransactions) { |
| if (preparedTransactions != null) { |
| this.preparedTransactions.clear(); |
| this.preparedTransactions.putAll(preparedTransactions); |
| } |
| } |
| } |