| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.bookkeeper.metadata.etcd; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.bookkeeper.metadata.etcd.EtcdConstants.EMPTY_BS; |
| |
| import io.etcd.jetcd.ByteSequence; |
| import io.etcd.jetcd.KV; |
| import io.etcd.jetcd.KeyValue; |
| import io.etcd.jetcd.Txn; |
| import io.etcd.jetcd.kv.GetResponse; |
| import io.etcd.jetcd.op.Cmp; |
| import io.etcd.jetcd.op.Cmp.Op; |
| import io.etcd.jetcd.op.CmpTarget; |
| import io.etcd.jetcd.options.GetOption; |
| import io.etcd.jetcd.options.PutOption; |
| |
| import java.nio.charset.StandardCharsets; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.client.BKException.Code; |
| import org.apache.bookkeeper.meta.LedgerIdGenerator; |
| import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; |
| |
| /** |
| * Generate 64-bit ledger ids from a bucket. |
| * |
| * <p>The most significant 8 bits is used as bucket id. The remaining 56 bits are |
| * used as the id generated per bucket. |
| */ |
| @Slf4j |
| class Etcd64bitIdGenerator implements LedgerIdGenerator { |
| |
| static final long MAX_ID_PER_BUCKET = 0x00ffffffffffffffL; |
| static final long BUCKET_ID_MASK = 0xff00000000000000L; |
| static final int BUCKET_ID_SHIFT = 56; |
| static final int NUM_BUCKETS = 0x80; |
| |
| static int getBucketId(long lid) { |
| return (int) ((lid & BUCKET_ID_MASK) >>> BUCKET_ID_SHIFT); |
| } |
| |
| static long getIdInBucket(long lid) { |
| return lid & MAX_ID_PER_BUCKET; |
| } |
| |
| private static final AtomicIntegerFieldUpdater<Etcd64bitIdGenerator> nextBucketIdUpdater = |
| AtomicIntegerFieldUpdater.newUpdater(Etcd64bitIdGenerator.class, "nextBucketId"); |
| |
| private final String scope; |
| private final KV kvClient; |
| private volatile int nextBucketId; |
| |
| Etcd64bitIdGenerator(KV kvClient, String scope) { |
| this.kvClient = kvClient; |
| this.scope = scope; |
| this.nextBucketId = ThreadLocalRandom.current().nextInt(NUM_BUCKETS); |
| } |
| |
| int nextBucketId() { |
| while (true) { |
| int bucketId = nextBucketIdUpdater.incrementAndGet(this); |
| if (bucketId >= NUM_BUCKETS) { |
| if (nextBucketIdUpdater.compareAndSet(this, bucketId, 0)) { |
| bucketId = 0; |
| } else { |
| // someone has been updated bucketId, try it again. |
| continue; |
| } |
| } |
| return bucketId; |
| } |
| } |
| |
| @Override |
| public void generateLedgerId(GenericCallback<Long> cb) { |
| int bucketId = nextBucketId(); |
| checkArgument(bucketId >= 0 && bucketId < NUM_BUCKETS, |
| "Invalid bucket id : " + bucketId); |
| |
| ByteSequence bucketKey = ByteSequence.from(EtcdUtils.getBucketPath(scope, bucketId), StandardCharsets.UTF_8); |
| Txn txn = kvClient.txn() |
| .If(new Cmp(bucketKey, Op.GREATER, CmpTarget.createRevision(0))) |
| .Then( |
| io.etcd.jetcd.op.Op.put(bucketKey, EMPTY_BS, PutOption.DEFAULT), |
| io.etcd.jetcd.op.Op.get(bucketKey, GetOption.DEFAULT) |
| ) |
| .Else( |
| io.etcd.jetcd.op.Op.put(bucketKey, EMPTY_BS, PutOption.DEFAULT), |
| io.etcd.jetcd.op.Op.get(bucketKey, GetOption.DEFAULT) |
| ); |
| txn.commit() |
| .thenAccept(txnResponse -> { |
| if (txnResponse.getGetResponses().size() <= 0) { |
| cb.operationComplete(Code.UnexpectedConditionException, null); |
| } else { |
| GetResponse resp = txnResponse.getGetResponses().get(0); |
| if (resp.getCount() > 0) { |
| KeyValue kv = resp.getKvs().get(0); |
| if (kv.getVersion() > MAX_ID_PER_BUCKET) { |
| log.warn("Etcd bucket '{}' is overflowed", bucketKey.toString(StandardCharsets.UTF_8)); |
| // the bucket is overflowed, moved to next bucket. |
| generateLedgerId(cb); |
| } else { |
| long version = kv.getVersion(); |
| long lid = ((((long) bucketId) << BUCKET_ID_SHIFT) & BUCKET_ID_MASK) |
| | (version & MAX_ID_PER_BUCKET); |
| cb.operationComplete(Code.OK, lid); |
| } |
| } else { |
| cb.operationComplete(Code.UnexpectedConditionException, null); |
| } |
| } |
| }) |
| .exceptionally(cause -> { |
| cb.operationComplete(Code.MetaStoreException, null); |
| return null; |
| }); |
| } |
| |
| @Override |
| public void close() { |
| // no-op |
| } |
| } |