[CLEAN] Replace com.google.common.base.Optional with java.util.Optional

### Motivation

`com.google.common.base.Optional` and `java.util.Optional` exist in the code base at the same time.

### Changes
Replace `com.google.common.base.Optional` with `java.util.Optional`.


Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <None>, lamber-ken <None>

This closes #2346 from mino181295/merge-optional
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
index 85d42c6..a1334d8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java
@@ -20,8 +20,7 @@
  */
 package org.apache.bookkeeper.client;
 
-import com.google.common.base.Optional;
-
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -92,7 +91,7 @@
                                         conf.getMaxSpeculativeReadTimeout(),
                                         conf.getSpeculativeReadTimeoutBackoffMultiplier()));
         } else {
-            this.readSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent();
+            this.readSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>empty();
         }
         if (conf.getFirstSpeculativeReadLACTimeout() > 0) {
             this.readLACSpeculativeRequestPolicy =
@@ -101,7 +100,7 @@
                         conf.getMaxSpeculativeReadLACTimeout(),
                         conf.getSpeculativeReadLACTimeoutBackoffMultiplier()));
         } else {
-            this.readLACSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>absent();
+            this.readLACSpeculativeRequestPolicy = Optional.<SpeculativeRequestExecutionPolicy>empty();
         }
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
index c04ecc9..d9ab2e6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -17,12 +17,12 @@
  */
 package org.apache.bookkeeper.proto;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import io.netty.channel.Channel;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
@@ -44,7 +44,7 @@
     private static final Logger logger = LoggerFactory.getLogger(LongPollReadEntryProcessorV3.class);
 
     private final Long previousLAC;
-    private Optional<Long> lastAddConfirmedUpdateTime = Optional.absent();
+    private Optional<Long> lastAddConfirmedUpdateTime = Optional.empty();
 
     // long poll execution state
     private final ExecutorService longPollThreadPool;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLastConfirmedAndEntryContext.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLastConfirmedAndEntryContext.java
index 1bf3685..6c3a57f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLastConfirmedAndEntryContext.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadLastConfirmedAndEntryContext.java
@@ -17,7 +17,7 @@
  */
 package org.apache.bookkeeper.proto;
 
-import com.google.common.base.Optional;
+import java.util.Optional;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx;
@@ -30,7 +30,7 @@
     final int bookieIndex;
     final BookieSocketAddress bookie;
     long lac = LedgerHandle.INVALID_ENTRY_ID;
