[OMID-90] Implement low latency tso capability. Instead of TSO writing to commit table the clients do it. In order to insure snapshot isolation, clients invalidate transaction when reading from commit table

Signed-off-by: Ohad Shacham <ohads@yahoo-inc.com>
diff --git a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
index 91f590e..52d0068 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/CommitTable.java
@@ -22,6 +22,7 @@
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
 
 public interface CommitTable {
 
@@ -46,6 +47,11 @@
          * Allows to clean the write's current buffer. It is required for HA
          */
         void clearWriteBuffer();
+
+        /**
+         * Add commited transaction while checking if invalidated by other client
+         */
+        boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException;
     }
 
     interface Client extends Closeable {
diff --git a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
index 90af54a..6f9f384 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/InMemoryCommitTable.java
@@ -66,6 +66,14 @@
         }
 
         @Override
+        public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
+            // In this implementation, we use only one location that represents
+            // both the value and the invalidation. Therefore, putIfAbsent is
+            // required to make sure the entry was not invalidated.
+            return (table.putIfAbsent(startTimestamp, commitTimestamp) == null);
+        }
+
+        @Override
         public void close() {
         }
     }
diff --git a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
index 1cba77e..c27a238 100644
--- a/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
+++ b/commit-table/src/main/java/org/apache/omid/committable/NullCommitTable.java
@@ -51,6 +51,11 @@
         }
 
         @Override
+        public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
+            return true;
+        }
+
+        @Override
         public void flush() throws IOException {
             // noop
         }
diff --git a/common/src/main/proto/TSOProto.proto b/common/src/main/proto/TSOProto.proto
index 749beaa..507f196 100644
--- a/common/src/main/proto/TSOProto.proto
+++ b/common/src/main/proto/TSOProto.proto
@@ -63,4 +63,5 @@
 message HandshakeResponse {
     optional bool clientCompatible = 1;
     optional Capabilities serverCapabilities = 2;
+    optional bool lowLatency = 3[default= false];
 }
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
index b31d2c9..72f3463 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseTransactionManager.java
@@ -82,6 +82,7 @@
         // Optional parameters - initialized to default values
         private Optional<TSOProtocol> tsoClient = Optional.absent();
         private Optional<CommitTable.Client> commitTableClient = Optional.absent();
+        private Optional<CommitTable.Writer> commitTableWriter = Optional.absent();
         private Optional<PostCommitActions> postCommitter = Optional.absent();
 
         public Builder(HBaseOmidClientConfiguration hbaseOmidClientConf) {
@@ -106,6 +107,7 @@
         public HBaseTransactionManager build() throws IOException, InterruptedException {
 
             CommitTable.Client commitTableClient = this.commitTableClient.or(buildCommitTableClient()).get();
+            CommitTable.Writer commitTableWriter = this.commitTableWriter.or(buildCommitTableWriter()).get();
             PostCommitActions postCommitter = this.postCommitter.or(buildPostCommitter(commitTableClient)).get();
             TSOProtocol tsoClient = this.tsoClient.or(buildTSOClient()).get();
 
@@ -113,6 +115,7 @@
                                                postCommitter,
                                                tsoClient,
                                                commitTableClient,
+                                               commitTableWriter,
                                                new HBaseTransactionFactory());
         }
 
@@ -128,6 +131,13 @@
             return Optional.of(commitTable.getClient());
         }
 
+        private Optional<CommitTable.Writer> buildCommitTableWriter() throws IOException {
+            HBaseCommitTableConfig commitTableConf = new HBaseCommitTableConfig();
+            commitTableConf.setTableName(hbaseOmidClientConf.getCommitTableName());
+            CommitTable commitTable = new HBaseCommitTable(hbaseOmidClientConf.getHBaseConfiguration(), commitTableConf);
+            return Optional.of(commitTable.getWriter());
+        }
+
         private Optional<PostCommitActions> buildPostCommitter(CommitTable.Client commitTableClient ) {
 
             PostCommitActions postCommitter;
@@ -160,14 +170,15 @@
                                     PostCommitActions postCommitter,
                                     TSOProtocol tsoClient,
                                     CommitTable.Client commitTableClient,
+                                    CommitTable.Writer commitTableWriter,
                                     HBaseTransactionFactory hBaseTransactionFactory) {
 
         super(hBaseOmidClientConfiguration.getMetrics(),
-              postCommitter,
-              tsoClient,
-              commitTableClient,
-              hBaseTransactionFactory);
-
+                postCommitter,
+                tsoClient,
+                commitTableClient,
+                commitTableWriter,
+                hBaseTransactionFactory);
     }
 
     // ----------------------------------------------------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
index 67c9cba..99f95db 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
@@ -27,18 +27,7 @@
 import org.apache.omid.metrics.NullMetricsProvider;
 import org.apache.omid.timestamp.storage.HBaseTimestampStorage;
 import org.apache.omid.timestamp.storage.TimestampStorage;
-import org.apache.omid.tso.BatchPoolModule;
-import org.apache.omid.tso.DisruptorModule;
-import org.apache.omid.tso.RuntimeExceptionPanicker;
-import org.apache.omid.tso.NetworkInterfaceUtils;
-import org.apache.omid.tso.Panicker;
-import org.apache.omid.tso.PausableTimestampOracle;
-import org.apache.omid.tso.PersistenceProcessorHandler;
-import org.apache.omid.tso.TSOChannelHandler;
-import org.apache.omid.tso.TSOServerConfig;
-import org.apache.omid.tso.TSOStateManager;
-import org.apache.omid.tso.TSOStateManagerImpl;
-import org.apache.omid.tso.TimestampOracle;
+import org.apache.omid.tso.*;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -72,6 +61,7 @@
         bind(TimestampStorage.class).to(HBaseTimestampStorage.class).in(Singleton.class);
         bind(TimestampOracle.class).to(PausableTimestampOracle.class).in(Singleton.class);
         bind(Panicker.class).to(RuntimeExceptionPanicker.class).in(Singleton.class);
+        bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
 
         install(new BatchPoolModule(config));
 
diff --git a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
index 89815c4..0464c39 100644
--- a/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
+++ b/hbase-commit-table/src/main/java/org/apache/omid/committable/hbase/HBaseCommitTable.java
@@ -24,14 +24,11 @@
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.CodedOutputStream;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.CommitTable.CommitTimestamp.Location;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -129,6 +126,18 @@
         }
 
         @Override
