PIP-135: Added Etcd MetadataStore implementation (#13225)

* Added Etcd MetadataStore implementation

* Fixed license file

* Addressed comments

* Fixed sql plugin license

* Fixed parent notification of children changes

* Fixed sequenetial keys with no parent

* Fixed checkstyle

* Fixed presto license

* Addressed comments

* Fixed etcd test dependency

* Updated the prefix for get children

* Fixed triggering futures from executor

* ExceptionallyAsync is only available in java12
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 06c8878..4ee7e26 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -476,6 +476,7 @@
     - io.grpc-grpc-services-1.42.1.jar
     - io.grpc-grpc-xds-1.42.1.jar
     - io.grpc-grpc-rls-1.42.1.jar
+    - com.google.auto.service-auto-service-annotations-1.0.jar
   * Perfmark
     - io.perfmark-perfmark-api-0.19.0.jar
   * OpenCensus
@@ -484,6 +485,7 @@
     - io.opencensus-opencensus-proto-0.2.0.jar
   * Jodah
     - net.jodah-typetools-0.5.0.jar
+    - net.jodah-failsafe-2.4.4.jar
   * Apache Avro
     - org.apache.avro-avro-1.10.2.jar
     - org.apache.avro-avro-protobuf-1.10.2.jar
@@ -526,6 +528,9 @@
     - com.google.http-client-google-http-client-1.38.0.jar
     - com.google.auto.value-auto-value-annotations-1.7.4.jar
     - com.google.re2j-re2j-1.5.jar
+  * Jetcd
+    - io.etcd-jetcd-common-0.5.11.jar
+    - io.etcd-jetcd-core-0.5.11.jar
 
 BSD 3-clause "New" or "Revised" License
  * Google auth library
diff --git a/pom.xml b/pom.xml
index b8f40cf..e34c17b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -202,6 +202,7 @@
     <cron-utils.version>9.1.6</cron-utils.version>
     <spring-context.version>5.3.15</spring-context.version>
     <apache-http-client.version>4.5.13</apache-http-client.version>
+    <jetcd.version>0.5.11</jetcd.version>
     <snakeyaml.version>1.30</snakeyaml.version>
 
     <!-- test dependencies -->
@@ -863,6 +864,18 @@
       </dependency>
 
       <dependency>
+        <groupId>io.etcd</groupId>
+        <artifactId>jetcd-core</artifactId>
+        <version>${jetcd.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>io.etcd</groupId>
+        <artifactId>jetcd-test</artifactId>
+        <version>${jetcd.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.apache.spark</groupId>
         <artifactId>spark-streaming_2.10</artifactId>
         <version>${spark-streaming_2.10.version}</version>
@@ -938,11 +951,47 @@
 
       <dependency>
         <groupId>io.grpc</groupId>
+        <artifactId>grpc-api</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>io.grpc</groupId>
         <artifactId>grpc-core</artifactId>
         <version>${grpc.version}</version>
       </dependency>
 
       <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-netty</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-protobuf</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-grpclb</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-alts</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>io.grpc</groupId>
+        <artifactId>grpc-netty-shaded</artifactId>
+        <version>${grpc.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>io.perfmark</groupId>
         <artifactId>perfmark-api</artifactId>
         <version>${perfmark.version}</version>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index a85f351..f4dbcaa 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -376,6 +376,11 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.etcd</groupId>
+      <artifactId>jetcd-test</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index f5b63e2..64a630c 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -69,6 +69,18 @@
     </dependency>
 
     <dependency>
+      <groupId>io.etcd</groupId>
+      <artifactId>jetcd-core</artifactId>
+    </dependency>
+
+
+    <dependency>
+      <groupId>io.etcd</groupId>
+      <artifactId>jetcd-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>com.github.ben-manes.caffeine</groupId>
       <artifactId>caffeine</artifactId>
     </dependency>
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
new file mode 100644
index 0000000..e1c26be
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.KV;
+import io.etcd.jetcd.KeyValue;
+import io.etcd.jetcd.Txn;
+import io.etcd.jetcd.kv.DeleteResponse;
+import io.etcd.jetcd.kv.GetResponse;
+import io.etcd.jetcd.kv.PutResponse;
+import io.etcd.jetcd.kv.TxnResponse;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.op.Cmp;
+import io.etcd.jetcd.op.CmpTarget;
+import io.etcd.jetcd.op.Op;
+import io.etcd.jetcd.options.DeleteOption;
+import io.etcd.jetcd.options.GetOption;
+import io.etcd.jetcd.options.PutOption;
+import io.etcd.jetcd.options.WatchOption;
+import io.etcd.jetcd.support.CloseableClient;
+import io.etcd.jetcd.watch.WatchEvent;
+import io.etcd.jetcd.watch.WatchResponse;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore;
+import org.apache.pulsar.metadata.impl.batching.MetadataOp;
+import org.apache.pulsar.metadata.impl.batching.OpDelete;
+import org.apache.pulsar.metadata.impl.batching.OpGet;
+import org.apache.pulsar.metadata.impl.batching.OpGetChildren;
+import org.apache.pulsar.metadata.impl.batching.OpPut;
+
+@Slf4j
+public class EtcdMetadataStore extends AbstractBatchedMetadataStore {
+
+    static final String ETCD_SCHEME_IDENTIFIER = "etcd:";
+
+    private final int leaseTTLSeconds;
+    private final Client client;
+    private final KV kv;
+    private volatile long leaseId;
+    private volatile CloseableClient leaseClient;
+    private final EtcdSessionWatcher sessionWatcher;
+
+    public EtcdMetadataStore(String metadataURL, MetadataStoreConfig conf, boolean enableSessionWatcher)
+            throws MetadataStoreException {
+        super(conf);
+
+        this.leaseTTLSeconds = conf.getSessionTimeoutMillis() / 1000;
+        String etcdUrl = metadataURL.replaceFirst(ETCD_SCHEME_IDENTIFIER, "");
+
+        try {
+            this.client = Client.builder().endpoints(etcdUrl).build();
+            this.kv = client.getKVClient();
+            this.client.getWatchClient().watch(ByteSequence.from("\0", StandardCharsets.UTF_8),
+                    WatchOption.newBuilder()
+                            .withPrefix(ByteSequence.from("/", StandardCharsets.UTF_8))
+                            .build(), this::handleWatchResponse);
+            if (enableSessionWatcher) {
+                this.sessionWatcher =
+                        new EtcdSessionWatcher(client, conf.getSessionTimeoutMillis(), this::receivedSessionEvent);
+
+                // Ensure the lease is created when we start
+                this.createLease(false).join();
+            } else {
+                sessionWatcher = null;
+            }
+        } catch (Exception e) {
+            throw new MetadataStoreException(e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+
+        if (sessionWatcher != null) {
+            sessionWatcher.close();
+        }
+
+        if (leaseClient != null) {
+            leaseClient.close();
+        }
+
+        if (leaseId != 0) {
+            client.getLeaseClient().revoke(leaseId);
+        }
+
+        kv.close();
+        client.close();
+    }
+
+    private static final GetOption EXISTS_GET_OPTION = GetOption.newBuilder().withCountOnly(true).build();
+    private static final GetOption SINGLE_GET_OPTION = GetOption.newBuilder().withLimit(1).build();
+
+    @Override
+    protected CompletableFuture<Boolean> existsFromStore(String path) {
+        return kv.get(ByteSequence.from(path, StandardCharsets.UTF_8), EXISTS_GET_OPTION)
+                .thenApplyAsync(gr -> gr.getCount() == 1, executor);
+    }
+
+    @Override
+    protected CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
+                                               EnumSet<CreateOption> options) {
+        if (!options.contains(CreateOption.Sequential)) {
+            return super.storePut(path, data, optExpectedVersion, options);
+        } else {
+            // First get the version from parent
+            String parent = parent(path);
+            if (parent == null) {
+                parent = "/";
+            }
+            return super.storePut(parent, new byte[0], Optional.empty(), EnumSet.noneOf(CreateOption.class))
+                    // Then create the unique key with the version added in the path
+                    .thenComposeAsync(
+                            stat -> super.storePut(path + stat.getVersion(), data, optExpectedVersion, options),
+                            executor);
+        }
+    }
+
+    @Override
+    protected void batchOperation(List<MetadataOp> ops) {
+        try {
+            Txn txn = kv.txn();
+
+            // First, set all the conditions
+            ops.forEach(op -> {
+                switch (op.getType()) {
+                    case PUT: {
+                        OpPut put = op.asPut();
+                        ByteSequence key = ByteSequence.from(put.getPath(), StandardCharsets.UTF_8);
+                        if (put.getOptExpectedVersion().isPresent()) {
+                            long expectedVersion = put.getOptExpectedVersion().get();
+                            if (expectedVersion == -1L) {
+                                // Check that key does not exist
+                                txn.If(new Cmp(key, Cmp.Op.EQUAL, CmpTarget.createRevision(0)));
+                            } else {
+                                txn.If(new Cmp(key, Cmp.Op.EQUAL, CmpTarget.version(expectedVersion + 1)));
+                            }
+                        }
+                        break;
+                    }
+                    case DELETE: {
+                        OpDelete del = op.asDelete();
+                        ByteSequence key = ByteSequence.from(del.getPath(), StandardCharsets.UTF_8);
+                        if (del.getOptExpectedVersion().isPresent()) {
+                            txn.If(new Cmp(key, Cmp.Op.EQUAL,
+                                    CmpTarget.version(del.getOptExpectedVersion().get() + 1)));
+                        }
+                        break;
+                    }
+                }
+            });
+
+            // Then the requests
+            ops.forEach(op -> {
+                switch (op.getType()) {
+                    case GET: {
+                        txn.Then(
+                                Op.get(ByteSequence.from(op.asGet().getPath(), StandardCharsets.UTF_8),
+                                        SINGLE_GET_OPTION));
+                        break;
+                    }
+                    case PUT: {
+                        OpPut put = op.asPut();
+                        ByteSequence key = ByteSequence.from(put.getPath(), StandardCharsets.UTF_8);
+                        if (!put.getFuture().isDone()) {
+                            PutOption.Builder b = PutOption.newBuilder()
+                                    .withPrevKV();
+
+                            if (put.isEphemeral()) {
+                                b.withLeaseId(leaseId);
+                            }
+
+                            txn.Then(Op.put(key, ByteSequence.from(put.getData()), b.build()));
+                        }
+                        break;
+                    }
+                    case DELETE: {
+                        OpDelete del = op.asDelete();
+                        ByteSequence key = ByteSequence.from(del.getPath(), StandardCharsets.UTF_8);
+                        txn.Then(Op.delete(key, DeleteOption.DEFAULT));
+                        break;
+                    }
+                    case GET_CHILDREN: {
+                        OpGetChildren opGetChildren = op.asGetChildren();
+                        String path = opGetChildren.getPath();
+
+                        ByteSequence prefix =
+                                ByteSequence.from(path.equals("/") ? path : path + "/", StandardCharsets.UTF_8);
+
+                        txn.Then(Op.get(prefix, GetOption.newBuilder()
+                                .withKeysOnly(true)
+                                .withSortField(GetOption.SortTarget.KEY)
+                                .withSortOrder(GetOption.SortOrder.ASCEND)
+                                .withPrefix(prefix)
+                                .build()));
+                        break;
+                    }
+                }
+            });
+
+            txn.commit().thenAccept(txnResponse -> {
+                handleBatchOperationResult(txnResponse, ops);
+            }).exceptionally(ex -> {
+                Throwable cause = ex.getCause();
+                if (cause instanceof ExecutionException || cause instanceof CompletionException) {
+                    cause = cause.getCause();
+                }
+                if (ops.size() > 1 && cause instanceof StatusRuntimeException) {
+                    Status.Code code = ((StatusRuntimeException) cause).getStatus().getCode();
+                    if (
+                            code == Status.Code.INVALID_ARGUMENT /* This could be caused by having repeated keys
+                                    in the batch, retry individually */
+                                    ||
+                                    code
+                                            == Status.Code.RESOURCE_EXHAUSTED /* We might have exceeded the max-frame
+                                             size for the response */
+                    ) {
+                        ops.forEach(o -> batchOperation(Collections.singletonList(o)));
+                    }
+                } else {
+                    log.warn("Failed to commit: {}", cause.getMessage());
+                    executor.execute(() -> {
+                        ops.forEach(o -> o.getFuture().completeExceptionally(ex));
+                    });
+                }
+                return null;
+            });
+        } catch (Throwable t) {
+            log.warn("Error in committing batch: {}", t.getMessage());
+        }
+    }
+
+    private void handleBatchOperationResult(TxnResponse txnResponse,
+                                            List<MetadataOp> ops) {
+        executor.execute(() -> {
+            if (!txnResponse.isSucceeded()) {
+                if (ops.size() > 1) {
+                    // Retry individually
+                    ops.forEach(o -> batchOperation(Collections.singletonList(o)));
+                } else {
+                    ops.get(0).getFuture()
+                            .completeExceptionally(new MetadataStoreException.BadVersionException("Bad version"));
+                }
+                return;
+            }
+
+            int getIdx = 0;
+            int deletedIdx = 0;
+            int putIdx = 0;
+            for (MetadataOp op : ops) {
+                switch (op.getType()) {
+                    case GET: {
+                        OpGet get = op.asGet();
+                        GetResponse gr = txnResponse.getGetResponses().get(getIdx++);
+                        if (gr.getCount() == 0) {
+                            get.getFuture().complete(Optional.empty());
+                        } else {
+                            KeyValue kv = gr.getKvs().get(0);
+                            boolean isEphemeral = kv.getLease() != 0;
+                            boolean createdBySelf = kv.getLease() == leaseId;
+                            get.getFuture().complete(Optional.of(
+                                            new GetResult(
+                                                    kv.getValue().getBytes(),
+                                                    new Stat(get.getPath(), kv.getVersion() - 1, 0, 0, isEphemeral,
+                                                            createdBySelf)
+                                            )
+                                    )
+                            );
+                        }
+                        break;
+                    }
+                    case PUT: {
+                        OpPut put = op.asPut();
+                        PutResponse pr = txnResponse.getPutResponses().get(putIdx++);
+                        KeyValue prevKv = pr.getPrevKv();
+                        if (prevKv == null) {
+                            put.getFuture().complete(new Stat(put.getPath(),
+                                    0, 0, 0, put.isEphemeral(), true));
+                        } else {
+                            put.getFuture().complete(new Stat(put.getPath(),
+                                    prevKv.getVersion(), 0, 0, put.isEphemeral(), true));
+                        }
+
+                        break;
+                    }
+                    case DELETE: {
+                        OpDelete del = op.asDelete();
+                        DeleteResponse dr = txnResponse.getDeleteResponses().get(deletedIdx++);
+                        if (dr.getDeleted() == 0) {
+                            del.getFuture().completeExceptionally(new MetadataStoreException.NotFoundException());
+                        } else {
+                            del.getFuture().complete(null);
+                        }
+                        break;
+                    }
+                    case GET_CHILDREN: {
+                        OpGetChildren getChildren = op.asGetChildren();
+                        GetResponse gr = txnResponse.getGetResponses().get(getIdx++);
+                        String basePath = getChildren.getPath() + "/";
+
+                        Set<String> children = gr.getKvs().stream()
+                                .map(kv -> kv.getKey().toString(StandardCharsets.UTF_8))
+                                .map(p -> p.replace(basePath, ""))
+                                // Only return first-level children
+                                .map(k -> k.split("/", 2)[0])
+                                .collect(Collectors.toCollection(TreeSet::new));
+
+                        getChildren.getFuture().complete(new ArrayList<>(children));
+                    }
+                }
+            }
+        });
+    }
+
+    private synchronized CompletableFuture<Void> createLease(boolean retryOnFailure) {
+        CompletableFuture<Void> future = client.getLeaseClient().grant(leaseTTLSeconds)
+                .thenAccept(lease -> {
+                    synchronized (this) {
+                        this.leaseId = lease.getID();
+
+                        if (leaseClient != null) {
+                            leaseClient.close();
+                        }
+                        this.leaseClient =
+                                this.client.getLeaseClient()
+                                        .keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
+                                            @Override
+                                            public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
+                                                if (log.isDebugEnabled()) {
+                                                    log.debug("On next: {}", leaseKeepAliveResponse);
+                                                }
+                                            }
+
+                                            @Override
+                                            public void onError(Throwable throwable) {
+                                                log.warn("Lease client error :", throwable);
+                                                receivedSessionEvent(SessionEvent.SessionLost);
+                                            }
+
+                                            @Override
+                                            public void onCompleted() {
+                                                log.info("Etcd lease has expired");
+                                                receivedSessionEvent(SessionEvent.SessionLost);
+                                            }
+                                        });
+                    }
+                });
+
+        if (retryOnFailure) {
+            future.exceptionally(ex -> {
+                log.warn("Failed to create Etcd lease. Retrying later", ex);
+                executor.schedule(() -> {
+                    createLease(true);
+                }, 1, TimeUnit.SECONDS);
+                return null;
+            });
+        }
+
+        return future;
+    }
+
+    private void handleWatchResponse(WatchResponse watchResponse) {
+        watchResponse.getEvents().forEach(we -> {
+            String path = we.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
+            if (we.getEventType() == WatchEvent.EventType.PUT) {
+                if (we.getKeyValue().getVersion() == 1) {
+                    receivedNotification(new Notification(NotificationType.Created, path));
+
+                    notifyParentChildrenChanged(path);
+                } else {
+                    receivedNotification(new Notification(NotificationType.Modified, path));
+                }
+            } else if (we.getEventType() == WatchEvent.EventType.DELETE) {
+                receivedNotification(new Notification(NotificationType.Deleted, path));
+                notifyParentChildrenChanged(path);
+            }
+        });
+    }
+
+    @Override
+    protected void receivedSessionEvent(SessionEvent event) {
+        if (event == SessionEvent.SessionReestablished) {
+            // Re-create the lease before notifying that we are reconnected
+            createLease(true)
+                    .thenRun(() -> {
+                        super.receivedSessionEvent(event);
+                    });
+
+        } else {
+            super.receivedSessionEvent(event);
+        }
+    }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdSessionWatcher.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdSessionWatcher.java
new file mode 100644
index 0000000..248e01e
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdSessionWatcher.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
+
+/**
+ * Monitor the ETCd session state every few seconds and send notifications.
+ */
+@Slf4j
+public class EtcdSessionWatcher implements AutoCloseable {
+    private final Client client;
+
+    private SessionEvent currentStatus;
+    private final Consumer<SessionEvent> sessionListener;
+
+    // Maximum time to wait for Etcd lease to be re-connected to quorum (set to 5/6 of SessionTimeout)
+    private final long monitorTimeoutMillis;
+
+    // Interval at which we check the state of the Etcd connection (set to 1/15 of SessionTimeout)
+    private final long tickTimeMillis;
+
+    private final ScheduledExecutorService scheduler;
+    private final ScheduledFuture<?> task;
+
+    private long disconnectedAt = 0;
+
+    public EtcdSessionWatcher(Client client, long sessionTimeoutMillis,
+                              Consumer<SessionEvent> sessionListener) {
+        this.client = client;
+        this.monitorTimeoutMillis = sessionTimeoutMillis * 5 / 6;
+        this.tickTimeMillis = sessionTimeoutMillis / 15;
+        this.sessionListener = sessionListener;
+
+        this.scheduler = Executors
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("metadata-store-etcd-session-watcher"));
+        this.task =
+                scheduler.scheduleAtFixedRate(catchingAndLoggingThrowables(this::checkConnectionStatus), tickTimeMillis,
+                        tickTimeMillis,
+                        TimeUnit.MILLISECONDS);
+        this.currentStatus = SessionEvent.SessionReestablished;
+    }
+
+    @Override
+    public void close() throws Exception {
+        task.cancel(true);
+        scheduler.shutdownNow();
+        scheduler.awaitTermination(10, TimeUnit.SECONDS);
+    }
+
+    // task that runs every TICK_TIME to check Etcd connection
+    private synchronized void checkConnectionStatus() {
+        try {
+            CompletableFuture<SessionEvent> future = new CompletableFuture<>();
+            client.getKVClient().get(ByteSequence.from("/".getBytes(StandardCharsets.UTF_8)))
+                    .thenRun(() -> {
+                        future.complete(SessionEvent.Reconnected);
+                    }).exceptionally(ex -> {
+                        future.complete(SessionEvent.ConnectionLost);
+                        return null;
+                    });
+
+            SessionEvent ectdClientState;
+            try {
+                ectdClientState = future.get(tickTimeMillis, TimeUnit.MILLISECONDS);
+            } catch (TimeoutException e) {
+                // Consider etcd disconnection if etcd operation takes more than TICK_TIME
+                ectdClientState = SessionEvent.ConnectionLost;
+            }
+
+            checkState(ectdClientState);
+        } catch (RejectedExecutionException | InterruptedException e) {
+            task.cancel(true);
+        } catch (Throwable t) {
+            log.warn("Error while checking Etcd connection status", t);
+        }
+    }
+
+    synchronized void setSessionInvalid() {
+        currentStatus = SessionEvent.SessionLost;
+    }
+
+    private void checkState(SessionEvent etcdlientState) {
+        switch (etcdlientState) {
+            case SessionLost:
+                if (currentStatus != SessionEvent.SessionLost) {
+                    log.error("Etcd lease has expired");
+                    currentStatus = SessionEvent.SessionLost;
+                    sessionListener.accept(currentStatus);
+                }
+                break;
+
+            case ConnectionLost:
+                if (disconnectedAt == 0) {
+                    // this is the first disconnect event, we should monitor the time out from now, so we record the
+                    // time of disconnect
+                    disconnectedAt = System.nanoTime();
+                }
+
+                long timeRemainingMillis = monitorTimeoutMillis
+                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - disconnectedAt);
+                if (timeRemainingMillis <= 0 && currentStatus != SessionEvent.SessionLost) {
+                    log.error("Etcd lease keep-alive timeout. Notifying session is lost.");
+                    currentStatus = SessionEvent.SessionLost;
+                    sessionListener.accept(currentStatus);
+                } else if (currentStatus != SessionEvent.SessionLost) {
+                    log.warn("Etcd client is disconnected. Waiting to reconnect, time remaining = {} seconds",
+                            timeRemainingMillis / 1000.0);
+                    if (currentStatus == SessionEvent.SessionReestablished) {
+                        currentStatus = SessionEvent.ConnectionLost;
+                        sessionListener.accept(currentStatus);
+                    }
+                }
+                break;
+
+            default:
+                if (currentStatus != SessionEvent.SessionReestablished) {
+                    // since it reconnected to Etcd, we reset the disconnected time
+                    log.info("Etcd client reconnection with server quorum. Current status: {}", currentStatus);
+                    disconnectedAt = 0;
+
+                    sessionListener.accept(SessionEvent.Reconnected);
+                    if (currentStatus == SessionEvent.SessionLost) {
+                        sessionListener.accept(SessionEvent.SessionReestablished);
+                    }
+                    currentStatus = SessionEvent.SessionReestablished;
+                }
+                break;
+        }
+    }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
index 199698e..57c1ff6 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
@@ -48,9 +48,10 @@
 
         if (metadataURL.startsWith("memory://")) {
             return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig);
-        }
-        if (metadataURL.startsWith("rocksdb://")) {
+        } else if (metadataURL.startsWith("rocksdb://")) {
             return RocksdbMetadataStore.get(metadataURL, metadataStoreConfig);
+        } else if (metadataURL.startsWith(EtcdMetadataStore.ETCD_SCHEME_IDENTIFIER)) {
+            return new EtcdMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher);
         } else {
             return new ZKMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher);
         }
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 5d78282..616cac2 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -136,7 +136,7 @@
     }
 
     @Override
