DL-89: add flag to enable thrift mux on DL monitor service
merge twitter's change from David Rusek.
Author: Sijie Guo <sijieg@twitter.com>
Author: Jordan Bull <jbull@twitter.com>
Author: Leigh Stewart <lstewart@twitter.com>
Author: Dave Rusek <drusek@twitter.com>
Reviewers: Leigh Stewart <lstewart@apache.org>
Closes #61 from sijie/merge/DL-89
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
index 2683b47..6b58eff 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
@@ -85,6 +85,7 @@
private int heartbeatEveryChecks = 0;
private int instanceId = -1;
private int totalInstances = -1;
+ private boolean isThriftMux = false;
// Options
private final Optional<String> uriArg;
@@ -98,6 +99,7 @@
private final Optional<Integer> heartbeatEveryChecksArg;
private final Optional<Boolean> handshakeWithClientInfoArg;
private final Optional<Boolean> watchNamespaceChangesArg;
+ private final Optional<Boolean> isThriftMuxArg;
// Stats
private final StatsProvider statsProvider;
@@ -224,6 +226,7 @@
Optional<Integer> heartbeatEveryChecksArg,
Optional<Boolean> handshakeWithClientInfoArg,
Optional<Boolean> watchNamespaceChangesArg,
+ Optional<Boolean> isThriftMuxArg,
StatsReceiver statsReceiver,
StatsProvider statsProvider) {
// options
@@ -238,6 +241,7 @@
this.heartbeatEveryChecksArg = heartbeatEveryChecksArg;
this.handshakeWithClientInfoArg = handshakeWithClientInfoArg;
this.watchNamespaceChangesArg = watchNamespaceChangesArg;
+ this.isThriftMuxArg = isThriftMuxArg;
// Stats
this.statsReceiver = statsReceiver;
@@ -275,6 +279,7 @@
}
handshakeWithClientInfo = handshakeWithClientInfoArg.isPresent();
watchNamespaceChanges = watchNamespaceChangesArg.isPresent();
+ isThriftMux = isThriftMuxArg.isPresent();
URI uri = URI.create(uriArg.get());
DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
if (confFileArg.isPresent()) {
@@ -300,8 +305,22 @@
ServerSet[] remotes = new ServerSet[serverSets.length - 1];
System.arraycopy(serverSets, 1, remotes, 0, remotes.length);
+ ClientBuilder finagleClientBuilder = ClientBuilder.get()
+ .connectTimeout(Duration.fromSeconds(1))
+ .tcpConnectTimeout(Duration.fromSeconds(1))
+ .requestTimeout(Duration.fromSeconds(2))
+ .keepAlive(true)
+ .failFast(false);
+
+ if (!isThriftMux) {
+ finagleClientBuilder = finagleClientBuilder
+ .hostConnectionLimit(2)
+ .hostConnectionCoresize(2);
+ }
+
dlClient = DistributedLogClientBuilder.newBuilder()
.name("monitor")
+ .thriftmux(isThriftMux)
.clientId(ClientId$.MODULE$.apply("monitor"))
.redirectBackoffMaxMs(50)
.redirectBackoffStartMs(100)
@@ -310,14 +329,7 @@
.serverSets(local, remotes)
.streamNameRegex(streamRegex)
.handshakeWithClientInfo(handshakeWithClientInfo)
- .clientBuilder(ClientBuilder.get()
- .connectTimeout(Duration.fromSeconds(1))
- .tcpConnectTimeout(Duration.fromSeconds(1))
- .requestTimeout(Duration.fromSeconds(2))
- .hostConnectionLimit(2)
- .hostConnectionCoresize(2)
- .keepAlive(true)
- .failFast(false))
+ .clientBuilder(finagleClientBuilder)
.statsReceiver(monitorReceiver.scope("client"))
.buildMonitorClient();
runMonitor(dlConf, uri);
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
index 90d3566..a51a6a9 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
@@ -99,6 +99,7 @@
getOptionalIntegerArg(cmdline, "hck"),
getOptionalBooleanArg(cmdline, "hsci"),
getOptionalBooleanArg(cmdline, "w"),
+ getOptionalBooleanArg(cmdline, "mx"),
statsReceiver,
statsProvider);