PIP-135: Added Etcd MetadataStore implementation (#13225)
* Added Etcd MetadataStore implementation
* Fixed license file
* Addressed comments
* Fixed sql plugin license
* Fixed parent notification of children changes
* Fixed sequenetial keys with no parent
* Fixed checkstyle
* Fixed presto license
* Addressed comments
* Fixed etcd test dependency
* Updated the prefix for get children
* Fixed triggering futures from executor
* ExceptionallyAsync is only available in java12
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 06c8878..4ee7e26 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -476,6 +476,7 @@
- io.grpc-grpc-services-1.42.1.jar
- io.grpc-grpc-xds-1.42.1.jar
- io.grpc-grpc-rls-1.42.1.jar
+ - com.google.auto.service-auto-service-annotations-1.0.jar
* Perfmark
- io.perfmark-perfmark-api-0.19.0.jar
* OpenCensus
@@ -484,6 +485,7 @@
- io.opencensus-opencensus-proto-0.2.0.jar
* Jodah
- net.jodah-typetools-0.5.0.jar
+ - net.jodah-failsafe-2.4.4.jar
* Apache Avro
- org.apache.avro-avro-1.10.2.jar
- org.apache.avro-avro-protobuf-1.10.2.jar
@@ -526,6 +528,9 @@
- com.google.http-client-google-http-client-1.38.0.jar
- com.google.auto.value-auto-value-annotations-1.7.4.jar
- com.google.re2j-re2j-1.5.jar
+ * Jetcd
+ - io.etcd-jetcd-common-0.5.11.jar
+ - io.etcd-jetcd-core-0.5.11.jar
BSD 3-clause "New" or "Revised" License
* Google auth library
diff --git a/pom.xml b/pom.xml
index b8f40cf..e34c17b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -202,6 +202,7 @@
<cron-utils.version>9.1.6</cron-utils.version>
<spring-context.version>5.3.15</spring-context.version>
<apache-http-client.version>4.5.13</apache-http-client.version>
+ <jetcd.version>0.5.11</jetcd.version>
<snakeyaml.version>1.30</snakeyaml.version>
<!-- test dependencies -->
@@ -863,6 +864,18 @@
</dependency>
<dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-core</artifactId>
+ <version>${jetcd.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-test</artifactId>
+ <version>${jetcd.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark-streaming_2.10.version}</version>
@@ -938,11 +951,47 @@
<dependency>
<groupId>io.grpc</groupId>
+ <artifactId>grpc-api</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-grpclb</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-alts</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <version>${grpc.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>io.perfmark</groupId>
<artifactId>perfmark-api</artifactId>
<version>${perfmark.version}</version>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index a85f351..f4dbcaa 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -376,6 +376,11 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-test</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index f5b63e2..64a630c 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -69,6 +69,18 @@
</dependency>
<dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-core</artifactId>
+ </dependency>
+
+
+ <dependency>
+ <groupId>io.etcd</groupId>
+ <artifactId>jetcd-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
new file mode 100644
index 0000000..e1c26be
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdMetadataStore.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.etcd.jetcd.KV;
+import io.etcd.jetcd.KeyValue;
+import io.etcd.jetcd.Txn;
+import io.etcd.jetcd.kv.DeleteResponse;
+import io.etcd.jetcd.kv.GetResponse;
+import io.etcd.jetcd.kv.PutResponse;
+import io.etcd.jetcd.kv.TxnResponse;
+import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
+import io.etcd.jetcd.op.Cmp;
+import io.etcd.jetcd.op.CmpTarget;
+import io.etcd.jetcd.op.Op;
+import io.etcd.jetcd.options.DeleteOption;
+import io.etcd.jetcd.options.GetOption;
+import io.etcd.jetcd.options.PutOption;
+import io.etcd.jetcd.options.WatchOption;
+import io.etcd.jetcd.support.CloseableClient;
+import io.etcd.jetcd.watch.WatchEvent;
+import io.etcd.jetcd.watch.WatchResponse;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.metadata.api.GetResult;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.apache.pulsar.metadata.api.Stat;
+import org.apache.pulsar.metadata.api.extended.CreateOption;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
+import org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore;
+import org.apache.pulsar.metadata.impl.batching.MetadataOp;
+import org.apache.pulsar.metadata.impl.batching.OpDelete;
+import org.apache.pulsar.metadata.impl.batching.OpGet;
+import org.apache.pulsar.metadata.impl.batching.OpGetChildren;
+import org.apache.pulsar.metadata.impl.batching.OpPut;
+
+@Slf4j
+public class EtcdMetadataStore extends AbstractBatchedMetadataStore {
+
+ static final String ETCD_SCHEME_IDENTIFIER = "etcd:";
+
+ private final int leaseTTLSeconds;
+ private final Client client;
+ private final KV kv;
+ private volatile long leaseId;
+ private volatile CloseableClient leaseClient;
+ private final EtcdSessionWatcher sessionWatcher;
+
+ public EtcdMetadataStore(String metadataURL, MetadataStoreConfig conf, boolean enableSessionWatcher)
+ throws MetadataStoreException {
+ super(conf);
+
+ this.leaseTTLSeconds = conf.getSessionTimeoutMillis() / 1000;
+ String etcdUrl = metadataURL.replaceFirst(ETCD_SCHEME_IDENTIFIER, "");
+
+ try {
+ this.client = Client.builder().endpoints(etcdUrl).build();
+ this.kv = client.getKVClient();
+ this.client.getWatchClient().watch(ByteSequence.from("\0", StandardCharsets.UTF_8),
+ WatchOption.newBuilder()
+ .withPrefix(ByteSequence.from("/", StandardCharsets.UTF_8))
+ .build(), this::handleWatchResponse);
+ if (enableSessionWatcher) {
+ this.sessionWatcher =
+ new EtcdSessionWatcher(client, conf.getSessionTimeoutMillis(), this::receivedSessionEvent);
+
+ // Ensure the lease is created when we start
+ this.createLease(false).join();
+ } else {
+ sessionWatcher = null;
+ }
+ } catch (Exception e) {
+ throw new MetadataStoreException(e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+
+ if (sessionWatcher != null) {
+ sessionWatcher.close();
+ }
+
+ if (leaseClient != null) {
+ leaseClient.close();
+ }
+
+ if (leaseId != 0) {
+ client.getLeaseClient().revoke(leaseId);
+ }
+
+ kv.close();
+ client.close();
+ }
+
+ private static final GetOption EXISTS_GET_OPTION = GetOption.newBuilder().withCountOnly(true).build();
+ private static final GetOption SINGLE_GET_OPTION = GetOption.newBuilder().withLimit(1).build();
+
+ @Override
+ protected CompletableFuture<Boolean> existsFromStore(String path) {
+ return kv.get(ByteSequence.from(path, StandardCharsets.UTF_8), EXISTS_GET_OPTION)
+ .thenApplyAsync(gr -> gr.getCount() == 1, executor);
+ }
+
+ @Override
+ protected CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
+ EnumSet<CreateOption> options) {
+ if (!options.contains(CreateOption.Sequential)) {
+ return super.storePut(path, data, optExpectedVersion, options);
+ } else {
+ // First get the version from parent
+ String parent = parent(path);
+ if (parent == null) {
+ parent = "/";
+ }
+ return super.storePut(parent, new byte[0], Optional.empty(), EnumSet.noneOf(CreateOption.class))
+ // Then create the unique key with the version added in the path
+ .thenComposeAsync(
+ stat -> super.storePut(path + stat.getVersion(), data, optExpectedVersion, options),
+ executor);
+ }
+ }
+
+ @Override
+ protected void batchOperation(List<MetadataOp> ops) {
+ try {
+ Txn txn = kv.txn();
+
+ // First, set all the conditions
+ ops.forEach(op -> {
+ switch (op.getType()) {
+ case PUT: {
+ OpPut put = op.asPut();
+ ByteSequence key = ByteSequence.from(put.getPath(), StandardCharsets.UTF_8);
+ if (put.getOptExpectedVersion().isPresent()) {
+ long expectedVersion = put.getOptExpectedVersion().get();
+ if (expectedVersion == -1L) {
+ // Check that key does not exist
+ txn.If(new Cmp(key, Cmp.Op.EQUAL, CmpTarget.createRevision(0)));
+ } else {
+ txn.If(new Cmp(key, Cmp.Op.EQUAL, CmpTarget.version(expectedVersion + 1)));
+ }
+ }
+ break;
+ }
+ case DELETE: {
+ OpDelete del = op.asDelete();
+ ByteSequence key = ByteSequence.from(del.getPath(), StandardCharsets.UTF_8);
+ if (del.getOptExpectedVersion().isPresent()) {
+ txn.If(new Cmp(key, Cmp.Op.EQUAL,
+ CmpTarget.version(del.getOptExpectedVersion().get() + 1)));
+ }
+ break;
+ }
+ }
+ });
+
+ // Then the requests
+ ops.forEach(op -> {
+ switch (op.getType()) {
+ case GET: {
+ txn.Then(
+ Op.get(ByteSequence.from(op.asGet().getPath(), StandardCharsets.UTF_8),
+ SINGLE_GET_OPTION));
+ break;
+ }
+ case PUT: {
+ OpPut put = op.asPut();
+ ByteSequence key = ByteSequence.from(put.getPath(), StandardCharsets.UTF_8);
+ if (!put.getFuture().isDone()) {
+ PutOption.Builder b = PutOption.newBuilder()
+ .withPrevKV();
+
+ if (put.isEphemeral()) {
+ b.withLeaseId(leaseId);
+ }
+
+ txn.Then(Op.put(key, ByteSequence.from(put.getData()), b.build()));
+ }
+ break;
+ }
+ case DELETE: {
+ OpDelete del = op.asDelete();
+ ByteSequence key = ByteSequence.from(del.getPath(), StandardCharsets.UTF_8);
+ txn.Then(Op.delete(key, DeleteOption.DEFAULT));
+ break;
+ }
+ case GET_CHILDREN: {
+ OpGetChildren opGetChildren = op.asGetChildren();
+ String path = opGetChildren.getPath();
+
+ ByteSequence prefix =
+ ByteSequence.from(path.equals("/") ? path : path + "/", StandardCharsets.UTF_8);
+
+ txn.Then(Op.get(prefix, GetOption.newBuilder()
+ .withKeysOnly(true)
+ .withSortField(GetOption.SortTarget.KEY)
+ .withSortOrder(GetOption.SortOrder.ASCEND)
+ .withPrefix(prefix)
+ .build()));
+ break;
+ }
+ }
+ });
+
+ txn.commit().thenAccept(txnResponse -> {
+ handleBatchOperationResult(txnResponse, ops);
+ }).exceptionally(ex -> {
+ Throwable cause = ex.getCause();
+ if (cause instanceof ExecutionException || cause instanceof CompletionException) {
+ cause = cause.getCause();
+ }
+ if (ops.size() > 1 && cause instanceof StatusRuntimeException) {
+ Status.Code code = ((StatusRuntimeException) cause).getStatus().getCode();
+ if (
+ code == Status.Code.INVALID_ARGUMENT /* This could be caused by having repeated keys
+ in the batch, retry individually */
+ ||
+ code
+ == Status.Code.RESOURCE_EXHAUSTED /* We might have exceeded the max-frame
+ size for the response */
+ ) {
+ ops.forEach(o -> batchOperation(Collections.singletonList(o)));
+ }
+ } else {
+ log.warn("Failed to commit: {}", cause.getMessage());
+ executor.execute(() -> {
+ ops.forEach(o -> o.getFuture().completeExceptionally(ex));
+ });
+ }
+ return null;
+ });
+ } catch (Throwable t) {
+ log.warn("Error in committing batch: {}", t.getMessage());
+ }
+ }
+
+ private void handleBatchOperationResult(TxnResponse txnResponse,
+ List<MetadataOp> ops) {
+ executor.execute(() -> {
+ if (!txnResponse.isSucceeded()) {
+ if (ops.size() > 1) {
+ // Retry individually
+ ops.forEach(o -> batchOperation(Collections.singletonList(o)));
+ } else {
+ ops.get(0).getFuture()
+ .completeExceptionally(new MetadataStoreException.BadVersionException("Bad version"));
+ }
+ return;
+ }
+
+ int getIdx = 0;
+ int deletedIdx = 0;
+ int putIdx = 0;
+ for (MetadataOp op : ops) {
+ switch (op.getType()) {
+ case GET: {
+ OpGet get = op.asGet();
+ GetResponse gr = txnResponse.getGetResponses().get(getIdx++);
+ if (gr.getCount() == 0) {
+ get.getFuture().complete(Optional.empty());
+ } else {
+ KeyValue kv = gr.getKvs().get(0);
+ boolean isEphemeral = kv.getLease() != 0;
+ boolean createdBySelf = kv.getLease() == leaseId;
+ get.getFuture().complete(Optional.of(
+ new GetResult(
+ kv.getValue().getBytes(),
+ new Stat(get.getPath(), kv.getVersion() - 1, 0, 0, isEphemeral,
+ createdBySelf)
+ )
+ )
+ );
+ }
+ break;
+ }
+ case PUT: {
+ OpPut put = op.asPut();
+ PutResponse pr = txnResponse.getPutResponses().get(putIdx++);
+ KeyValue prevKv = pr.getPrevKv();
+ if (prevKv == null) {
+ put.getFuture().complete(new Stat(put.getPath(),
+ 0, 0, 0, put.isEphemeral(), true));
+ } else {
+ put.getFuture().complete(new Stat(put.getPath(),
+ prevKv.getVersion(), 0, 0, put.isEphemeral(), true));
+ }
+
+ break;
+ }
+ case DELETE: {
+ OpDelete del = op.asDelete();
+ DeleteResponse dr = txnResponse.getDeleteResponses().get(deletedIdx++);
+ if (dr.getDeleted() == 0) {
+ del.getFuture().completeExceptionally(new MetadataStoreException.NotFoundException());
+ } else {
+ del.getFuture().complete(null);
+ }
+ break;
+ }
+ case GET_CHILDREN: {
+ OpGetChildren getChildren = op.asGetChildren();
+ GetResponse gr = txnResponse.getGetResponses().get(getIdx++);
+ String basePath = getChildren.getPath() + "/";
+
+ Set<String> children = gr.getKvs().stream()
+ .map(kv -> kv.getKey().toString(StandardCharsets.UTF_8))
+ .map(p -> p.replace(basePath, ""))
+ // Only return first-level children
+ .map(k -> k.split("/", 2)[0])
+ .collect(Collectors.toCollection(TreeSet::new));
+
+ getChildren.getFuture().complete(new ArrayList<>(children));
+ }
+ }
+ }
+ });
+ }
+
+ private synchronized CompletableFuture<Void> createLease(boolean retryOnFailure) {
+ CompletableFuture<Void> future = client.getLeaseClient().grant(leaseTTLSeconds)
+ .thenAccept(lease -> {
+ synchronized (this) {
+ this.leaseId = lease.getID();
+
+ if (leaseClient != null) {
+ leaseClient.close();
+ }
+ this.leaseClient =
+ this.client.getLeaseClient()
+ .keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
+ @Override
+ public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
+ if (log.isDebugEnabled()) {
+ log.debug("On next: {}", leaseKeepAliveResponse);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ log.warn("Lease client error :", throwable);
+ receivedSessionEvent(SessionEvent.SessionLost);
+ }
+
+ @Override
+ public void onCompleted() {
+ log.info("Etcd lease has expired");
+ receivedSessionEvent(SessionEvent.SessionLost);
+ }
+ });
+ }
+ });
+
+ if (retryOnFailure) {
+ future.exceptionally(ex -> {
+ log.warn("Failed to create Etcd lease. Retrying later", ex);
+ executor.schedule(() -> {
+ createLease(true);
+ }, 1, TimeUnit.SECONDS);
+ return null;
+ });
+ }
+
+ return future;
+ }
+
+ private void handleWatchResponse(WatchResponse watchResponse) {
+ watchResponse.getEvents().forEach(we -> {
+ String path = we.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
+ if (we.getEventType() == WatchEvent.EventType.PUT) {
+ if (we.getKeyValue().getVersion() == 1) {
+ receivedNotification(new Notification(NotificationType.Created, path));
+
+ notifyParentChildrenChanged(path);
+ } else {
+ receivedNotification(new Notification(NotificationType.Modified, path));
+ }
+ } else if (we.getEventType() == WatchEvent.EventType.DELETE) {
+ receivedNotification(new Notification(NotificationType.Deleted, path));
+ notifyParentChildrenChanged(path);
+ }
+ });
+ }
+
+ @Override
+ protected void receivedSessionEvent(SessionEvent event) {
+ if (event == SessionEvent.SessionReestablished) {
+ // Re-create the lease before notifying that we are reconnected
+ createLease(true)
+ .thenRun(() -> {
+ super.receivedSessionEvent(event);
+ });
+
+ } else {
+ super.receivedSessionEvent(event);
+ }
+ }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdSessionWatcher.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdSessionWatcher.java
new file mode 100644
index 0000000..248e01e
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/EtcdSessionWatcher.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl;
+
+import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
+import io.etcd.jetcd.ByteSequence;
+import io.etcd.jetcd.Client;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.metadata.api.extended.SessionEvent;
+
+/**
+ * Monitor the ETCd session state every few seconds and send notifications.
+ */
+@Slf4j
+public class EtcdSessionWatcher implements AutoCloseable {
+ private final Client client;
+
+ private SessionEvent currentStatus;
+ private final Consumer<SessionEvent> sessionListener;
+
+ // Maximum time to wait for Etcd lease to be re-connected to quorum (set to 5/6 of SessionTimeout)
+ private final long monitorTimeoutMillis;
+
+ // Interval at which we check the state of the Etcd connection (set to 1/15 of SessionTimeout)
+ private final long tickTimeMillis;
+
+ private final ScheduledExecutorService scheduler;
+ private final ScheduledFuture<?> task;
+
+ private long disconnectedAt = 0;
+
+ public EtcdSessionWatcher(Client client, long sessionTimeoutMillis,
+ Consumer<SessionEvent> sessionListener) {
+ this.client = client;
+ this.monitorTimeoutMillis = sessionTimeoutMillis * 5 / 6;
+ this.tickTimeMillis = sessionTimeoutMillis / 15;
+ this.sessionListener = sessionListener;
+
+ this.scheduler = Executors
+ .newSingleThreadScheduledExecutor(new DefaultThreadFactory("metadata-store-etcd-session-watcher"));
+ this.task =
+ scheduler.scheduleAtFixedRate(catchingAndLoggingThrowables(this::checkConnectionStatus), tickTimeMillis,
+ tickTimeMillis,
+ TimeUnit.MILLISECONDS);
+ this.currentStatus = SessionEvent.SessionReestablished;
+ }
+
+ @Override
+ public void close() throws Exception {
+ task.cancel(true);
+ scheduler.shutdownNow();
+ scheduler.awaitTermination(10, TimeUnit.SECONDS);
+ }
+
+ // task that runs every TICK_TIME to check Etcd connection
+ private synchronized void checkConnectionStatus() {
+ try {
+ CompletableFuture<SessionEvent> future = new CompletableFuture<>();
+ client.getKVClient().get(ByteSequence.from("/".getBytes(StandardCharsets.UTF_8)))
+ .thenRun(() -> {
+ future.complete(SessionEvent.Reconnected);
+ }).exceptionally(ex -> {
+ future.complete(SessionEvent.ConnectionLost);
+ return null;
+ });
+
+ SessionEvent ectdClientState;
+ try {
+ ectdClientState = future.get(tickTimeMillis, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ // Consider etcd disconnection if etcd operation takes more than TICK_TIME
+ ectdClientState = SessionEvent.ConnectionLost;
+ }
+
+ checkState(ectdClientState);
+ } catch (RejectedExecutionException | InterruptedException e) {
+ task.cancel(true);
+ } catch (Throwable t) {
+ log.warn("Error while checking Etcd connection status", t);
+ }
+ }
+
+ synchronized void setSessionInvalid() {
+ currentStatus = SessionEvent.SessionLost;
+ }
+
+ private void checkState(SessionEvent etcdlientState) {
+ switch (etcdlientState) {
+ case SessionLost:
+ if (currentStatus != SessionEvent.SessionLost) {
+ log.error("Etcd lease has expired");
+ currentStatus = SessionEvent.SessionLost;
+ sessionListener.accept(currentStatus);
+ }
+ break;
+
+ case ConnectionLost:
+ if (disconnectedAt == 0) {
+ // this is the first disconnect event, we should monitor the time out from now, so we record the
+ // time of disconnect
+ disconnectedAt = System.nanoTime();
+ }
+
+ long timeRemainingMillis = monitorTimeoutMillis
+ - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - disconnectedAt);
+ if (timeRemainingMillis <= 0 && currentStatus != SessionEvent.SessionLost) {
+ log.error("Etcd lease keep-alive timeout. Notifying session is lost.");
+ currentStatus = SessionEvent.SessionLost;
+ sessionListener.accept(currentStatus);
+ } else if (currentStatus != SessionEvent.SessionLost) {
+ log.warn("Etcd client is disconnected. Waiting to reconnect, time remaining = {} seconds",
+ timeRemainingMillis / 1000.0);
+ if (currentStatus == SessionEvent.SessionReestablished) {
+ currentStatus = SessionEvent.ConnectionLost;
+ sessionListener.accept(currentStatus);
+ }
+ }
+ break;
+
+ default:
+ if (currentStatus != SessionEvent.SessionReestablished) {
+ // since it reconnected to Etcd, we reset the disconnected time
+ log.info("Etcd client reconnection with server quorum. Current status: {}", currentStatus);
+ disconnectedAt = 0;
+
+ sessionListener.accept(SessionEvent.Reconnected);
+ if (currentStatus == SessionEvent.SessionLost) {
+ sessionListener.accept(SessionEvent.SessionReestablished);
+ }
+ currentStatus = SessionEvent.SessionReestablished;
+ }
+ break;
+ }
+ }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
index 199698e..57c1ff6 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImpl.java
@@ -48,9 +48,10 @@
if (metadataURL.startsWith("memory://")) {
return new LocalMemoryMetadataStore(metadataURL, metadataStoreConfig);
- }
- if (metadataURL.startsWith("rocksdb://")) {
+ } else if (metadataURL.startsWith("rocksdb://")) {
return RocksdbMetadataStore.get(metadataURL, metadataStoreConfig);
+ } else if (metadataURL.startsWith(EtcdMetadataStore.ETCD_SCHEME_IDENTIFIER)) {
+ return new EtcdMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher);
} else {
return new ZKMetadataStore(metadataURL, metadataStoreConfig, enableSessionWatcher);
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index 5d78282..616cac2 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -136,7 +136,7 @@
}
@Override
- protected final CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
+ protected CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
OpPut op = new OpPut(path, data, optExpectedVersion, options);
enqueue(writeOps, op);
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
index fcb6b77..4582750 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/BaseMetadataStoreTest.java
@@ -19,10 +19,13 @@
package org.apache.pulsar.metadata;
import static org.testng.Assert.assertTrue;
+import io.etcd.jetcd.launcher.EtcdCluster;
+import io.etcd.jetcd.launcher.EtcdClusterFactory;
import java.io.File;
import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.pulsar.tests.TestRetrySupport;
import org.assertj.core.util.Files;
import org.testng.annotations.AfterClass;
@@ -31,12 +34,16 @@
public abstract class BaseMetadataStoreTest extends TestRetrySupport {
protected TestZKServer zks;
+ protected EtcdCluster etcdCluster;
@BeforeClass(alwaysRun = true)
@Override
public final void setup() throws Exception {
incrementSetupNumber();
zks = new TestZKServer();
+
+ etcdCluster = EtcdClusterFactory.buildCluster("test", 1, false);
+ etcdCluster.start();
}
@AfterClass(alwaysRun = true)
@@ -47,6 +54,10 @@
zks.close();
zks = null;
}
+
+ if (etcdCluster != null) {
+ etcdCluster.close();
+ }
}
private static String createTempFolder() {
@@ -62,10 +73,12 @@
// The Zookeeper test server gets restarted by TestRetrySupport before the retry.
// The new connection string won't be available to the test method unless a
// Supplier<String> lambda is used for providing the value.
- return new Object[][] {
+ return new Object[][]{
{ "ZooKeeper", stringSupplier(() -> zks.getConnectionString()) },
{ "Memory", stringSupplier(() -> "memory://" + UUID.randomUUID()) },
{ "RocksDB", stringSupplier(() -> "rocksdb://" + createTempFolder()) },
+ {"Etcd", stringSupplier(() -> "etcd:" + etcdCluster.getClientEndpoints().stream().map(x -> x.toString())
+ .collect(Collectors.joining(",")))},
};
}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
index d9bc00d..c15a384 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/LockManagerTest.java
@@ -58,13 +58,15 @@
@Cleanup
LockManager<String> lockManager = coordinationService.getLockManager(String.class);
- assertEquals(lockManager.listLocks("/my/path").join(), Collections.emptyList());
- assertEquals(lockManager.readLock("/my/path/1").join(), Optional.empty());
+ String key = newKey();
- ResourceLock<String> lock1 = lockManager.acquireLock("/my/path/1", "lock-1").join();
- assertEquals(lockManager.listLocks("/my/path").join(), Collections.singletonList("1"));
- assertEquals(lockManager.readLock("/my/path/1").join(), Optional.of("lock-1"));
- assertEquals(lock1.getPath(), "/my/path/1");
+ assertEquals(lockManager.listLocks(key).join(), Collections.emptyList());
+ assertEquals(lockManager.readLock(key + "/1").join(), Optional.empty());
+
+ ResourceLock<String> lock1 = lockManager.acquireLock(key + "/1", "lock-1").join();
+ assertEquals(lockManager.listLocks(key).join(), Collections.singletonList("1"));
+ assertEquals(lockManager.readLock(key + "/1").join(), Optional.of("lock-1"));
+ assertEquals(lock1.getPath(), key + "/1");
assertEquals(lock1.getValue(), "lock-1");
CountDownLatch latchLock1 = new CountDownLatch(1);
@@ -73,14 +75,14 @@
});
assertEquals(latchLock1.getCount(), 1);
- assertEquals(lockManager.listLocks("/my/path").join(), Collections.singletonList("1"));
- assertEquals(lockManager.readLock("/my/path/1").join(), Optional.of("lock-1"));
+ assertEquals(lockManager.listLocks(key).join(), Collections.singletonList("1"));
+ assertEquals(lockManager.readLock(key + "/1").join(), Optional.of("lock-1"));
assertEquals(latchLock1.getCount(), 1);
lock1.release().join();
- assertEquals(lockManager.listLocks("/my/path").join(), Collections.emptyList());
- assertEquals(lockManager.readLock("/my/path/1").join(), Optional.empty());
+ assertEquals(lockManager.listLocks(key).join(), Collections.emptyList());
+ assertEquals(lockManager.readLock(key + "/1").join(), Optional.empty());
// The future should have been triggered before the release is complete
latchLock1.await(0, TimeUnit.SECONDS);
@@ -88,10 +90,10 @@
// Double release shoud be a no-op
lock1.release().join();
- ResourceLock<String> lock2 = lockManager.acquireLock("/my/path/1", "lock-1").join();
- assertEquals(lockManager.listLocks("/my/path").join(), Collections.singletonList("1"));
- assertEquals(lockManager.readLock("/my/path/1").join(), Optional.of("lock-1"));
- assertEquals(lock2.getPath(), "/my/path/1");
+ ResourceLock<String> lock2 = lockManager.acquireLock(key + "/1", "lock-1").join();
+ assertEquals(lockManager.listLocks(key).join(), Collections.singletonList("1"));
+ assertEquals(lockManager.readLock(key + "/1").join(), Optional.of("lock-1"));
+ assertEquals(lock2.getPath(), key + "/1");
assertEquals(lock2.getValue(), "lock-1");
}
@@ -106,23 +108,24 @@
LockManager<String> lockManager = coordinationService.getLockManager(String.class);
- assertEquals(lockManager.listLocks("/my/path").join(), Collections.emptyList());
- assertEquals(lockManager.readLock("/my/path/1").join(), Optional.empty());
+ String key = newKey();
+ assertEquals(lockManager.listLocks(key).join(), Collections.emptyList());
+ assertEquals(lockManager.readLock(key + "/1").join(), Optional.empty());
- lockManager.acquireLock("/my/path/1", "lock-1").join();
- assertEquals(lockManager.listLocks("/my/path").join(), Collections.singletonList("1"));
- assertEquals(lockManager.readLock("/my/path/1").join(), Optional.of("lock-1"));
+ lockManager.acquireLock(key + "/1", "lock-1").join();
+ assertEquals(lockManager.listLocks(key).join(), Collections.singletonList("1"));
+ assertEquals(lockManager.readLock(key + "/1").join(), Optional.of("lock-1"));
- lockManager.acquireLock("/my/path/2", "lock-2").join();
- assertEquals(lockManager.listLocks("/my/path").join(), new ArrayList<>(Arrays.asList("1", "2")));
- assertEquals(lockManager.readLock("/my/path/2").join(), Optional.of("lock-2"));
+ lockManager.acquireLock(key + "/2", "lock-2").join();
+ assertEquals(lockManager.listLocks(key + "").join(), new ArrayList<>(Arrays.asList("1", "2")));
+ assertEquals(lockManager.readLock(key + "/2").join(), Optional.of("lock-2"));
lockManager.close();
lockManager = coordinationService.getLockManager(String.class);
- assertEquals(lockManager.listLocks("/my/path").join(), Collections.emptyList());
- assertEquals(lockManager.readLock("/my/path/1").join(), Optional.empty());
- assertEquals(lockManager.readLock("/my/path/2").join(), Optional.empty());
+ assertEquals(lockManager.listLocks(key).join(), Collections.emptyList());
+ assertEquals(lockManager.readLock(key + "/1").join(), Optional.empty());
+ assertEquals(lockManager.readLock(key + "/2").join(), Optional.empty());
}
@Test(dataProvider = "impl")
@@ -139,13 +142,14 @@
@Cleanup
LockManager<String> lockManager = coordinationService.getLockManager(String.class);
- ResourceLock<String> lock = lockManager.acquireLock("/my/path/1", "lock-1").join();
+ String key = newKey();
+ ResourceLock<String> lock = lockManager.acquireLock(key + "/1", "lock-1").join();
assertEquals(lock.getValue(), "lock-1");
- assertEquals(cache.get("/my/path/1").join().get(), "lock-1");
+ assertEquals(cache.get(key + "/1").join().get(), "lock-1");
lock.updateValue("value-2").join();
assertEquals(lock.getValue(), "value-2");
- assertEquals(cache.get("/my/path/1").join().get(), "value-2");
+ assertEquals(cache.get(key + "/1").join().get(), "value-2");
}
@Test(dataProvider = "impl")
@@ -162,17 +166,18 @@
@Cleanup
LockManager<String> lockManager = coordinationService.getLockManager(String.class);
- ResourceLock<String> lock = lockManager.acquireLock("/my/path/1", "lock-1").join();
+ String key = newKey();
+ ResourceLock<String> lock = lockManager.acquireLock(key + "/1", "lock-1").join();
assertEquals(lock.getValue(), "lock-1");
- assertEquals(cache.get("/my/path/1").join().get(), "lock-1");
+ assertEquals(cache.get(key + "/1").join().get(), "lock-1");
- store.put("/my/path/1",
+ store.put(key + "/1",
ObjectMapperFactory.getThreadLocal().writeValueAsBytes("value-2"),
Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
lock.updateValue("value-2").join();
assertEquals(lock.getValue(), "value-2");
- assertEquals(cache.get("/my/path/1").join().get(), "value-2");
+ assertEquals(cache.get(key + "/1").join().get(), "value-2");
}
@Test(dataProvider = "impl")
@@ -189,15 +194,16 @@
@Cleanup
LockManager<String> lockManager = coordinationService.getLockManager(String.class);
- ResourceLock<String> lock = lockManager.acquireLock("/my/path/1", "lock-1").join();
+ String key = newKey();
+ ResourceLock<String> lock = lockManager.acquireLock(key + "/1", "lock-1").join();
assertEquals(lock.getValue(), "lock-1");
- assertEquals(cache.get("/my/path/1").join().get(), "lock-1");
+ assertEquals(cache.get(key + "/1").join().get(), "lock-1");
- store.delete("/my/path/1", Optional.empty()).join();
+ store.delete(key + "/1", Optional.empty()).join();
lock.updateValue("value-2").join();
assertEquals(lock.getValue(), "value-2");
- assertEquals(cache.get("/my/path/1").join().get(), "value-2");
+ assertEquals(cache.get(key + "/1").join().get(), "value-2");
}
@Test(dataProvider = "impl")
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java
index 7c8160c..5766aa4 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataBenchmark.java
@@ -36,7 +36,7 @@
@Slf4j
public class MetadataBenchmark extends MetadataStoreTest {
- @Test(dataProvider = "impl")
+ @Test(dataProvider = "impl", enabled = false)
public void testGet(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
@@ -71,7 +71,7 @@
log.info("[{}] Get Throughput: {} Kops/s", provider, throughput / 1_000);
}
- @Test(dataProvider = "impl")
+ @Test(dataProvider = "impl", enabled = false)
public void testGetChildren(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
@@ -105,13 +105,13 @@
log.info("[{}] Get Children Throughput: {} Kops/s", provider, throughput / 1_000);
}
- @Test(dataProvider = "impl")
+ @Test(dataProvider = "impl", enabled = false)
public void testPut(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
- final int N_KEYS = 128;
- final int N_PUTS = 1_000_000;
+ final int N_KEYS = 10_000;
+ final int N_PUTS = 100_000;
String key = newKey();
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
index cd31c30..8f159e6 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreBatchingTest.java
@@ -30,6 +30,7 @@
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -42,9 +43,30 @@
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.testng.annotations.Test;
+@Slf4j
public class MetadataStoreBatchingTest extends BaseMetadataStoreTest {
@Test(dataProvider = "impl")
+ public void testBatchWrite(String provider, Supplier<String> urlSupplier) throws Exception {
+ @Cleanup
+ MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder()
+ .batchingEnabled(true)
+ .batchingMaxDelayMillis(1_000)
+ .build());
+
+ String key1 = newKey();
+ CompletableFuture<Stat> f1 = store.put(key1, new byte[0], Optional.empty());
+
+ String key2 = newKey();
+ CompletableFuture<Stat> f2 = store.put(key2, new byte[0], Optional.empty());
+
+ Stat s1 = f1.join();
+ Stat s2 = f2.join();
+ log.info("s1: {}", s1);
+ log.info("s2: {}", s2);
+ }
+
+ @Test(dataProvider = "impl")
public void testBatching(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder()
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
index d79bea5..14cd594 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreExtendedTest.java
@@ -36,7 +36,7 @@
@Test(dataProvider = "impl")
public void sequentialKeys(String provider, Supplier<String> urlSupplier) throws Exception {
- final String basePath = "/my/path";
+ final String basePath = newKey();
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 672322b..856c789 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -78,7 +78,8 @@
store.delete("/non-existing-key", Optional.of(1L)).join();
fail("Should have failed");
} catch (CompletionException e) {
- assertException(e, NotFoundException.class);
+ assertTrue(NotFoundException.class.isInstance(e.getCause()) || BadVersionException.class.isInstance(
+ e.getCause()));
}
}
@@ -388,18 +389,16 @@
@Test(dataProvider = "impl")
public void testPersistent(String provider, Supplier<String> urlSupplier) throws Exception {
- if (provider.equals("Memory")) {
- // Memory is not persistent.
- return;
- }
String metadataUrl = urlSupplier.get();
MetadataStore store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder().build());
byte[] data = "testPersistent".getBytes(StandardCharsets.UTF_8);
- store.put("/a/b/c", data, Optional.of(-1L)).join();
+
+ String key = newKey() + "/a/b/c";
+ store.put(key, data, Optional.of(-1L)).join();
store.close();
store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder().build());
- Optional<GetResult> result = store.get("/a/b/c").get();
+ Optional<GetResult> result = store.get(key).get();
assertTrue(result.isPresent());
assertEquals(result.get().getValue(), data);
store.close();
diff --git a/pulsar-sql/presto-distribution/LICENSE b/pulsar-sql/presto-distribution/LICENSE
index bd7b0f3..e377593 100644
--- a/pulsar-sql/presto-distribution/LICENSE
+++ b/pulsar-sql/presto-distribution/LICENSE
@@ -252,8 +252,23 @@
- netty-transport-native-epoll-4.1.72.Final-linux-x86_64.jar
- netty-transport-native-unix-common-4.1.72.Final.jar
- netty-transport-native-unix-common-4.1.72.Final-linux-x86_64.jar
+ - netty-codec-http2-4.1.72.Final.jar
+ * GRPC
+ - grpc-api-1.42.1.jar
+ - grpc-context-1.42.1.jar
+ - grpc-core-1.42.1.jar
+ - grpc-grpclb-1.42.1.jar
+ - grpc-netty-1.42.1.jar
+ - grpc-protobuf-1.42.1.jar
+ - grpc-protobuf-lite-1.42.1.jar
+ - grpc-stub-1.42.1.jar
+ * JEtcd
+ - jetcd-common-0.5.11.jar
+ - jetcd-core-0.5.11.jar
+
* Joda Time
- joda-time-2.10.5.jar
+ - failsafe-2.4.4.jar
* Jetty
- http2-client-9.4.44.v20210927.jar
- http2-common-9.4.44.v20210927.jar
@@ -457,10 +472,16 @@
- audience-annotations-0.5.0.jar
* Swagger
- swagger-annotations-1.6.2.jar
+ * Perfmark
+ - perfmark-api-0.19.0.jar
+ * Annotations
+ - auto-service-annotations-1.0.jar
Protocol Buffers License
* Protocol Buffers
- protobuf-java-3.16.1.jar
+ - protobuf-java-util-3.16.1.jar
+ - proto-google-common-protos-2.0.1.jar
BSD 3-clause "New" or "Revised" License
* RE2J TD -- re2j-td-1.4.jar
@@ -491,6 +512,9 @@
- jul-to-slf4j-1.7.32.jar
* Checker Qual
- checker-qual-3.12.0.jar
+ * Annotations
+ - animal-sniffer-annotations-1.19.jar
+ - annotations-4.1.1.4.jar
CDDL - 1.0
* OSGi Resource Locator