Add metrics around write-ahead-log appends
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java b/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java
index aa67628..5e0b4a9 100644
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java
@@ -297,6 +297,7 @@
       public void doRun() {
         txMetricsCollector.gauge("committing.size", committingChangeSets.size());
         txMetricsCollector.gauge("committed.size", committedChangeSets.size());
+        txMetricsCollector.gauge("inprogress.size", inProgress.size());
         txMetricsCollector.gauge("invalid.size", invalidArray.length);
       }
 
@@ -305,6 +306,7 @@
         // perform a final metrics emit
         txMetricsCollector.gauge("committing.size", committingChangeSets.size());
         txMetricsCollector.gauge("committed.size", committedChangeSets.size());
+        txMetricsCollector.gauge("inprogress.size", inProgress.size());
         txMetricsCollector.gauge("invalid.size", invalidArray.length);
       }
 
@@ -1127,7 +1129,8 @@
     try {
       Stopwatch timer = new Stopwatch().start();
       currentLog.append(edit);
-      txMetricsCollector.histogram("append.edit", (int) timer.elapsedMillis());
+      txMetricsCollector.rate("wal.append.count");
+      txMetricsCollector.histogram("wal.append.latency", (int) timer.elapsedMillis());
     } catch (IOException ioe) {
       abortService("Error appending to transaction log", ioe);
     }
@@ -1137,7 +1140,8 @@
     try {
       Stopwatch timer = new Stopwatch().start();
       currentLog.append(edits);
-      txMetricsCollector.histogram("append.edit", (int) timer.elapsedMillis());
+      txMetricsCollector.rate("wal.append.count", edits.size());
+      txMetricsCollector.histogram("wal.append.latency", (int) timer.elapsedMillis());
     } catch (IOException ioe) {
       abortService("Error appending to transaction log", ioe);
     }
diff --git a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java b/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java
index 1748178..86d29ce 100644
--- a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java
+++ b/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java
@@ -17,6 +17,7 @@
 package co.cask.tephra.coprocessor;
 
 import co.cask.tephra.TxConstants;
+import co.cask.tephra.metrics.TxMetricsCollector;
 import co.cask.tephra.persist.HDFSTransactionStateStorage;
 import co.cask.tephra.persist.TransactionSnapshot;
 import co.cask.tephra.persist.TransactionStateStorage;
@@ -85,7 +86,8 @@
     try {
       Configuration conf = getSnapshotConfiguration();
       if (conf != null) {
-        this.storage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf));
+        this.storage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf),
+            new TxMetricsCollector());
         this.storage.startAndWait();
         this.snapshotRefreshFrequency = conf.getLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL,
                                                      TxConstants.Manager.DEFAULT_TX_SNAPSHOT_INTERVAL) * 1000;
diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java
index 4314c97..b9c8a4b 100644
--- a/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java
+++ b/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java
@@ -93,6 +93,11 @@
   }
 
   @Override
+  public void rate(String metricName, int count) {
+    metrics.meter(metricName).mark(count);
+  }
+
+  @Override
   protected void startUp() throws Exception {
     jmxReporter.start();
     reporter.start(reportPeriod, TimeUnit.SECONDS);
diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java
index 1bf2032..ae5c98c 100644
--- a/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java
+++ b/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java
@@ -31,6 +31,8 @@
 
   void rate(String metricName);
 
+  void rate(String metricName, int count);
+
   void histogram(String metricName, int value);
 
   void configure(Configuration conf);
diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java
index b6bec91..b75f38b 100644
--- a/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java
+++ b/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java
@@ -39,6 +39,11 @@
   }
 
   @Override
+  public void rate(String metricName, int count) {
+    // no-op
+  }
+
+  @Override
   public void histogram(String metricName, int value) {
     // no-op
   }
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java
index e5d07df..eb719c2 100644
--- a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java
@@ -16,6 +16,7 @@
 
 package co.cask.tephra.persist;
 
+import co.cask.tephra.metrics.MetricsCollector;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
@@ -40,6 +41,7 @@
   private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class);
 
   private final AtomicLong logSequence = new AtomicLong();
+  private final MetricsCollector metricsCollector;
   protected long timestamp;
   private volatile boolean initialized;
   private volatile boolean closed;
@@ -47,8 +49,9 @@
   private List<Entry> pendingWrites = Lists.newLinkedList();
   private TransactionLogWriter writer;
 
