[NO JIRA] Removing unused concurrentStoreAndDispatchTransactions variable from KahaDBStore. Cleaning up related unused code.
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 0d7feba..261a04b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -56,7 +56,6 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.protobuf.Buffer;
 import org.apache.activemq.store.AbstractMessageStore;
@@ -106,8 +105,8 @@
 
     protected ExecutorService queueExecutor;
     protected ExecutorService topicExecutor;
-    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
-    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
+    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<>();
+    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<>();
     final WireFormat wireFormat = new OpenWireFormat();
     private SystemUsage usageManager;
     private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
@@ -118,19 +117,13 @@
     // when true, message order may be compromised when cache is exhausted if store is out
     // or order w.r.t cache
     private boolean concurrentStoreAndDispatchTopics = false;
-    private final boolean concurrentStoreAndDispatchTransactions = false;
     private int maxAsyncJobs = MAX_ASYNC_JOBS;
     private final KahaDBTransactionStore transactionStore;
     private TransactionIdTransformer transactionIdTransformer;
 
     public KahaDBStore() {
         this.transactionStore = new KahaDBTransactionStore(this);
-        this.transactionIdTransformer = new TransactionIdTransformer() {
-            @Override
-            public TransactionId transform(TransactionId txid) {
-                return txid;
-            }
-        };
+        this.transactionIdTransformer = txid -> txid;
     }
 
     @Override
@@ -181,10 +174,6 @@
         this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
     }
 
