DL-108: Log rate limiting more clearly
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 0df2f1c..f8d347a 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -1080,8 +1080,8 @@
}
// Unregister gauge to avoid GC spiral
- ((LimitedPermitManager)this.logSegmentRollingPermitManager).unregisterGauge();
- ((SimplePermitLimiter)this.writeLimiter).unregisterGauge();
+ this.logSegmentRollingPermitManager.close();
+ this.writeLimiter.close();
// Shutdown log segment metadata stores
Utils.close(writerSegmentMetadataStore);
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
index 8276125..8029f89 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
@@ -534,8 +534,9 @@
private void closeInternal(final boolean abort,
final AtomicReference<Throwable> throwExc,
final Promise<Void> closePromise) {
- // remove stats
+ // clean stats resources
this.transmitOutstandingLogger.unregisterGauge("requests", transmitOutstandingGauge);
+ this.writeLimiter.close();
// Cancel the periodic keep alive schedule first
if (null != periodicKeepAliveSchedule) {
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java
index 9b5cdd0..0b24c1a 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java
@@ -54,4 +54,9 @@
streamLimiter.release(permits);
globalLimiter.release(permits);
}
+
+ public void close() {
+ streamLimiter.close();
+ globalLimiter.close();
+ }
}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
index 4b917b2..dc25023 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
@@ -146,6 +146,11 @@
}
@Override
+ public void close() {
+ unregisterGauge();
+ }
+
+ @Override
synchronized public boolean allowObtainPermits() {
forceSetAllowPermits(true);
return true;
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java
index 61366ac..41c28a3 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java
@@ -31,6 +31,11 @@
@Override
public void release(int permits) {
}
+
+ @Override
+ public void close() {
+
+ }
};
/**
@@ -44,4 +49,9 @@
* Release a permit.
*/
void release(int permits);
+
+ /**
+ * Close the resources created by the limiter
+ */
+ void close();
}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java
index f93c7bf..6a6d574 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java
@@ -50,6 +50,11 @@
return false;
}
+ @Override
+ public void close() {
+ // nop
+ }
+
};
/**
@@ -80,4 +85,9 @@
* permit context to disallow
*/
boolean disallowObtainPermits(Permit permit);
+
+ /**
+ * Release the resources
+ */
+ void close();
}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
index 2482ece..4086a1e 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
@@ -99,6 +99,11 @@
permits.addAndGet(-permitsToRelease);
}
+ @Override
+ public void close() {
+ unregisterGauge();
+ }
+
@VisibleForTesting
public int getPermits() {
return permits.get();
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
index 615ff21..69a8470 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
@@ -66,7 +66,7 @@
.overlimit(new OverlimitFunction<StreamOp>() {
@Override
public void apply(StreamOp request) throws OverCapacityException {
- throw new OverCapacityException("RPS limit exceeded for the service instance");
+ throw new OverCapacityException("Being rate limited: RPS limit exceeded for the service instance");
}
});
@@ -80,7 +80,7 @@
.overlimit(new OverlimitFunction<StreamOp>() {
@Override
public void apply(StreamOp request) throws OverCapacityException {
- throw new OverCapacityException("BPS limit exceeded for the service instance");
+ throw new OverCapacityException("Being rate limited: BPS limit exceeded for the service instance");
}
});
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
index b0e5ba4..b4836d1 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
@@ -52,7 +52,7 @@
.overlimit(new OverlimitFunction<StreamOp>() {
@Override
public void apply(StreamOp op) throws OverCapacityException {
- throw new OverCapacityException("RPS limit exceeded for stream " + streamName);
+ throw new OverCapacityException("Being rate limited: RPS limit exceeded for stream " + streamName);
}
});
RequestLimiterBuilder rpsSoftLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
@@ -66,7 +66,7 @@
.overlimit(new OverlimitFunction<StreamOp>() {
@Override
public void apply(StreamOp op) throws OverCapacityException {
- throw new OverCapacityException("BPS limit exceeded for stream " + streamName);
+ throw new OverCapacityException("Being rate limited: BPS limit exceeded for stream " + streamName);
}
});
RequestLimiterBuilder bpsSoftLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()