DL-108: Log rate limiting more clearly
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 0df2f1c..f8d347a 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -1080,8 +1080,8 @@
         }
 
         // Unregister gauge to avoid GC spiral
-        ((LimitedPermitManager)this.logSegmentRollingPermitManager).unregisterGauge();
-        ((SimplePermitLimiter)this.writeLimiter).unregisterGauge();
+        this.logSegmentRollingPermitManager.close();
+        this.writeLimiter.close();
 
         // Shutdown log segment metadata stores
         Utils.close(writerSegmentMetadataStore);
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
index 8276125..8029f89 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
@@ -534,8 +534,9 @@
     private void closeInternal(final boolean abort,
                                final AtomicReference<Throwable> throwExc,
                                final Promise<Void> closePromise) {
-        // remove stats
+        // clean stats resources
         this.transmitOutstandingLogger.unregisterGauge("requests", transmitOutstandingGauge);
+        this.writeLimiter.close();
 
         // Cancel the periodic keep alive schedule first
         if (null != periodicKeepAliveSchedule) {
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java
index 9b5cdd0..0b24c1a 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/WriteLimiter.java
@@ -54,4 +54,9 @@
         streamLimiter.release(permits);
         globalLimiter.release(permits);
     }
+
+    public void close() {
+        streamLimiter.close();
+        globalLimiter.close();
+    }
 }
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
index 4b917b2..dc25023 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/LimitedPermitManager.java
@@ -146,6 +146,11 @@
     }
 
     @Override
+    public void close() {
+        unregisterGauge();
+    }
+
+    @Override
     synchronized public boolean allowObtainPermits() {
         forceSetAllowPermits(true);
         return true;
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java
index 61366ac..41c28a3 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitLimiter.java
@@ -31,6 +31,11 @@
         @Override
         public void release(int permits) {
         }
+
+        @Override
+        public void close() {
+
+        }
     };
 
     /**
@@ -44,4 +49,9 @@
      * Release a permit.
      */
     void release(int permits);
+
+    /**
+     * Close the resources created by the limiter
+     */
+    void close();
 }
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java
index f93c7bf..6a6d574 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/PermitManager.java
@@ -50,6 +50,11 @@
             return false;
         }
 
+        @Override
+        public void close() {
+            // nop
+        }
+
     };
 
     /**
@@ -80,4 +85,9 @@
      *          permit context to disallow
      */
     boolean disallowObtainPermits(Permit permit);
+
+    /**
+     * Release the resources
+     */
+    void close();
 }
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
index 2482ece..4086a1e 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/SimplePermitLimiter.java
@@ -99,6 +99,11 @@
         permits.addAndGet(-permitsToRelease);
     }
 
+    @Override
+    public void close() {
+        unregisterGauge();
+    }
+
     @VisibleForTesting
     public int getPermits() {
         return permits.get();
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
index 615ff21..69a8470 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
@@ -66,7 +66,7 @@
             .overlimit(new OverlimitFunction<StreamOp>() {
                 @Override
                 public void apply(StreamOp request) throws OverCapacityException {
-                    throw new OverCapacityException("RPS limit exceeded for the service instance");
+                    throw new OverCapacityException("Being rate limited: RPS limit exceeded for the service instance");
                 }
             });
 
@@ -80,7 +80,7 @@
             .overlimit(new OverlimitFunction<StreamOp>() {
                 @Override
                 public void apply(StreamOp request) throws OverCapacityException {
-                    throw new OverCapacityException("BPS limit exceeded for the service instance");
+                    throw new OverCapacityException("Being rate limited: BPS limit exceeded for the service instance");
                 }
             });
 
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
index b0e5ba4..b4836d1 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
@@ -52,7 +52,7 @@
             .overlimit(new OverlimitFunction<StreamOp>() {
                 @Override
                 public void apply(StreamOp op) throws OverCapacityException {
-                    throw new OverCapacityException("RPS limit exceeded for stream " + streamName);
+                    throw new OverCapacityException("Being rate limited: RPS limit exceeded for stream " + streamName);
                 }
             });
         RequestLimiterBuilder rpsSoftLimiterBuilder = RequestLimiterBuilder.newRpsLimiterBuilder()
@@ -66,7 +66,7 @@
             .overlimit(new OverlimitFunction<StreamOp>() {
                 @Override
                 public void apply(StreamOp op) throws OverCapacityException {
-                    throw new OverCapacityException("BPS limit exceeded for stream " + streamName);
+                    throw new OverCapacityException("Being rate limited: BPS limit exceeded for stream " + streamName);
                 }
             });
         RequestLimiterBuilder bpsSoftLimiterBuilder = RequestLimiterBuilder.newBpsLimiterBuilder()