Add metrics around write-ahead-log appends
diff --git a/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java b/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java
index aa67628..5e0b4a9 100644
--- a/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/co/cask/tephra/TransactionManager.java
@@ -297,6 +297,7 @@
public void doRun() {
txMetricsCollector.gauge("committing.size", committingChangeSets.size());
txMetricsCollector.gauge("committed.size", committedChangeSets.size());
+ txMetricsCollector.gauge("inprogress.size", inProgress.size());
txMetricsCollector.gauge("invalid.size", invalidArray.length);
}
@@ -305,6 +306,7 @@
// perform a final metrics emit
txMetricsCollector.gauge("committing.size", committingChangeSets.size());
txMetricsCollector.gauge("committed.size", committedChangeSets.size());
+ txMetricsCollector.gauge("inprogress.size", inProgress.size());
txMetricsCollector.gauge("invalid.size", invalidArray.length);
}
@@ -1127,7 +1129,8 @@
try {
Stopwatch timer = new Stopwatch().start();
currentLog.append(edit);
- txMetricsCollector.histogram("append.edit", (int) timer.elapsedMillis());
+ txMetricsCollector.rate("wal.append.count");
+ txMetricsCollector.histogram("wal.append.latency", (int) timer.elapsedMillis());
} catch (IOException ioe) {
abortService("Error appending to transaction log", ioe);
}
@@ -1137,7 +1140,8 @@
try {
Stopwatch timer = new Stopwatch().start();
currentLog.append(edits);
- txMetricsCollector.histogram("append.edit", (int) timer.elapsedMillis());
+ txMetricsCollector.rate("wal.append.count", edits.size());
+ txMetricsCollector.histogram("wal.append.latency", (int) timer.elapsedMillis());
} catch (IOException ioe) {
abortService("Error appending to transaction log", ioe);
}
diff --git a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java b/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java
index 1748178..86d29ce 100644
--- a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java
+++ b/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java
@@ -17,6 +17,7 @@
package co.cask.tephra.coprocessor;
import co.cask.tephra.TxConstants;
+import co.cask.tephra.metrics.TxMetricsCollector;
import co.cask.tephra.persist.HDFSTransactionStateStorage;
import co.cask.tephra.persist.TransactionSnapshot;
import co.cask.tephra.persist.TransactionStateStorage;
@@ -85,7 +86,8 @@
try {
Configuration conf = getSnapshotConfiguration();
if (conf != null) {
- this.storage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf));
+ this.storage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf),
+ new TxMetricsCollector());
this.storage.startAndWait();
this.snapshotRefreshFrequency = conf.getLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL,
TxConstants.Manager.DEFAULT_TX_SNAPSHOT_INTERVAL) * 1000;
diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java
index 4314c97..b9c8a4b 100644
--- a/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java
+++ b/tephra-core/src/main/java/co/cask/tephra/metrics/DefaultMetricsCollector.java
@@ -93,6 +93,11 @@
}
@Override
+ public void rate(String metricName, int count) {
+ metrics.meter(metricName).mark(count);
+ }
+
+ @Override
protected void startUp() throws Exception {
jmxReporter.start();
reporter.start(reportPeriod, TimeUnit.SECONDS);
diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java
index 1bf2032..ae5c98c 100644
--- a/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java
+++ b/tephra-core/src/main/java/co/cask/tephra/metrics/MetricsCollector.java
@@ -31,6 +31,8 @@
void rate(String metricName);
+ void rate(String metricName, int count);
+
void histogram(String metricName, int value);
void configure(Configuration conf);
diff --git a/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java b/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java
index b6bec91..b75f38b 100644
--- a/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java
+++ b/tephra-core/src/main/java/co/cask/tephra/metrics/TxMetricsCollector.java
@@ -39,6 +39,11 @@
}
@Override
+ public void rate(String metricName, int count) {
+ // no-op
+ }
+
+ @Override
public void histogram(String metricName, int value) {
// no-op
}
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java
index e5d07df..eb719c2 100644
--- a/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/AbstractTransactionLog.java
@@ -16,6 +16,7 @@
package co.cask.tephra.persist;
+import co.cask.tephra.metrics.MetricsCollector;
import com.google.common.collect.Lists;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
@@ -40,6 +41,7 @@
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionLog.class);
private final AtomicLong logSequence = new AtomicLong();
+ private final MetricsCollector metricsCollector;
protected long timestamp;
private volatile boolean initialized;
private volatile boolean closed;
@@ -47,8 +49,9 @@
private List<Entry> pendingWrites = Lists.newLinkedList();
private TransactionLogWriter writer;
- public AbstractTransactionLog(long timestamp) {
+ public AbstractTransactionLog(long timestamp, MetricsCollector metricsCollector) {
this.timestamp = timestamp;
+ this.metricsCollector = metricsCollector;
}
/**
@@ -151,6 +154,7 @@
// writes out pending entries to the HLog
TransactionLogWriter tmpWriter = null;
long latestSeq = 0;
+ int entryCount = 0;
synchronized (this) {
if (closed) {
return;
@@ -162,6 +166,7 @@
// write out all accumulated Entries to hdfs.
for (Entry e : currentPending) {
tmpWriter.append(e);
+ entryCount++;
latestSeq = Math.max(latestSeq, e.getKey().get());
}
}
@@ -169,6 +174,7 @@
// someone else might have already synced our edits, avoid double syncing
if (lastSynced < latestSeq) {
tmpWriter.sync();
+ metricsCollector.histogram("wal.sync.size", entryCount);
syncedUpTo.compareAndSet(lastSynced, latestSeq);
}
}
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
index a94ab98..a3bdbfb 100644
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionLog.java
@@ -16,6 +16,7 @@
package co.cask.tephra.persist;
+import co.cask.tephra.metrics.MetricsCollector;
import com.google.common.base.Throwables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -46,8 +47,8 @@
* @param logPath Path to the log file.
*/
public HDFSTransactionLog(final FileSystem fs, final Configuration hConf,
- final Path logPath, long timestamp) {
- super(timestamp);
+ final Path logPath, long timestamp, MetricsCollector metricsCollector) {
+ super(timestamp, metricsCollector);
this.fs = fs;
this.hConf = hConf;
this.logPath = logPath;
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
index 08359fc..22d8c5c 100644
--- a/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/HDFSTransactionStateStorage.java
@@ -17,6 +17,8 @@
package co.cask.tephra.persist;
import co.cask.tephra.TxConstants;
+import co.cask.tephra.metrics.MetricsCollector;
+import co.cask.tephra.metrics.TxMetricsCollector;
import co.cask.tephra.snapshot.SnapshotCodecProvider;
import co.cask.tephra.util.ConfigurationFactory;
import com.google.common.base.Function;
@@ -68,17 +70,19 @@
// buffer size used for HDFS reads and writes
private static final int BUFFER_SIZE = 16384;
+ private final Configuration hConf;
+ private final String configuredSnapshotDir;
+ private final MetricsCollector metricsCollector;
private FileSystem fs;
- private Configuration hConf;
- private String configuredSnapshotDir;
private Path snapshotDir;
@Inject
- public HDFSTransactionStateStorage(Configuration hConf,
- SnapshotCodecProvider codecProvider) {
+ public HDFSTransactionStateStorage(Configuration hConf, SnapshotCodecProvider codecProvider,
+ MetricsCollector metricsCollector) {
super(codecProvider);
this.hConf = hConf;
- configuredSnapshotDir = hConf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
+ this.configuredSnapshotDir = hConf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR);
+ this.metricsCollector = metricsCollector;
}
@Override
@@ -237,7 +241,7 @@
}
private TransactionLog openLog(Path path, long timestamp) {
- return new HDFSTransactionLog(fs, hConf, path, timestamp);
+ return new HDFSTransactionLog(fs, hConf, path, timestamp, metricsCollector);
}
@Override
@@ -375,7 +379,7 @@
Configuration config = new ConfigurationFactory().get();
HDFSTransactionStateStorage storage =
- new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config));
+ new HDFSTransactionStateStorage(config, new SnapshotCodecProvider(config), new TxMetricsCollector());
storage.startAndWait();
try {
switch (mode) {
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java
index 93fb373..e4d74e2 100644
--- a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionLog.java
@@ -16,6 +16,8 @@
package co.cask.tephra.persist;
+import co.cask.tephra.metrics.MetricsCollector;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -36,8 +38,8 @@
* Creates a new transaction log using the given file instance.
* @param logFile The log file to use.
*/
- public LocalFileTransactionLog(File logFile, long timestamp) {
- super(timestamp);
+ public LocalFileTransactionLog(File logFile, long timestamp, MetricsCollector metricsCollector) {
+ super(timestamp, metricsCollector);
this.logFile = logFile;
}
diff --git a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java
index fd2294f..f52c644 100644
--- a/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java
+++ b/tephra-core/src/main/java/co/cask/tephra/persist/LocalFileTransactionStateStorage.java
@@ -17,6 +17,7 @@
package co.cask.tephra.persist;
import co.cask.tephra.TxConstants;
+import co.cask.tephra.metrics.MetricsCollector;
import co.cask.tephra.snapshot.SnapshotCodecProvider;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@@ -58,12 +59,15 @@
};
private final String configuredSnapshotDir;
+ private final MetricsCollector metricsCollector;
private File snapshotDir;
@Inject
- public LocalFileTransactionStateStorage(Configuration conf, SnapshotCodecProvider codecProvider) {
+ public LocalFileTransactionStateStorage(Configuration conf, SnapshotCodecProvider codecProvider,
+ MetricsCollector metricsCollector) {
super(codecProvider);
this.configuredSnapshotDir = conf.get(TxConstants.Manager.CFG_TX_SNAPSHOT_LOCAL_DIR);
+ this.metricsCollector = metricsCollector;
}
@Override
@@ -213,7 +217,7 @@
@Nullable
@Override
public TransactionLog apply(@Nullable TimestampedFilename input) {
- return new LocalFileTransactionLog(input.getFile(), input.getTimestamp());
+ return new LocalFileTransactionLog(input.getFile(), input.getTimestamp(), metricsCollector);
}
});
}
@@ -222,7 +226,7 @@
public TransactionLog createLog(long timestamp) throws IOException {
File newLogFile = new File(snapshotDir, LOG_FILE_PREFIX + timestamp);
LOG.info("Creating new transaction log at {}", newLogFile.getAbsolutePath());
- return new LocalFileTransactionLog(newLogFile, timestamp);
+ return new LocalFileTransactionLog(newLogFile, timestamp, metricsCollector);
}
@Override
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java
index fee9935..ae65d07 100644
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java
+++ b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java
@@ -46,7 +46,7 @@
bind(TransactionManager.class).in(Singleton.class);
bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Singleton.class);
- bind(MetricsCollector.class).to(DefaultMetricsCollector.class);
+ bind(MetricsCollector.class).to(DefaultMetricsCollector.class).in(Singleton.class);
install(new FactoryModuleBuilder()
.implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java
index b9df3ca..b7ee86f 100644
--- a/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java
+++ b/tephra-core/src/test/java/co/cask/tephra/persist/HDFSTransactionStateStorageTest.java
@@ -17,6 +17,7 @@
package co.cask.tephra.persist;
import co.cask.tephra.TxConstants;
+import co.cask.tephra.metrics.TxMetricsCollector;
import co.cask.tephra.snapshot.SnapshotCodecProvider;
import co.cask.tephra.snapshot.SnapshotCodecV2;
import org.apache.hadoop.conf.Configuration;
@@ -66,6 +67,6 @@
@Override
protected AbstractTransactionStateStorage getStorage(Configuration conf) {
- return new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf));
+ return new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
}
}
diff --git a/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java b/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java
index bb47f3a..5ae2eea 100644
--- a/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java
+++ b/tephra-core/src/test/java/co/cask/tephra/persist/LocalTransactionStateStorageTest.java
@@ -63,7 +63,7 @@
@Override
protected AbstractTransactionStateStorage getStorage(Configuration conf) {
- return new LocalFileTransactionStateStorage(conf, new SnapshotCodecProvider(conf));
+ return new LocalFileTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
}
// v2 TransactionEdit
diff --git a/tephra-hbase-compat-0.94/src/test/java/co/cask/tephra/hbase94/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.94/src/test/java/co/cask/tephra/hbase94/coprocessor/TransactionProcessorTest.java
index 677307b..94b64a5 100644
--- a/tephra-hbase-compat-0.94/src/test/java/co/cask/tephra/hbase94/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.94/src/test/java/co/cask/tephra/hbase94/coprocessor/TransactionProcessorTest.java
@@ -22,6 +22,7 @@
import co.cask.tephra.TxConstants;
import co.cask.tephra.coprocessor.TransactionStateCache;
import co.cask.tephra.coprocessor.TransactionStateCacheSupplier;
+import co.cask.tephra.metrics.TxMetricsCollector;
import co.cask.tephra.persist.HDFSTransactionStateStorage;
import co.cask.tephra.persist.TransactionSnapshot;
import co.cask.tephra.snapshot.DefaultSnapshotCodec;
@@ -118,7 +119,7 @@
TransactionType.SHORT))),
new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
HDFSTransactionStateStorage tmpStorage =
- new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf));
+ new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
tmpStorage.startAndWait();
tmpStorage.writeSnapshot(txSnapshot);
tmpStorage.stopAndWait();
diff --git a/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessorTest.java
index 85b591a..f6035cf 100644
--- a/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.96/src/test/java/co/cask/tephra/hbase96/coprocessor/TransactionProcessorTest.java
@@ -22,6 +22,7 @@
import co.cask.tephra.TxConstants;
import co.cask.tephra.coprocessor.TransactionStateCache;
import co.cask.tephra.coprocessor.TransactionStateCacheSupplier;
+import co.cask.tephra.metrics.TxMetricsCollector;
import co.cask.tephra.persist.HDFSTransactionStateStorage;
import co.cask.tephra.persist.TransactionSnapshot;
import co.cask.tephra.snapshot.DefaultSnapshotCodec;
@@ -138,7 +139,7 @@
TransactionType.SHORT))),
new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
HDFSTransactionStateStorage tmpStorage =
- new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf));
+ new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
tmpStorage.startAndWait();
tmpStorage.writeSnapshot(txSnapshot);
tmpStorage.stopAndWait();
diff --git a/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java b/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java
index 6f16654..8a7c819 100644
--- a/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java
+++ b/tephra-hbase-compat-0.98/src/test/java/co/cask/tephra/hbase98/coprocessor/TransactionProcessorTest.java
@@ -22,6 +22,7 @@
import co.cask.tephra.TxConstants;
import co.cask.tephra.coprocessor.TransactionStateCache;
import co.cask.tephra.coprocessor.TransactionStateCacheSupplier;
+import co.cask.tephra.metrics.TxMetricsCollector;
import co.cask.tephra.persist.HDFSTransactionStateStorage;
import co.cask.tephra.persist.TransactionSnapshot;
import co.cask.tephra.snapshot.DefaultSnapshotCodec;
@@ -140,7 +141,7 @@
TransactionType.SHORT))),
new HashMap<Long, Set<ChangeId>>(), new TreeMap<Long, Set<ChangeId>>());
HDFSTransactionStateStorage tmpStorage =
- new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf));
+ new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf), new TxMetricsCollector());
tmpStorage.startAndWait();
tmpStorage.writeSnapshot(txSnapshot);
tmpStorage.stopAndWait();