-    protected final CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
+    protected CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
                                                EnumSet<CreateOption> options) {
         OpPut op = new OpPut(path, data, optExpectedVersion, options);
         enqueue(writeOps, op);
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index fcb6b77..4582750 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -19,10 +19,13 @@
 package org.apache.pulsar.metadata;
 
 import static org.testng.Assert.assertTrue;
+import io.etcd.jetcd.launcher.EtcdCluster;
+import io.etcd.jetcd.launcher.EtcdClusterFactory;
 import java.io.File;
 import java.util.UUID;
 import java.util.concurrent.CompletionException;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.pulsar.tests.TestRetrySupport;
 import org.assertj.core.util.Files;
 import org.testng.annotations.AfterClass;
@@ -31,12 +34,16 @@
 
 public abstract class BaseMetadataStoreTest extends TestRetrySupport {
     protected TestZKServer zks;
+    protected EtcdCluster etcdCluster;
 
     @BeforeClass(alwaysRun = true)
     @Override
     public final void setup() throws Exception {
         incrementSetupNumber();
         zks = new TestZKServer();
+
+        etcdCluster = EtcdClusterFactory.buildCluster("test", 1, false);
+        etcdCluster.start();
     }
 
     @AfterClass(alwaysRun = true)
@@ -47,6 +54,10 @@
             zks.close();
             zks = null;
         }
+
+        if (etcdCluster != null) {
+            etcdCluster.close();
+        }
     }
 
     private static String createTempFolder() {
@@ -62,10 +73,12 @@
         // The Zookeeper test server gets restarted by TestRetrySupport before the retry.
         // The new connection string won't be available to the test method unless a
         // Supplier<String> lambda is used for providing the value.
-        return new Object[][] {
+        return new Object[][]{
                 { "ZooKeeper", stringSupplier(() -> zks.getConnectionString()) },
                 { "Memory", stringSupplier(() -> "memory://" + UUID.randomUUID()) },
                 { "RocksDB", stringSupplier(() -> "rocksdb://" + createTempFolder()) },
+                {"Etcd", stringSupplier(() -> "etcd:" + etcdCluster.getClientEndpoints().stream().map(x -> x.toString())
+                        .collect(Collectors.joining(",")))},
         };
     }
 
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
index d9bc00d..c15a384 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
@@ -58,13 +58,15 @@
         @Cleanup
         LockManager<String> lockManager = coordinationService.getLockManager(String.class);
 
-        assertEquals(lockManager.listLocks("/my/path").join(), Collections.emptyList());
-        assertEquals(lockManager.readLock("/my/path/1").join(), Optional.empty());
+        String key = newKey();
 
-        ResourceLock<String> lock1 = lockManager.acquireLock("/my/path/1", "lock-1").join();
-        assertEquals(lockManager.listLocks("/my/path").join(), Collections.singletonList("1"));
-        assertEquals(lockManager.readLock("/my/path/1").join(), Optional.of("lock-1"));
-        assertEquals(lock1.getPath(), "/my/path/1");
+        assertEquals(lockManager.listLocks(key).join(), Collections.emptyList());
+        assertEquals(lockManager.readLock(key + "/1").join(), Optional.empty());
+
+        ResourceLock<String> lock1 = lockManager.acquireLock(key + "/1", "lock-1").join();
+        assertEquals(lockManager.listLocks(key).join(), Collections.singletonList("1"));
+        assertEquals(lockManager.readLock(key + "/1").join(), Optional.of("lock-1"));
+        assertEquals(lock1.getPath(), key + "/1");
         assertEquals(lock1.getValue(), "lock-1");
 
         CountDownLatch latchLock1 = new CountDownLatch(1);
@@ -73,14 +75,14 @@
         });
         assertEquals(latchLock1.getCount(), 1);
 