+        public boolean atomicAddCommittedTransaction(long startTimestamp, long commitTimestamp) throws IOException {
+            assert (startTimestamp < commitTimestamp);
+            byte[] transactionRow = startTimestampToKey(startTimestamp);
+            Put put = new Put(transactionRow, startTimestamp);
+            byte[] value = encodeCommitTimestamp(startTimestamp, commitTimestamp);
+            put.add(commitTableFamily, COMMIT_TABLE_QUALIFIER, value);
+
+            // TODO checkandput return false but still writes the put!?!
+            return table.checkAndPut(transactionRow, commitTableFamily, INVALID_TX_QUALIFIER, null, put);
+        }
+
+        @Override
         public void close() throws IOException {
             clearWriteBuffer();
             table.close();
@@ -254,7 +263,7 @@
             try {
                 byte[] row = startTimestampToKey(startTimestamp);
                 Put invalidationPut = new Put(row, startTimestamp);
-                invalidationPut.add(commitTableFamily, INVALID_TX_QUALIFIER, null);
+                invalidationPut.add(commitTableFamily, INVALID_TX_QUALIFIER, Bytes.toBytes(1));
 
                 // We need to write to the invalid column only if the commit timestamp
                 // is empty. This has to be done atomically. Otherwise, if we first
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
index abfe67c..0a6d29f 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
@@ -26,20 +26,7 @@
 import org.apache.omid.metrics.NullMetricsProvider;
 import org.apache.omid.timestamp.storage.HBaseTimestampStorage;
 import org.apache.omid.timestamp.storage.TimestampStorage;
-import org.apache.omid.tso.BatchPoolModule;
-import org.apache.omid.tso.DisruptorModule;
-import org.apache.omid.tso.LeaseManagement;
-import org.apache.omid.tso.MockPanicker;
-import org.apache.omid.tso.NetworkInterfaceUtils;
-import org.apache.omid.tso.Panicker;
-import org.apache.omid.tso.PersistenceProcessorHandler;
-import org.apache.omid.tso.TSOChannelHandler;
-import org.apache.omid.tso.TSOServerConfig;
-import org.apache.omid.tso.TSOStateManager;
-import org.apache.omid.tso.TSOStateManagerImpl;
-import org.apache.omid.tso.TimestampOracle;
-import org.apache.omid.tso.TimestampOracleImpl;
-import org.apache.omid.tso.VoidLeaseManager;
+import org.apache.omid.tso.*;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -76,7 +63,7 @@
         // Timestamp storage creation
         bind(TimestampStorage.class).to(HBaseTimestampStorage.class).in(Singleton.class);
         bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class);
-
+        bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
         install(new BatchPoolModule(config));
         // DisruptorConfig
         install(new DisruptorModule(config));
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
index 8b406b4..a41466e 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
@@ -65,6 +65,7 @@
     private final PostCommitActions postCommitter;
     protected final TSOProtocol tsoClient;
     protected final CommitTable.Client commitTableClient;
+    private final CommitTable.Writer commitTableWriter;
     private final TransactionFactory<? extends CellId> transactionFactory;
 
     // Metrics
@@ -94,11 +95,13 @@
                                       PostCommitActions postCommitter,
                                       TSOProtocol tsoClient,
                                       CommitTable.Client commitTableClient,
+                                      CommitTable.Writer commitTableWriter,
                                       TransactionFactory<? extends CellId> transactionFactory) {
 
         this.tsoClient = tsoClient;
         this.postCommitter = postCommitter;
         this.commitTableClient = commitTableClient;
+        this.commitTableWriter = commitTableWriter;
         this.transactionFactory = transactionFactory;
 
         // Metrics configuration
@@ -108,7 +111,6 @@
         this.rolledbackTxsCounter = metrics.counter(name("omid", "tm", "hbase", "rolledbackTxs"));
         this.errorTxsCounter = metrics.counter(name("omid", "tm", "hbase", "erroredTxs"));
         this.invalidatedTxsCounter = metrics.counter(name("omid", "tm", "hbase", "invalidatedTxs"));
-
     }
 
     /**
@@ -198,7 +200,10 @@
                 if (tx.getWriteSet().isEmpty()) {
                     markReadOnlyTransaction(tx); // No need for read-only transactions to contact the TSO Server
                 } else {
-                    commitRegularTransaction(tx);
+                    if (tsoClient.isLowLatency())
+                        commitLowLatencyTransaction(tx);
+                    else
+                        commitRegularTransaction(tx);
                 }
                 committedTxsCounter.inc();
             } finally {
@@ -312,22 +317,43 @@
 
             // 2) Then check the commit table
             // If the data was written at a previous epoch, check whether the transaction was invalidated
-            Optional<CommitTimestamp> commitTimeStamp = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();
+            Optional<CommitTimestamp> commitTimeStampFromCT = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();
+
+            boolean invalidatedByOther = false;
+            if (commitTimeStampFromCT.isPresent()) {
+                if (tsoClient.isLowLatency() && !commitTimeStampFromCT.get().isValid())
+                    invalidatedByOther = true;
+                else
+                    return commitTimeStampFromCT.get();
+            }
+
+            // 3) Read from shadow cell
+            Optional<CommitTimestamp> commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
             if (commitTimeStamp.isPresent()) {
                 return commitTimeStamp.get();
             }
 
-            // 3) Read from shadow cell
-            commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
-            if (commitTimeStamp.isPresent()) {
-                return commitTimeStamp.get();
+            // In case of LL, if found invalid ct cell, still must check sc in stage 3 then return
+            if (invalidatedByOther) {
+                return commitTimeStampFromCT.get();
             }
 
             // 4) Check the epoch and invalidate the entry
             // if the data was written by a transaction from a previous epoch (previous TSO)
-            if (cellStartTimestamp < epoch) {
+            if (cellStartTimestamp < epoch || tsoClient.isLowLatency()) {
                 boolean invalidated = commitTableClient.tryInvalidateTransaction(cellStartTimestamp).get();
                 if (invalidated) { // Invalid commit timestamp
+
+                    // If we are running lowLatency Omid, we could have manged to invalidate a ct entry,
+                    // but the committing client already wrote to shadow cells:
+                    if (tsoClient.isLowLatency()) {
+                        commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
+                        if (commitTimeStamp.isPresent()) {
+                            // Remove false invalidation from commit table
+                            commitTableClient.completeTransaction(cellStartTimestamp);
+                            return commitTimeStamp.get();
+                        }
+                    }
                     return new CommitTimestamp(COMMIT_TABLE, CommitTable.INVALID_TRANSACTION_MARKER, false);
                 }
             }
@@ -417,6 +443,41 @@
 
     }
 
+    private void commitLowLatencyTransaction(AbstractTransaction<? extends CellId> tx)
+            throws RollbackException, TransactionException {
+        try {
+
+            long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet()).get();
+            boolean commited = commitTableWriter.atomicAddCommittedTransaction(tx.getStartTimestamp(),commitTs);
+            if (!commited) {
+                // Trasaction has been invalidated by other client
+                rollback(tx);
+                commitTableClient.completeTransaction(tx.getStartTimestamp());
+                rolledbackTxsCounter.inc();
+                throw new RollbackException("Transaction " + tx.getTransactionId() + " got invalidated");
+            }
+            certifyCommitForTx(tx, commitTs);
+            updateShadowCellsAndRemoveCommitTableEntry(tx, postCommitter);
+
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof AbortException) { // TSO reports Tx conflicts as AbortExceptions in the future
+                rollback(tx);
+                rolledbackTxsCounter.inc();
+                throw new RollbackException("Conflicts detected in tx writeset", e.getCause());
+            }
+
+            if (e.getCause() instanceof ServiceUnavailableException || e.getCause() instanceof ConnectionException) {
+                errorTxsCounter.inc();
+            } else {
+                throw new TransactionException(tx + ": cannot determine Tx outcome", e.getCause());
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
     private void commitRegularTransaction(AbstractTransaction<? extends CellId> tx)
             throws RollbackException, TransactionException
     {
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java
index 0511e0f..8486e02 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java
@@ -104,6 +104,11 @@
     }
 
     @Override
+    public boolean isLowLatency() {
+        return false;
+    }
+
+    @Override
     public long getEpoch() {
         return 0;
     }
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
index fd92792..617b4d5 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
@@ -92,6 +92,8 @@
     private InetSocketAddress tsoAddr;
     private String zkCurrentTsoPath;
 
+    private boolean lowLatency;
+
     // ----------------------------------------------------------------------------------------------------------------
     // Construction
     // ----------------------------------------------------------------------------------------------------------------
@@ -159,6 +161,7 @@
         bootstrap.setOption("keepAlive", true);
         bootstrap.setOption("reuseAddress", true);
         bootstrap.setOption("connectTimeoutMillis", 100);
+        lowLatency = false;
     }
 
     // ----------------------------------------------------------------------------------------------------------------
@@ -264,6 +267,11 @@
 
     }
 
+    @Override
+    public boolean isLowLatency() {
+        return lowLatency;
+    }
+
     // ****************************************** Finite State Machine ************************************************
 
     // ----------------------------------------------------------------------------------------------------------------
@@ -530,6 +538,7 @@
         }
 
         public StateMachine.State handleEvent(ResponseEvent e) {
+            lowLatency = e.getParam().getHandshakeResponse().getLowLatency();
             if (e.getParam().hasHandshakeResponse() && e.getParam().getHandshakeResponse().getClientCompatible()) {
                 if (timeout != null) {
                     timeout.cancel();
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
index fae4b96..bebc67e 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
@@ -60,4 +60,10 @@
      */
     TSOFuture<Void> close();
 
