Merge pull request #904 from amazon-mq/remove-dead-code
[NO JIRA] Removing unused variable `concurrentStoreAndDispatchTransactions` in `KahaDBStore`
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 0d7feba..261a04b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -56,7 +56,6 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.command.TransactionId;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.protobuf.Buffer;
import org.apache.activemq.store.AbstractMessageStore;
@@ -106,8 +105,8 @@
protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor;
- protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
- protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
+ protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<>();
+ protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<>();
final WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager;
private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
@@ -118,19 +117,13 @@
// when true, message order may be compromised when cache is exhausted if store is out
// or order w.r.t cache
private boolean concurrentStoreAndDispatchTopics = false;
- private final boolean concurrentStoreAndDispatchTransactions = false;
private int maxAsyncJobs = MAX_ASYNC_JOBS;
private final KahaDBTransactionStore transactionStore;
private TransactionIdTransformer transactionIdTransformer;
public KahaDBStore() {
this.transactionStore = new KahaDBTransactionStore(this);
- this.transactionIdTransformer = new TransactionIdTransformer() {
- @Override
- public TransactionId transform(TransactionId txid) {
- return txid;
- }
- };
+ this.transactionIdTransformer = txid -> txid;
}
@Override
@@ -181,10 +174,6 @@
this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
}
- public boolean isConcurrentStoreAndDispatchTransactions() {
- return this.concurrentStoreAndDispatchTransactions;
- }
-
/**
* @return the maxAsyncJobs
*/
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
index b0f5c41..021a986 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
@@ -20,13 +20,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.activemq.broker.ConnectionContext;
@@ -62,7 +57,6 @@
*/
public class KahaDBTransactionStore implements TransactionStore {
static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
- ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
private final KahaDBStore theStore;
public KahaDBTransactionStore(KahaDBStore theStore) {
@@ -74,9 +68,9 @@
}
public class Tx {
- private final List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<AddMessageCommand>());
+ private final List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<>());
- private final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<RemoveMessageCommand>());
+ private final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<>());
public void add(AddMessageCommand msg) {
messages.add(msg);
@@ -89,8 +83,7 @@
public Message[] getMessages() {
Message rc[] = new Message[messages.size()];
int count = 0;
- for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
- AddMessageCommand cmd = iter.next();
+ for (AddMessageCommand cmd : messages) {
rc[count++] = cmd.getMessage();
}
return rc;
@@ -99,8 +92,7 @@
public MessageAck[] getAcks() {
MessageAck rc[] = new MessageAck[acks.size()];
int count = 0;
- for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
- RemoveMessageCommand cmd = iter.next();
+ for (RemoveMessageCommand cmd : acks) {
rc[count++] = cmd.getMessageAck();
}
return rc;
@@ -111,16 +103,14 @@
* @throws IOException
*/
public List<Future<Object>> commit() throws IOException {
- List<Future<Object>> results = new ArrayList<Future<Object>>();
+ List<Future<Object>> results = new ArrayList<>();
// Do all the message adds.
- for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
- AddMessageCommand cmd = iter.next();
+ for (AddMessageCommand cmd : messages) {
results.add(cmd.run());
}
// And removes..
- for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
- RemoveMessageCommand cmd = iter.next();
+ for (RemoveMessageCommand cmd : acks) {
cmd.run();
results.add(cmd.run());
}
@@ -129,7 +119,7 @@
}
}
- public abstract class AddMessageCommand {
+ public abstract static class AddMessageCommand {
private final ConnectionContext ctx;
AddMessageCommand(ConnectionContext ctx) {
this.ctx = ctx;
@@ -141,7 +131,7 @@
abstract Future<Object> run(ConnectionContext ctx) throws IOException;
}
- public abstract class RemoveMessageCommand {
+ public abstract static class RemoveMessageCommand {
private final ConnectionContext ctx;
RemoveMessageCommand(ConnectionContext ctx) {
@@ -237,78 +227,20 @@
@Override
public void prepare(TransactionId txid) throws IOException {
KahaTransactionInfo info = getTransactionInfo(txid);
- if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
- theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
- } else {
- Tx tx = inflightTransactions.remove(txid);
- if (tx != null) {
- theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
- }
- }
- }
-
- public Tx getTx(Object txid) {
- Tx tx = inflightTransactions.get(txid);
- if (tx == null) {
- synchronized (inflightTransactions) {
- tx = inflightTransactions.get(txid);
- if (tx == null) {
- tx = new Tx();
- inflightTransactions.put(txid, tx);
- }
- }
- }
- return tx;
+ theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
}
@Override
public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit)
throws IOException {
if (txid != null) {
- if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
- if (preCommit != null) {
- preCommit.run();
- }
- Tx tx = inflightTransactions.remove(txid);
- if (tx != null) {
- List<Future<Object>> results = tx.commit();
- boolean doneSomething = false;
- for (Future<Object> result : results) {
- try {
- result.get();
- } catch (InterruptedException e) {
- theStore.brokerService.handleIOException(new IOException(e.getMessage()));
- } catch (ExecutionException e) {
- theStore.brokerService.handleIOException(new IOException(e.getMessage()));
- }catch(CancellationException e) {
- }
- if (!result.isCancelled()) {
- doneSomething = true;
- }
- }
- if (postCommit != null) {
- postCommit.run();
- }
- if (doneSomething) {
- KahaTransactionInfo info = getTransactionInfo(txid);
- theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
- }
- }else {
- //The Tx will be null for failed over clients - lets run their post commits
- if (postCommit != null) {
- postCommit.run();
- }
- }
-
- } else {
- KahaTransactionInfo info = getTransactionInfo(txid);
- if (preCommit != null) {
- preCommit.run();
- }
- theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit);
- forgetRecoveredAcks(txid, false);
+ KahaTransactionInfo info = getTransactionInfo(txid);
+ if (preCommit != null) {
+ preCommit.run();
}
- }else {
+ theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit);
+ forgetRecoveredAcks(txid, false);
+ } else {
LOG.error("Null transaction passed on commit");
}
}
@@ -319,13 +251,9 @@
*/
@Override
public void rollback(TransactionId txid) throws IOException {
- if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
- KahaTransactionInfo info = getTransactionInfo(txid);
- theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
- forgetRecoveredAcks(txid, true);
- } else {
- inflightTransactions.remove(txid);
- }
+ KahaTransactionInfo info = getTransactionInfo(txid);
+ theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
+ forgetRecoveredAcks(txid, true);
}
protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException {
@@ -347,8 +275,8 @@
public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
XATransactionId xid = (XATransactionId) entry.getKey();
- ArrayList<Message> messageList = new ArrayList<Message>();
- ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
+ ArrayList<Message> messageList = new ArrayList<>();
+ ArrayList<MessageAck> ackList = new ArrayList<>();
for (Operation op : entry.getValue()) {
if (op.getClass() == MessageDatabase.AddOperation.class) {
@@ -384,52 +312,14 @@
*/
void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
-
- if (message.getTransactionId() != null) {
- if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
- destination.addMessage(context, message);
- } else {
- Tx tx = getTx(message.getTransactionId());
- tx.add(new AddMessageCommand(context) {
- @Override
- public Message getMessage() {
- return message;
- }
- @Override
- public Future<Object> run(ConnectionContext ctx) throws IOException {
- destination.addMessage(ctx, message);
- return AbstractMessageStore.FUTURE;
- }
-
- });
- }
- } else {
- destination.addMessage(context, message);
- }
+ destination.addMessage(context, message);
}
ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
throws IOException {
-
if (message.getTransactionId() != null) {
- if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
- destination.addMessage(context, message);
- return AbstractMessageStore.FUTURE;
- } else {
- Tx tx = getTx(message.getTransactionId());
- tx.add(new AddMessageCommand(context) {
- @Override
- public Message getMessage() {
- return message;
- }
- @Override
- public Future<Object> run(ConnectionContext ctx) throws IOException {
- return destination.asyncAddQueueMessage(ctx, message);
- }
-
- });
- return AbstractMessageStore.FUTURE;
- }
+ destination.addMessage(context, message);
+ return AbstractMessageStore.FUTURE;
} else {
return destination.asyncAddQueueMessage(context, message);
}
@@ -439,24 +329,8 @@
throws IOException {
if (message.getTransactionId() != null) {
- if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
- destination.addMessage(context, message);
- return AbstractMessageStore.FUTURE;
- } else {
- Tx tx = getTx(message.getTransactionId());
- tx.add(new AddMessageCommand(context) {
- @Override
- public Message getMessage() {
- return message;
- }
- @Override
- public Future<Object> run(ConnectionContext ctx) throws IOException {
- return destination.asyncAddTopicMessage(ctx, message);
- }
-
- });
- return AbstractMessageStore.FUTURE;
- }
+ destination.addMessage(context, message);
+ return AbstractMessageStore.FUTURE;
} else {
return destination.asyncAddTopicMessage(context, message);
}
@@ -468,80 +342,17 @@
*/
final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
throws IOException {
-
- if (ack.isInTransaction()) {
- if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
- destination.removeMessage(context, ack);
- } else {
- Tx tx = getTx(ack.getTransactionId());
- tx.add(new RemoveMessageCommand(context) {
- @Override
- public MessageAck getMessageAck() {
- return ack;
- }
-
- @Override
- public Future<Object> run(ConnectionContext ctx) throws IOException {
- destination.removeMessage(ctx, ack);
- return AbstractMessageStore.FUTURE;
- }
- });
- }
- } else {
- destination.removeMessage(context, ack);
- }
+ destination.removeMessage(context, ack);
}
final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
throws IOException {
-
- if (ack.isInTransaction()) {
- if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
- destination.removeAsyncMessage(context, ack);
- } else {
- Tx tx = getTx(ack.getTransactionId());
- tx.add(new RemoveMessageCommand(context) {
- @Override
- public MessageAck getMessageAck() {
- return ack;
- }
-
- @Override
- public Future<Object> run(ConnectionContext ctx) throws IOException {
- destination.removeMessage(ctx, ack);
- return AbstractMessageStore.FUTURE;
- }
- });
- }
- } else {
- destination.removeAsyncMessage(context, ack);
- }
+ destination.removeAsyncMessage(context, ack);
}
final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName,
final MessageId messageId, final MessageAck ack) throws IOException {
-
- if (ack.isInTransaction()) {
- if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
- destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
- } else {
- Tx tx = getTx(ack.getTransactionId());
- tx.add(new RemoveMessageCommand(context) {
- @Override
- public MessageAck getMessageAck() {
- return ack;
- }
-
- @Override
- public Future<Object> run(ConnectionContext ctx) throws IOException {
- destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
- return AbstractMessageStore.FUTURE;
- }
- });
- }
- } else {
- destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
- }
+ destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
}