-        assertEquals(lockManager.listLocks("/my/path").join(), Collections.singletonList("1"));
-        assertEquals(lockManager.readLock("/my/path/1").join(), Optional.of("lock-1"));
+        assertEquals(lockManager.listLocks(key).join(), Collections.singletonList("1"));
+        assertEquals(lockManager.readLock(key + "/1").join(), Optional.of("lock-1"));
 
         assertEquals(latchLock1.getCount(), 1);
 
         lock1.release().join();
-        assertEquals(lockManager.listLocks("/my/path").join(), Collections.emptyList());
-        assertEquals(lockManager.readLock("/my/path/1").join(), Optional.empty());
+        assertEquals(lockManager.listLocks(key).join(), Collections.emptyList());
+        assertEquals(lockManager.readLock(key + "/1").join(), Optional.empty());
 
         // The future should have been triggered before the release is complete
         latchLock1.await(0, TimeUnit.SECONDS);
@@ -88,10 +90,10 @@
         // Double release shoud be a no-op
         lock1.release().join();
 
-        ResourceLock<String> lock2 = lockManager.acquireLock("/my/path/1", "lock-1").join();
-        assertEquals(lockManager.listLocks("/my/path").join(), Collections.singletonList("1"));
-        assertEquals(lockManager.readLock("/my/path/1").join(), Optional.of("lock-1"));
-        assertEquals(lock2.getPath(), "/my/path/1");
+        ResourceLock<String> lock2 = lockManager.acquireLock(key + "/1", "lock-1").join();
+        assertEquals(lockManager.listLocks(key).join(), Collections.singletonList("1"));
+        assertEquals(lockManager.readLock(key + "/1").join(), Optional.of("lock-1"));
+        assertEquals(lock2.getPath(), key + "/1");
         assertEquals(lock2.getValue(), "lock-1");
     }
 