+    /**
+     * checks is tso is low latency protocol
+     * @return
+     */
+    boolean isLowLatency();
+
 }
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
similarity index 86%
rename from tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
rename to tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
index 65416bc..14e1370 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
@@ -30,7 +30,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.inject.Inject;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
@@ -42,30 +41,28 @@
 import static com.lmax.disruptor.dsl.ProducerType.MULTI;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.omid.tso.RequestProcessorImpl.RequestEvent.EVENT_FACTORY;
+import static org.apache.omid.tso.AbstractRequestProcessor.RequestEvent.EVENT_FACTORY;
 
-class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
+abstract class AbstractRequestProcessor implements EventHandler<AbstractRequestProcessor.RequestEvent>, RequestProcessor, TimeoutHandler {
 
-    private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class);
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractRequestProcessor.class);
 
     // Disruptor-related attributes
     private final ExecutorService disruptorExec;
-    private final Disruptor<RequestEvent> disruptor;
-    private final RingBuffer<RequestEvent> requestRing;
+    protected final Disruptor<RequestEvent> disruptor;
+    protected RingBuffer<RequestEvent> requestRing;
 
     private final TimestampOracle timestampOracle;
     private final CommitHashMap hashmap;
     private final MetricsRegistry metrics;
-    private final PersistenceProcessor persistProc;
-
+    private final LowWatermarkWriter lowWatermarkWriter;
     private long lowWatermark = -1L;
 
-    @Inject
-    RequestProcessorImpl(MetricsRegistry metrics,
-                         TimestampOracle timestampOracle,
-                         PersistenceProcessor persistProc,
-                         Panicker panicker,
-                         TSOServerConfig config)
+
+    AbstractRequestProcessor(MetricsRegistry metrics,
+                             TimestampOracle timestampOracle,
+                             Panicker panicker,
+                             TSOServerConfig config, LowWatermarkWriter lowWatermarkWriter)
             throws IOException {
 
         // ------------------------------------------------------------------------------------------------------------
@@ -80,17 +77,18 @@
         this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, timeoutStrategy);
         disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
         disruptor.handleEventsWith(this);
-        this.requestRing = disruptor.start();
+
 
         // ------------------------------------------------------------------------------------------------------------
         // Attribute initialization
         // ------------------------------------------------------------------------------------------------------------
 
         this.metrics = metrics;
-        this.persistProc = persistProc;
         this.timestampOracle = timestampOracle;
         this.hashmap = new CommitHashMap(config.getConflictMapSize());
 
+        this.lowWatermarkWriter = lowWatermarkWriter;
+
         LOG.info("RequestProcessor initialized");
 
     }
@@ -102,7 +100,7 @@
     public void update(TSOState state) throws Exception {
         LOG.info("Initializing RequestProcessor state...");
         this.lowWatermark = state.getLowWatermark();
-        persistProc.persistLowWatermark(lowWatermark).get(); // Sync persist
+        lowWatermarkWriter.persistLowWatermark(lowWatermark).get(); // Sync persist
         LOG.info("RequestProcessor state initialized with LWMs {} and Epoch {}", lowWatermark, state.getEpoch());
     }
 
@@ -131,8 +129,7 @@
         // TODO (cont) thread the one that calls persistProc.triggerCurrentBatchFlush(); we'll incur in concurrency issues
         // TODO (cont) This is because, in the current implementation, only the request-0 thread calls the public methods
         // TODO (cont) in persistProc and it is guaranteed that access them serially.
-        persistProc.triggerCurrentBatchFlush();
-
+        onTimeout();
     }
 
     @Override
@@ -162,8 +159,7 @@
 
         long timestamp = timestampOracle.next();
         requestEvent.getMonCtx().timerStop("request.processor.timestamp.latency");
-        persistProc.addTimestampToBatch(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
-
+        forwardTimestamp(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
     }
 
     private void handleCommit(RequestEvent event) throws Exception {
@@ -208,25 +204,33 @@
                 if (newLowWatermark != lowWatermark) {
                     LOG.trace("Setting new low Watermark to {}", newLowWatermark);
                     lowWatermark = newLowWatermark;
-                    persistProc.persistLowWatermark(newLowWatermark); // Async persist
+                    lowWatermarkWriter.persistLowWatermark(newLowWatermark); // Async persist
                 }
             }
             event.getMonCtx().timerStop("request.processor.commit.latency");
-            persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
+            forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx());
 
         } else {
 
             event.getMonCtx().timerStop("request.processor.commit.latency");
             if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
-                persistProc.addCommitRetryToBatch(startTimestamp, c, event.getMonCtx());
+                forwardCommitRetry(startTimestamp, c, event.getMonCtx());
             } else {
-                persistProc.addAbortToBatch(startTimestamp, c, event.getMonCtx());
+                forwardAbort(startTimestamp, c, event.getMonCtx());
             }
 
         }
 
     }
 
+    public abstract void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+    public abstract void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+    public abstract void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+    public abstract void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+
+    public abstract void onTimeout() throws Exception;
+
+
     @Override
     public void close() throws IOException {
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java b/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
index 2584629..032f3a3 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
@@ -51,8 +51,15 @@
              bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(YieldingWaitStrategy.class);
              break;
         }
