DL-105: Make compression stats available per stream
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 0f2c222..281c637 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -512,7 +512,8 @@
logName,
ClientSharingOption.SharedClients,
Optional.<DistributedLogConfiguration>absent(),
- Optional.<DynamicDistributedLogConfiguration>absent());
+ Optional.<DynamicDistributedLogConfiguration>absent(),
+ Optional.<StatsLogger>absent());
dlm.delete();
}
@@ -521,13 +522,15 @@
throws InvalidStreamNameException, IOException {
return openLog(logName,
Optional.<DistributedLogConfiguration>absent(),
- Optional.<DynamicDistributedLogConfiguration>absent());
+ Optional.<DynamicDistributedLogConfiguration>absent(),
+ Optional.<StatsLogger>absent());
}
@Override
public DistributedLogManager openLog(String logName,
Optional<DistributedLogConfiguration> logConf,
- Optional<DynamicDistributedLogConfiguration> dynamicLogConf)
+ Optional<DynamicDistributedLogConfiguration> dynamicLogConf,
+ Optional<StatsLogger> perStreamStatsLogger)
throws InvalidStreamNameException, IOException {
validateName(logName);
Optional<URI> uri = FutureUtils.result(metadataStore.getLogLocation(logName));
@@ -539,7 +542,8 @@
logName,
ClientSharingOption.SharedClients,
logConf,
- dynamicLogConf);
+ dynamicLogConf,
+ perStreamStatsLogger);
}
@Override
@@ -780,7 +784,8 @@
nameOfLogStream,
clientSharingOption,
logConfiguration,
- dynamicLogConfiguration
+ dynamicLogConfiguration,
+ Optional.<StatsLogger>absent()
);
}
@@ -806,7 +811,8 @@
String nameOfLogStream,
ClientSharingOption clientSharingOption,
Optional<DistributedLogConfiguration> logConfiguration,
- Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration)
+ Optional<DynamicDistributedLogConfiguration> dynamicLogConfiguration,
+ Optional<StatsLogger> perStreamStatsLogger)
throws InvalidStreamNameException, IOException {
// Make sure the name is well formed
validateName(nameOfLogStream);
@@ -872,6 +878,8 @@
dlmLedgerAlloctor = this.allocator;
dlmLogSegmentRollingPermitManager = this.logSegmentRollingPermitManager;
}
+ // if there's a specified perStreamStatsLogger, user it, otherwise use the default one.
+ StatsLogger perLogStatsLogger = perStreamStatsLogger.or(this.perLogStatsLogger);
return new BKDistributedLogManager(
nameOfLogStream, /* Log Name */
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
index d42b5f2..b5abe9f 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/namespace/DistributedLogNamespace.java
@@ -30,6 +30,8 @@
import java.io.IOException;
import java.util.Iterator;
+import org.apache.bookkeeper.stats.StatsLogger;
+
/**
* A namespace is the basic unit for managing a set of distributedlogs.
*
@@ -128,7 +130,8 @@
*/
DistributedLogManager openLog(String logName,
Optional<DistributedLogConfiguration> logConf,
- Optional<DynamicDistributedLogConfiguration> dynamicLogConf)
+ Optional<DynamicDistributedLogConfiguration> dynamicLogConf,
+ Optional<StatsLogger> perStreamStatsLogger)
throws InvalidStreamNameException, IOException;
/**
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 0c7f346..e5063cc 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -45,6 +45,7 @@
import org.apache.bookkeeper.client.LedgerMetadata;
import org.apache.bookkeeper.feature.FixedValueFeature;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -1979,7 +1980,8 @@
dlm = namespace.openLog(
name + "-custom",
Optional.<DistributedLogConfiguration>absent(),
- Optional.of(dynConf));
+ Optional.of(dynConf),
+ Optional.<StatsLogger>absent());
writer = dlm.startAsyncLogSegmentNonPartitioned();
FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
segments = dlm.getLogSegments();
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 3d5b9e7..e74ebbe 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -231,7 +231,8 @@
private DistributedLogManager openLog(String name) throws IOException {
Optional<DistributedLogConfiguration> dlConf = Optional.<DistributedLogConfiguration>absent();
Optional<DynamicDistributedLogConfiguration> dynDlConf = Optional.of(dynConf);
- return dlNamespace.openLog(name, dlConf, dynDlConf);
+ Optional<StatsLogger> perStreamStatsLogger = Optional.of(streamLogger);
+ return dlNamespace.openLog(name, dlConf, dynDlConf, perStreamStatsLogger);
}
// Expensive initialization, only called once per stream.