@@ -106,23 +108,24 @@
 
         LockManager<String> lockManager = coordinationService.getLockManager(String.class);
 
-        assertEquals(lockManager.listLocks("/my/path").join(), Collections.emptyList());
-        assertEquals(lockManager.readLock("/my/path/1").join(), Optional.empty());
+        String key = newKey();
+        assertEquals(lockManager.listLocks(key).join(), Collections.emptyList());
+        assertEquals(lockManager.readLock(key + "/1").join(), Optional.empty());
 
-        lockManager.acquireLock("/my/path/1", "lock-1").join();
-        assertEquals(lockManager.listLocks("/my/path").join(), Collections.singletonList("1"));
-        assertEquals(lockManager.readLock("/my/path/1").join(), Optional.of("lock-1"));
+        lockManager.acquireLock(key + "/1", "lock-1").join();
+        assertEquals(lockManager.listLocks(key).join(), Collections.singletonList("1"));
+        assertEquals(lockManager.readLock(key + "/1").join(), Optional.of("lock-1"));
 
-        lockManager.acquireLock("/my/path/2", "lock-2").join();
-        assertEquals(lockManager.listLocks("/my/path").join(), new ArrayList<>(Arrays.asList("1", "2")));
-        assertEquals(lockManager.readLock("/my/path/2").join(), Optional.of("lock-2"));
+        lockManager.acquireLock(key + "/2", "lock-2").join();
+        assertEquals(lockManager.listLocks(key + "").join(), new ArrayList<>(Arrays.asList("1", "2")));
+        assertEquals(lockManager.readLock(key + "/2").join(), Optional.of("lock-2"));
 
         lockManager.close();
 
         lockManager = coordinationService.getLockManager(String.class);