-        bind(RequestProcessor.class).to(RequestProcessorImpl.class).in(Singleton.class);
-        bind(PersistenceProcessor.class).to(PersistenceProcessorImpl.class).in(Singleton.class);
+
+        if (config.getLowLatency()) {
+            bind(RequestProcessor.class).to(RequestProcessorSkipCT.class).in(Singleton.class);
+            bind(PersistenceProcessor.class).to(PersitenceProcessorNullImpl.class).in(Singleton.class);
+        } else {
+            bind(PersistenceProcessor.class).to(PersistenceProcessorImpl.class).in(Singleton.class);
+            bind(RequestProcessor.class).to(RequestProcessorPersistCT.class).in(Singleton.class);
+        }
+
         bind(ReplyProcessor.class).to(ReplyProcessorImpl.class).in(Singleton.class);
         bind(RetryProcessor.class).to(RetryProcessorImpl.class).in(Singleton.class);
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriter.java b/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriter.java
new file mode 100644
index 0000000..ddd0623
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriter.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import java.util.concurrent.Future;
+
+public interface LowWatermarkWriter {
+    Future<Void> persistLowWatermark(final long lowWatermark);
+}
diff --git a/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriterImpl.java b/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriterImpl.java
new file mode 100644
index 0000000..8de1b20
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/LowWatermarkWriterImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.Timer;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.omid.metrics.MetricsUtils.name;
+
+public class LowWatermarkWriterImpl implements LowWatermarkWriter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(LowWatermarkWriterImpl.class);
+
+    private final Timer lwmWriteTimer;
+    private final CommitTable.Writer lowWatermarkWriter;
+    private final ExecutorService lowWatermarkWriterExecutor;
+    private MetricsRegistry metrics;
+
+    @Inject
+    LowWatermarkWriterImpl(TSOServerConfig config,
+                           CommitTable commitTable,
+                           MetricsRegistry metrics)
+            throws Exception {
+        this.metrics = metrics;
+        this.lowWatermarkWriter = commitTable.getWriter();
+        // Low Watermark writer
+        ThreadFactoryBuilder lwmThreadFactory = new ThreadFactoryBuilder().setNameFormat("lwm-writer-%d");
+        this.lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build());
+
+        // Metrics config
+        this.lwmWriteTimer = metrics.timer(name("tso", "lwmWriter", "latency"));
+        LOG.info("PersistentProcessor initialized");
+    }
+
+    @Override
+    public Future<Void> persistLowWatermark(final long lowWatermark) {
+
+        return lowWatermarkWriterExecutor.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws IOException {
+                try {
+                    lwmWriteTimer.start();
+                    lowWatermarkWriter.updateLowWatermark(lowWatermark);
+                    lowWatermarkWriter.flush();
+                } finally {
+                    lwmWriteTimer.stop();
+                }
+                return null;
+            }
+        });
+    }
+}
diff --git a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java
index 426df27..ea183a8 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContext.java
@@ -15,62 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.omid.tso;
 
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Throwables;
-import org.apache.omid.metrics.MetricsRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+public interface MonitoringContext {
 
-import javax.annotation.concurrent.NotThreadSafe;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
+    public void timerStart(String name);
 
-import static org.apache.omid.metrics.MetricsUtils.name;
+    public void timerStop(String name);
 
-@NotThreadSafe
-public class MonitoringContext {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MonitoringContext.class);
-
-    private volatile boolean flag;
-    private Map<String, Long> elapsedTimeMsMap = new ConcurrentHashMap<>();
-    private Map<String, Stopwatch> timers = new ConcurrentHashMap<>();
-    private MetricsRegistry metrics;
-
-    public MonitoringContext(MetricsRegistry metrics) {
-        this.metrics = metrics;
-    }
-
-    public void timerStart(String name) {
-        Stopwatch stopwatch = new Stopwatch();
-        stopwatch.start();
-        timers.put(name, stopwatch);
-    }
-
-    public void timerStop(String name) {
-        if (flag) {
-            LOG.warn("timerStop({}) called after publish. Measurement was ignored. {}", name, Throwables.getStackTraceAsString(new Exception()));
-            return;
-        }
-        Stopwatch activeStopwatch = timers.get(name);
-        if (activeStopwatch == null) {
-            throw new IllegalStateException(
-                    String.format("There is no %s timer in the %s monitoring context.", name, this));
-        }
-        activeStopwatch.stop();
-        elapsedTimeMsMap.put(name, activeStopwatch.elapsedTime(TimeUnit.NANOSECONDS));
-        timers.remove(name);
-    }
-
-    public void publish() {
-        flag = true;
-        for (Map.Entry<String, Long> entry : elapsedTimeMsMap.entrySet()) {
-            metrics.timer(name("tso", entry.getKey())).update(entry.getValue());
-        }
-    }
+    public void publish();
 
 }
diff --git a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextFactory.java b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextFactory.java
new file mode 100644
index 0000000..4280abc
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.metrics.MetricsRegistry;
+
+public class MonitoringContextFactory {
+    private MonitoringContextFactory(){};
+
+    static public MonitoringContext getInstance(TSOServerConfig config, MetricsRegistry metrics) {
+        if (config.getMonitorContext())
+            return new MonitoringContextImpl(metrics);
+        else
+            return new MonitoringContextNullImpl();
+    }
+}
diff --git a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextImpl.java b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextImpl.java
new file mode 100644
index 0000000..5792a77
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextImpl.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Throwables;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.omid.metrics.MetricsUtils.name;
+import java.util.concurrent.TimeUnit;
+
+@NotThreadSafe
+public class MonitoringContextImpl implements MonitoringContext{
+
+    private static final Logger LOG = LoggerFactory.getLogger(MonitoringContextImpl.class);
+
+    private volatile boolean flag;
+    private Map<String, Long> elapsedTimeMsMap = new ConcurrentHashMap<>();
+    private Map<String, Stopwatch> timers = new ConcurrentHashMap<>();
+    private MetricsRegistry metrics;
+
+    public MonitoringContextImpl(MetricsRegistry metrics) {
+        this.metrics = metrics;
+    }
+
+    public void timerStart(String name) {
+        Stopwatch stopwatch = new Stopwatch();
+        stopwatch.start();
+        timers.put(name, stopwatch);
+    }
+
+    public void timerStop(String name) {
+        if (flag) {
+            LOG.warn("timerStop({}) called after publish. Measurement was ignored. {}", name, Throwables.getStackTraceAsString(new Exception()));
+            return;
+        }
+        Stopwatch activeStopwatch = timers.get(name);
+        if (activeStopwatch == null) {
+            throw new IllegalStateException(
+                    String.format("There is no %s timer in the %s monitoring context.", name, this));
+        }
+        activeStopwatch.stop();
+        elapsedTimeMsMap.put(name, activeStopwatch.elapsedTime(TimeUnit.NANOSECONDS));
+        timers.remove(name);
+    }
+
+    public void publish() {
+        flag = true;
+        for (Map.Entry<String, Long> entry : elapsedTimeMsMap.entrySet()) {
+            metrics.timer(name("tso", entry.getKey())).update(entry.getValue());
+        }
+    }
+
+}
diff --git a/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextNullImpl.java b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextNullImpl.java
new file mode 100644
index 0000000..f88123f
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/MonitoringContextNullImpl.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.omid.tso;
+
+public class MonitoringContextNullImpl implements MonitoringContext {
+    @Override
+    public void timerStart(String name) {
+
+    }
+
+    @Override
+    public void timerStop(String name) {
+
+    }
+
+    @Override
+    public void publish() {
+
+    }
+}
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index b96945d..40f203f 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -20,7 +20,6 @@
 import org.jboss.netty.channel.Channel;
 
 import java.io.Closeable;
-import java.util.concurrent.Future;
 
 interface PersistenceProcessor extends Closeable {
 
@@ -35,5 +34,5 @@
 
     void triggerCurrentBatchFlush() throws Exception;
 
-    Future<Void> persistLowWatermark(long lowWatermark);
+
 }
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 95d77ba..e4d2eba 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -29,7 +29,6 @@
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
-import org.apache.omid.metrics.Timer;
 import org.jboss.netty.channel.Channel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,10 +36,8 @@
 import javax.inject.Inject;
 
 import java.io.IOException;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
 import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -63,12 +60,7 @@
 
     // TODO Next two need to be either int or AtomicLong
     volatile private long batchSequence;
