blob: e4ae4f264e75d67cf393ee501cd0338869a82a10 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.client;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Map;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A specific {@link DistributionSchedule} that places entries in round-robin
* fashion. For ensemble size 3, and quorum size 2, Entry 0 goes to bookie 0 and
* 1, entry 1 goes to bookie 1 and 2, and entry 2 goes to bookie 2 and 0, and so
* on.
*
*/
public class RoundRobinDistributionSchedule implements DistributionSchedule {
private static final Logger LOG = LoggerFactory.getLogger(RoundRobinDistributionSchedule.class);
private final int writeQuorumSize;
private final int ackQuorumSize;
private final int ensembleSize;
public RoundRobinDistributionSchedule(int writeQuorumSize, int ackQuorumSize, int ensembleSize) {
this.writeQuorumSize = writeQuorumSize;
this.ackQuorumSize = ackQuorumSize;
this.ensembleSize = ensembleSize;
}
@Override
public WriteSet getWriteSet(long entryId) {
return WriteSetImpl.create(ensembleSize, writeQuorumSize, entryId);
}
@Override
public WriteSet getEnsembleSet(long entryId) {
// for long poll reads and force ledger , we are trying all the bookies in the ensemble
// so we create a `WriteSet` with `writeQuorumSize == ensembleSize`.
return WriteSetImpl.create(ensembleSize, ensembleSize /* writeQuorumSize */, entryId);
}
@VisibleForTesting
static WriteSet writeSetFromValues(Integer... values) {
WriteSetImpl writeSet = WriteSetImpl.create(0, 0, 0);
writeSet.setSize(values.length);
for (int i = 0; i < values.length; i++) {
writeSet.set(i, values[i]);
}
return writeSet;
}
private static class WriteSetImpl implements WriteSet {
int[] array = null;
int size;
private final Handle<WriteSetImpl> recyclerHandle;
private static final Recycler<WriteSetImpl> RECYCLER = new Recycler<WriteSetImpl>() {
protected WriteSetImpl newObject(
Recycler.Handle<WriteSetImpl> handle) {
return new WriteSetImpl(handle);
}
};
private WriteSetImpl(Handle<WriteSetImpl> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
static WriteSetImpl create(int ensembleSize,
int writeQuorumSize,
long entryId) {
WriteSetImpl writeSet = RECYCLER.get();
writeSet.reset(ensembleSize, writeQuorumSize, entryId);
return writeSet;
}
private void reset(int ensembleSize, int writeQuorumSize,
long entryId) {
setSize(writeQuorumSize);
for (int w = 0; w < writeQuorumSize; w++) {
set(w, (int) ((entryId + w) % ensembleSize));
}
}
private void setSize(int newSize) {
if (array == null) {
array = new int[newSize];
} else if (newSize > array.length) {
int[] newArray = new int[newSize];
System.arraycopy(array, 0,
newArray, 0, array.length);
array = newArray;
}
size = newSize;
}
@Override
public int size() {
return size;
}
@Override
public boolean contains(int i) {
return indexOf(i) != -1;
}
@Override
public int get(int i) {
checkBounds(i);
return array[i];
}
@Override
public int set(int i, int index) {
checkBounds(i);
int oldVal = array[i];
array[i] = index;
return oldVal;
}
@Override
public void sort() {
Arrays.sort(array, 0, size);
}
@Override
public int indexOf(int index) {
for (int j = 0; j < size; j++) {
if (array[j] == index) {
return j;
}
}
return -1;
}
@Override
public void addMissingIndices(int maxIndex) {
if (size < maxIndex) {
int oldSize = size;
setSize(maxIndex);
for (int i = 0, j = oldSize;
i < maxIndex && j < maxIndex; i++) {
if (!contains(i)) {
set(j, i);
j++;
}
}
}
}
@Override
public void moveAndShift(int from, int to) {
checkBounds(from);
checkBounds(to);
if (from > to) {
int tmp = array[from];
for (int i = from; i > to; i--) {
array[i] = array[i - 1];
}
array[to] = tmp;
} else if (from < to) {
int tmp = array[from];
for (int i = from; i < to; i++) {
array[i] = array[i + 1];
}
array[to] = tmp;
}
}
@Override
public void recycle() {
recyclerHandle.recycle(this);
}
@Override
public WriteSet copy() {
WriteSetImpl copy = RECYCLER.get();
copy.setSize(size);
for (int i = 0; i < size; i++) {
copy.set(i, array[i]);
}
return copy;
}
@Override
public int hashCode() {
int sum = 0;
for (int i = 0; i < size; i++) {
sum += sum * 31 + i;
}
return sum;
}
@Override
public boolean equals(Object other) {
if (other instanceof WriteSetImpl) {
WriteSetImpl o = (WriteSetImpl) other;
if (o.size() != size()) {
return false;
}
for (int i = 0; i < size(); i++) {
if (o.get(i) != get(i)) {
return false;
}
}
return true;
}
return false;
}
@Override
public String toString() {
StringBuilder b = new StringBuilder("WriteSet[");
int i = 0;
for (; i < size() - 1; i++) {
b.append(get(i)).append(",");
}
b.append(get(i)).append("]");
return b.toString();
}
private void checkBounds(int i) {
if (i < 0 || i > size) {
throw new IndexOutOfBoundsException(
"Index " + i + " out of bounds, array size = " + size);
}
}
}
@Override
public AckSet getAckSet() {
return AckSetImpl.create(ensembleSize, writeQuorumSize, ackQuorumSize);
}
@Override
public AckSet getEnsembleAckSet() {
return AckSetImpl.create(ensembleSize, ensembleSize, ensembleSize);
}
private static class AckSetImpl implements AckSet {
private int writeQuorumSize;
private int ackQuorumSize;
private final BitSet ackSet = new BitSet();
// grows on reset()
private BookieSocketAddress[] failureMap = new BookieSocketAddress[0];
private final Handle<AckSetImpl> recyclerHandle;
private static final Recycler<AckSetImpl> RECYCLER = new Recycler<AckSetImpl>() {
protected AckSetImpl newObject(Recycler.Handle<AckSetImpl> handle) {
return new AckSetImpl(handle);
}
};
private AckSetImpl(Handle<AckSetImpl> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
static AckSetImpl create(int ensembleSize,
int writeQuorumSize,
int ackQuorumSize) {
AckSetImpl ackSet = RECYCLER.get();
ackSet.reset(ensembleSize, writeQuorumSize, ackQuorumSize);
return ackSet;
}
private void reset(int ensembleSize,
int writeQuorumSize,
int ackQuorumSize) {
this.ackQuorumSize = ackQuorumSize;
this.writeQuorumSize = writeQuorumSize;
ackSet.clear();
if (failureMap.length < ensembleSize) {
failureMap = new BookieSocketAddress[ensembleSize];
}
Arrays.fill(failureMap, null);
}
@Override
public boolean completeBookieAndCheck(int bookieIndexHeardFrom) {
failureMap[bookieIndexHeardFrom] = null;
ackSet.set(bookieIndexHeardFrom);
return ackSet.cardinality() >= ackQuorumSize;
}
@Override
public boolean failBookieAndCheck(int bookieIndexHeardFrom,
BookieSocketAddress address) {
ackSet.clear(bookieIndexHeardFrom);
failureMap[bookieIndexHeardFrom] = address;
return failed() > (writeQuorumSize - ackQuorumSize);
}
@Override
public Map<Integer, BookieSocketAddress> getFailedBookies() {
ImmutableMap.Builder<Integer, BookieSocketAddress> builder = new ImmutableMap.Builder<>();
for (int i = 0; i < failureMap.length; i++) {
if (failureMap[i] != null) {
builder.put(i, failureMap[i]);
}
}
return builder.build();
}
@Override
public boolean removeBookieAndCheck(int bookie) {
ackSet.clear(bookie);
failureMap[bookie] = null;
return ackSet.cardinality() >= ackQuorumSize;
}
@Override
public void recycle() {
recyclerHandle.recycle(this);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("ackQuorumSize", ackQuorumSize)
.add("ackSet", ackSet)
.add("failureMap", failureMap).toString();
}
private int failed() {
int count = 0;
for (int i = 0; i < failureMap.length; i++) {
if (failureMap[i] != null) {
count++;
}
}
return count;
}
}
private class RRQuorumCoverageSet implements QuorumCoverageSet {
private final int[] covered = new int[ensembleSize];
private RRQuorumCoverageSet() {
for (int i = 0; i < covered.length; i++) {
covered[i] = BKException.Code.UNINITIALIZED;
}
}
@Override
public synchronized void addBookie(int bookieIndexHeardFrom, int rc) {
covered[bookieIndexHeardFrom] = rc;
}
@Override
public synchronized boolean checkCovered() {
// now check if there are any write quorums, with |ackQuorum| nodes available
for (int i = 0; i < ensembleSize; i++) {
/* Nodes which have either responded with an error other than NoSuch{Entry,Ledger},
or have not responded at all. We cannot know if these nodes ever accepted a entry. */
int nodesUnknown = 0;
for (int j = 0; j < writeQuorumSize; j++) {
int nodeIndex = (i + j) % ensembleSize;
if (covered[nodeIndex] != BKException.Code.OK
&& covered[nodeIndex] != BKException.Code.NoSuchEntryException
&& covered[nodeIndex] != BKException.Code.NoSuchLedgerExistsException) {
nodesUnknown++;
}
}
/* If nodesUnknown is greater than the ack quorum size, then
it is possible those two unknown nodes accepted an entry which
we do not know about */
if (nodesUnknown >= ackQuorumSize) {
return false;
}
}
return true;
}
@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append("QuorumCoverage(e:").append(ensembleSize)
.append(",w:").append(writeQuorumSize)
.append(",a:").append(ackQuorumSize)
.append(") = [");
int i = 0;
for (; i < covered.length - 1; i++) {
buffer.append(covered[i]).append(", ");
}
buffer.append(covered[i]).append("]");
return buffer.toString();
}
}
@Override
public QuorumCoverageSet getCoverageSet() {
return new RRQuorumCoverageSet();
}
@Override
public boolean hasEntry(long entryId, int bookieIndex) {
WriteSet w = getWriteSet(entryId);
try {
return w.contains(bookieIndex);
} finally {
w.recycle();
}
}
@Override
public BitSet getEntriesStripedToTheBookie(int bookieIndex, long startEntryId, long lastEntryId) {
if ((startEntryId < 0) || (lastEntryId < 0) || (bookieIndex < 0) || (bookieIndex >= ensembleSize)
|| (lastEntryId < startEntryId)) {
LOG.error(
"Illegal arguments for getEntriesStripedToTheBookie, bookieIndex : {},"
+ " ensembleSize : {}, startEntryId : {}, lastEntryId : {}",
bookieIndex, ensembleSize, startEntryId, lastEntryId);
throw new IllegalArgumentException("Illegal arguments for getEntriesStripedToTheBookie");
}
BitSet entriesStripedToTheBookie = new BitSet((int) (lastEntryId - startEntryId + 1));
for (long entryId = startEntryId; entryId <= lastEntryId; entryId++) {
int modValOfFirstReplica = (int) (entryId % ensembleSize);
int modValOfLastReplica = (int) ((entryId + writeQuorumSize - 1) % ensembleSize);
if (modValOfLastReplica >= modValOfFirstReplica) {
if ((bookieIndex >= modValOfFirstReplica) && (bookieIndex <= modValOfLastReplica)) {
entriesStripedToTheBookie.set((int) (entryId - startEntryId));
}
} else {
if ((bookieIndex >= modValOfFirstReplica) || (bookieIndex <= modValOfLastReplica)) {
entriesStripedToTheBookie.set((int) (entryId - startEntryId));
}
}
}
return entriesStripedToTheBookie;
}
}