-  public AbstractTransactionLog(long timestamp) {
+  public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
     this.timestamp = timestamp;
+    this.metricsCollector = metricsCollector;
   }
 
   /**
@@ -151,6 +154,7 @@
     // writes out pending entries to the HLog
     TransactionLogWriter tmpWriter = null;
     long latestSeq = 0;
+    int entryCount = 0;
     synchronized (this) {
       if (closed) {
         return;
@@ -162,6 +166,7 @@
       // write out all accumulated Entries to hdfs.
       for (Entry e : currentPending) {
         tmpWriter.append(e);
+        entryCount++;
         latestSeq = Math.max(latestSeq, e.getKey().get());
       }
     }
@@ -169,6 +174,7 @@
     // someone else might have already synced our edits, avoid double syncing
     if (lastSynced < latestSeq) {
       tmpWriter.sync();
+      metricsCollector.histogram("wal.sync.size", entryCount);
       syncedUpTo.compareAndSet(lastSynced, latestSeq);
     }
   }
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
index a94ab98..a3bdbfb 100644
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
@@ -16,6 +16,7 @@
 
 package co.cask.tephra.persist;
 
+import co.cask.tephra.metrics.MetricsCollector;
 import com.google.common.base.Throwables;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -46,8 +47,8 @@
    * @param logPath Path to the log file.
    */
   public HDFSTransactionLog(final FileSystem fs, final Configuration hConf,
-                            final Path logPath, long timestamp) {
-    super(timestamp);
+                            final Path logPath, long timestamp, MetricsCollector metricsCollector) {
+    super(timestamp, metricsCollector);
     this.fs = fs;
     this.hConf = hConf;
     this.logPath = logPath;
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
index 08359fc..22d8c5c 100644
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
@@ -17,6 +17,8 @@
 package co.cask.tephra.persist;
 
 import co.cask.tephra.TxConstants;
+import co.cask.tephra.metrics.MetricsCollector;
+import co.cask.tephra.metrics.TxMetricsCollector;
 import co.cask.tephra.snapshot.SnapshotCodecProvider;
 import co.cask.tephra.util.ConfigurationFactory;
 import com.google.common.base.Function;
@@ -68,17 +70,19 @@
   // buffer size used for HDFS reads and writes
   private static final int BUFFER_SIZE = 16384;
 
+  private final Configuration hConf;
+  private final String configuredSnapshotDir;
+  private final MetricsCollector metricsCollector;
   private FileSystem fs;
-  private Configuration hConf;
-  private String configuredSnapshotDir;
   private Path snapshotDir;
 
   @Inject
-  public HDFSTransactionStateStorage(Configuration hConf,
-                                     SnapshotCodecProvider codecProvider) {
+  public HDFSTransactionStateStorage(Configuration hConf, SnapshotCodecProvider codecProvider,
+                                     MetricsCollector metricsCollector) {
     super(codecProvider);
     this.hConf = hConf;
-    configuredSnapshotDir = hConf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
+    this.configuredSnapshotDir = hConf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
+    this.metricsCollector = metricsCollector;
   }
 
   @Override
@@ -237,7 +241,7 @@
   }
 
   private TransactionLog openLog(Path path, long timestamp) {
-    return new HDFSTransactionLog(fs, hConf, path, timestamp);
+    return new HDFSTransactionLog(fs, hConf, path, timestamp, metricsCollector);
   }
 
   @Override
@@ -375,7 +379,7 @@
     Configuration config = new ConfigurationFactory().get();
 
     HDFSTransactionStateStorage storage =
-      new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config));
+      new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector());
     storage.startAndWait();
     try {
       switch (mode) {
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java
index 93fb373..e4d74e2 100644
--- a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java
@@ -16,6 +16,8 @@
 
 package co.cask.tephra.persist;
 
+import co.cask.tephra.metrics.MetricsCollector;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -36,8 +38,8 @@
    * Creates a new transaction log using the given file instance.
    * @param logFile The log file to use.
    */
-  public LocalFileTransactionLog(File logFile, long timestamp) {
-    super(timestamp);
+  public LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector) {
+    super(timestamp, metricsCollector);
     this.logFile = logFile;
   }
 
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java
index fd2294f..f52c644 100644
--- a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java
@@ -17,6 +17,7 @@
 package co.cask.tephra.persist;
 
 import co.cask.tephra.TxConstants;
+import co.cask.tephra.metrics.MetricsCollector;
 import co.cask.tephra.snapshot.SnapshotCodecProvider;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
@@ -58,12 +59,15 @@
   };
 
   private final String configuredSnapshotDir;
+  private final MetricsCollector metricsCollector;
   private File snapshotDir;
 
   @Inject
-  public LocalFileTransactionStateStorage(Configuration conf, SnapshotCodecProvider codecProvider) {
+  public LocalFileTransactionStateStorage(Configuration conf, SnapshotCodecProvider codecProvider,
+                                          MetricsCollector metricsCollector) {
     super(codecProvider);
     this.configuredSnapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR);
+    this.metricsCollector = metricsCollector;
   }
 
   @Override
@@ -213,7 +217,7 @@
       @Nullable
       @Override
       public TransactionLog apply(@Nullable TimestampedFilename input) {
-        return new LocalFileTransactionLog(input.getFile(), input.getTimestamp());
+        return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector);
       }
     });
   }
