blob: 65d254112d1571f8c520cf3f2af40a9eaec34c96 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.StampedLock;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;
/**
* Contains all the cursors for a ManagedLedger.
*
* <p/>The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
*
* <p/>This data structure maintains a list and a map of cursors. The map is used to relate a cursor name with an entry
* in the linked-list. The list is a sorted double linked-list of cursors.
*
* <p/>When a cursor is markDeleted, this list is updated and the cursor is moved in its new position.
*
* <p/>To minimize the moving around, the order is maintained using the ledgerId, but not the entryId, since we only
* care about ledgers to be deleted.
*
*/
public class ManagedCursorContainer implements Iterable<ManagedCursor> {
private static class Item {
final ManagedCursor cursor;
PositionImpl position;
int idx;
Item(ManagedCursor cursor, int idx) {
this.cursor = cursor;
this.position = (PositionImpl) cursor.getMarkDeletedPosition();
this.idx = idx;
}
}
public enum CursorType {
DurableCursor,
NonDurableCursor,
ALL
}
public ManagedCursorContainer() {
cursorType = CursorType.DurableCursor;
}
public ManagedCursorContainer(CursorType cursorType) {
this.cursorType = cursorType;
}
private final CursorType cursorType;
// Used to keep track of slowest cursor. Contains all of all active cursors.
private final ArrayList<Item> heap = Lists.newArrayList();
// Maps a cursor to its position in the heap
private final ConcurrentMap<String, Item> cursors = new ConcurrentSkipListMap<>();
private final StampedLock rwLock = new StampedLock();
public void add(ManagedCursor cursor) {
long stamp = rwLock.writeLock();
try {
// Append a new entry at the end of the list
Item item = new Item(cursor, heap.size());
cursors.put(cursor.getName(), item);
if (shouldTrackInHeap(cursor)) {
heap.add(item);
siftUp(item);
}
} finally {
rwLock.unlockWrite(stamp);
}
}
private boolean shouldTrackInHeap(ManagedCursor cursor) {
return CursorType.ALL.equals(cursorType)
|| (cursor.isDurable() && CursorType.DurableCursor.equals(cursorType))
|| (!cursor.isDurable() && CursorType.NonDurableCursor.equals(cursorType));
}
public PositionImpl getSlowestReadPositionForActiveCursors() {
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
}
public ManagedCursor get(String name) {
long stamp = rwLock.readLock();
try {
Item item = cursors.get(name);
return item != null ? item.cursor : null;
} finally {
rwLock.unlockRead(stamp);
}
}
public void removeCursor(String name) {
long stamp = rwLock.writeLock();
try {
Item item = cursors.remove(name);
if (item != null && shouldTrackInHeap(item.cursor)) {
// Move the item to the right end of the heap to be removed
Item lastItem = heap.get(heap.size() - 1);
swap(item, lastItem);
heap.remove(item.idx);
// Update the heap
siftDown(lastItem);
}
} finally {
rwLock.unlockWrite(stamp);
}
}
/**
* Signal that a cursor position has been updated and that the container must re-order the cursor list.
*
* @param cursor
* @return a pair of positions, representing the previous slowest consumer and the new slowest consumer (after the
* update).
*/
public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Position newPosition) {
checkNotNull(cursor);
long stamp = rwLock.writeLock();
try {
Item item = cursors.get(cursor.getName());
if (item == null) {
return null;
}
if (shouldTrackInHeap(item.cursor)) {
PositionImpl previousSlowestConsumer = heap.get(0).position;
// When the cursor moves forward, we need to push it toward the
// bottom of the tree and push it up if a reset was done
item.position = (PositionImpl) newPosition;
if (item.idx == 0 || getParent(item).position.compareTo(item.position) <= 0) {
siftDown(item);
} else {
siftUp(item);
}
PositionImpl newSlowestConsumer = heap.get(0).position;
return Pair.of(previousSlowestConsumer, newSlowestConsumer);
}
return null;
} finally {
rwLock.unlockWrite(stamp);
}
}
/**
* Get the slowest reader position, meaning older acknowledged position between all the cursors.
*
* @return the slowest reader position
*/
public PositionImpl getSlowestReaderPosition() {
long stamp = rwLock.readLock();
try {
return heap.isEmpty() ? null : heap.get(0).position;
} finally {
rwLock.unlockRead(stamp);
}
}
public ManagedCursor getSlowestReader() {
long stamp = rwLock.readLock();
try {
return heap.isEmpty() ? null : heap.get(0).cursor;
} finally {
rwLock.unlockRead(stamp);
}
}
/**
* Check whether there are any cursors.
* @return true is there are no cursors and false if there are
*/
public boolean isEmpty() {
long stamp = rwLock.tryOptimisticRead();
boolean isEmpty = cursors.isEmpty();
if (!rwLock.validate(stamp)) {
// Fallback to read lock
stamp = rwLock.readLock();
try {
isEmpty = cursors.isEmpty();
} finally {
rwLock.unlockRead(stamp);
}
}
return isEmpty;
}
/**
* Check whether that are any durable cursors.
* @return true if there are durable cursors and false if there are not
*/
public boolean hasDurableCursors() {
long stamp = rwLock.tryOptimisticRead();
boolean isEmpty = heap.isEmpty();
if (!rwLock.validate(stamp)) {
// Fallback to read lock
stamp = rwLock.readLock();
try {
isEmpty = heap.isEmpty();
} finally {
rwLock.unlockRead(stamp);
}
}
return !isEmpty;
}
@Override
public String toString() {
long stamp = rwLock.readLock();
try {
StringBuilder sb = new StringBuilder();
sb.append('[');
boolean first = true;
for (Item item : cursors.values()) {
if (!first) {
sb.append(", ");
}
first = false;
sb.append(item.cursor);
}
sb.append(']');
return sb.toString();
} finally {
rwLock.unlockRead(stamp);
}
}
@Override
public Iterator<ManagedCursor> iterator() {
final Iterator<Map.Entry<String, Item>> it = cursors.entrySet().iterator();
return new Iterator<ManagedCursor>() {
@Override
public boolean hasNext() {
return it.hasNext();
}
@Override
public ManagedCursor next() {
return it.next().getValue().cursor;
}
@Override
public void remove() {
throw new IllegalArgumentException("Cannot remove ManagedCursor form container");
}
};
}
// //////////////////////
/**
* Push the item up towards the the root of the tree (lowest reading position).
*/
private void siftUp(Item item) {
Item parent = getParent(item);
while (item.idx > 0 && parent.position.compareTo(item.position) > 0) {
swap(item, parent);
parent = getParent(item);
}
}
/**
* Push the item down towards the bottom of the tree (highest reading position).
*/
private void siftDown(final Item item) {
while (true) {
Item j = null;
Item right = getRight(item);
if (right != null && right.position.compareTo(item.position) < 0) {
Item left = getLeft(item);
if (left != null && left.position.compareTo(right.position) < 0) {
j = left;
} else {
j = right;
}
} else {
Item left = getLeft(item);
if (left != null && left.position.compareTo(item.position) < 0) {
j = left;
}
}
if (j != null) {
swap(item, j);
} else {
break;
}
}
}
/**
* Swap two items in the heap.
*/
private void swap(Item item1, Item item2) {
int idx1 = item1.idx;
int idx2 = item2.idx;
heap.set(idx2, item1);
heap.set(idx1, item2);
// Update the indexes too
item1.idx = idx2;
item2.idx = idx1;
}
private Item getParent(Item item) {
return heap.get((item.idx - 1) / 2);
}
private Item getLeft(Item item) {
int i = item.idx * 2 + 1;
return i < heap.size() ? heap.get(i) : null;
}
private Item getRight(Item item) {
int i = item.idx * 2 + 2;
return i < heap.size() ? heap.get(i) : null;
}
}