blob: 05711fd96c1b88fd916e6312b792deb72e478419 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.bookkeeper.client;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A specific {@link DistributionSchedule} that places entries in round-robin
* fashion. For ensemble size 3, and quorum size 2, Entry 0 goes to bookie 0 and
* 1, entry 1 goes to bookie 1 and 2, and entry 2 goes to bookie 2 and 0, and so
* on.
public class RoundRobinDistributionSchedule implements DistributionSchedule {
private static final Logger LOG = LoggerFactory.getLogger(RoundRobinDistributionSchedule.class);
private final int writeQuorumSize;
private final int ackQuorumSize;
private final int ensembleSize;
public RoundRobinDistributionSchedule(int writeQuorumSize, int ackQuorumSize, int ensembleSize) {
this.writeQuorumSize = writeQuorumSize;
this.ackQuorumSize = ackQuorumSize;
this.ensembleSize = ensembleSize;
public WriteSet getWriteSet(long entryId) {
return WriteSetImpl.create(ensembleSize, writeQuorumSize, entryId);
public WriteSet getEnsembleSet(long entryId) {
// for long poll reads and force ledger , we are trying all the bookies in the ensemble
// so we create a `WriteSet` with `writeQuorumSize == ensembleSize`.
return WriteSetImpl.create(ensembleSize, ensembleSize /* writeQuorumSize */, entryId);
static WriteSet writeSetFromValues(Integer... values) {
WriteSetImpl writeSet = WriteSetImpl.create(0, 0, 0);
for (int i = 0; i < values.length; i++) {
writeSet.set(i, values[i]);
return writeSet;
private static class WriteSetImpl implements WriteSet {
int[] array = null;
int size;
private final Handle<WriteSetImpl> recyclerHandle;
private static final Recycler<WriteSetImpl> RECYCLER = new Recycler<WriteSetImpl>() {
protected WriteSetImpl newObject(
Recycler.Handle<WriteSetImpl> handle) {
return new WriteSetImpl(handle);
private WriteSetImpl(Handle<WriteSetImpl> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
static WriteSetImpl create(int ensembleSize,
int writeQuorumSize,
long entryId) {
WriteSetImpl writeSet = RECYCLER.get();
writeSet.reset(ensembleSize, writeQuorumSize, entryId);
return writeSet;
private void reset(int ensembleSize, int writeQuorumSize,
long entryId) {
for (int w = 0; w < writeQuorumSize; w++) {
set(w, (int) ((entryId + w) % ensembleSize));
private void setSize(int newSize) {
if (array == null) {
array = new int[newSize];
} else if (newSize > array.length) {
int[] newArray = new int[newSize];
System.arraycopy(array, 0,
newArray, 0, array.length);
array = newArray;
size = newSize;
public int size() {
return size;
public boolean contains(int i) {
return indexOf(i) != -1;
public int get(int i) {
return array[i];
public int set(int i, int index) {
int oldVal = array[i];
array[i] = index;
return oldVal;
public void sort() {
Arrays.sort(array, 0, size);
public int indexOf(int index) {
for (int j = 0; j < size; j++) {
if (array[j] == index) {
return j;
return -1;
public void addMissingIndices(int maxIndex) {
if (size < maxIndex) {
int oldSize = size;
for (int i = 0, j = oldSize;
i < maxIndex && j < maxIndex; i++) {
if (!contains(i)) {
set(j, i);
public void moveAndShift(int from, int to) {
if (from > to) {
int tmp = array[from];
for (int i = from; i > to; i--) {
array[i] = array[i - 1];
array[to] = tmp;
} else if (from < to) {
int tmp = array[from];
for (int i = from; i < to; i++) {
array[i] = array[i + 1];
array[to] = tmp;
public void recycle() {
public WriteSet copy() {
WriteSetImpl copy = RECYCLER.get();
for (int i = 0; i < size; i++) {
copy.set(i, array[i]);
return copy;
public int hashCode() {
int sum = 0;
for (int i = 0; i < size; i++) {
sum += sum * 31 + i;
return sum;
public boolean equals(Object other) {
if (other instanceof WriteSetImpl) {
WriteSetImpl o = (WriteSetImpl) other;
if (o.size() != size()) {
return false;
for (int i = 0; i < size(); i++) {
if (o.get(i) != get(i)) {
return false;
return true;
return false;
public String toString() {
StringBuilder b = new StringBuilder("WriteSet[");
int i = 0;
for (; i < size() - 1; i++) {
return b.toString();
private void checkBounds(int i) {
if (i < 0 || i > size) {
throw new IndexOutOfBoundsException(
"Index " + i + " out of bounds, array size = " + size);
public AckSet getAckSet() {
return AckSetImpl.create(ensembleSize, writeQuorumSize, ackQuorumSize);
public AckSet getEnsembleAckSet() {
return AckSetImpl.create(ensembleSize, ensembleSize, ensembleSize);
private static class AckSetImpl implements AckSet {
private int writeQuorumSize;
private int ackQuorumSize;
private final BitSet ackSet = new BitSet();
// grows on reset()
private BookieSocketAddress[] failureMap = new BookieSocketAddress[0];
private final Handle<AckSetImpl> recyclerHandle;
private static final Recycler<AckSetImpl> RECYCLER = new Recycler<AckSetImpl>() {
protected AckSetImpl newObject(Recycler.Handle<AckSetImpl> handle) {
return new AckSetImpl(handle);
private AckSetImpl(Handle<AckSetImpl> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
static AckSetImpl create(int ensembleSize,
int writeQuorumSize,
int ackQuorumSize) {
AckSetImpl ackSet = RECYCLER.get();
ackSet.reset(ensembleSize, writeQuorumSize, ackQuorumSize);
return ackSet;
private void reset(int ensembleSize,
int writeQuorumSize,
int ackQuorumSize) {
this.ackQuorumSize = ackQuorumSize;
this.writeQuorumSize = writeQuorumSize;
if (failureMap.length < ensembleSize) {
failureMap = new BookieSocketAddress[ensembleSize];
Arrays.fill(failureMap, null);
public boolean completeBookieAndCheck(int bookieIndexHeardFrom) {
failureMap[bookieIndexHeardFrom] = null;
return ackSet.cardinality() >= ackQuorumSize;
public boolean failBookieAndCheck(int bookieIndexHeardFrom,
BookieSocketAddress address) {
failureMap[bookieIndexHeardFrom] = address;
return failed() > (writeQuorumSize - ackQuorumSize);
public Map<Integer, BookieSocketAddress> getFailedBookies() {
ImmutableMap.Builder<Integer, BookieSocketAddress> builder = new ImmutableMap.Builder<>();
for (int i = 0; i < failureMap.length; i++) {
if (failureMap[i] != null) {
builder.put(i, failureMap[i]);
public boolean removeBookieAndCheck(int bookie) {
failureMap[bookie] = null;
return ackSet.cardinality() >= ackQuorumSize;
public void recycle() {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("ackQuorumSize", ackQuorumSize)
.add("ackSet", ackSet)
.add("failureMap", failureMap).toString();
private int failed() {
int count = 0;
for (int i = 0; i < failureMap.length; i++) {
if (failureMap[i] != null) {
return count;
private class RRQuorumCoverageSet implements QuorumCoverageSet {
private final int[] covered = new int[ensembleSize];
private RRQuorumCoverageSet() {
for (int i = 0; i < covered.length; i++) {
covered[i] = BKException.Code.UNINITIALIZED;
public synchronized void addBookie(int bookieIndexHeardFrom, int rc) {
covered[bookieIndexHeardFrom] = rc;
public synchronized boolean checkCovered() {
// now check if there are any write quorums, with |ackQuorum| nodes available
for (int i = 0; i < ensembleSize; i++) {
int nodesNotCovered = 0;
int nodesOkay = 0;
int nodesUninitialized = 0;
for (int j = 0; j < writeQuorumSize; j++) {
int nodeIndex = (i + j) % ensembleSize;
if (covered[nodeIndex] == BKException.Code.OK) {
} else if (covered[nodeIndex] != BKException.Code.NoSuchEntryException
&& covered[nodeIndex] != BKException.Code.NoSuchLedgerExistsException) {
if (covered[nodeIndex] == BKException.Code.UNINITIALIZED) {
// if we haven't seen any OK responses and there are still nodes not heard from,
// let's wait until
if (nodesNotCovered >= ackQuorumSize || (nodesOkay == 0 && nodesUninitialized > 0)) {
return false;
return true;
public QuorumCoverageSet getCoverageSet() {
return new RRQuorumCoverageSet();
public boolean hasEntry(long entryId, int bookieIndex) {
WriteSet w = getWriteSet(entryId);
try {
return w.contains(bookieIndex);
} finally {
public BitSet getEntriesStripedToTheBookie(int bookieIndex, long startEntryId, long lastEntryId) {
if ((startEntryId < 0) || (lastEntryId < 0) || (bookieIndex < 0) || (bookieIndex >= ensembleSize)
|| (lastEntryId < startEntryId)) {
"Illegal arguments for getEntriesStripedToTheBookie, bookieIndex : {},"
+ " ensembleSize : {}, startEntryId : {}, lastEntryId : {}",
bookieIndex, ensembleSize, startEntryId, lastEntryId);
throw new IllegalArgumentException("Illegal arguments for getEntriesStripedToTheBookie");
BitSet entriesStripedToTheBookie = new BitSet((int) (lastEntryId - startEntryId + 1));
for (long entryId = startEntryId; entryId <= lastEntryId; entryId++) {
int modValOfFirstReplica = (int) (entryId % ensembleSize);
int modValOfLastReplica = (int) ((entryId + writeQuorumSize - 1) % ensembleSize);
if (modValOfLastReplica >= modValOfFirstReplica) {
if ((bookieIndex >= modValOfFirstReplica) && (bookieIndex <= modValOfLastReplica)) {
entriesStripedToTheBookie.set((int) (entryId - startEntryId));
} else {
if ((bookieIndex >= modValOfFirstReplica) || (bookieIndex <= modValOfLastReplica)) {
entriesStripedToTheBookie.set((int) (entryId - startEntryId));
return entriesStripedToTheBookie;