blob: e1c26bead096a6029b5584ab611b3e6d988ffb0e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.impl;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Txn;
import io.etcd.jetcd.kv.DeleteResponse;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.PutResponse;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.support.CloseableClient;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore;
import org.apache.pulsar.metadata.impl.batching.MetadataOp;
import org.apache.pulsar.metadata.impl.batching.OpDelete;
import org.apache.pulsar.metadata.impl.batching.OpGet;
import org.apache.pulsar.metadata.impl.batching.OpGetChildren;
import org.apache.pulsar.metadata.impl.batching.OpPut;
@Slf4j
public class EtcdMetadataStore extends AbstractBatchedMetadataStore {
static final String ETCD_SCHEME_IDENTIFIER = "etcd:";
private final int leaseTTLSeconds;
private final Client client;
private final KV kv;
private volatile long leaseId;
private volatile CloseableClient leaseClient;
private final EtcdSessionWatcher sessionWatcher;
public EtcdMetadataStore(String metadataURL, MetadataStoreConfig conf, boolean enableSessionWatcher)
throws MetadataStoreException {
super(conf);
this.leaseTTLSeconds = conf.getSessionTimeoutMillis() / 1000;
String etcdUrl = metadataURL.replaceFirst(ETCD_SCHEME_IDENTIFIER, "");
try {
this.client = Client.builder().endpoints(etcdUrl).build();
this.kv = client.getKVClient();
this.client.getWatchClient().watch(ByteSequence.from("\0", StandardCharsets.UTF_8),
WatchOption.newBuilder()
.withPrefix(ByteSequence.from("/", StandardCharsets.UTF_8))
.build(), this::handleWatchResponse);
if (enableSessionWatcher) {
this.sessionWatcher =
new EtcdSessionWatcher(client, conf.getSessionTimeoutMillis(), this::receivedSessionEvent);
// Ensure the lease is created when we start
this.createLease(false).join();
} else {
sessionWatcher = null;
}
} catch (Exception e) {
throw new MetadataStoreException(e);
}
}
@Override
public void close() throws Exception {
super.close();
if (sessionWatcher != null) {
sessionWatcher.close();
}
if (leaseClient != null) {
leaseClient.close();
}
if (leaseId != 0) {
client.getLeaseClient().revoke(leaseId);
}
kv.close();
client.close();
}
private static final GetOption EXISTS_GET_OPTION = GetOption.newBuilder().withCountOnly(true).build();
private static final GetOption SINGLE_GET_OPTION = GetOption.newBuilder().withLimit(1).build();
@Override
protected CompletableFuture<Boolean> existsFromStore(String path) {
return kv.get(ByteSequence.from(path, StandardCharsets.UTF_8), EXISTS_GET_OPTION)
.thenApplyAsync(gr -> gr.getCount() == 1, executor);
}
@Override
protected CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion,
EnumSet<CreateOption> options) {
if (!options.contains(CreateOption.Sequential)) {
return super.storePut(path, data, optExpectedVersion, options);
} else {
// First get the version from parent
String parent = parent(path);
if (parent == null) {
parent = "/";
}
return super.storePut(parent, new byte[0], Optional.empty(), EnumSet.noneOf(CreateOption.class))
// Then create the unique key with the version added in the path
.thenComposeAsync(
stat -> super.storePut(path + stat.getVersion(), data, optExpectedVersion, options),
executor);
}
}
@Override
protected void batchOperation(List<MetadataOp> ops) {
try {
Txn txn = kv.txn();
// First, set all the conditions
ops.forEach(op -> {
switch (op.getType()) {
case PUT: {
OpPut put = op.asPut();
ByteSequence key = ByteSequence.from(put.getPath(), StandardCharsets.UTF_8);
if (put.getOptExpectedVersion().isPresent()) {
long expectedVersion = put.getOptExpectedVersion().get();
if (expectedVersion == -1L) {
// Check that key does not exist
txn.If(new Cmp(key, Cmp.Op.EQUAL, CmpTarget.createRevision(0)));
} else {
txn.If(new Cmp(key, Cmp.Op.EQUAL, CmpTarget.version(expectedVersion + 1)));
}
}
break;
}
case DELETE: {
OpDelete del = op.asDelete();
ByteSequence key = ByteSequence.from(del.getPath(), StandardCharsets.UTF_8);
if (del.getOptExpectedVersion().isPresent()) {
txn.If(new Cmp(key, Cmp.Op.EQUAL,
CmpTarget.version(del.getOptExpectedVersion().get() + 1)));
}
break;
}
}
});
// Then the requests
ops.forEach(op -> {
switch (op.getType()) {
case GET: {
txn.Then(
Op.get(ByteSequence.from(op.asGet().getPath(), StandardCharsets.UTF_8),
SINGLE_GET_OPTION));
break;
}
case PUT: {
OpPut put = op.asPut();
ByteSequence key = ByteSequence.from(put.getPath(), StandardCharsets.UTF_8);
if (!put.getFuture().isDone()) {
PutOption.Builder b = PutOption.newBuilder()
.withPrevKV();
if (put.isEphemeral()) {
b.withLeaseId(leaseId);
}
txn.Then(Op.put(key, ByteSequence.from(put.getData()), b.build()));
}
break;
}
case DELETE: {
OpDelete del = op.asDelete();
ByteSequence key = ByteSequence.from(del.getPath(), StandardCharsets.UTF_8);
txn.Then(Op.delete(key, DeleteOption.DEFAULT));
break;
}
case GET_CHILDREN: {
OpGetChildren opGetChildren = op.asGetChildren();
String path = opGetChildren.getPath();
ByteSequence prefix =
ByteSequence.from(path.equals("/") ? path : path + "/", StandardCharsets.UTF_8);
txn.Then(Op.get(prefix, GetOption.newBuilder()
.withKeysOnly(true)
.withSortField(GetOption.SortTarget.KEY)
.withSortOrder(GetOption.SortOrder.ASCEND)
.withPrefix(prefix)
.build()));
break;
}
}
});
txn.commit().thenAccept(txnResponse -> {
handleBatchOperationResult(txnResponse, ops);
}).exceptionally(ex -> {
Throwable cause = ex.getCause();
if (cause instanceof ExecutionException || cause instanceof CompletionException) {
cause = cause.getCause();
}
if (ops.size() > 1 && cause instanceof StatusRuntimeException) {
Status.Code code = ((StatusRuntimeException) cause).getStatus().getCode();
if (
code == Status.Code.INVALID_ARGUMENT /* This could be caused by having repeated keys
in the batch, retry individually */
||
code
== Status.Code.RESOURCE_EXHAUSTED /* We might have exceeded the max-frame
size for the response */
) {
ops.forEach(o -> batchOperation(Collections.singletonList(o)));
}
} else {
log.warn("Failed to commit: {}", cause.getMessage());
executor.execute(() -> {
ops.forEach(o -> o.getFuture().completeExceptionally(ex));
});
}
return null;
});
} catch (Throwable t) {
log.warn("Error in committing batch: {}", t.getMessage());
}
}
private void handleBatchOperationResult(TxnResponse txnResponse,
List<MetadataOp> ops) {
executor.execute(() -> {
if (!txnResponse.isSucceeded()) {
if (ops.size() > 1) {
// Retry individually
ops.forEach(o -> batchOperation(Collections.singletonList(o)));
} else {
ops.get(0).getFuture()
.completeExceptionally(new MetadataStoreException.BadVersionException("Bad version"));
}
return;
}
int getIdx = 0;
int deletedIdx = 0;
int putIdx = 0;
for (MetadataOp op : ops) {
switch (op.getType()) {
case GET: {
OpGet get = op.asGet();
GetResponse gr = txnResponse.getGetResponses().get(getIdx++);
if (gr.getCount() == 0) {
get.getFuture().complete(Optional.empty());
} else {
KeyValue kv = gr.getKvs().get(0);
boolean isEphemeral = kv.getLease() != 0;
boolean createdBySelf = kv.getLease() == leaseId;
get.getFuture().complete(Optional.of(
new GetResult(
kv.getValue().getBytes(),
new Stat(get.getPath(), kv.getVersion() - 1, 0, 0, isEphemeral,
createdBySelf)
)
)
);
}
break;
}
case PUT: {
OpPut put = op.asPut();
PutResponse pr = txnResponse.getPutResponses().get(putIdx++);
KeyValue prevKv = pr.getPrevKv();
if (prevKv == null) {
put.getFuture().complete(new Stat(put.getPath(),
0, 0, 0, put.isEphemeral(), true));
} else {
put.getFuture().complete(new Stat(put.getPath(),
prevKv.getVersion(), 0, 0, put.isEphemeral(), true));
}
break;
}
case DELETE: {
OpDelete del = op.asDelete();
DeleteResponse dr = txnResponse.getDeleteResponses().get(deletedIdx++);
if (dr.getDeleted() == 0) {
del.getFuture().completeExceptionally(new MetadataStoreException.NotFoundException());
} else {
del.getFuture().complete(null);
}
break;
}
case GET_CHILDREN: {
OpGetChildren getChildren = op.asGetChildren();
GetResponse gr = txnResponse.getGetResponses().get(getIdx++);
String basePath = getChildren.getPath() + "/";
Set<String> children = gr.getKvs().stream()
.map(kv -> kv.getKey().toString(StandardCharsets.UTF_8))
.map(p -> p.replace(basePath, ""))
// Only return first-level children
.map(k -> k.split("/", 2)[0])
.collect(Collectors.toCollection(TreeSet::new));
getChildren.getFuture().complete(new ArrayList<>(children));
}
}
}
});
}
private synchronized CompletableFuture<Void> createLease(boolean retryOnFailure) {
CompletableFuture<Void> future = client.getLeaseClient().grant(leaseTTLSeconds)
.thenAccept(lease -> {
synchronized (this) {
this.leaseId = lease.getID();
if (leaseClient != null) {
leaseClient.close();
}
this.leaseClient =
this.client.getLeaseClient()
.keepAlive(leaseId, new StreamObserver<LeaseKeepAliveResponse>() {
@Override
public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
if (log.isDebugEnabled()) {
log.debug("On next: {}", leaseKeepAliveResponse);
}
}
@Override
public void onError(Throwable throwable) {
log.warn("Lease client error :", throwable);
receivedSessionEvent(SessionEvent.SessionLost);
}
@Override
public void onCompleted() {
log.info("Etcd lease has expired");
receivedSessionEvent(SessionEvent.SessionLost);
}
});
}
});
if (retryOnFailure) {
future.exceptionally(ex -> {
log.warn("Failed to create Etcd lease. Retrying later", ex);
executor.schedule(() -> {
createLease(true);
}, 1, TimeUnit.SECONDS);
return null;
});
}
return future;
}
private void handleWatchResponse(WatchResponse watchResponse) {
watchResponse.getEvents().forEach(we -> {
String path = we.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
if (we.getEventType() == WatchEvent.EventType.PUT) {
if (we.getKeyValue().getVersion() == 1) {
receivedNotification(new Notification(NotificationType.Created, path));
notifyParentChildrenChanged(path);
} else {
receivedNotification(new Notification(NotificationType.Modified, path));
}
} else if (we.getEventType() == WatchEvent.EventType.DELETE) {
receivedNotification(new Notification(NotificationType.Deleted, path));
notifyParentChildrenChanged(path);
}
});
}
@Override
protected void receivedSessionEvent(SessionEvent event) {
if (event == SessionEvent.SessionReestablished) {
// Re-create the lease before notifying that we are reconnected
createLease(true)
.thenRun(() -> {
super.receivedSessionEvent(event);
});
} else {
super.receivedSessionEvent(event);
}
}
}