blob: 234a860364fa582647ccd8ddb8dfa88851f62f9f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.discover;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.collect;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.apache.bookkeeper.common.testing.MoreAsserts.assertSetEquals;
import static org.apache.bookkeeper.discover.ZKRegistrationClient.ZK_CONNECT_BACKOFF_MS;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException.ZKException;
import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
import org.apache.bookkeeper.discover.RegistrationClient.RegistrationListener;
import org.apache.bookkeeper.discover.ZKRegistrationClient.WatchTask;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.MockZooKeeperTestCase;
import org.apache.zookeeper.AsyncCallback.Children2Callback;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
/**
* Unit test of {@link RegistrationClient}.
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ ZKRegistrationClient.class, ZkUtils.class })
@Slf4j
public class TestZkRegistrationClient extends MockZooKeeperTestCase {
@Rule
public final TestName runtime = new TestName();
private String ledgersPath;
private String regPath;
private String regAllPath;
private String regReadonlyPath;
private ZKRegistrationClient zkRegistrationClient;
private ScheduledExecutorService mockExecutor;
private MockExecutorController controller;
@Override
@Before
public void setup() throws Exception {
super.setup();
this.ledgersPath = "/" + runtime.getMethodName();
this.regPath = ledgersPath + "/" + AVAILABLE_NODE;
this.regAllPath = ledgersPath + "/" + COOKIE_NODE;
this.regReadonlyPath = regPath + "/" + READONLY;
this.mockExecutor = mock(ScheduledExecutorService.class);
this.controller = new MockExecutorController()
.controlExecute(mockExecutor)
.controlSubmit(mockExecutor)
.controlSchedule(mockExecutor)
.controlScheduleAtFixedRate(mockExecutor, 10);
this.zkRegistrationClient = new ZKRegistrationClient(
mockZk,
ledgersPath,
mockExecutor
);
}
@After
public void teardown() {
if (null != zkRegistrationClient) {
zkRegistrationClient.close();
}
}
private static Set<BookieSocketAddress> prepareNBookies(int num) {
Set<BookieSocketAddress> bookies = new HashSet<>();
for (int i = 0; i < num; i++) {
bookies.add(new BookieSocketAddress("127.0.0.1", 3181 + i));
}
return bookies;
}
@Test
public void testGetWritableBookies() throws Exception {
Set<BookieSocketAddress> addresses = prepareNBookies(10);
List<String> children = Lists.newArrayList();
for (BookieSocketAddress address : addresses) {
children.add(address.toString());
}
Stat stat = mock(Stat.class);
when(stat.getCversion()).thenReturn(1234);
mockGetChildren(
regPath, false,
Code.OK.intValue(), children, stat);
Versioned<Set<BookieSocketAddress>> result =
result(zkRegistrationClient.getWritableBookies());
assertEquals(new LongVersion(1234), result.getVersion());
assertSetEquals(
addresses, result.getValue());
}
@Test
public void testGetAllBookies() throws Exception {
Set<BookieSocketAddress> addresses = prepareNBookies(10);
List<String> children = Lists.newArrayList();
for (BookieSocketAddress address : addresses) {
children.add(address.toString());
}
Stat stat = mock(Stat.class);
when(stat.getCversion()).thenReturn(1234);
mockGetChildren(
regAllPath, false,
Code.OK.intValue(), children, stat);
Versioned<Set<BookieSocketAddress>> result =
result(zkRegistrationClient.getAllBookies());
assertEquals(new LongVersion(1234), result.getVersion());
assertSetEquals(
addresses, result.getValue());
}
@Test
public void testGetReadOnlyBookies() throws Exception {
Set<BookieSocketAddress> addresses = prepareNBookies(10);
List<String> children = Lists.newArrayList();
for (BookieSocketAddress address : addresses) {
children.add(address.toString());
}
Stat stat = mock(Stat.class);
when(stat.getCversion()).thenReturn(1234);
mockGetChildren(
regReadonlyPath, false,
Code.OK.intValue(), children, stat);
Versioned<Set<BookieSocketAddress>> result =
result(zkRegistrationClient.getReadOnlyBookies());
assertEquals(new LongVersion(1234), result.getVersion());
assertSetEquals(
addresses, result.getValue());
}
@Test
public void testGetWritableBookiesFailure() throws Exception {
mockGetChildren(
regPath, false,
Code.NONODE.intValue(), null, null);
try {
result(zkRegistrationClient.getWritableBookies());
fail("Should fail to get writable bookies");
} catch (ZKException zke) {
// expected to throw zookeeper exception
}
}
@Test
public void testGetAllBookiesFailure() throws Exception {
mockGetChildren(
regAllPath, false,
Code.NONODE.intValue(), null, null);
try {
result(zkRegistrationClient.getAllBookies());
fail("Should fail to get all bookies");
} catch (ZKException zke) {
// expected to throw zookeeper exception
}
}
@Test
public void testGetReadOnlyBookiesFailure() throws Exception {
mockGetChildren(
regReadonlyPath, false,
Code.NONODE.intValue(), null, null);
try {
result(zkRegistrationClient.getReadOnlyBookies());
fail("Should fail to get writable bookies");
} catch (ZKException zke) {
// expected to throw zookeeper exception
}
}
@Test
public void testWatchWritableBookiesSuccess() throws Exception {
testWatchBookiesSuccess(true);
}
@Test
public void testWatchReadonlyBookiesSuccess() throws Exception {
testWatchBookiesSuccess(false);
}
@SuppressWarnings("unchecked")
private void testWatchBookiesSuccess(boolean isWritable)
throws Exception {
//
// 1. test watch bookies with a listener
//
LinkedBlockingQueue<Versioned<Set<BookieSocketAddress>>> updates =
spy(new LinkedBlockingQueue<>());
RegistrationListener listener = bookies -> {
try {
updates.put(bookies);
} catch (InterruptedException e) {
log.warn("Interrupted on enqueue bookie updates", e);
}
};
Set<BookieSocketAddress> addresses = prepareNBookies(10);
List<String> children = Lists.newArrayList();
for (BookieSocketAddress address : addresses) {
children.add(address.toString());
}
Stat stat = mock(Stat.class);
when(stat.getCversion()).thenReturn(1234);
mockGetChildren(
isWritable ? regPath : regReadonlyPath,
true,
Code.OK.intValue(), children, stat);
if (isWritable) {
result(zkRegistrationClient.watchWritableBookies(listener));
} else {
result(zkRegistrationClient.watchReadOnlyBookies(listener));
}
Versioned<Set<BookieSocketAddress>> update = updates.take();
verify(updates, times(1)).put(any(Versioned.class));
assertEquals(new LongVersion(1234), update.getVersion());
assertSetEquals(
addresses, update.getValue());
verify(mockZk, times(1))
.getChildren(anyString(), any(Watcher.class), any(Children2Callback.class), any());
//
// 2. test watch bookies with a second listener. the second listener returns cached bookies
// without calling `getChildren` again
//
// register another listener
LinkedBlockingQueue<Versioned<Set<BookieSocketAddress>>> secondUpdates =
spy(new LinkedBlockingQueue<>());
RegistrationListener secondListener = bookies -> {
try {
secondUpdates.put(bookies);
} catch (InterruptedException e) {
log.warn("Interrupted on enqueue bookie updates", e);
}
};
if (isWritable) {
result(zkRegistrationClient.watchWritableBookies(secondListener));
} else {
result(zkRegistrationClient.watchReadOnlyBookies(secondListener));
}
Versioned<Set<BookieSocketAddress>> secondListenerUpdate = secondUpdates.take();
// first listener will not be notified with any update
verify(updates, times(1)).put(any(Versioned.class));
// second listener will receive same update as the first listener received before
verify(secondUpdates, times(1)).put(any(Versioned.class));
assertSame(update.getVersion(), secondListenerUpdate.getVersion());
assertSame(update.getValue(), secondListenerUpdate.getValue());
// the second listener will return the cached value without issuing another getChildren call
verify(mockZk, times(1))
.getChildren(anyString(), any(Watcher.class), any(Children2Callback.class), any());
//
// 3. simulate session expire, it will trigger watcher to refetch bookies again.
// but since there is no updates on bookies, the registered listeners will not be notified.
//
notifyWatchedEvent(
EventType.None,
KeeperState.Expired,
isWritable ? regPath : regReadonlyPath);
// if session expires, the watcher task will get into backoff state
controller.advance(Duration.ofMillis(ZK_CONNECT_BACKOFF_MS));
// the same updates returns, the getChildren calls increase to 2
// but since there is no updates, so no notification is sent.
verify(mockZk, times(2))
.getChildren(anyString(), any(Watcher.class), any(Children2Callback.class), any());
assertNull(updates.poll());
// both listener and secondListener will not receive any old update
verify(updates, times(1)).put(any(Versioned.class));
verify(secondUpdates, times(1)).put(any(Versioned.class));
//
// 4. notify with new bookies. both listeners will be notified with new bookies.
//
Set<BookieSocketAddress> newAddresses = prepareNBookies(20);
List<String> newChildren = Lists.newArrayList();
for (BookieSocketAddress address : newAddresses) {
newChildren.add(address.toString());
}
Stat newStat = mock(Stat.class);
when(newStat.getCversion()).thenReturn(1235);
mockGetChildren(
isWritable ? regPath : regReadonlyPath,
true,
Code.OK.intValue(), newChildren, newStat);
// trigger watcher
notifyWatchedEvent(
EventType.NodeChildrenChanged,
KeeperState.SyncConnected,
isWritable ? regPath : regReadonlyPath);
update = updates.take();
assertEquals(new LongVersion(1235), update.getVersion());
assertSetEquals(
newAddresses, update.getValue());
secondListenerUpdate = secondUpdates.take();
assertSame(update.getVersion(), secondListenerUpdate.getVersion());
assertSame(update.getValue(), secondListenerUpdate.getValue());
verify(mockZk, times(3))
.getChildren(anyString(), any(Watcher.class), any(Children2Callback.class), any());
verify(updates, times(2)).put(any(Versioned.class));
verify(secondUpdates, times(2)).put(any(Versioned.class));
//
// 5. unwatch the second listener and notify with new bookies again. only first listener will
// be notified with new bookies.
//
newAddresses = prepareNBookies(25);
newChildren.clear();
newChildren = Lists.newArrayList();
for (BookieSocketAddress address : newAddresses) {
newChildren.add(address.toString());
}
newStat = mock(Stat.class);
when(newStat.getCversion()).thenReturn(1236);
mockGetChildren(
isWritable ? regPath : regReadonlyPath,
true,
Code.OK.intValue(), newChildren, newStat);
if (isWritable) {
assertEquals(2, zkRegistrationClient.getWatchWritableBookiesTask().getNumListeners());
zkRegistrationClient.unwatchWritableBookies(secondListener);
assertEquals(1, zkRegistrationClient.getWatchWritableBookiesTask().getNumListeners());
} else {
assertEquals(2, zkRegistrationClient.getWatchReadOnlyBookiesTask().getNumListeners());
zkRegistrationClient.unwatchReadOnlyBookies(secondListener);
assertEquals(1, zkRegistrationClient.getWatchReadOnlyBookiesTask().getNumListeners());
}
// trigger watcher
notifyWatchedEvent(
EventType.NodeChildrenChanged,
KeeperState.SyncConnected,
isWritable ? regPath : regReadonlyPath);
update = updates.take();
assertEquals(new LongVersion(1236), update.getVersion());
assertSetEquals(
newAddresses, update.getValue());
secondListenerUpdate = secondUpdates.poll();
assertNull(secondListenerUpdate);
verify(mockZk, times(4))
.getChildren(anyString(), any(Watcher.class), any(Children2Callback.class), any());
verify(updates, times(3)).put(any(Versioned.class));
verify(secondUpdates, times(2)).put(any(Versioned.class));
//
// 6. unwatch the first listener. the watch task will be closed and zk watcher will be removed.
//
//
WatchTask expectedWatcher;
if (isWritable) {
expectedWatcher = zkRegistrationClient.getWatchWritableBookiesTask();
assertFalse(expectedWatcher.isClosed());
zkRegistrationClient.unwatchWritableBookies(listener);
assertNull(zkRegistrationClient.getWatchWritableBookiesTask());
} else {
expectedWatcher = zkRegistrationClient.getWatchReadOnlyBookiesTask();
assertFalse(expectedWatcher.isClosed());
zkRegistrationClient.unwatchReadOnlyBookies(listener);
assertNull(zkRegistrationClient.getWatchReadOnlyBookiesTask());
}
// the watch task will not be closed since there is still a listener
assertTrue(expectedWatcher.isClosed());
}
@Test
public void testWatchWritableBookiesTwice() throws Exception {
testWatchBookiesTwice(true);
}
@Test
public void testWatchReadonlyBookiesTwice() throws Exception {
testWatchBookiesTwice(false);
}
private void testWatchBookiesTwice(boolean isWritable)
throws Exception {
int zkCallbackDelayMs = 100;
Set<BookieSocketAddress> addresses = prepareNBookies(10);
List<String> children = Lists.newArrayList();
for (BookieSocketAddress address : addresses) {
children.add(address.toString());
}
Stat stat = mock(Stat.class);
when(stat.getCversion()).thenReturn(1234);
mockGetChildren(
isWritable ? regPath : regReadonlyPath,
true,
Code.OK.intValue(), children, stat, zkCallbackDelayMs);
CompletableFuture<Versioned<Set<BookieSocketAddress>>> firstResult = new CompletableFuture<>();
RegistrationListener firstListener = bookies -> firstResult.complete(bookies);
CompletableFuture<Versioned<Set<BookieSocketAddress>>> secondResult = new CompletableFuture<>();
RegistrationListener secondListener = bookies -> secondResult.complete(bookies);
List<CompletableFuture<Void>> watchFutures = Lists.newArrayListWithExpectedSize(2);
if (isWritable) {
watchFutures.add(zkRegistrationClient.watchWritableBookies(firstListener));
watchFutures.add(zkRegistrationClient.watchWritableBookies(secondListener));
} else {
watchFutures.add(zkRegistrationClient.watchReadOnlyBookies(firstListener));
watchFutures.add(zkRegistrationClient.watchReadOnlyBookies(secondListener));
}
// trigger zkCallbackExecutor to execute getChildren callback
zkCallbackController.advance(Duration.ofMillis(zkCallbackDelayMs));
result(collect(watchFutures));
assertEquals(firstResult.get().getVersion(), secondResult.get().getVersion());
assertSetEquals(firstResult.get().getValue(), secondResult.get().getValue());
}
@Test
public void testWatchWritableBookiesFailure() throws Exception {
testWatchBookiesFailure(true);
}
@Test
public void testWatchReadonlyBookiesFailure() throws Exception {
testWatchBookiesFailure(false);
}
private void testWatchBookiesFailure(boolean isWritable)
throws Exception {
int zkCallbackDelayMs = 100;
mockGetChildren(
isWritable ? regPath : regReadonlyPath,
true,
Code.NONODE.intValue(), null, null, zkCallbackDelayMs);
CompletableFuture<Versioned<Set<BookieSocketAddress>>> listenerResult = new CompletableFuture<>();
RegistrationListener listener = bookies -> listenerResult.complete(bookies);
CompletableFuture<Void> watchFuture;
WatchTask watchTask;
if (isWritable) {
watchFuture = zkRegistrationClient.watchWritableBookies(listener);
watchTask = zkRegistrationClient.getWatchWritableBookiesTask();
} else {
watchFuture = zkRegistrationClient.watchReadOnlyBookies(listener);
watchTask = zkRegistrationClient.getWatchReadOnlyBookiesTask();
}
assertNotNull(watchTask);
assertEquals(1, watchTask.getNumListeners());
// trigger zkCallbackExecutor to execute getChildren callback
zkCallbackController.advance(Duration.ofMillis(zkCallbackDelayMs));
try {
result(watchFuture);
fail("Should fail to watch writable bookies if reg path doesn't exist");
} catch (ZKException zke) {
// expected
}
assertEquals(0, watchTask.getNumListeners());
assertTrue(watchTask.isClosed());
if (isWritable) {
assertNull(zkRegistrationClient.getWatchWritableBookiesTask());
} else {
assertNull(zkRegistrationClient.getWatchReadOnlyBookiesTask());
}
}
}