blob: 112cd4821e146732539cd8bfb9b88f6b622a0f28 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.client;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
import org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A utility class to read {@link BookieInfo} from bookies.
*
* <p>NOTE: This class is tended to be used by this project only. External users should not rely on it directly.
*/
public class BookieInfoReader {
private static final Logger LOG = LoggerFactory.getLogger(BookieInfoReader.class);
private static final long GET_BOOKIE_INFO_REQUEST_FLAGS =
BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
| BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
private final ScheduledExecutorService scheduler;
private final BookKeeper bk;
private final ClientConfiguration conf;
/**
* A class represents the information (e.g. disk usage, load) of a bookie.
*
* <p>NOTE: This class is tended to be used by this project only. External users should not rely on it directly.
*/
public static class BookieInfo implements WeightedObject {
private final long freeDiskSpace;
private final long totalDiskSpace;
public BookieInfo() {
this(0L, 0L);
}
public BookieInfo(long totalDiskSpace, long freeDiskSpace) {
this.totalDiskSpace = totalDiskSpace;
this.freeDiskSpace = freeDiskSpace;
}
public long getFreeDiskSpace() {
return freeDiskSpace;
}
public long getTotalDiskSpace() {
return totalDiskSpace;
}
@Override
public long getWeight() {
return freeDiskSpace;
}
public String toString() {
return "FreeDiskSpace: " + this.freeDiskSpace + " TotalDiskCapacity: " + this.totalDiskSpace;
}
}
/**
* Tracks the most recently reported set of bookies from BookieWatcher as well
* as current BookieInfo for bookies we've successfully queried.
*/
private static class BookieInfoMap {
/**
* Contains the most recently obtained information on the contained bookies.
* When an error happens querying a bookie, the entry is removed.
*/
private final Map<BookieSocketAddress, BookieInfo> infoMap = new HashMap<>();
/**
* Contains the most recently reported set of bookies from BookieWatcher
* A partial query consists of every member of mostRecentlyReportedBookies
* minus the entries in bookieInfoMap.
*/
private Collection<BookieSocketAddress> mostRecentlyReportedBookies = new ArrayList<>();
public void updateBookies(Collection<BookieSocketAddress> updatedBookieSet) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"updateBookies: current: {}, new: {}",
mostRecentlyReportedBookies, updatedBookieSet);
}
infoMap.keySet().retainAll(updatedBookieSet);
mostRecentlyReportedBookies = updatedBookieSet;
}
@SuppressWarnings("unchecked")
public Collection<BookieSocketAddress> getPartialScanTargets() {
return CollectionUtils.subtract(mostRecentlyReportedBookies, infoMap.keySet());
}
public Collection<BookieSocketAddress> getFullScanTargets() {
return mostRecentlyReportedBookies;
}
/**
* Returns info for bookie, null if not known.
*
* @param bookie bookie for which to get info
* @return Info for bookie, null otherwise
*/
public BookieInfo getInfo(BookieSocketAddress bookie) {
return infoMap.get(bookie);
}
/**
* Removes bookie from bookieInfoMap.
*
* @param bookie bookie on which we observed an error
*/
public void clearInfo(BookieSocketAddress bookie) {
infoMap.remove(bookie);
}
/**
* Report new info on bookie.
*
* @param bookie bookie for which we obtained new info
* @param info the new info
*/
public void gotInfo(BookieSocketAddress bookie, BookieInfo info) {
infoMap.put(bookie, info);
}
/**
* Get bookie info map.
*/
public Map<BookieSocketAddress, BookieInfo> getBookieMap() {
return infoMap;
}
}
private final BookieInfoMap bookieInfoMap = new BookieInfoMap();
/**
* Tracks whether there is an execution in progress as well as whether
* another is pending.
*/
public enum State { UNQUEUED, PARTIAL, FULL }
private static class InstanceState {
private boolean running = false;
private State queuedType = State.UNQUEUED;
private boolean shouldStart() {
if (running) {
return false;
} else {
running = true;
return true;
}
}
/**
* Mark pending operation FULL and return true if there is no in-progress operation.
*
* @return True if we should execute a scan, False if there is already one running
*/
public boolean tryStartFull() {
queuedType = State.FULL;
return shouldStart();
}
/**
* Mark pending operation PARTIAL if not full and return true if there is no in-progress operation.
*
* @return True if we should execute a scan, False if there is already one running
*/
public boolean tryStartPartial() {
if (queuedType == State.UNQUEUED) {
queuedType = State.PARTIAL;
}
return shouldStart();
}
/**
* Gets and clears queuedType.
*/
public State getAndClearQueuedType() {
State ret = queuedType;
queuedType = State.UNQUEUED;
return ret;
}
/**
* If queuedType != UNQUEUED, returns true, leaves running equal to true
* Otherwise, returns false and sets running to false.
*/
public boolean completeUnlessQueued() {
if (queuedType == State.UNQUEUED) {
running = false;
return false;
} else {
return true;
}
}
}
private final InstanceState instanceState = new InstanceState();
BookieInfoReader(BookKeeper bk,
ClientConfiguration conf,
ScheduledExecutorService scheduler) {
this.bk = bk;
this.conf = conf;
this.scheduler = scheduler;
}
public void start() {
this.bk
.getMetadataClientDriver()
.getRegistrationClient()
.watchWritableBookies(bookies -> availableBookiesChanged(bookies.getValue()));
scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
synchronized (BookieInfoReader.this) {
if (LOG.isDebugEnabled()) {
LOG.debug("Running periodic BookieInfo scan");
}
try {
Collection<BookieSocketAddress> updatedBookies = bk.bookieWatcher.getBookies();
bookieInfoMap.updateBookies(updatedBookies);
} catch (BKException e) {
LOG.info("Got exception while querying bookies from watcher, rerunning after {}s",
conf.getGetBookieInfoRetryIntervalSeconds(), e);
scheduler.schedule(this, conf.getGetBookieInfoRetryIntervalSeconds(), TimeUnit.SECONDS);
return;
}
if (instanceState.tryStartFull()) {
getReadWriteBookieInfo();
}
}
}
}, 0, conf.getGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS);
}
private void submitTask() {
scheduler.submit(() -> getReadWriteBookieInfo());
}
private void submitTaskWithDelay(int delaySeconds) {
scheduler.schedule(() -> getReadWriteBookieInfo(), delaySeconds, TimeUnit.SECONDS);
}
synchronized void availableBookiesChanged(Set<BookieSocketAddress> updatedBookiesList) {
if (LOG.isInfoEnabled()) {
LOG.info("Scheduling bookie info read due to changes in available bookies.");
}
bookieInfoMap.updateBookies(updatedBookiesList);
if (instanceState.tryStartPartial()) {
submitTask();
}
}
/**
* Method to allow tests to block until bookie info is available.
*
* @param bookie to lookup
* @return None if absent, free disk space if present
*/
synchronized Optional<Long> getFreeDiskSpace(BookieSocketAddress bookie) {
BookieInfo bookieInfo = bookieInfoMap.getInfo(bookie);
if (bookieInfo != null) {
return Optional.of(bookieInfo.getFreeDiskSpace());
} else {
return Optional.empty();
}
}
/* State to track scan execution progress as callbacks come in */
private int totalSent = 0;
private int completedCnt = 0;
private int errorCnt = 0;
/**
* Performs scan described by instanceState using the cached bookie information
* in bookieInfoMap.
*/
synchronized void getReadWriteBookieInfo() {
State queuedType = instanceState.getAndClearQueuedType();
Collection<BookieSocketAddress> toScan;
if (queuedType == State.FULL) {
if (LOG.isDebugEnabled()) {
LOG.debug("Doing full scan");
}
toScan = bookieInfoMap.getFullScanTargets();
} else if (queuedType == State.PARTIAL) {
if (LOG.isDebugEnabled()) {
LOG.debug("Doing partial scan");
}
toScan = bookieInfoMap.getPartialScanTargets();
} else {
if (LOG.isErrorEnabled()) {
LOG.error("Invalid state, queuedType cannot be UNQUEUED in getReadWriteBookieInfo");
}
assert(queuedType != State.UNQUEUED);
return;
}
BookieClient bkc = bk.getBookieClient();
final long requested = BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
| BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
totalSent = 0;
completedCnt = 0;
errorCnt = 0;
if (LOG.isDebugEnabled()) {
LOG.debug("Getting bookie info for: {}", toScan);
}
for (BookieSocketAddress b : toScan) {
bkc.getBookieInfo(b, requested,
new GetBookieInfoCallback() {
void processReadInfoComplete(int rc, BookieInfo bInfo, Object ctx) {
synchronized (BookieInfoReader.this) {
BookieSocketAddress b = (BookieSocketAddress) ctx;
if (rc != BKException.Code.OK) {
if (LOG.isErrorEnabled()) {
LOG.error("Reading bookie info from bookie {} failed due to {}",
b, BKException.codeLogger(rc));
}
// We reread bookies missing from the map each time, so remove to ensure
// we get to it on the next scan
bookieInfoMap.clearInfo(b);
errorCnt++;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Bookie Info for bookie {} is {}", b, bInfo);
}
bookieInfoMap.gotInfo(b, bInfo);
}
completedCnt++;
if (totalSent == completedCnt) {
onExit();
}
}
}
@Override
public void getBookieInfoComplete(final int rc, final BookieInfo bInfo, final Object ctx) {
scheduler.submit(
new Runnable() {
@Override
public void run() {
processReadInfoComplete(rc, bInfo, ctx);
}
});
}
}, b);
totalSent++;
}
if (totalSent == 0) {
onExit();
}
}
void onExit() {
bk.placementPolicy.updateBookieInfo(bookieInfoMap.getBookieMap());
if (errorCnt > 0) {
if (LOG.isInfoEnabled()) {
LOG.info("Rescheduling in {}s due to errors", conf.getGetBookieInfoIntervalSeconds());
}
instanceState.tryStartPartial();
submitTaskWithDelay(conf.getGetBookieInfoRetryIntervalSeconds());
} else if (instanceState.completeUnlessQueued()) {
if (LOG.isInfoEnabled()) {
LOG.info("Rescheduling, another scan is pending");
}
submitTask();
}
}
Map<BookieSocketAddress, BookieInfo> getBookieInfo() throws BKException, InterruptedException {
BookieClient bkc = bk.getBookieClient();
final AtomicInteger totalSent = new AtomicInteger();
final AtomicInteger totalCompleted = new AtomicInteger();
final ConcurrentMap<BookieSocketAddress, BookieInfo> map =
new ConcurrentHashMap<BookieSocketAddress, BookieInfo>();
final CountDownLatch latch = new CountDownLatch(1);
long requested = BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE
| BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
Collection<BookieSocketAddress> bookies;
bookies = bk.bookieWatcher.getBookies();
bookies.addAll(bk.bookieWatcher.getReadOnlyBookies());
totalSent.set(bookies.size());
for (BookieSocketAddress b : bookies) {
bkc.getBookieInfo(b, requested, new GetBookieInfoCallback() {
@Override
public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) {
BookieSocketAddress b = (BookieSocketAddress) ctx;
if (rc != BKException.Code.OK) {
if (LOG.isErrorEnabled()) {
LOG.error("Reading bookie info from bookie {} failed due to {}",
b, BKException.codeLogger(rc));
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Free disk space on bookie {} is {}.", b, bInfo.getFreeDiskSpace());
}
map.put(b, bInfo);
}
if (totalCompleted.incrementAndGet() == totalSent.get()) {
latch.countDown();
}
}
}, b);
}
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Received InterruptedException ", e);
throw e;
}
return map;
}
}