-
-    private CommitTable.Writer lowWatermarkWriter;
-    private ExecutorService lowWatermarkWriterExecutor;
-
     private MetricsRegistry metrics;
-    private final Timer lwmWriteTimer;
 
     @Inject
     PersistenceProcessorImpl(TSOServerConfig config,
@@ -97,19 +89,11 @@
         // ------------------------------------------------------------------------------------------------------------
 
         this.metrics = metrics;
-        this.lowWatermarkWriter = commitTable.getWriter();
         this.batchSequence = 0L;
         this.batchPool = batchPool;
         this.currentBatch = batchPool.borrowObject();
-        // Low Watermark writer
-        ThreadFactoryBuilder lwmThreadFactory = new ThreadFactoryBuilder().setNameFormat("lwm-writer-%d");
-        this.lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build());
-
-        // Metrics config
-        this.lwmWriteTimer = metrics.timer(name("tso", "lwmWriter", "latency"));
 
         LOG.info("PersistentProcessor initialized");
-
     }
 
     @Override
@@ -167,25 +151,6 @@
     }
 
     @Override
-    public Future<Void> persistLowWatermark(final long lowWatermark) {
-
-        return lowWatermarkWriterExecutor.submit(new Callable<Void>() {
-            @Override
-            public Void call() throws IOException {
-                try {
-                    lwmWriteTimer.start();
-                    lowWatermarkWriter.updateLowWatermark(lowWatermark);
-                    lowWatermarkWriter.flush();
-                } finally {
-                    lwmWriteTimer.stop();
-                }
-                return null;
-            }
-        });
-
-    }
-
-    @Override
     public void close() throws IOException {
 
         LOG.info("Terminating Persistence Processor...");
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
new file mode 100644
index 0000000..55b5068
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.jboss.netty.channel.Channel;
+
+import java.io.IOException;
+
+public class PersitenceProcessorNullImpl implements PersistenceProcessor {
+
+    @Override
+    public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+
+    }
+
+    @Override
+    public void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+
+    }
+
+    @Override
+    public void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+
+    }
+
+    @Override
+    public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+
+    }
+
+    @Override
+    public void triggerCurrentBatchFlush() throws Exception {
+
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+}
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
index f196c42..60e3e06 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
@@ -44,7 +44,7 @@
      * @param channel
      *            the channel used to send the response back to the client
      */
-    void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel channel);
+    void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel channel, MonitoringContext monCtx);
 
     /**
      * Allows to send an abort response back to the client.
@@ -54,7 +54,7 @@
      * @param channel
      *            the channel used to send the response back to the client
      */
-    void sendAbortResponse(long startTimestamp, Channel channel);
+    void sendAbortResponse(long startTimestamp, Channel channel, MonitoringContext monCtx);
 
     /**
      * Allow to send a timestamp response back to the client.
@@ -65,7 +65,7 @@
      *            the channel used to send the response back to the client
      */
 
-    void sendTimestampResponse(long startTimestamp, Channel channel);
+    void sendTimestampResponse(long startTimestamp, Channel channel, MonitoringContext monCtx);
 
 }
 
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index 8e50323..6681fab 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -114,19 +114,13 @@
 
             switch (event.getType()) {
                 case COMMIT:
-                    sendCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
-                    event.getMonCtx().timerStop("reply.processor.commit.latency");
-                    commitMeter.mark();
+                    sendCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(), event.getMonCtx());
                     break;
                 case ABORT:
-                    sendAbortResponse(event.getStartTimestamp(), event.getChannel());
-                    event.getMonCtx().timerStop("reply.processor.abort.latency");
-                    abortMeter.mark();
+                    sendAbortResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
                     break;
                 case TIMESTAMP:
-                    sendTimestampResponse(event.getStartTimestamp(), event.getChannel());
-                    event.getMonCtx().timerStop("reply.processor.timestamp.latency");
-                    timestampMeter.mark();
+                    sendTimestampResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
                     break;
                 case COMMIT_RETRY:
                     throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event);
@@ -182,7 +176,7 @@
     }
 
     @Override
-    public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c) {
+    public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
 
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
@@ -191,11 +185,12 @@
                 .setCommitTimestamp(commitTimestamp);
         builder.setCommitResponse(commitBuilder.build());
         c.write(builder.build());
-
+        commitMeter.mark();
+        monCtx.timerStop("reply.processor.commit.latency");
     }
 
     @Override
-    public void sendAbortResponse(long startTimestamp, Channel c) {
+    public void sendAbortResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
 
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
@@ -203,18 +198,20 @@
         commitBuilder.setStartTimestamp(startTimestamp);
         builder.setCommitResponse(commitBuilder.build());
         c.write(builder.build());
-
+        abortMeter.mark();
+        monCtx.timerStop("reply.processor.abort.latency");
     }
 
     @Override
