DL-119: Fix the logging on closing readahead worker
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
index 6a647a9..266409e 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
@@ -35,11 +35,13 @@
import com.twitter.util.Promise;
import com.twitter.util.Return;
import com.twitter.util.Throw;
+import com.twitter.util.Try;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import javax.annotation.Nullable;
@@ -384,14 +386,25 @@
if (timeout < DistributedLogConstants.FUTURE_TIMEOUT_IMMEDIATE || promise.isDefined()) {
return promise;
}
- scheduler.schedule(key, new Runnable() {
+ // schedule a timeout to raise timeout exception
+ final java.util.concurrent.ScheduledFuture<?> task = scheduler.schedule(key, new Runnable() {
@Override
public void run() {
- logger.info("Raise exception", cause);
- // satisfy the promise
- FutureUtils.setException(promise, cause);
+ if (!promise.isDefined() && FutureUtils.setException(promise, cause)) {
+ logger.info("Raise exception", cause);
+ }
}
}, timeout, unit);
+ // when the promise is satisfied, cancel the timeout task
+ promise.respond(new AbstractFunction1<Try<T>, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Try<T> value) {
+ if (!task.cancel(true)) {
+ logger.debug("Failed to cancel the timeout task");
+ }
+ return BoxedUnit.UNIT;
+ }
+ });
return promise;
}
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
index 512a456..75223f2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
@@ -237,9 +237,9 @@
try {
((Future<?>) runnable).get();
} catch (CancellationException e) {
- LOG.info("Task {} cancelled", runnable, e.getCause());
+ LOG.debug("Task {} cancelled", runnable, e.getCause());
} catch (InterruptedException e) {
- LOG.info("Task {} was interrupted", runnable, e);
+ LOG.debug("Task {} was interrupted", runnable, e);
} catch (ExecutionException e) {
return e.getCause();
}