fixes #1069 close tx before notifying that tx completed (#1070)

diff --git a/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java b/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
index ece178c..ffa4292 100644
--- a/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
+++ b/modules/core/src/main/java/org/apache/fluo/core/async/CommitManager.java
@@ -4,9 +4,9 @@
  * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License. You may obtain a
  * copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software distributed under the License
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
@@ -82,39 +82,42 @@
     @Override
     public void committed() {
       try {
-        aco.committed();
-      } finally {
+        // perform cleanup and close tx before notifying observer
         finish(TxResult.COMMITTED);
+      } finally {
+        aco.committed();
       }
     }
 
     @Override
     public void failed(Throwable t) {
       try {
-        aco.failed(t);
-      } finally {
+        // perform cleanup and close tx before notifying observer
         finish(TxResult.ERROR);
+      } finally {
+        aco.failed(t);
       }
     }
 
     @Override
     public void alreadyAcknowledged() {
       try {
-        aco.alreadyAcknowledged();
-      } finally {
+        // perform cleanup and close tx before notifying observer
         finish(TxResult.AACKED);
+      } finally {
+        aco.alreadyAcknowledged();
       }
     }
 
     @Override
     public void commitFailed(String msg) {
       try {
-        aco.commitFailed(msg);
-      } finally {
+        // perform cleanup and close tx before notifying observer
         finish(TxResult.COMMIT_EXCEPTION);
+      } finally {
+        aco.commitFailed(msg);
       }
     }
-
   }