-    public void sendTimestampResponse(long startTimestamp, Channel c) {
+    public void sendTimestampResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
 
         TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
         TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
         respBuilder.setStartTimestamp(startTimestamp);
         builder.setTimestampResponse(respBuilder.build());
         c.write(builder.build());
-
+        timestampMeter.mark();
+        monCtx.timerStop("reply.processor.timestamp.latency");
     }
 
     @Override
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
new file mode 100644
index 0000000..62d8c5d
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.inject.Inject;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.jboss.netty.channel.Channel;
+
+import java.io.IOException;
+
+public class RequestProcessorPersistCT extends AbstractRequestProcessor {
+
+    PersistenceProcessor persistenceProcessor;
+
+    @Inject
+    RequestProcessorPersistCT(MetricsRegistry metrics,
+                              TimestampOracle timestampOracle,
+                              PersistenceProcessor persistenceProcessor,
+                              Panicker panicker,
+                              TSOServerConfig config,
+                              LowWatermarkWriter lowWatermarkWriter) throws IOException {
+
+        super(metrics, timestampOracle, panicker, config, lowWatermarkWriter);
+        this.persistenceProcessor = persistenceProcessor;
+        requestRing = disruptor.start();
+    }
+
+
+
+    @Override
+    public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        persistenceProcessor.addCommitToBatch(startTimestamp,commitTimestamp,c,monCtx);
+    }
+
+    @Override
+    public void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        persistenceProcessor.addCommitRetryToBatch(startTimestamp,c,monCtx);
+    }
+
+    @Override
+    public void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        persistenceProcessor.addAbortToBatch(startTimestamp,c,monCtx);
+    }
+
+    @Override
+    public void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        persistenceProcessor.addTimestampToBatch(startTimestamp,c,monCtx);
+    }
+
+    @Override
+    public void onTimeout() throws Exception {
+        persistenceProcessor.triggerCurrentBatchFlush();
+    }
+}
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
new file mode 100644
index 0000000..9ce4908
--- /dev/null
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.inject.Inject;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.jboss.netty.channel.Channel;
+
+import java.io.IOException;
+
+public class RequestProcessorSkipCT extends AbstractRequestProcessor {
+
+
+    private final ReplyProcessor replyProcessor;
+
+    private final LeaseManagement leaseManager;
+    private final Panicker panicker;
+    private final String tsoHostAndPort;
+
+    @Inject
+    RequestProcessorSkipCT(MetricsRegistry metrics,
+                           TimestampOracle timestampOracle,
+                           ReplyProcessor replyProcessor,
+                           Panicker panicker,
+                           LeaseManagement leaseManager,
+                           TSOServerConfig config,
+                           LowWatermarkWriter lowWatermarkWriter,
+                           String tsoHostAndPort) throws IOException {
+        super(metrics, timestampOracle, panicker, config, lowWatermarkWriter);
+        this.replyProcessor = replyProcessor;
+        this.tsoHostAndPort = tsoHostAndPort;
+        requestRing = disruptor.start();
+        this.leaseManager = leaseManager;
+        this.panicker = panicker;
+    }
+
+    private void commitSuicideIfNotMaster() {
+        if (!leaseManager.stillInLeasePeriod()) {
+            panicker.panic("Replica " + tsoHostAndPort + " lost mastership whilst flushing data. Committing suicide");
+        }
+    }
+
+    @Override
+    public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
+        commitSuicideIfNotMaster();
+        replyProcessor.sendCommitResponse(startTimestamp, commitTimestamp, c, monCtx);
+    }
+
+    @Override
+    public void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) {
+        replyProcessor.sendAbortResponse(startTimestamp, c, monCtx);
+    }
+
+    @Override
+    public void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) {
+        replyProcessor.sendAbortResponse(startTimestamp, c, monCtx);
+    }
+
+    @Override
+    public void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) {
+        replyProcessor.sendTimestampResponse(startTimestamp, c, monCtx);
+    }
+
+    @Override
+    public void onTimeout() {
+        
+    }
+}
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index 6d923be..610e760 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -133,16 +133,16 @@
             if (commitTimestamp.isPresent()) {
                 if (commitTimestamp.get().isValid()) {
                     LOG.trace("Tx {}: Valid commit TS found in Commit Table. Sending Commit to client.", startTimestamp);
-                    replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel());
+                    replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel(), event.getMonCtx());
                     txAlreadyCommittedMeter.mark();
                 } else {
                     LOG.trace("Tx {}: Invalid tx marker found. Sending Abort to client.", startTimestamp);
-                    replyProc.sendAbortResponse(startTimestamp, event.getChannel());
+                    replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
                     invalidTxMeter.mark();
                 }
             } else {
                 LOG.trace("Tx {}: No Commit TS found in Commit Table. Sending Abort to client.", startTimestamp);
-                replyProc.sendAbortResponse(startTimestamp, event.getChannel());
+                replyProc.sendAbortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
                 noCTFoundMeter.mark();
             }
         } catch (InterruptedException e) {
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
index fe99880..f6fb273 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
@@ -165,14 +165,14 @@
             }
 
             if (request.hasTimestampRequest()) {
-                requestProcessor.timestampRequest(ctx.getChannel(), new MonitoringContext(metrics));
+                requestProcessor.timestampRequest(ctx.getChannel(), MonitoringContextFactory.getInstance(config,metrics));
             } else if (request.hasCommitRequest()) {
                 TSOProto.CommitRequest cr = request.getCommitRequest();
                 requestProcessor.commitRequest(cr.getStartTimestamp(),
                                                cr.getCellIdList(),
                                                cr.getIsRetry(),
                                                ctx.getChannel(),
-                                               new MonitoringContext(metrics));
+                                               MonitoringContextFactory.getInstance(config,metrics));
             } else {
                 LOG.error("Invalid request {}. Closing channel {}", request, ctx.getChannel());
                 ctx.getChannel().close();
@@ -240,6 +240,7 @@
         } else {
             response.setClientCompatible(false);
         }
+        response.setLowLatency(config.getLowLatency());
         ctx.getChannel().write(TSOProto.Response.newBuilder().setHandshakeResponse(response.build()).build());
 
     }
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
index a7aec27..ca30d25 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
@@ -44,6 +44,7 @@
         bind(TSOChannelHandler.class).in(Singleton.class);
         bind(TSOStateManager.class).to(TSOStateManagerImpl.class).in(Singleton.class);
         bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class);
+        bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
         bind(Panicker.class).to(SystemExitPanicker.class).in(Singleton.class);
 
         install(new BatchPoolModule(config));
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
index 19d9f01..98ee6cd 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServer.java
@@ -52,7 +52,8 @@
     private RetryProcessor retryProcessor;
     @Inject
     private ReplyProcessor replyProcessor;
-
+    @Inject
+    private LowWatermarkWriter lowWatermarkWriter;
     // ----------------------------------------------------------------------------------------------------------------
     // High availability related variables
     // ----------------------------------------------------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
index 3292211..71d5280 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
@@ -82,6 +82,26 @@
 
     private String networkIfaceName = NetworkUtils.getDefaultNetworkInterface();
 
+    public Boolean getLowLatency() {
+        return lowLatency;
+    }
+
+    public void setLowLatency(Boolean lowLatency) {
+        this.lowLatency = lowLatency;
+    }
+
+    private Boolean lowLatency;
+
+    public boolean getMonitorContext() {
+        return monitorContext;
+    }
+
+    public void setMonitorContext(boolean monitorContext) {
+        this.monitorContext = monitorContext;
+    }
+
+    public boolean monitorContext;
+
     public int getPort() {
         return port;
     }
diff --git a/tso-server/src/main/resources/default-omid-server-configuration.yml b/tso-server/src/main/resources/default-omid-server-configuration.yml
index da0c531..b7f39bb 100644
--- a/tso-server/src/main/resources/default-omid-server-configuration.yml
+++ b/tso-server/src/main/resources/default-omid-server-configuration.yml
@@ -41,6 +41,9 @@
 # When this timeout expires, the contents of the batch are flushed to the datastore
 batchPersistTimeoutInMs: 10
 
+#low latency mode - clients are expected to update commit table
+lowLatency: false
+
 # Default module configuration (No TSO High Availability & in-memory storage for timestamp and commit tables)
 timestampStoreModule: !!org.apache.omid.tso.InMemoryTimestampStorageModule [ ]
 commitTableStoreModule: !!org.apache.omid.tso.InMemoryCommitTableStorageModule [ ]
@@ -49,6 +52,8 @@
 # Default stats/metrics configuration
 metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ]
 
+monitorContext: false
+
 # ---------------------------------------------------------------------------------------------------------------------
 # Timestamp storage configuration options
 # ---------------------------------------------------------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
index 17fd2e0..2e5aa7d 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
@@ -53,6 +53,7 @@
         bind(TimestampStorage.class).to(InMemoryTimestampStorage.class).in(Singleton.class);
         bind(TimestampOracle.class).to(PausableTimestampOracle.class).in(Singleton.class);
         bind(Panicker.class).to(MockPanicker.class).in(Singleton.class);
+        bind(LowWatermarkWriter.class).to(LowWatermarkWriterImpl.class).in(Singleton.class);
 
         install(new BatchPoolModule(config));
         install(config.getLeaseModule());
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index 573cd89..c286f85 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -43,7 +43,7 @@
     @Mock
     private Channel channel;
     @Mock
-    private MonitoringContext monCtx;
+    private MonitoringContextImpl monCtx;
 
     @BeforeMethod
     void setup() {
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index ae89f01..5e1613c 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -137,9 +137,11 @@
                                                                  handlers,
                                                                  metrics);
 
