blob: f707bf4c90388a56a53fe0fc6c57dfab89ba7144 [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.StampedLock;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.util.Pair;
import com.google.common.collect.Lists;
/**
* Contains all the cursors for a ManagedLedger.
*
* The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
*
* This data structure maintains a list and a map of cursors. The map is used to relate a cursor name with an entry in
* the linked-list. The list is a sorted double linked-list of cursors.
*
* When a cursor is markDeleted, this list is updated and the cursor is moved in its new position.
*
* To minimize the moving around, the order is maintained using the ledgerId, but not the entryId, since we only care
* about ledgers to be deleted.
*
*/
class ManagedCursorContainer implements Iterable<ManagedCursor> {
private static class Item {
final ManagedCursor cursor;
PositionImpl position;
int idx;
Item(ManagedCursor cursor, int idx) {
this.cursor = cursor;
this.position = (PositionImpl) cursor.getMarkDeletedPosition();
this.idx = idx;
}
}
private final ArrayList<Item> heap = Lists.newArrayList();
// Maps a cursor to its position in the heap
private final ConcurrentMap<String, Item> cursors = new ConcurrentSkipListMap<String, Item>();
private final StampedLock rwLock = new StampedLock();
public void add(ManagedCursor cursor) {
long stamp = rwLock.writeLock();
try {
// Append a new entry at the end of the list
Item item = new Item(cursor, heap.size());
cursors.put(cursor.getName(), item);
heap.add(item);
siftUp(item);
} finally {
rwLock.unlockWrite(stamp);
}
}
public ManagedCursor get(String name) {
long stamp = rwLock.readLock();
try {
Item item = cursors.get(name);
return item != null ? item.cursor : null;
} finally {
rwLock.unlockRead(stamp);
}
}
public void removeCursor(String name) {
long stamp = rwLock.writeLock();
try {
Item item = cursors.remove(name);
// Move the item to the right end of the heap to be removed
Item lastItem = heap.get(heap.size() - 1);
swap(item, lastItem);
heap.remove(item.idx);
// Update the heap
siftDown(lastItem);
} finally {
rwLock.unlockWrite(stamp);
}
}
/**
* Signal that a cursor position has been updated and that the container must re-order the cursor list.
*
* @param cursor
* @return a pair of positions, representing the previous slowest consumer and the new slowest consumer (after the
* update).
*/
public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Position newPosition) {
checkNotNull(cursor);
long stamp = rwLock.writeLock();
try {
Item item = cursors.get(cursor.getName());
if (item == null) {
return null;
}
PositionImpl previousSlowestConsumer = heap.get(0).position;
// When the cursor moves forward, we need to push it toward the
// bottom of the tree and push it up if a reset was done
item.position = (PositionImpl) newPosition;
if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) {
siftDown(item);
} else {
siftUp(item);
}
PositionImpl newSlowestConsumer = heap.get(0).position;
return Pair.create(previousSlowestConsumer, newSlowestConsumer);
} finally {
rwLock.unlockWrite(stamp);
}
}
/**
* Get the slowest reader position, meaning older acknowledged position between all the cursors.
*
* @return the slowest reader position
*/
public PositionImpl getSlowestReaderPosition() {
long stamp = rwLock.readLock();
try {
return heap.isEmpty() ? null : heap.get(0).position;
} finally {
rwLock.unlockRead(stamp);
}
}
public ManagedCursor getSlowestReader() {
long stamp = rwLock.readLock();
try {
return heap.isEmpty() ? null : heap.get(0).cursor;
} finally {
rwLock.unlockRead(stamp);
}
}
public boolean isEmpty() {
long stamp = rwLock.tryOptimisticRead();
boolean isEmpty = heap.isEmpty();
if (!rwLock.validate(stamp)) {
// Fallback to read lock
stamp = rwLock.readLock();
try {
isEmpty = heap.isEmpty();
} finally {
rwLock.unlockRead(stamp);
}
}
return isEmpty;
}
@Override
public String toString() {
long stamp = rwLock.readLock();
try {
StringBuilder sb = new StringBuilder();
sb.append('[');
boolean first = true;
for (Item item : cursors.values()) {
if (!first) {
sb.append(", ");
}
first = false;
sb.append(item.cursor);
}
sb.append(']');
return sb.toString();
} finally {
rwLock.unlockRead(stamp);
}
}
@Override
public Iterator<ManagedCursor> iterator() {
final Iterator<Map.Entry<String, Item>> it = cursors.entrySet().iterator();
return new Iterator<ManagedCursor>() {
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public ManagedCursor next() {
return it.next().getValue().cursor;
}
@Override
public void remove() {
throw new IllegalArgumentException("Cannot remove ManagedCursor form container");
}
};
}
// //////////////////////
/**
* Push the item up towards the the root of the tree (lowest reading position)
*/
private void siftUp(Item item) {
Item parent = getParent(item);
while (item.idx > 0 && parent.position.compareTo(item.position) > 0) {
swap(item, parent);
parent = getParent(item);
}
}
/**
* Push the item down towards the bottom of the tree (highest reading position)
*/
private void siftDown(final Item item) {
while (true) {
Item j = null;
Item right = getRight(item);
if (right != null && right.position.compareTo(item.position) < 0) {
Item left = getLeft(item);
if (left.position.compareTo(right.position) < 0) {
j = left;
} else {
j = right;
}
} else {
Item left = getLeft(item);
if (left != null && left.position.compareTo(item.position) < 0) {
j = left;
}
}
if (j != null) {
swap(item, j);
} else {
break;
}
}
}
/**
* Swap two items in the heap
*/
private void swap(Item item1, Item item2) {
int idx1 = item1.idx;
int idx2 = item2.idx;
heap.set(idx2, item1);
heap.set(idx1, item2);
// Update the indexes too
item1.idx = idx2;
item2.idx = idx1;
}
private Item getParent(Item item) {
return heap.get((item.idx - 1) / 2);
}
private Item getLeft(Item item) {
int i = item.idx * 2 + 1;
return i < heap.size() ? heap.get(i) : null;
}
private Item getRight(Item item) {
int i = item.idx * 2 + 2;
return i < heap.size() ? heap.get(i) : null;
}
}