blob: e361294cb9971d3786e24f3375a4532db738b046 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.zookeeper.ZooKeeper;
/**
* Test BookKeeperClient which allows access to members we don't
* wish to expose in the public API.
*/
@Slf4j
public class BookKeeperTestClient extends BookKeeper {
TestStatsProvider statsProvider;
public BookKeeperTestClient(ClientConfiguration conf, TestStatsProvider statsProvider)
throws IOException, InterruptedException, BKException {
super(conf, null, null, new UnpooledByteBufAllocator(false),
statsProvider == null ? NullStatsLogger.INSTANCE : statsProvider.getStatsLogger(""),
null, null, null);
this.statsProvider = statsProvider;
}
public BookKeeperTestClient(ClientConfiguration conf)
throws InterruptedException, BKException, IOException {
this(conf, null);
}
public ZooKeeper getZkHandle() {
return super.getZkHandle();
}
public ClientConfiguration getConf() {
return super.getConf();
}
public BookieClient getBookieClient() {
return bookieClient;
}
public Future<?> waitForReadOnlyBookie(BookieId b)
throws Exception {
return waitForBookieInSet(b, false);
}
public Future<?> waitForWritableBookie(BookieId b)
throws Exception {
return waitForBookieInSet(b, true);
}
/**
* Wait for bookie to appear in either the writable set of bookies,
* or the read only set of bookies. Also ensure that it doesn't exist
* in the other set before completing.
*/
private Future<?> waitForBookieInSet(BookieId b,
boolean writable) throws Exception {
log.info("Wait for {} to become {}",
b, writable ? "writable" : "readonly");
CompletableFuture<Void> readOnlyFuture = new CompletableFuture<>();
CompletableFuture<Void> writableFuture = new CompletableFuture<>();
RegistrationListener readOnlyListener = (bookies) -> {
boolean contains = bookies.getValue().contains(b);
if ((!writable && contains) || (writable && !contains)) {
readOnlyFuture.complete(null);
}
};
RegistrationListener writableListener = (bookies) -> {
boolean contains = bookies.getValue().contains(b);
if ((writable && contains) || (!writable && !contains)) {
writableFuture.complete(null);
}
};
getMetadataClientDriver().getRegistrationClient().watchWritableBookies(writableListener);
getMetadataClientDriver().getRegistrationClient().watchReadOnlyBookies(readOnlyListener);
if (writable) {
return writableFuture
.thenCompose(ignored -> getMetadataClientDriver().getRegistrationClient().getReadOnlyBookies())
.thenCompose(readonlyBookies -> {
if (readonlyBookies.getValue().contains(b)) {
// if the bookie still shows up at readonly path, wait for it to disappear
return readOnlyFuture;
} else {
return FutureUtils.Void();
}
});
} else {
return readOnlyFuture
.thenCompose(ignored -> getMetadataClientDriver().getRegistrationClient().getWritableBookies())
.thenCompose(writableBookies -> {
if (writableBookies.getValue().contains(b)) {
// if the bookie still shows up at writable path, wait for it to disappear
return writableFuture;
} else {
return FutureUtils.Void();
}
});
}
}
public TestStatsProvider getTestStatsProvider() {
return statsProvider;
}
}