-        assertEquals(lockManager.listLocks("/my/path").join(), Collections.emptyList());
-        assertEquals(lockManager.readLock("/my/path/1").join(), Optional.empty());
-        assertEquals(lockManager.readLock("/my/path/2").join(), Optional.empty());
+        assertEquals(lockManager.listLocks(key).join(), Collections.emptyList());
+        assertEquals(lockManager.readLock(key + "/1").join(), Optional.empty());
+        assertEquals(lockManager.readLock(key + "/2").join(), Optional.empty());
     }
 
     @Test(dataProvider = "impl")
@@ -139,13 +142,14 @@
         @Cleanup
         LockManager<String> lockManager = coordinationService.getLockManager(String.class);
 
-        ResourceLock<String> lock = lockManager.acquireLock("/my/path/1", "lock-1").join();
+        String key = newKey();
+        ResourceLock<String> lock = lockManager.acquireLock(key + "/1", "lock-1").join();
         assertEquals(lock.getValue(), "lock-1");
-        assertEquals(cache.get("/my/path/1").join().get(), "lock-1");
+        assertEquals(cache.get(key + "/1").join().get(), "lock-1");
 
         lock.updateValue("value-2").join();
         assertEquals(lock.getValue(), "value-2");
-        assertEquals(cache.get("/my/path/1").join().get(), "value-2");
+        assertEquals(cache.get(key + "/1").join().get(), "value-2");
     }
 
     @Test(dataProvider = "impl")
