| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.bookkeeper.metadata.etcd; |
| |
| import com.google.common.collect.Sets; |
| |
| import io.etcd.jetcd.ByteSequence; |
| import io.etcd.jetcd.Client; |
| import io.etcd.jetcd.KV; |
| import io.etcd.jetcd.KeyValue; |
| import io.etcd.jetcd.Txn; |
| import io.etcd.jetcd.common.exception.ClosedClientException; |
| import io.etcd.jetcd.kv.GetResponse; |
| import io.etcd.jetcd.op.Cmp; |
| import io.etcd.jetcd.op.CmpTarget; |
| import io.etcd.jetcd.options.DeleteOption; |
| import io.etcd.jetcd.options.GetOption; |
| import io.etcd.jetcd.options.PutOption; |
| |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.List; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.function.Consumer; |
| |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.client.BKException; |
| import org.apache.bookkeeper.client.api.LedgerMetadata; |
| import org.apache.bookkeeper.meta.LedgerManager; |
| import org.apache.bookkeeper.meta.LedgerMetadataSerDe; |
| import org.apache.bookkeeper.metadata.etcd.helpers.KeyIterator; |
| import org.apache.bookkeeper.metadata.etcd.helpers.KeyStream; |
| import org.apache.bookkeeper.metadata.etcd.helpers.ValueStream; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor; |
| import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; |
| import org.apache.bookkeeper.versioning.LongVersion; |
| import org.apache.bookkeeper.versioning.Version; |
| import org.apache.bookkeeper.versioning.Versioned; |
| import org.apache.zookeeper.AsyncCallback.VoidCallback; |
| |
| /** |
| * Etcd ledger manager. |
| */ |
| @Slf4j |
| class EtcdLedgerManager implements LedgerManager { |
| |
| private final LedgerMetadataSerDe serDe = new LedgerMetadataSerDe(); |
| |
| private final String scope; |
| private final Client client; |
| private final KV kvClient; |
| private final EtcdWatchClient watchClient; |
| private final ConcurrentLongHashMap<ValueStream<LedgerMetadata>> watchers = |
| new ConcurrentLongHashMap<>(); |
| private final ConcurrentMap<LedgerMetadataListener, LedgerMetadataConsumer> listeners = |
| new ConcurrentHashMap<>(); |
| |
| private volatile boolean closed = false; |
| |
| EtcdLedgerManager(Client client, |
| String scope) { |
| this.client = client; |
| this.kvClient = client.getKVClient(); |
| this.scope = scope; |
| this.watchClient = new EtcdWatchClient(client); |
| } |
| |
| private boolean isClosed() { |
| return closed; |
| } |
| |
| ValueStream<LedgerMetadata> getLedgerMetadataStream(long ledgerId) { |
| return watchers.get(ledgerId); |
| } |
| |
| @Override |
| public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long ledgerId, |
| LedgerMetadata metadata) { |
| CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>(); |
| String ledgerKey = EtcdUtils.getLedgerKey(scope, ledgerId); |
| log.info("Create ledger metadata under key {}", ledgerKey); |
| |
| ByteSequence ledgerKeyBs = ByteSequence.from(ledgerKey, StandardCharsets.UTF_8); |
| final ByteSequence valueBs; |
| try { |
| valueBs = ByteSequence.from(serDe.serialize(metadata)); |
| } catch (IOException ioe) { |
| promise.completeExceptionally(new BKException.BKMetadataSerializationException(ioe)); |
| return promise; |
| } |
| kvClient.txn() |
| .If(new Cmp( |
| ledgerKeyBs, |
| Cmp.Op.GREATER, |
| CmpTarget.createRevision(0L))) |
| .Then(io.etcd.jetcd.op.Op.get( |
| ledgerKeyBs, |
| GetOption.newBuilder() |
| .withCountOnly(true) |
| .build())) |
| .Else(io.etcd.jetcd.op.Op.put( |
| ledgerKeyBs, |
| valueBs, |
| PutOption.DEFAULT)) |
| .commit() |
| .thenAccept(resp -> { |
| if (resp.isSucceeded()) { |
| GetResponse getResp = resp.getGetResponses().get(0); |
| if (getResp.getCount() <= 0) { |
| // key doesn't exist but we fail to put the key |
| promise.completeExceptionally(new BKException.BKUnexpectedConditionException()); |
| } else { |
| // key exists |
| promise.completeExceptionally(new BKException.BKLedgerExistException()); |
| } |
| } else { |
| promise.complete(new Versioned<>(metadata, |
| new LongVersion(resp.getHeader().getRevision()))); |
| } |
| }) |
| .exceptionally(cause -> { |
| promise.completeExceptionally(new BKException.MetaStoreException()); |
| return null; |
| }); |
| return promise; |
| } |
| |
| @Override |
| public CompletableFuture<Void> removeLedgerMetadata(long ledgerId, Version version) { |
| CompletableFuture<Void> promise = new CompletableFuture<>(); |
| long revision = -0xabcd; |
| if (Version.NEW == version) { |
| log.error("Request to delete ledger {} metadata with version set to the initial one", ledgerId); |
| promise.completeExceptionally(new BKException.BKMetadataVersionException()); |
| return promise; |
| } else if (Version.ANY != version) { |
| if (!(version instanceof LongVersion)) { |
| log.info("Not an instance of LongVersion : {}", ledgerId); |
| promise.completeExceptionally(new BKException.BKMetadataVersionException()); |
| return promise; |
| } else { |
| revision = ((LongVersion) version).getLongVersion(); |
| } |
| } |
| |
| String ledgerKey = EtcdUtils.getLedgerKey(scope, ledgerId); |
| ByteSequence ledgerKeyBs = ByteSequence.from(ledgerKey, StandardCharsets.UTF_8); |
| Txn txn = kvClient.txn(); |
| if (revision == -0xabcd) { |
| txn = txn.If(new Cmp( |
| ledgerKeyBs, |
| Cmp.Op.GREATER, |
| CmpTarget.createRevision(0L) |
| )); |
| } else { |
| txn = txn.If(new Cmp( |
| ledgerKeyBs, |
| Cmp.Op.EQUAL, |
| CmpTarget.modRevision(revision) |
| )); |
| } |
| txn |
| .Then(io.etcd.jetcd.op.Op.delete( |
| ledgerKeyBs, |
| DeleteOption.DEFAULT |
| )) |
| .Else(io.etcd.jetcd.op.Op.get( |
| ledgerKeyBs, |
| GetOption.DEFAULT |
| )) |
| .commit() |
| .thenAccept(txnResp -> { |
| if (txnResp.isSucceeded()) { |
| promise.complete(null); |
| } else { |
| GetResponse getResp = txnResp.getGetResponses().get(0); |
| if (getResp.getCount() > 0) { |
| // fail to delete the ledger |
| promise.completeExceptionally(new BKException.BKMetadataVersionException()); |
| } else { |
| log.warn("Deleting ledger {} failed due to : ledger key {} doesn't exist", ledgerId, ledgerKey); |
| promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsException()); |
| } |
| } |
| }) |
| .exceptionally(cause -> { |
| promise.completeExceptionally(new BKException.MetaStoreException()); |
| return null; |
| }); |
| return promise; |
| } |
| |
| @Override |
| public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId) { |
| CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>(); |
| String ledgerKey = EtcdUtils.getLedgerKey(scope, ledgerId); |
| ByteSequence ledgerKeyBs = ByteSequence.from(ledgerKey, StandardCharsets.UTF_8); |
| log.info("read ledger metadata under key {}", ledgerKey); |
| kvClient.get(ledgerKeyBs) |
| .thenAccept(getResp -> { |
| if (getResp.getCount() > 0) { |
| KeyValue kv = getResp.getKvs().get(0); |
| byte[] data = kv.getValue().getBytes(); |
| try { |
| LedgerMetadata metadata = serDe.parseConfig(data, ledgerId, Optional.empty()); |
| promise.complete(new Versioned<>(metadata, new LongVersion(kv.getModRevision()))); |
| } catch (IOException ioe) { |
| log.error("Could not parse ledger metadata for ledger : {}", ledgerId, ioe); |
| promise.completeExceptionally(new BKException.MetaStoreException()); |
| return; |
| } |
| } else { |
| promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsException()); |
| } |
| }) |
| .exceptionally(cause -> { |
| promise.completeExceptionally(new BKException.MetaStoreException()); |
| return null; |
| }); |
| return promise; |
| } |
| |
| @Override |
| public CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, |
| Version currentVersion) { |
| CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<>(); |
| if (Version.NEW == currentVersion || !(currentVersion instanceof LongVersion)) { |
| promise.completeExceptionally(new BKException.BKMetadataVersionException()); |
| return promise; |
| } |
| final LongVersion lv = (LongVersion) currentVersion; |
| String ledgerKey = EtcdUtils.getLedgerKey(scope, ledgerId); |
| ByteSequence ledgerKeyBs = ByteSequence.from(ledgerKey, StandardCharsets.UTF_8); |
| |
| final ByteSequence valueBs; |
| try { |
| valueBs = ByteSequence.from(serDe.serialize(metadata)); |
| } catch (IOException ioe) { |
| promise.completeExceptionally(new BKException.BKMetadataSerializationException(ioe)); |
| return promise; |
| } |
| |
| kvClient.txn() |
| .If(new Cmp( |
| ledgerKeyBs, |
| Cmp.Op.EQUAL, |
| CmpTarget.modRevision(lv.getLongVersion()))) |
| .Then(io.etcd.jetcd.op.Op.put( |
| ledgerKeyBs, |
| valueBs, |
| PutOption.DEFAULT)) |
| .Else(io.etcd.jetcd.op.Op.get( |
| ledgerKeyBs, |
| GetOption.DEFAULT)) |
| .commit() |
| .thenAccept(resp -> { |
| if (resp.isSucceeded()) { |
| promise.complete(new Versioned<>(metadata, new LongVersion(resp.getHeader().getRevision()))); |
| } else { |
| GetResponse getResp = resp.getGetResponses().get(0); |
| if (getResp.getCount() > 0) { |
| log.warn("Conditional update ledger metadata failed :" |
| + " expected version = {}, actual version = {}", |
| getResp.getKvs().get(0).getModRevision(), lv); |
| promise.completeExceptionally(new BKException.BKMetadataVersionException()); |
| } else { |
| promise.completeExceptionally(new BKException.BKNoSuchLedgerExistsException()); |
| } |
| } |
| }) |
| .exceptionally(cause -> { |
| promise.completeExceptionally(new BKException.MetaStoreException()); |
| return null; |
| }); |
| return promise; |
| } |
| |
| private LedgerMetadataConsumer listenerToConsumer(long ledgerId, |
| LedgerMetadataListener listener, |
| Consumer<Long> onDeletedConsumer) { |
| return new LedgerMetadataConsumer( |
| ledgerId, |
| listener, |
| onDeletedConsumer |
| ); |
| } |
| |
| @Override |
| public void registerLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) { |
| if (listeners.containsKey(listener)) { |
| return; |
| } |
| |
| ValueStream<LedgerMetadata> lmStream = watchers.computeIfAbsent( |
| ledgerId, (lid) -> new ValueStream<>( |
| client, |
| watchClient, |
| bs -> { |
| try { |
| return serDe.parseConfig( |
| bs.getBytes(), |
| lid, |
| Optional.empty() |
| ); |
| } catch (IOException ioe) { |
| log.error("Could not parse ledger metadata : {}", |
| bs.toString(StandardCharsets.UTF_8), ioe); |
| throw new RuntimeException( |
| "Could not parse ledger metadata : " |
| + bs.toString(StandardCharsets.UTF_8), ioe); |
| } |
| }, |
| ByteSequence.from(EtcdUtils.getLedgerKey(scope, ledgerId), StandardCharsets.UTF_8)) |
| ); |
| LedgerMetadataConsumer lmConsumer = listenerToConsumer(ledgerId, listener, |
| (lid) -> { |
| if (watchers.remove(lid, lmStream)) { |
| log.info("Closed ledger metadata watcher on ledger {} deletion.", lid); |
| lmStream.closeAsync(); |
| } |
| }); |
| LedgerMetadataConsumer oldConsumer = listeners.putIfAbsent(listener, lmConsumer); |
| if (null != oldConsumer) { |
| return; |
| } else { |
| lmStream.readAndWatch(lmConsumer) |
| .whenComplete((values, cause) -> { |
| if (null != cause && !(cause instanceof ClosedClientException)) { |
| // fail to register ledger metadata listener, re-attempt it |
| registerLedgerMetadataListener(ledgerId, listener); |
| } |
| }); |
| } |
| } |
| |
| @Override |
| public void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataListener listener) { |
| LedgerMetadataConsumer lmConsumer = listeners.remove(listener); |
| unregisterLedgerMetadataListener(ledgerId, lmConsumer); |
| } |
| |
| private void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataConsumer lmConsumer) { |
| ValueStream<LedgerMetadata> lmStream = watchers.get(ledgerId); |
| if (null == lmStream) { |
| return; |
| } else { |
| lmStream.unwatch(lmConsumer).thenAccept(noConsumers -> { |
| if (noConsumers) { |
| if (watchers.remove(ledgerId, lmStream)) { |
| log.info("Closed ledger metadata watcher on ledger {} since there are no listeners any more.", |
| ledgerId); |
| lmStream.closeAsync(); |
| } |
| } |
| }).exceptionally(cause -> { |
| if (cause instanceof ClosedClientException) { |
| // fail to unwatch a consumer |
| unregisterLedgerMetadataListener(ledgerId, lmConsumer); |
| } |
| return null; |
| }); |
| } |
| } |
| |
| @Override |
| public void asyncProcessLedgers(Processor<Long> processor, |
| VoidCallback finalCb, |
| Object context, |
| int successRc, |
| int failureRc) { |
| KeyStream<Long> ks = new KeyStream<>( |
| kvClient, |
| ByteSequence.from(EtcdUtils.getLedgerKey(scope, 0L), StandardCharsets.UTF_8), |
| ByteSequence.from(EtcdUtils.getLedgerKey(scope, Long.MAX_VALUE), StandardCharsets.UTF_8), |
| bs -> { |
| UUID uuid = EtcdUtils.parseLedgerKey(bs.toString(StandardCharsets.UTF_8)); |
| return uuid.getLeastSignificantBits(); |
| } |
| ); |
| processLedgers( |
| ks, processor, finalCb, context, successRc, failureRc); |
| } |
| |
| private void processLedgers(KeyStream<Long> ks, |
| Processor<Long> processor, |
| VoidCallback finalCb, |
| Object context, |
| int successRc, |
| int failureRc) { |
| ks.readNext().whenCompleteAsync((ledgers, cause) -> { |
| if (null != cause) { |
| finalCb.processResult(failureRc, null, context); |
| } else { |
| if (ledgers.isEmpty()) { |
| finalCb.processResult(successRc, null, context); |
| } else { |
| ledgers.forEach(l -> processor.process(l, finalCb)); |
| processLedgers(ks, processor, finalCb, context, successRc, failureRc); |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public LedgerRangeIterator getLedgerRanges(long opTimeOutMs) { |
| KeyStream<Long> ks = new KeyStream<>( |
| kvClient, |
| ByteSequence.from(EtcdUtils.getLedgerKey(scope, 0L), StandardCharsets.UTF_8), |
| ByteSequence.from(EtcdUtils.getLedgerKey(scope, Long.MAX_VALUE), StandardCharsets.UTF_8), |
| bs -> { |
| UUID uuid = EtcdUtils.parseLedgerKey(bs.toString(StandardCharsets.UTF_8)); |
| return uuid.getLeastSignificantBits(); |
| } |
| ); |
| KeyIterator<Long> ki = new KeyIterator<>(ks); |
| return new LedgerRangeIterator() { |
| @Override |
| public boolean hasNext() throws IOException { |
| try { |
| return ki.hasNext(); |
| } catch (Exception e) { |
| if (e instanceof IOException) { |
| throw ((IOException) e); |
| } else { |
| throw new IOException(e); |
| } |
| } |
| } |
| |
| @Override |
| public LedgerRange next() throws IOException { |
| try { |
| final List<Long> values = ki.next(); |
| final Set<Long> ledgers = Sets.newTreeSet(); |
| ledgers.addAll(values); |
| return new LedgerRange(ledgers); |
| } catch (Exception e) { |
| if (e instanceof IOException) { |
| throw ((IOException) e); |
| } else { |
| throw new IOException(e); |
| } |
| } |
| } |
| }; |
| } |
| |
| @Override |
| public void close() { |
| synchronized (this) { |
| if (closed) { |
| return; |
| } |
| closed = true; |
| } |
| watchClient.close(); |
| } |
| } |