-    Optional<Long> lacUpdateTimestamp = Optional.absent();
+    Optional<Long> lacUpdateTimestamp = Optional.empty();
 
     public ReadLastConfirmedAndEntryContext(int bookieIndex, BookieSocketAddress bookie) {
         this.bookieIndex = bookieIndex;
diff --git a/site/docs/latest/api/distributedlog-api.md b/site/docs/latest/api/distributedlog-api.md
index f073b29..9002d05 100644
--- a/site/docs/latest/api/distributedlog-api.md
+++ b/site/docs/latest/api/distributedlog-api.md
@@ -222,7 +222,7 @@
 DistributedLogManager logManager = namespace.openLog(
         "test-log",
         Optional.of(logConf),
-        Optional.absent()
+        Optional.empty()
 );
 ```
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
index 3c8dfb7..4f4f957 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java
@@ -18,12 +18,12 @@
 package org.apache.distributedlog;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Ticker;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ScheduledFuture;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
index b21b210..a99e844 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogManager.java
@@ -22,11 +22,11 @@
 import static org.apache.distributedlog.namespace.NamespaceDriver.Role.WRITER;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import java.io.IOException;
 import java.net.URI;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -291,7 +291,7 @@
     // Create Read Handler
 
     synchronized BKLogReadHandler createReadHandler() {
-        Optional<String> subscriberId = Optional.absent();
+        Optional<String> subscriberId = Optional.empty();
         return createReadHandler(subscriberId, false);
     }
 
@@ -634,7 +634,7 @@
 
     @Override
     public LogReader openLogReader(DLSN fromDLSN) throws IOException {
-        return getInputStreamInternal(fromDLSN, Optional.<Long>absent());
+        return getInputStreamInternal(fromDLSN, Optional.<Long>empty());
     }
 
     @Override
@@ -704,7 +704,7 @@
 
     @Override
     public CompletableFuture<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) {
-        Optional<String> subscriberId = Optional.absent();
+        Optional<String> subscriberId = Optional.empty();
         AsyncLogReader reader = new BKAsyncLogReader(
                 this,
                 scheduler,
@@ -722,7 +722,7 @@
      */
     @Override
     public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(final DLSN fromDLSN) {
-        Optional<String> subscriberId = Optional.absent();
+        Optional<String> subscriberId = Optional.empty();
         return getAsyncLogReaderWithLock(Optional.of(fromDLSN), subscriberId);
     }
 
@@ -733,7 +733,7 @@
 
     @Override
     public CompletableFuture<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId) {
-        Optional<DLSN> fromDLSN = Optional.absent();
+        Optional<DLSN> fromDLSN = Optional.empty();
         return getAsyncLogReaderWithLock(fromDLSN, Optional.of(subscriberId));
     }
 
@@ -1068,7 +1068,7 @@
         CompletableFuture<Void> closeResult = Utils.closeSequence(null, true,
                 readHandlerToClose,
                 pendingReaders,
-                resourcesCloseable.or(AsyncCloseable.NULL));
+                resourcesCloseable.orElse(AsyncCloseable.NULL));
         FutureUtils.proxyTo(closeResult, closeFuture);
         return closeFuture;
     }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
index 02a0cb8..a55d0fc 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
@@ -154,7 +154,7 @@
             throws InvalidStreamNameException, LogNotFoundException, IOException {
         checkState();
         logName = validateAndNormalizeName(logName);
-        com.google.common.base.Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
+        Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
         if (!uri.isPresent()) {
             throw new LogNotFoundException("Log " + logName + " isn't found.");
         }
@@ -183,7 +183,7 @@
             throws InvalidStreamNameException, IOException {
         checkState();
         logName = validateAndNormalizeName(logName);
-        com.google.common.base.Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
+        Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
         if (!uri.isPresent()) {
             throw new LogNotFoundException("Log " + logName + " isn't found.");
         }
@@ -220,7 +220,7 @@
     public boolean logExists(String logName)
         throws IOException, IllegalArgumentException {
         checkState();
-        com.google.common.base.Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
+        Optional<URI> uri = Utils.ioResult(driver.getLogMetadataStore().getLogLocation(logName));
         if (uri.isPresent()) {
             try {
                 Utils.ioResult(driver.getLogStreamMetadataStore(WRITER)
@@ -309,8 +309,7 @@
                 failureInjector,                    /* Failure Injector */
                 statsLogger,                        /* Stats Logger */
                 perLogStatsLogger,                  /* Per Log Stats Logger */
-                com.google.common.base.Optional.absent()
-                                                    /* shared resources, we don't need to close any resources in dlm */
+                Optional.empty()                    /* shared resources, we don't need to close any resources in dlm */
         );
     }
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
index c865c28..d7606e2 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKLogReadHandler.java
@@ -18,11 +18,11 @@
 package org.apache.distributedlog;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArraySet;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
index 892ab06..37bb8c1 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
@@ -18,11 +18,11 @@
 package org.apache.distributedlog;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.base.Ticker;
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -70,7 +70,7 @@
                     StatsLogger statsLogger) throws IOException {
         this.bkdlm = bkdlm;
         this.readHandler = bkdlm.createReadHandler(
-                Optional.<String>absent(),
+                Optional.<String>empty(),
                 this,
                 true);
         this.maxReadAheadWaitTime = conf.getReadAheadWaitTime();
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClient.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
index a03530d..bffbde3 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
@@ -19,11 +19,11 @@
 
 import static com.google.common.base.Charsets.UTF_8;
 
-import com.google.common.base.Optional;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.client.AsyncCallback;
 import org.apache.bookkeeper.client.BKException;
@@ -117,7 +117,7 @@
                 .setStatsLogger(statsLogger)
                 .dnsResolver(dnsResolver)
                 .requestTimer(requestTimer)
-                .featureProvider(featureProvider.orNull())
+                .featureProvider(featureProvider.orElse(null))
                 .build();
         } catch (BKException bke) {
             throw new IOException(bke);
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java
index b16034c..c2152a9 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java
@@ -20,9 +20,9 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.base.Optional;
 import io.netty.channel.EventLoopGroup;
 import io.netty.util.HashedWheelTimer;
+import java.util.Optional;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -59,7 +59,7 @@
     // request timer
     private HashedWheelTimer requestTimer = null;
     // feature provider
-    private Optional<FeatureProvider> featureProvider = Optional.absent();
+    private Optional<FeatureProvider> featureProvider = Optional.empty();
 
     // Cached BookKeeper Client
     private BookKeeperClient cachedClient = null;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
index 7a71ba8..3a0138b 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
@@ -19,11 +19,11 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.bookkeeper.common.util.ReflectionUtils;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -539,22 +539,8 @@
      * Load whitelisted stream configuration from another configuration object.
      *
      * @param streamConfiguration stream configuration overrides
-     * @Deprecated since 0.5.0, in favor of using {@link #loadStreamConf(java.util.Optional)}
      */
     public void loadStreamConf(Optional<DistributedLogConfiguration> streamConfiguration) {
-        if (streamConfiguration.isPresent()) {
-            loadStreamConf(java.util.Optional.of(streamConfiguration.get()));
-        } else {
-            loadStreamConf(java.util.Optional.empty());
-        }
-    }
-
-    /**
-     * Load whitelisted stream configuration from another configuration object.
-     *
-     * @param streamConfiguration stream configuration overrides
-     */
-    public void loadStreamConf(java.util.Optional<DistributedLogConfiguration> streamConfiguration) {
         if (!streamConfiguration.isPresent()) {
             return;
         }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
index ec35a16..196caeb 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java
@@ -17,13 +17,13 @@
  */
 package org.apache.distributedlog;
 
-import com.google.common.base.Optional;
 import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.conf.ServerConfiguration;
@@ -78,7 +78,7 @@
         private int zkPort = DEFAULT_ZK_PORT;
         private int initialBookiePort = DEFAULT_BOOKIE_INITIAL_PORT;
         private boolean shouldStartZK = true;
-        private Optional<ServerConfiguration> serverConf = Optional.absent();
+        private Optional<ServerConfiguration> serverConf = Optional.empty();
 
         public Builder numBookies(int numBookies) {
             this.numBookies = numBookies;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadUtils.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadUtils.java
index 3233473..db1174f 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadUtils.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadUtils.java
@@ -17,10 +17,10 @@
  */
 package org.apache.distributedlog;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -497,7 +497,7 @@
             if (segment.getLastTxId() < transactionId) {
                 // all log records whose transaction id is less than provided transactionId
                 // then return none
-                Optional<LogRecordWithDLSN> noneRecord = Optional.absent();
+                Optional<LogRecordWithDLSN> noneRecord = Optional.empty();
                 return FutureUtils.value(noneRecord);
             }
         }
@@ -513,7 +513,7 @@
                     if (lastEntryId < 0) {
                         // it means that the log segment is created but not written yet or an empty log segment.
                         //it is equivalent to 'all log records whose transaction id is less than provided transactionId'
-                        Optional<LogRecordWithDLSN> nonRecord = Optional.absent();
+                        Optional<LogRecordWithDLSN> nonRecord = Optional.empty();
                         promise.complete(nonRecord);
                         return;
                     }
@@ -550,7 +550,7 @@
                             reader,
                             Lists.newArrayList(0L, lastEntryId),
                             nWays,
-                            Optional.<LogRecordWithDLSN>absent(),
+                            Optional.<LogRecordWithDLSN>empty(),
                             promise);
                 }
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java
index 1bfcd9c..2512c3b 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java
@@ -19,7 +19,6 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.net.MalformedURLException;
@@ -27,6 +26,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.configuration.ConfigurationException;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
index f316986..afb3c33 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
@@ -21,7 +21,6 @@
 import static org.apache.distributedlog.util.DLUtils.validateAndNormalizeName;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.epoll.EpollEventLoopGroup;
@@ -32,6 +31,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -293,7 +293,7 @@
                     bkdlConfig.getBkLedgersPath(),
                     eventLoopGroup,
                     requestTimer,
-                    Optional.<FeatureProvider>absent(),
+                    Optional.<FeatureProvider>empty(),
                     statsLogger);
         }
         this.readerBKC = this.sharedReaderBKCBuilder.build();
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
index a37484e..15277e2 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
@@ -19,11 +19,11 @@
 
 import static org.apache.distributedlog.util.DLUtils.isReservedStreamName;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import java.net.URI;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
index 280d8ea..323b795 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
@@ -21,7 +21,6 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
-import com.google.common.base.Optional;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -31,6 +30,7 @@
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -641,7 +641,7 @@
             return postStateCheck(FutureUtils.value(Optional.of(location)));
         }
         if (!forceCheckLogExistence) {
-            Optional<URI> result = Optional.absent();
+            Optional<URI> result = Optional.empty();
             return FutureUtils.value(result);
         }
         return postStateCheck(fetchLogLocation(logName).thenApply((uriOptional) -> {
@@ -663,7 +663,7 @@
         FutureUtils.collect(fetchFutures).whenComplete(new FutureEventListener<List<Optional<URI>>>() {
             @Override
             public void onSuccess(List<Optional<URI>> fetchResults) {
-                Optional<URI> result = Optional.absent();
+                Optional<URI> result = Optional.empty();
                 for (Optional<URI> fetchResult : fetchResults) {
                     if (result.isPresent()) {
                         if (fetchResult.isPresent()) {
@@ -701,7 +701,7 @@
                     if (Code.OK.intValue() == rc) {
                         fetchPromise.complete(Optional.of(uri));
                     } else if (Code.NONODE.intValue() == rc) {
-                        fetchPromise.complete(Optional.<URI>absent());
+                        fetchPromise.complete(Optional.<URI>empty());
                     } else {
                         fetchPromise.completeExceptionally(KeeperException.create(Code.get(rc)));
                     }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index 869a968..45368ce 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -31,13 +31,13 @@
 import static org.apache.distributedlog.metadata.LogMetadata.VERSION_PATH;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/metadata/LogMetadataForReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/metadata/LogMetadataForReader.java
index c07d5b4..0053ad3 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/metadata/LogMetadataForReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/metadata/LogMetadataForReader.java
@@ -17,8 +17,8 @@
  */
 package org.apache.distributedlog.metadata;
 
-import com.google.common.base.Optional;
 import java.net.URI;
+import java.util.Optional;
 
 /**
  * Log Metadata for Reader.
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java
index 62b75a4..c09ceeb 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/metadata/LogMetadataStore.java
@@ -18,9 +18,9 @@
 package org.apache.distributedlog.metadata;
 
 import com.google.common.annotations.Beta;
-import com.google.common.base.Optional;
 import java.net.URI;
 import java.util.Iterator;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.callback.NamespaceListener;
 
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
index 712619a..3b184d7 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
@@ -18,9 +18,9 @@
 package org.apache.distributedlog.metadata;
 
 import com.google.common.annotations.Beta;
-import com.google.common.base.Optional;
 import java.io.Closeable;
 import java.net.URI;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.distributedlog.common.util.PermitManager;
 import org.apache.distributedlog.lock.DistributedLock;
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/CommandLineUtils.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/CommandLineUtils.java
index a999f71..d0af277 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/CommandLineUtils.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/CommandLineUtils.java
@@ -17,7 +17,7 @@
  */
 package org.apache.distributedlog.util;
 
-import com.google.common.base.Optional;
+import java.util.Optional;
 import org.apache.commons.cli.CommandLine;
 
 /**
@@ -29,7 +29,7 @@
         if (cmdline.hasOption(arg)) {
             return Optional.of(cmdline.getOptionValue(arg));
         } else {
-            return Optional.absent();
+            return Optional.empty();
         }
     }
 
@@ -37,7 +37,7 @@
         if (cmdline.hasOption(arg)) {
             return Optional.of(true);
         } else {
-            return Optional.absent();
+            return Optional.empty();
         }
     }
 
@@ -47,7 +47,7 @@
             if (cmdline.hasOption(arg)) {
                 return Optional.of(Integer.parseInt(cmdline.getOptionValue(arg)));
             } else {
-                return Optional.absent();
+                return Optional.empty();
             }
         } catch (NumberFormatException ex) {
             throw new IllegalArgumentException(arg + " is not a number");
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/Utils.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/Utils.java
index 8c4e18b..e7cd70b 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/Utils.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/util/Utils.java
@@ -18,12 +18,12 @@
 package org.apache.distributedlog.util;
 
 import com.google.common.base.Objects;
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -216,7 +216,7 @@
         final byte[] data,
         final List<ACL> acl,
         final CreateMode createMode) {
-        Optional<String> parentPathShouldNotCreate = Optional.absent();
+        Optional<String> parentPathShouldNotCreate = Optional.empty();
         return zkAsyncCreateFullPathOptimistic(
                 zkc,
                 pathToCreate,
@@ -282,7 +282,7 @@
                         return;
                     }
 
-                    Optional<String> parentPathShouldNotCreate = Optional.absent();
+                    Optional<String> parentPathShouldNotCreate = Optional.empty();
                     zkAsyncCreateFullPathOptimisticRecursive(zkc, pathToCreate, parentPathShouldNotCreate,
                             data, acl, createMode, new AsyncCallback.StringCallback() {
                         @Override
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java
index a8b4f93..da4dabd 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java
@@ -22,9 +22,9 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import com.google.common.base.Optional;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
index d069e72..cbba9f1 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
@@ -19,13 +19,13 @@
 
 import static org.junit.Assert.assertTrue;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Ticker;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.client.LedgerHandle;
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogConfiguration.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogConfiguration.java
index 1b864cb..03a7a20 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogConfiguration.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestDistributedLogConfiguration.java
@@ -21,8 +21,8 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import com.google.common.base.Optional;
 import java.util.List;
+import java.util.Optional;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.commons.configuration.StrictConfigurationComparator;
 import org.apache.distributedlog.net.DNSResolverForRacks;
@@ -88,7 +88,7 @@
     public void loadStreamConfNullOverrides() throws Exception {
         DistributedLogConfiguration conf = new DistributedLogConfiguration();
         DistributedLogConfiguration confClone = new DistributedLogConfiguration();
-        Optional<DistributedLogConfiguration> streamConfiguration = Optional.absent();
+        Optional<DistributedLogConfiguration> streamConfiguration = Optional.empty();
         conf.loadStreamConf(streamConfiguration);
 
         StrictConfigurationComparator comp = new StrictConfigurationComparator();
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
index 437bb6f..7cc3453 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
@@ -24,10 +24,10 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Ticker;
 import com.google.common.collect.Lists;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
@@ -116,7 +116,7 @@
                                                    DistributedLogConfiguration conf)
             throws Exception {
         BKLogReadHandler readHandler = dlm.createReadHandler(
-                Optional.<String>absent(),
+                Optional.<String>empty(),
                 true);
         LogSegmentEntryStore entryStore = new BKLogSegmentEntryStore(
                 conf,
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadUtils.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadUtils.java
index 033065b..9adf6f6 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadUtils.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadUtils.java
@@ -21,9 +21,9 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java
index cba20c2..02eea80 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java
@@ -20,8 +20,8 @@
 import static org.junit.Assert.assertEquals;
 
 import com.google.common.base.Objects;
-import com.google.common.base.Optional;
 import java.io.File;
+import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
index 2ba24db..12ad923 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
@@ -20,9 +20,9 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
 import java.net.URI;
+import java.util.Optional;
 import java.util.Set;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.distributedlog.DistributedLogConfiguration;
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
index f32f4ce..3ce838d 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/impl/federated/TestFederatedZKLogMetadataStore.java
@@ -22,13 +22,13 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.net.URI;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/util/TestUtils.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/util/TestUtils.java
index de5da8c..8abb61f 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/util/TestUtils.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/util/TestUtils.java
@@ -23,7 +23,7 @@
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
-import com.google.common.base.Optional;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -64,7 +64,7 @@
     @Test(timeout = 60000)
     public void testZkAsyncCreateFulPathOptimisticRecursive() throws Exception {
         String path1 = "/a/b/c/d";
-        Optional<String> parentPathShouldNotCreate = Optional.absent();
+        Optional<String> parentPathShouldNotCreate = Optional.empty();
         final CountDownLatch doneLatch1 = new CountDownLatch(1);
         Utils.zkAsyncCreateFullPathOptimisticRecursive(zkc, path1, parentPathShouldNotCreate,
                 new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,