@@ -162,17 +166,18 @@
         @Cleanup
         LockManager<String> lockManager = coordinationService.getLockManager(String.class);
 
-        ResourceLock<String> lock = lockManager.acquireLock("/my/path/1", "lock-1").join();
+        String key = newKey();
+        ResourceLock<String> lock = lockManager.acquireLock(key + "/1", "lock-1").join();
         assertEquals(lock.getValue(), "lock-1");
-        assertEquals(cache.get("/my/path/1").join().get(), "lock-1");
+        assertEquals(cache.get(key + "/1").join().get(), "lock-1");
 
-        store.put("/my/path/1",
+        store.put(key + "/1",
                 ObjectMapperFactory.getThreadLocal().writeValueAsBytes("value-2"),
                 Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
 
         lock.updateValue("value-2").join();
         assertEquals(lock.getValue(), "value-2");
-        assertEquals(cache.get("/my/path/1").join().get(), "value-2");
+        assertEquals(cache.get(key + "/1").join().get(), "value-2");
     }
 
     @Test(dataProvider = "impl")
@@ -189,15 +194,16 @@
         @Cleanup
         LockManager<String> lockManager = coordinationService.getLockManager(String.class);
 
-        ResourceLock<String> lock = lockManager.acquireLock("/my/path/1", "lock-1").join();
+        String key = newKey();
+        ResourceLock<String> lock = lockManager.acquireLock(key + "/1", "lock-1").join();
         assertEquals(lock.getValue(), "lock-1");
-        assertEquals(cache.get("/my/path/1").join().get(), "lock-1");
+        assertEquals(cache.get(key + "/1").join().get(), "lock-1");
 
-        store.delete("/my/path/1", Optional.empty()).join();
+        store.delete(key + "/1", Optional.empty()).join();
 
         lock.updateValue("value-2").join();
         assertEquals(lock.getValue(), "value-2");
-        assertEquals(cache.get("/my/path/1").join().get(), "value-2");
+        assertEquals(cache.get(key + "/1").join().get(), "value-2");
     }
 
     @Test(dataProvider = "impl")
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java
index 7c8160c..5766aa4 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java
@@ -36,7 +36,7 @@
 @Slf4j
 public class MetadataBenchmark extends MetadataStoreTest {
 
-    @Test(dataProvider = "impl")
+    @Test(dataProvider = "impl", enabled = false)
     public void testGet(String provider, Supplier<String> urlSupplier) throws Exception {
         @Cleanup
         MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
@@ -71,7 +71,7 @@
         log.info("[{}] Get Throughput: {} Kops/s", provider, throughput / 1_000);
     }
 
-    @Test(dataProvider = "impl")
+    @Test(dataProvider = "impl", enabled = false)
     public void testGetChildren(String provider, Supplier<String> urlSupplier) throws Exception {
         @Cleanup
         MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
@@ -105,13 +105,13 @@
         log.info("[{}] Get Children Throughput: {} Kops/s", provider, throughput / 1_000);
     }
 
-    @Test(dataProvider = "impl")
+    @Test(dataProvider = "impl", enabled = false)
     public void testPut(String provider, Supplier<String> urlSupplier) throws Exception {
         @Cleanup
         MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
 
-        final int N_KEYS = 128;
-        final int N_PUTS = 1_000_000;
+        final int N_KEYS = 10_000;
+        final int N_PUTS = 100_000;
 
         String key = newKey();
 
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
index cd31c30..8f159e6 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
@@ -30,6 +30,7 @@
 import java.util.concurrent.CompletionException;
 import java.util.function.Supplier;
 import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -42,9 +43,30 @@
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.testng.annotations.Test;
 
+@Slf4j
 public class MetadataStoreBatchingTest extends BaseMetadataStoreTest {
 
     @Test(dataProvider = "impl")
+    public void testBatchWrite(String provider, Supplier<String> urlSupplier) throws Exception {
+        @Cleanup
+        MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder()
+                .batchingEnabled(true)
+                .batchingMaxDelayMillis(1_000)
+                .build());
+
+        String key1 = newKey();
+        CompletableFuture<Stat> f1 = store.put(key1, new byte[0], Optional.empty());
+
+        String key2 = newKey();
+        CompletableFuture<Stat> f2 = store.put(key2, new byte[0], Optional.empty());
+
+        Stat s1 = f1.join();
+        Stat s2 = f2.join();
+        log.info("s1: {}", s1);
+        log.info("s2: {}", s2);
+    }
+
+    @Test(dataProvider = "impl")
     public void testBatching(String provider, Supplier<String> urlSupplier) throws Exception {
         @Cleanup
         MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder()
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
index d79bea5..14cd594 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
@@ -36,7 +36,7 @@
 
     @Test(dataProvider = "impl")
     public void sequentialKeys(String provider, Supplier<String> urlSupplier) throws Exception {
-        final String basePath = "/my/path";
+        final String basePath = newKey();
 
         @Cleanup
         MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 672322b..856c789 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -78,7 +78,8 @@
             store.delete("/non-existing-key", Optional.of(1L)).join();
             fail("Should have failed");
         } catch (CompletionException e) {
-            assertException(e, NotFoundException.class);
+            assertTrue(NotFoundException.class.isInstance(e.getCause()) || BadVersionException.class.isInstance(
+                    e.getCause()));
         }
     }
 
@@ -388,18 +389,16 @@
 
     @Test(dataProvider = "impl")
     public void testPersistent(String provider, Supplier<String> urlSupplier) throws Exception {
-        if (provider.equals("Memory")) {
-            // Memory is not persistent.
-            return;
-        }
         String metadataUrl = urlSupplier.get();
         MetadataStore store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder().build());
         byte[] data = "testPersistent".getBytes(StandardCharsets.UTF_8);
-        store.put("/a/b/c", data, Optional.of(-1L)).join();
+
+        String key = newKey() + "/a/b/c";
+        store.put(key, data, Optional.of(-1L)).join();
         store.close();
 
         store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder().build());
-        Optional<GetResult> result = store.get("/a/b/c").get();
+        Optional<GetResult> result = store.get(key).get();
         assertTrue(result.isPresent());
         assertEquals(result.get().getValue(), data);
         store.close();
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index bd7b0f3..e377593 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -252,8 +252,23 @@
     - netty-transport-native-epoll-4.1.72.Final-linux-x86_64.jar
     - netty-transport-native-unix-common-4.1.72.Final.jar
     - netty-transport-native-unix-common-4.1.72.Final-linux-x86_64.jar
+    - netty-codec-http2-4.1.72.Final.jar
+ * GRPC
+    - grpc-api-1.42.1.jar
+    - grpc-context-1.42.1.jar
+    - grpc-core-1.42.1.jar
+    - grpc-grpclb-1.42.1.jar
+    - grpc-netty-1.42.1.jar
+    - grpc-protobuf-1.42.1.jar
+    - grpc-protobuf-lite-1.42.1.jar
+    - grpc-stub-1.42.1.jar
+  * JEtcd
+    - jetcd-common-0.5.11.jar
+    - jetcd-core-0.5.11.jar
+
  * Joda Time
     - joda-time-2.10.5.jar
+    - failsafe-2.4.4.jar
   * Jetty
     - http2-client-9.4.44.v20210927.jar
     - http2-common-9.4.44.v20210927.jar
@@ -457,10 +472,16 @@
     - audience-annotations-0.5.0.jar
   * Swagger
     - swagger-annotations-1.6.2.jar
+  * Perfmark
+    - perfmark-api-0.19.0.jar
+  * Annotations
+    - auto-service-annotations-1.0.jar
 
 Protocol Buffers License
  * Protocol Buffers
    - protobuf-java-3.16.1.jar
+   - protobuf-java-util-3.16.1.jar
+   - proto-google-common-protos-2.0.1.jar
 
 BSD 3-clause "New" or "Revised" License
   *  RE2J TD -- re2j-td-1.4.jar
@@ -491,6 +512,9 @@
    - jul-to-slf4j-1.7.32.jar
  * Checker Qual
    - checker-qual-3.12.0.jar 
+ * Annotations
+   - animal-sniffer-annotations-1.19.jar
+   - annotations-4.1.1.4.jar
 
 CDDL - 1.0
  * OSGi Resource Locator