-    public boolean isConcurrentStoreAndDispatchTransactions() {
-        return this.concurrentStoreAndDispatchTransactions;
-    }
-
     /**
      * @return the maxAsyncJobs
      */
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
index b0f5c41..021a986 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBTransactionStore.java
@@ -20,13 +20,8 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import org.apache.activemq.broker.ConnectionContext;
@@ -62,7 +57,6 @@
  */
 public class KahaDBTransactionStore implements TransactionStore {
     static final Logger LOG = LoggerFactory.getLogger(KahaDBTransactionStore.class);
-    ConcurrentMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
     private final KahaDBStore theStore;
 
     public KahaDBTransactionStore(KahaDBStore theStore) {
@@ -74,9 +68,9 @@
     }
 
     public class Tx {
-        private final List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<AddMessageCommand>());
+        private final List<AddMessageCommand> messages = Collections.synchronizedList(new ArrayList<>());
 
-        private final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<RemoveMessageCommand>());
+        private final List<RemoveMessageCommand> acks = Collections.synchronizedList(new ArrayList<>());
 
         public void add(AddMessageCommand msg) {
             messages.add(msg);
@@ -89,8 +83,7 @@
         public Message[] getMessages() {
             Message rc[] = new Message[messages.size()];
             int count = 0;
-            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
-                AddMessageCommand cmd = iter.next();
+            for (AddMessageCommand cmd : messages) {
                 rc[count++] = cmd.getMessage();
             }
             return rc;
@@ -99,8 +92,7 @@
         public MessageAck[] getAcks() {
             MessageAck rc[] = new MessageAck[acks.size()];
             int count = 0;
-            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
-                RemoveMessageCommand cmd = iter.next();
+            for (RemoveMessageCommand cmd : acks) {
                 rc[count++] = cmd.getMessageAck();
             }
             return rc;
@@ -111,16 +103,14 @@
          * @throws IOException
          */
         public List<Future<Object>> commit() throws IOException {
-            List<Future<Object>> results = new ArrayList<Future<Object>>();
+            List<Future<Object>> results = new ArrayList<>();
             // Do all the message adds.
-            for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
-                AddMessageCommand cmd = iter.next();
+            for (AddMessageCommand cmd : messages) {
                 results.add(cmd.run());
 
             }
             // And removes..
-            for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
-                RemoveMessageCommand cmd = iter.next();
+            for (RemoveMessageCommand cmd : acks) {
                 cmd.run();
                 results.add(cmd.run());
             }
@@ -129,7 +119,7 @@
         }
     }
 
-    public abstract class AddMessageCommand {
+    public abstract static class AddMessageCommand {
         private final ConnectionContext ctx;
         AddMessageCommand(ConnectionContext ctx) {
             this.ctx = ctx;
@@ -141,7 +131,7 @@
         abstract Future<Object> run(ConnectionContext ctx) throws IOException;
     }
 
-    public abstract class RemoveMessageCommand {
+    public abstract static class RemoveMessageCommand {
 
         private final ConnectionContext ctx;
         RemoveMessageCommand(ConnectionContext ctx) {
@@ -237,78 +227,20 @@
     @Override
     public void prepare(TransactionId txid) throws IOException {
         KahaTransactionInfo info = getTransactionInfo(txid);
-        if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
-            theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
-        } else {
-            Tx tx = inflightTransactions.remove(txid);
-            if (tx != null) {
-               theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
-            }
-        }
-    }
-
-    public Tx getTx(Object txid) {
-        Tx tx = inflightTransactions.get(txid);
-        if (tx == null) {
-            synchronized (inflightTransactions) {
-                tx = inflightTransactions.get(txid);
-                if (tx == null) {
-                    tx = new Tx();
-                    inflightTransactions.put(txid, tx);
-                }
-            }
-        }
-        return tx;
+        theStore.store(new KahaPrepareCommand().setTransactionInfo(info), true, null, null);
     }
 
     @Override
     public void commit(TransactionId txid, boolean wasPrepared, final Runnable preCommit, Runnable postCommit)
             throws IOException {
         if (txid != null) {
-            if (!txid.isXATransaction() && theStore.isConcurrentStoreAndDispatchTransactions()) {
-                if (preCommit != null) {
-                    preCommit.run();
-                }
-                Tx tx = inflightTransactions.remove(txid);
-                if (tx != null) {
-                    List<Future<Object>> results = tx.commit();
-                    boolean doneSomething = false;
-                    for (Future<Object> result : results) {
-                        try {
-                            result.get();
-                        } catch (InterruptedException e) {
-                            theStore.brokerService.handleIOException(new IOException(e.getMessage()));
-                        } catch (ExecutionException e) {
-                            theStore.brokerService.handleIOException(new IOException(e.getMessage()));
-                        }catch(CancellationException e) {
-                        }
-                        if (!result.isCancelled()) {
-                            doneSomething = true;
-                        }
-                    }
-                    if (postCommit != null) {
-                        postCommit.run();
-                    }
-                    if (doneSomething) {
-                        KahaTransactionInfo info = getTransactionInfo(txid);
-                        theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
-                    }
-                }else {
-                    //The Tx will be null for failed over clients - lets run their post commits
-                    if (postCommit != null) {
-                        postCommit.run();
-                    }
-                }
-
-            } else {
-                KahaTransactionInfo info = getTransactionInfo(txid);
-                if (preCommit != null) {
-                    preCommit.run();
-                }
-                theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit);
-                forgetRecoveredAcks(txid, false);
+            KahaTransactionInfo info = getTransactionInfo(txid);
+            if (preCommit != null) {
+                preCommit.run();
             }
-        }else {
+            theStore.store(new KahaCommitCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, postCommit);
+            forgetRecoveredAcks(txid, false);
+        } else {
            LOG.error("Null transaction passed on commit");
         }
     }
@@ -319,13 +251,9 @@
      */
     @Override
     public void rollback(TransactionId txid) throws IOException {
-        if (txid.isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
-            KahaTransactionInfo info = getTransactionInfo(txid);
-            theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
-            forgetRecoveredAcks(txid, true);
-        } else {
-            inflightTransactions.remove(txid);
-        }
+        KahaTransactionInfo info = getTransactionInfo(txid);
+        theStore.store(new KahaRollbackCommand().setTransactionInfo(info), theStore.isEnableJournalDiskSyncs(), null, null);
+        forgetRecoveredAcks(txid, true);
     }
 
     protected void forgetRecoveredAcks(TransactionId txid, boolean isRollback) throws IOException {
@@ -347,8 +275,8 @@
     public synchronized void recover(TransactionRecoveryListener listener) throws IOException {
         for (Map.Entry<TransactionId, List<Operation>> entry : theStore.preparedTransactions.entrySet()) {
             XATransactionId xid = (XATransactionId) entry.getKey();
-            ArrayList<Message> messageList = new ArrayList<Message>();
-            ArrayList<MessageAck> ackList = new ArrayList<MessageAck>();
+            ArrayList<Message> messageList = new ArrayList<>();
+            ArrayList<MessageAck> ackList = new ArrayList<>();
 
             for (Operation op : entry.getValue()) {
                 if (op.getClass() == MessageDatabase.AddOperation.class) {
@@ -384,52 +312,14 @@
      */
     void addMessage(ConnectionContext context, final MessageStore destination, final Message message)
             throws IOException {
-
-        if (message.getTransactionId() != null) {
-            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
-                destination.addMessage(context, message);
-            } else {
-                Tx tx = getTx(message.getTransactionId());
-                tx.add(new AddMessageCommand(context) {
-                    @Override
-                    public Message getMessage() {
-                        return message;
-                    }
-                    @Override
-                    public Future<Object> run(ConnectionContext ctx) throws IOException {
-                        destination.addMessage(ctx, message);
-                        return AbstractMessageStore.FUTURE;
-                    }
-
-                });
-            }
-        } else {
-            destination.addMessage(context, message);
-        }
+        destination.addMessage(context, message);
     }
 
     ListenableFuture<Object> asyncAddQueueMessage(ConnectionContext context, final MessageStore destination, final Message message)
             throws IOException {
-
         if (message.getTransactionId() != null) {
-            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions() == false) {
-                destination.addMessage(context, message);
-                return AbstractMessageStore.FUTURE;
-            } else {
-                Tx tx = getTx(message.getTransactionId());
-                tx.add(new AddMessageCommand(context) {
-                    @Override
-                    public Message getMessage() {
-                        return message;
-                    }
-                    @Override
-                    public Future<Object> run(ConnectionContext ctx) throws IOException {
-                        return destination.asyncAddQueueMessage(ctx, message);
-                    }
-
-                });
-                return AbstractMessageStore.FUTURE;
-            }
+            destination.addMessage(context, message);
+            return AbstractMessageStore.FUTURE;
         } else {
             return destination.asyncAddQueueMessage(context, message);
         }
@@ -439,24 +329,8 @@
             throws IOException {
 
         if (message.getTransactionId() != null) {
-            if (message.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
-                destination.addMessage(context, message);
-                return AbstractMessageStore.FUTURE;
-            } else {
-                Tx tx = getTx(message.getTransactionId());
-                tx.add(new AddMessageCommand(context) {
-                    @Override
-                    public Message getMessage() {
-                        return message;
-                    }
-                    @Override
-                    public Future<Object> run(ConnectionContext ctx) throws IOException {
-                        return destination.asyncAddTopicMessage(ctx, message);
-                    }
-
-                });
-                return AbstractMessageStore.FUTURE;
-            }
+            destination.addMessage(context, message);
+            return AbstractMessageStore.FUTURE;
         } else {
             return destination.asyncAddTopicMessage(context, message);
         }
@@ -468,80 +342,17 @@
      */
     final void removeMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
             throws IOException {
-
-        if (ack.isInTransaction()) {
-            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
-                destination.removeMessage(context, ack);
-            } else {
-                Tx tx = getTx(ack.getTransactionId());
-                tx.add(new RemoveMessageCommand(context) {
-                    @Override
-                    public MessageAck getMessageAck() {
-                        return ack;
-                    }
-
-                    @Override
-                    public Future<Object> run(ConnectionContext ctx) throws IOException {
-                        destination.removeMessage(ctx, ack);
-                        return AbstractMessageStore.FUTURE;
-                    }
-                });
-            }
-        } else {
-            destination.removeMessage(context, ack);
-        }
+        destination.removeMessage(context, ack);
     }
 
     final void removeAsyncMessage(ConnectionContext context, final MessageStore destination, final MessageAck ack)
             throws IOException {
-
-        if (ack.isInTransaction()) {
-            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()==false) {
-                destination.removeAsyncMessage(context, ack);
-            } else {
-                Tx tx = getTx(ack.getTransactionId());
-                tx.add(new RemoveMessageCommand(context) {
-                    @Override
-                    public MessageAck getMessageAck() {
-                        return ack;
-                    }
-
-                    @Override
-                    public Future<Object> run(ConnectionContext ctx) throws IOException {
-                        destination.removeMessage(ctx, ack);
-                        return AbstractMessageStore.FUTURE;
-                    }
-                });
-            }
-        } else {
-            destination.removeAsyncMessage(context, ack);
-        }
+        destination.removeAsyncMessage(context, ack);
     }
 
     final void acknowledge(ConnectionContext context, final TopicMessageStore destination, final String clientId, final String subscriptionName,
                            final MessageId messageId, final MessageAck ack) throws IOException {
-
-        if (ack.isInTransaction()) {
-            if (ack.getTransactionId().isXATransaction() || theStore.isConcurrentStoreAndDispatchTransactions()== false) {
-                destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
-            } else {
-                Tx tx = getTx(ack.getTransactionId());
-                tx.add(new RemoveMessageCommand(context) {
-                    @Override
-                    public MessageAck getMessageAck() {
-                        return ack;
-                    }
-
-                    @Override
-                    public Future<Object> run(ConnectionContext ctx) throws IOException {
-                        destination.acknowledge(ctx, clientId, subscriptionName, messageId, ack);
-                        return AbstractMessageStore.FUTURE;
-                    }
-                });
-            }
-        } else {
-            destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
-        }
+        destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
     }