@@ -222,7 +226,7 @@
   public TransactionLog createLog(long timestamp) throws IOException {
     File newLogFile = new File(snapshotDir, LOG_FILE_PREFIX + timestamp);
     LOG.info("Creating new transaction log at {}", newLogFile.getAbsolutePath());
-    return new LocalFileTransactionLog(newLogFile, timestamp);
+    return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector);
   }
 
   @Override
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java
index fee9935..ae65d07 100644
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java
+++ b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java
@@ -46,7 +46,7 @@
 
     bind(TransactionManager.class).in(Singleton.class);
     bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Singleton.class);
-    bind(MetricsCollector.class).to(DefaultMetricsCollector.class);
+    bind(MetricsCollector.class).to(DefaultMetricsCollector.class).in(Singleton.class);
 
     install(new FactoryModuleBuilder()
         .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java
index b9df3ca..b7ee86f 100644
--- a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java
+++ b/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java
@@ -17,6 +17,7 @@
 package co.cask.tephra.persist;
 
 import co.cask.tephra.TxConstants;
+import co.cask.tephra.metrics.TxMetricsCollector;
 import co.cask.tephra.snapshot.SnapshotCodecProvider;
 import co.cask.tephra.snapshot.SnapshotCodecV2;
 import org.apache.hadoop.conf.Configuration;
@@ -66,6 +67,6 @@
 
   @Override
   protected AbstractTransactionStateStorage getStorage(Configuration conf) {
-    return new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf));
+    return new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
   }
 }
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java
index bb47f3a..5ae2eea 100644
--- a/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java
+++ b/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java
@@ -63,7 +63,7 @@
 
   @Override
   protected AbstractTransactionStateStorage getStorage(Configuration conf) {
-    return new LocalFileTransactionStateStorage(conf, new SnapshotCodecProvider(conf));
+    return new LocalFileTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
   }
 
   // v2 TransactionEdit
diff --git a/tephra-hbase-compat-0.94/src/test/java/co/cask/tephra/hbase94/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.94/src/test/java/co/cask/tephra/hbase94/coprocessor/TransactionProcessorTest.java
index 677307b..94b64a5 100644
--- a/tephra-hbase-compat-0.94/src/test/java/co/cask/tephra/hbase94/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.94/src/test/java/co/cask/tephra/hbase94/coprocessor/TransactionProcessorTest.java
@@ -22,6 +22,7 @@
 import co.cask.tephra.TxConstants;
 import co.cask.tephra.coprocessor.TransactionStateCache;
 import co.cask.tephra.coprocessor.TransactionStateCacheSupplier;
+import co.cask.tephra.metrics.TxMetricsCollector;
 import co.cask.tephra.persist.HDFSTransactionStateStorage;
 import co.cask.tephra.persist.TransactionSnapshot;
 import co.cask.tephra.snapshot.DefaultSnapshotCodec;
@@ -118,7 +119,7 @@
                                                                                         TransactionType.SHORT))),
         new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
     HDFSTransactionStateStorage tmpStorage =
-      new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf));
+      new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
     tmpStorage.startAndWait();
     tmpStorage.writeSnapshot(txSnapshot);
     tmpStorage.stopAndWait();
diff --git a/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessorTest.java
index 85b591a..f6035cf 100644
--- a/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessorTest.java
@@ -22,6 +22,7 @@
 import co.cask.tephra.TxConstants;
 import co.cask.tephra.coprocessor.TransactionStateCache;
 import co.cask.tephra.coprocessor.TransactionStateCacheSupplier;
+import co.cask.tephra.metrics.TxMetricsCollector;
 import co.cask.tephra.persist.HDFSTransactionStateStorage;
 import co.cask.tephra.persist.TransactionSnapshot;
 import co.cask.tephra.snapshot.DefaultSnapshotCodec;
@@ -138,7 +139,7 @@
                                                                                         TransactionType.SHORT))),
         new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
     HDFSTransactionStateStorage tmpStorage =
-      new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf));
+      new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
     tmpStorage.startAndWait();
     tmpStorage.writeSnapshot(txSnapshot);
     tmpStorage.stopAndWait();
diff --git a/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java
index 6f16654..8a7c819 100644
--- a/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java
@@ -22,6 +22,7 @@
 import co.cask.tephra.TxConstants;
 import co.cask.tephra.coprocessor.TransactionStateCache;
 import co.cask.tephra.coprocessor.TransactionStateCacheSupplier;
+import co.cask.tephra.metrics.TxMetricsCollector;
 import co.cask.tephra.persist.HDFSTransactionStateStorage;
 import co.cask.tephra.persist.TransactionSnapshot;
 import co.cask.tephra.snapshot.DefaultSnapshotCodec;
@@ -140,7 +141,7 @@
                                                                                         TransactionType.SHORT))),
         new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
     HDFSTransactionStateStorage tmpStorage =
-      new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf));
+      new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
     tmpStorage.startAndWait();
     tmpStorage.writeSnapshot(txSnapshot);
     tmpStorage.stopAndWait();