-        proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
+        proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics));
 
-        new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
+        LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
+
+        new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class), lowWatermarkWriter);
 
         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
 
@@ -189,9 +191,11 @@
                                                                  panicker,
                                                                  handlers,
                                                                  metrics);
-        proc.addCommitToBatch(1, 2, null, new MonitoringContext(metrics));
+        proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics));
 
-        new RequestProcessorImpl(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class));
+        LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
+
+        new RequestProcessorPersistCT(metrics, mock(TimestampOracle.class), proc, panicker, mock(TSOServerConfig.class), lowWatermarkWriter);
 
         verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
 
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index 4779608..94e8f88 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -68,6 +68,7 @@
 
     private MetricsRegistry metrics;
     private CommitTable commitTable;
+    private LowWatermarkWriter lowWatermarkWriter;
 
     @BeforeMethod(alwaysRun = true, timeOut = 30_000)
     public void initMocksAndComponents() throws Exception {
@@ -101,29 +102,9 @@
     public void testLowWatermarkIsPersisted() throws Exception {
 
         TSOServerConfig tsoConfig = new TSOServerConfig();
+        lowWatermarkWriter = new LowWatermarkWriterImpl(tsoConfig, commitTable, metrics);
 
-        PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
-        for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
-            handlers[i] = new PersistenceProcessorHandler(metrics,
-                                                          "localhost:1234",
-                                                          mock(LeaseManager.class),
-                                                          commitTable,
-                                                          mock(ReplyProcessor.class),
-                                                          retryProcessor,
-                                                          panicker);
-        }
-
-        // Component under test
-        PersistenceProcessorImpl persistenceProcessor =
-                new PersistenceProcessorImpl(tsoConfig,
-                                             new BlockingWaitStrategy(),
-                                             commitTable,
-                                             mock(ObjectPool.class),
-                                             panicker,
-                                             handlers,
-                                             metrics);
-
-        persistenceProcessor.persistLowWatermark(ANY_LWM).get();
+        lowWatermarkWriter.persistLowWatermark(ANY_LWM).get();
 
         ArgumentCaptor<Long> lwmCapture = ArgumentCaptor.forClass(Long.class);
         CommitTable.Writer lwmWriter = commitTable.getWriter();
@@ -166,10 +147,10 @@
 
         verify(batchPool, times(1)).borrowObject(); // Called during initialization
 
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Flush: batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
 
         verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).borrowObject(); // 3: 1 in init + 2 when flushing
 
@@ -211,11 +192,11 @@
         verify(batchPool, times(1)).borrowObject(); // Called during initialization
 
         // Fill 1st handler Batches completely
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
         verify(batchPool, times(2)).borrowObject();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 2nd batch full
         verify(batchPool, times(3)).borrowObject();
 
         // Test empty flush does not trigger response in getting a new currentBatch
@@ -223,14 +204,14 @@
         verify(batchPool, times(3)).borrowObject();
 
         // Fill 2nd handler Batches completely
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 1st batch full
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 2nd batch full
         verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).borrowObject();
 
         // Start filling a new currentBatch and flush it immediately
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // Batch not full
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Batch not full
         verify(batchPool, times(5)).borrowObject();
         proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch
         verify(batchPool, times(6)).borrowObject();
@@ -281,7 +262,7 @@
 
         // The non-ha lease manager always return true for
         // stillInLeasePeriod(), so verify the currentBatch sends replies as master
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -336,7 +317,7 @@
 
         // Test: Configure the lease manager to return true always
         doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -357,7 +338,7 @@
 
         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -378,7 +359,7 @@
 
         // Test: Configure the lease manager to return false for stillInLeasePeriod
         doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -402,7 +383,7 @@
         // Configure mock writer to flush unsuccessfully
         doThrow(new IOException("Unable to write")).when(mockWriter).flush();
         doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
-        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
+        proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
         proc.triggerCurrentBatchFlush();
         verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
         verify(batchPool, times(2)).borrowObject();
@@ -452,7 +433,7 @@
         PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
                                                                      panicker, handlers, metrics);
 
-        MonitoringContext monCtx = new MonitoringContext(metrics);
+        MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
 
         // Configure lease manager to work normally
         doReturn(true).when(leaseManager).stillInLeasePeriod();
@@ -492,7 +473,7 @@
 
         // Configure writer to explode with a runtime exception
         doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
-        MonitoringContext monCtx = new MonitoringContext(metrics);
+        MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
 
         // Check the panic is extended!
         proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
index 43f354f..4f190f9 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -167,7 +167,7 @@
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertTrue(batch.isEmpty());
 
@@ -178,14 +178,14 @@
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
         persistenceHandler.onEvent(batchEvent);
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -197,14 +197,14 @@
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
         persistenceHandler.onEvent(batchEvent);
 
         verify(persistenceHandler, times(1)).flush(eq(1));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -217,14 +217,14 @@
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
         persistenceHandler.onEvent(batchEvent);
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -236,7 +236,7 @@
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -245,7 +245,7 @@
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 0);
 
@@ -256,8 +256,8 @@
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
-        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -269,7 +269,7 @@
 
         verify(persistenceHandler, times(1)).flush(eq(1));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -285,8 +285,8 @@
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -298,7 +298,7 @@
 
         verify(persistenceHandler, times(1)).flush(eq(1));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 1);
         assertEquals(batch.get(0).getStartTimestamp(), SECOND_ST);
@@ -311,8 +311,8 @@
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -324,8 +324,8 @@
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContextImpl.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 0);
 
@@ -336,8 +336,8 @@
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addAbort(SECOND_ST, null, mock(MonitoringContext.class));
+        batch.addAbort(FIRST_ST, null, mock(MonitoringContextImpl.class));
+        batch.addAbort(SECOND_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -349,7 +349,7 @@
 
         verify(persistenceHandler, times(1)).flush(eq(0));
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 2);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -364,12 +364,12 @@
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
 
-        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
-        batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContext.class));
-        batch.addAbort(FOURTH_ST, null, mock(MonitoringContext.class));
-        batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContext.class));
-        batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContext.class));
+        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContextImpl.class));
+        batch.addAbort(FOURTH_ST, null, mock(MonitoringContextImpl.class));
+        batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContextImpl.class));
+        batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -381,7 +381,7 @@
 
         verify(persistenceHandler, times(1)).flush(2); // 2 commits to flush
         verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
-        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContextImpl.class));
         verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
         assertEquals(batch.getNumEvents(), 4);
         assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
@@ -408,7 +408,7 @@
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -450,7 +450,7 @@
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -485,7 +485,7 @@
 
         // Prepare test batch
         batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
         batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
index 3ead24b..54d1e70 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
@@ -70,7 +70,7 @@
     private Panicker panicker;
 
     @Mock
-    private MonitoringContext monCtx;
+    private MonitoringContextImpl monCtx;
 
     private MetricsRegistry metrics;
 
@@ -247,11 +247,11 @@
         inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(thirdBatchEvent));
 
         InOrder inOrderReplies = inOrder(replyProcessor, replyProcessor, replyProcessor, replyProcessor, replyProcessor);
-        inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class));
-        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class));
-        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class));
-        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class));
-        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class));
+        inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class), eq(monCtx));
+        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class), eq(monCtx));
 
     }
 
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 405102a..01844ae 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -56,6 +56,8 @@
     // Request processor under test
     private RequestProcessor requestProc;
 
+    private LowWatermarkWriter lowWatermarkWriter;
+
     @BeforeMethod
     public void beforeMethod() throws Exception {
 
@@ -66,16 +68,16 @@
                 new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker());
 
         stateManager = new TSOStateManagerImpl(timestampOracle);
-
+        lowWatermarkWriter = mock(LowWatermarkWriter.class);
         persist = mock(PersistenceProcessor.class);
         SettableFuture<Void> f = SettableFuture.create();
         f.set(null);
-        doReturn(f).when(persist).persistLowWatermark(any(Long.class));
+        doReturn(f).when(lowWatermarkWriter).persistLowWatermark(any(Long.class));
 
         TSOServerConfig config = new TSOServerConfig();
         config.setConflictMapSize(CONFLICT_MAP_SIZE);
 
-        requestProc = new RequestProcessorImpl(metrics, timestampOracle, persist, new MockPanicker(), config);
+        requestProc = new RequestProcessorPersistCT(metrics, timestampOracle, persist, new MockPanicker(), config, lowWatermarkWriter);
 
         // Initialize the state for the experiment
         stateManager.register(requestProc);
@@ -86,16 +88,16 @@
     @Test(timeOut = 30_000)
     public void testTimestamp() throws Exception {
 
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(1)).addTimestampToBatch(
-                firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+                firstTScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
 
         long firstTS = firstTScapture.getValue();
         // verify that timestamps increase monotonically
         for (int i = 0; i < 100; i++) {
-            requestProc.timestampRequest(null, new MonitoringContext(metrics));
-            verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS++), any(Channel.class), any(MonitoringContext.class));
+            requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
+            verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS++), any(Channel.class), any(MonitoringContextImpl.class));
         }
 
     }
@@ -103,39 +105,39 @@
     @Test(timeOut = 30_000)
     public void testCommit() throws Exception {
 
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> TScapture = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(1)).addTimestampToBatch(
-                TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+                TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long firstTS = TScapture.getValue();
 
         List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
-        requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), any(Channel.class), any(MonitoringContext.class));
+        requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContextImpl(metrics));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), any(Channel.class), any(MonitoringContextImpl.class));
 
-        requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContext(metrics));
+        requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
 
-        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS");
 
         // test conflict
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         TScapture = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(2)).addTimestampToBatch(
-                TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+                TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long secondTS = TScapture.getValue();
 
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         TScapture = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(3)).addTimestampToBatch(
-                TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+                TScapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long thirdTS = TScapture.getValue();
 
-        requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
-        requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContext.class));
+        requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContextImpl(metrics));
+        verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
+        requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContextImpl(metrics));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContextImpl.class));
 
     }
 
@@ -145,11 +147,11 @@
         List<Long> writeSet = Collections.emptyList();
 
         // Start a transaction...
-        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        requestProc.timestampRequest(null, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> capturedTS = ArgumentCaptor.forClass(Long.class);
         verify(persist, timeout(100).times(1)).addTimestampToBatch(capturedTS.capture(),
                                                                    any(Channel.class),
-                                                                   any(MonitoringContext.class));
+                                                                   any(MonitoringContextImpl.class));
         long startTS = capturedTS.getValue();
 
         // ... simulate the reset of the RequestProcessor state (e.g. due to
@@ -157,8 +159,8 @@
         stateManager.initialize();
 
         // ...check that the transaction is aborted when trying to commit
-        requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContext.class));
+        requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContextImpl(metrics));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContextImpl.class));
 
     }
 
@@ -173,17 +175,17 @@
         for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
             long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT
             List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
-            requestProc.commitRequest(ANY_START_TS, writeSet, false, null, new MonitoringContext(metrics));
+            requestProc.commitRequest(ANY_START_TS, writeSet, false, null, new MonitoringContextImpl(metrics));
         }
 
         Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
 
         // Check that first time its called is on init
-        verify(persist, timeout(100).times(1)).persistLowWatermark(eq(0L));
+        verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(0L));
         // Then, check it is called when cache is full and the first element is evicted (should be a 1)
-        verify(persist, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
+        verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
         // Finally it should never be called with the next element
-        verify(persist, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
+        verify(lowWatermarkWriter, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
 
     }
 
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index 54302d0..9206187 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -25,7 +25,6 @@
 import org.apache.omid.committable.CommitTable.CommitTimestamp;
 import org.apache.omid.committable.InMemoryCommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
-import org.apache.omid.metrics.NullMetricsProvider;
 import org.jboss.netty.channel.Channel;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
@@ -37,6 +36,7 @@
 import org.testng.annotations.Test;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
@@ -57,6 +57,8 @@
     private Panicker panicker;
     @Mock
     private MetricsRegistry metrics;
+    @Mock
+    private MonitoringContextImpl monCtx;
 
     private CommitTable commitTable;
 
@@ -75,10 +77,10 @@
         RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
 
         // Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
-        retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
+        retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, monCtx);
         ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
 
-        verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class));
+        verify(replyProc, timeout(100).times(1)).sendAbortResponse(firstTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long startTS = firstTSCapture.getValue();
         assertEquals(startTS, NON_EXISTING_ST_TX, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
     }
@@ -92,13 +94,13 @@
 
         // Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
         commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
-        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
+        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> firstTSCapture = ArgumentCaptor.forClass(Long.class);
         ArgumentCaptor<Long> secondTSCapture = ArgumentCaptor.forClass(Long.class);
 
         verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(),
                                                                     secondTSCapture.capture(),
-                                                                    any(Channel.class));
+                                                                    any(Channel.class), any(MonitoringContextImpl.class));
 
         long startTS = firstTSCapture.getValue();
         long commitTS = secondTSCapture.getValue();
@@ -125,9 +127,9 @@
         RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
 
         // Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated
-        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
+        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContextImpl(metrics));
         ArgumentCaptor<Long> startTSCapture = ArgumentCaptor.forClass(Long.class);
-        verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class));
+        verify(replyProc, timeout(100).times(1)).sendAbortResponse(startTSCapture.capture(), any(Channel.class), any(MonitoringContextImpl.class));
         long startTS = startTSCapture.getValue();
         Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
 
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
index 968f4a9..bbc8132 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
@@ -258,9 +258,9 @@
         tsBuilder.setTimestampRequest(tsRequestBuilder.build());
         // Write into the channel
         channel.write(tsBuilder.build()).await();
-        verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+        verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
         verify(requestProcessor, timeout(100).never())
-                .commitRequest(anyLong(), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+                .commitRequest(anyLong(), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContextImpl.class));
     }
 
     private void testWritingCommitRequest(Channel channel) throws InterruptedException {
@@ -275,9 +275,9 @@
         assertTrue(r.hasCommitRequest());
         // Write into the channel
         channel.write(commitBuilder.build()).await();
-        verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+        verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContextImpl.class));
         verify(requestProcessor, timeout(100).times(1))
-                .commitRequest(eq(666L), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContext.class));
+                .commitRequest(eq(666L), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContextImpl.class));
     }
 
     // ----------------------------------------------------------------------------------------------------------------