blob: 51e7ea6e7ced5baea90b311216d9c8cd125c5702 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.client;
import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS;
import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.shuffleWithMask;
import static org.apache.bookkeeper.client.RoundRobinDistributionSchedule.writeSetFromValues;
import static org.apache.bookkeeper.feature.SettableFeatureProvider.DISABLE_ALL;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.util.HashedWheelTimer;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy.PlacementPolicyAdherence;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy.Ensemble;
import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.EnsembleForReplacementWithNoConstraints;
import org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy.TruePredicate;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.net.Node;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
import org.apache.bookkeeper.util.StaticDNSResolver;
import org.apache.commons.collections4.CollectionUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the rackaware ensemble placement policy.
*/
public class TestRackawareEnsemblePlacementPolicy extends TestCase {
static final Logger LOG = LoggerFactory.getLogger(TestRackawareEnsemblePlacementPolicy.class);
RackawareEnsemblePlacementPolicy repp;
final List<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
DistributionSchedule.WriteSet writeSet = DistributionSchedule.NULL_WRITE_SET;
ClientConfiguration conf = new ClientConfiguration();
BookieSocketAddress addr1, addr2, addr3, addr4;
io.netty.util.HashedWheelTimer timer;
final int minNumRacksPerWriteQuorumConfValue = 2;
@Override
protected void setUp() throws Exception {
super.setUp();
StaticDNSResolver.reset();
StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostAddress(),
NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack("127.0.0.1", NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack("localhost", NetworkTopology.DEFAULT_REGION_AND_RACK);
LOG.info("Set up static DNS Resolver.");
conf.setProperty(REPP_DNS_RESOLVER_CLASS, StaticDNSResolver.class.getName());
conf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
addr1 = new BookieSocketAddress("127.0.0.2", 3181);
addr2 = new BookieSocketAddress("127.0.0.3", 3181);
addr3 = new BookieSocketAddress("127.0.0.4", 3181);
addr4 = new BookieSocketAddress("127.0.0.5", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), NetworkTopology.DEFAULT_REGION + "/rack1");
StaticDNSResolver.addNodeToRack(addr2.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr3.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr4.getHostName(), NetworkTopology.DEFAULT_REGION + "/rack2");
ensemble.add(addr1);
ensemble.add(addr2);
ensemble.add(addr3);
ensemble.add(addr4);
writeSet = writeSetFromValues(0, 1, 2, 3);
timer = new HashedWheelTimer(
new ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
conf.getTimeoutTimerNumTicks());
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
}
@Override
protected void tearDown() throws Exception {
repp.uninitalize();
super.tearDown();
}
static BookiesHealthInfo getBookiesHealthInfo() {
return getBookiesHealthInfo(new HashMap<>(), new HashMap<>());
}
static BookiesHealthInfo getBookiesHealthInfo(Map<BookieSocketAddress, Long> bookieFailureHistory,
Map<BookieSocketAddress, Long> bookiePendingRequests) {
return new BookiesHealthInfo() {
@Override
public long getBookieFailureHistory(BookieSocketAddress bookieSocketAddress) {
return bookieFailureHistory.getOrDefault(bookieSocketAddress, -1L);
}
@Override
public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) {
return bookiePendingRequests.getOrDefault(bookieSocketAddress, 0L);
}
};
}
static void updateMyRack(String rack) throws Exception {
StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostAddress(), rack);
StaticDNSResolver.addNodeToRack(InetAddress.getLocalHost().getHostName(), rack);
StaticDNSResolver.addNodeToRack("127.0.0.1", rack);
StaticDNSResolver.addNodeToRack("localhost", rack);
}
@Test
public void testNodeDown() throws Exception {
repp.uninitalize();
updateMyRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
addrs.remove(addr1);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(),
writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(1, 2, 3, 0);
LOG.info("reorder set : {}", reorderSet);
assertFalse(reorderSet.equals(origWriteSet));
assertEquals(expectedSet, reorderSet);
}
@Test
public void testNodeReadOnly() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
addrs.remove(addr1);
Set<BookieSocketAddress> ro = new HashSet<BookieSocketAddress>();
ro.add(addr1);
repp.onClusterChanged(addrs, ro);
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(), writeSet);
LOG.info("reorder set : {}", reorderSet);
assertEquals(reorderSet, origWriteSet);
}
@Test
public void testNodeSlow() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
repp.registerSlowBookie(addr1, 0L);
Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
bookiePendingMap.put(addr1, 1L);
repp.onClusterChanged(addrs, new HashSet<>());
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(1, 2, 3, 0);
LOG.info("reorder set : {}", reorderSet);
assertFalse(reorderSet.equals(origWriteSet));
assertEquals(expectedSet, reorderSet);
}
@Test
public void testTwoNodesSlow() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
repp.registerSlowBookie(addr1, 0L);
repp.registerSlowBookie(addr2, 0L);
Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
bookiePendingMap.put(addr1, 1L);
bookiePendingMap.put(addr2, 2L);
repp.onClusterChanged(addrs, new HashSet<>());
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3, 0, 1);
LOG.info("reorder set : {}", reorderSet);
assertFalse(reorderSet.equals(origWriteSet));
assertEquals(expectedSet, reorderSet);
}
@Test
public void testTwoNodesDown() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
addrs.remove(addr1);
addrs.remove(addr2);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(), writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3, 0, 1);
LOG.info("reorder set : {}", reorderSet);
assertFalse(reorderSet.equals(origWriteSet));
assertEquals(expectedSet, reorderSet);
}
@Test
public void testNodeDownAndReadOnly() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
addrs.remove(addr1);
addrs.remove(addr2);
Set<BookieSocketAddress> roAddrs = new HashSet<BookieSocketAddress>();
roAddrs.add(addr2);
repp.onClusterChanged(addrs, roAddrs);
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(), writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3, 1, 0);
assertFalse(reorderSet.equals(origWriteSet));
assertEquals(expectedSet, reorderSet);
}
@Test
public void testNodeDownAndNodeSlow() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
repp.registerSlowBookie(addr1, 0L);
Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
bookiePendingMap.put(addr1, 1L);
addrs.remove(addr2);
repp.onClusterChanged(addrs, new HashSet<>());
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 3, 0, 1);
LOG.info("reorder set : {}", reorderSet);
assertFalse(reorderSet.equals(origWriteSet));
assertEquals(expectedSet, reorderSet);
}
@Test
public void testNodeDownAndReadOnlyAndNodeSlow() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
addrs.remove(addr1);
addrs.remove(addr2);
Set<BookieSocketAddress> ro = new HashSet<BookieSocketAddress>();
ro.add(addr2);
repp.registerSlowBookie(addr3, 0L);
Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
bookiePendingMap.put(addr3, 1L);
addrs.remove(addr2);
repp.onClusterChanged(addrs, ro);
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(3, 1, 2, 0);
LOG.info("reorder set : {}", reorderSet);
assertFalse(reorderSet.equals(origWriteSet));
assertEquals(expectedSet, reorderSet);
}
/*
* Tests the reordering of the writeSet based on number of pending requests.
* Expect the third bookie to be placed first since its number of pending requests
* is READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 less than the originally first bookie.
*/
@Test
public void testPendingRequestsReorder() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");
repp = new RackawareEnsemblePlacementPolicy();
ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
conf.setReorderThresholdPendingRequests(10);
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
bookiePendingMap.put(addr1, 20L);
bookiePendingMap.put(addr2, 7L);
bookiePendingMap.put(addr3, 1L); // best bookie -> this one first
bookiePendingMap.put(addr4, 5L);
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(2, 0, 1, 3);
LOG.info("reorder set : {}", reorderSet);
assertEquals("expect bookie idx 2 first", expectedSet, reorderSet);
}
/*
* Tests the reordering of the writeSet based on number of pending requests for
* an ensemble that is larger than the writeSet.
* Expect the sixth bookie to be placed first since its number of pending requests
* is READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 less than the originally first bookie.
*/
@Test
public void testPendingRequestsReorderLargeEnsemble() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");
repp = new RackawareEnsemblePlacementPolicy();
ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
conf.setReorderThresholdPendingRequests(10);
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
// Update cluster
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.7", 3181);
BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.8", 3181);
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
addrs.add(addr5);
addrs.add(addr6);
addrs.add(addr7);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
bookiePendingMap.put(addr1, 1L); // not in write set
bookiePendingMap.put(addr2, 20L);
bookiePendingMap.put(addr3, 0L); // not in write set
bookiePendingMap.put(addr4, 12L);
bookiePendingMap.put(addr5, 9L); // not in write set
bookiePendingMap.put(addr6, 2L); // best bookie -> this one first
bookiePendingMap.put(addr7, 10L);
List<BookieSocketAddress> ensemble = new ArrayList<BookieSocketAddress>();
ensemble.add(addr1);
ensemble.add(addr2);
ensemble.add(addr3);
ensemble.add(addr4);
ensemble.add(addr5);
ensemble.add(addr6);
ensemble.add(addr7);
DistributionSchedule.WriteSet writeSet = writeSetFromValues(1, 3, 5, 6);
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
DistributionSchedule.WriteSet expectedSet = writeSetFromValues(5, 1, 3, 6);
LOG.info("reorder set : {}", reorderSet);
assertEquals("expect bookie idx 5 first", expectedSet, reorderSet);
}
/*
* Tests the reordering of the writeSet based on number of pending requests.
* Expect no reordering in this case since the currently first bookie's number of
* pending requests is less than READ_REORDER_THRESHOLD_PENDING_REQUESTS=10 lower
* than the best bookie.
*/
@Test
public void testPendingRequestsNoReorder1() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");
repp = new RackawareEnsemblePlacementPolicy();
ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
conf.setReorderThresholdPendingRequests(10);
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
bookiePendingMap.put(addr1, 10L); // -> this one first
bookiePendingMap.put(addr2, 7L);
bookiePendingMap.put(addr3, 1L); // best bookie, but below threshold
bookiePendingMap.put(addr4, 5L);
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
LOG.info("reorder set : {}", reorderSet);
assertEquals("writeSet should be in original order", origWriteSet, reorderSet);
}
/*
* Tests the reordering of the writeSet based on number of pending requests.
* Expect no reordering in this case since the currently first bookie's number of
* pending requests is lowest among all bookies already.
*/
@Test
public void testPendingRequestsNoReorder2() throws Exception {
repp.uninitalize();
updateMyRack("/r1/rack1");
repp = new RackawareEnsemblePlacementPolicy();
ClientConfiguration conf = (ClientConfiguration) this.conf.clone();
conf.setReorderThresholdPendingRequests(10);
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
Map<BookieSocketAddress, Long> bookiePendingMap = new HashMap<>();
bookiePendingMap.put(addr1, 1L); // -> this one first
bookiePendingMap.put(addr2, 7L);
bookiePendingMap.put(addr3, 1L);
bookiePendingMap.put(addr4, 5L);
DistributionSchedule.WriteSet origWriteSet = writeSet.copy();
DistributionSchedule.WriteSet reorderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(new HashMap<>(), bookiePendingMap), writeSet);
LOG.info("reorder set : {}", reorderSet);
assertEquals("writeSet should be in original order", origWriteSet, reorderSet);
}
@Test
public void testReplaceBookieWithEnoughBookiesInSameRack() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), addr2, new HashSet<>());
BookieSocketAddress replacedBookie = replaceBookieResponse.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
assertEquals(addr3, replacedBookie);
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
}
@Test
public void testReplaceBookieWithEnoughBookiesInDifferentRack() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r3");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r4");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress> replaceBookieResponse =
repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), addr2, excludedAddrs);
BookieSocketAddress replacedBookie = replaceBookieResponse.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
assertFalse(addr1.equals(replacedBookie));
assertTrue(addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
}
@Test
public void testReplaceBookieWithNotEnoughBookies() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r3");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r4");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
Set<BookieSocketAddress> excludedAddrs = new HashSet<BookieSocketAddress>();
excludedAddrs.add(addr1);
excludedAddrs.add(addr3);
excludedAddrs.add(addr4);
try {
repp.replaceBookie(1, 1, 1, null, new ArrayList<BookieSocketAddress>(), addr2, excludedAddrs);
fail("Should throw BKNotEnoughBookiesException when there is not enough bookies");
} catch (BKNotEnoughBookiesException bnebe) {
// should throw not enou
}
}
@Test
public void testReplaceBookieWithEnoughBookiesInSameRackAsEnsemble() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), NetworkTopology.DEFAULT_RACK);
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/r2");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/r2");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/r3");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// replace node under r2
List<BookieSocketAddress> ensembleBookies = new ArrayList<BookieSocketAddress>();
ensembleBookies.add(addr2);
ensembleBookies.add(addr4);
EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress> replaceBookieResponse = repp.replaceBookie(
1, 1, 1 , null,
ensembleBookies,
addr4,
new HashSet<>());
BookieSocketAddress replacedBookie = replaceBookieResponse.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
assertEquals(addr1, replacedBookie);
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
}
@Test
public void testNewEnsembleWithSingleRack() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.6", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.7", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.8", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.9", 3181);
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse;
ensembleResponse = repp.newEnsemble(3, 2, 2, null, new HashSet<>());
List<BookieSocketAddress> ensemble = ensembleResponse.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
assertEquals(0, getNumCoveredWriteQuorums(ensemble, 2, conf.getMinNumRacksPerWriteQuorum()));
assertEquals(PlacementPolicyAdherence.FAIL, isEnsembleAdheringToPlacementPolicy);
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse2;
ensembleResponse2 = repp.newEnsemble(4, 2, 2, null, new HashSet<>());
List<BookieSocketAddress> ensemble2 = ensembleResponse2.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy2 = ensembleResponse2.isAdheringToPolicy();
assertEquals(0, getNumCoveredWriteQuorums(ensemble2, 2, conf.getMinNumRacksPerWriteQuorum()));
assertEquals(PlacementPolicyAdherence.FAIL, isEnsembleAdheringToPlacementPolicy);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is only one rack.");
}
}
@Test
public void testSingleRackWithEnforceMinNumRacks() throws Exception {
repp.uninitalize();
updateMyRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr1.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr2.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr3.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr4.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setMinNumRacksPerWriteQuorum(2);
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
List<BookieSocketAddress> ensemble;
try {
ensemble = repp.newEnsemble(3, 2, 2, null, new HashSet<>()).getResult();
fail("Should get not enough bookies exception since there is only one rack.");
} catch (BKNotEnoughBookiesException bnebe) {
}
try {
ensemble = repp.newEnsemble(3, 2, 2, new HashSet<>(),
EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE).getResult();
fail("Should get not enough bookies exception since there is only one rack.");
} catch (BKNotEnoughBookiesException bnebe) {
}
}
@Test
public void testNewEnsembleWithEnforceMinNumRacks() throws Exception {
String defaultRackForThisTest = NetworkTopology.DEFAULT_REGION_AND_RACK;
repp.uninitalize();
updateMyRack(defaultRackForThisTest);
int minNumRacksPerWriteQuorum = 4;
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
// set enforceMinNumRacksPerWriteQuorum
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL, statsLogger);
repp.withDefaultRack(defaultRackForThisTest);
Gauge<? extends Number> numBookiesInDefaultRackGauge = statsLogger
.getGauge(BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK);
int numOfRacks = 3;
int numOfBookiesPerRack = 5;
BookieSocketAddress[] bookieSocketAddresses = new BookieSocketAddress[numOfRacks * numOfBookiesPerRack];
for (int i = 0; i < numOfRacks; i++) {
for (int j = 0; j < numOfBookiesPerRack; j++) {
int index = i * numOfBookiesPerRack + j;
bookieSocketAddresses[index] = new BookieSocketAddress("128.0.0." + index, 3181);
StaticDNSResolver.addNodeToRack(bookieSocketAddresses[index].getHostName(), "/default-region/r" + i);
}
}
int numOfBookiesInDefaultRack = 5;
BookieSocketAddress[] bookieSocketAddressesInDefaultRack = new BookieSocketAddress[numOfBookiesInDefaultRack];
for (int i = 0; i < numOfBookiesInDefaultRack; i++) {
bookieSocketAddressesInDefaultRack[i] = new BookieSocketAddress("128.0.0." + (100 + i), 3181);
StaticDNSResolver.addNodeToRack(bookieSocketAddressesInDefaultRack[i].getHostName(),
defaultRackForThisTest);
}
List<BookieSocketAddress> nonDefaultRackBookiesList = Arrays.asList(bookieSocketAddresses);
List<BookieSocketAddress> defaultRackBookiesList = Arrays.asList(bookieSocketAddressesInDefaultRack);
Set<BookieSocketAddress> writableBookies = new HashSet<BookieSocketAddress>(nonDefaultRackBookiesList);
writableBookies.addAll(defaultRackBookiesList);
repp.onClusterChanged(writableBookies, new HashSet<BookieSocketAddress>());
assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", numOfBookiesInDefaultRack,
numBookiesInDefaultRackGauge.getSample());
try {
// this newEnsemble call will exclude default rack bookies
repp.newEnsemble(8, 4, 4, null, new HashSet<>());
fail("Should get not enough bookies exception since there are only 3 non-default racks");
} catch (BKNotEnoughBookiesException bnebe) {
}
try {
repp.newEnsemble(8, 4, 4, new HashSet<>(defaultRackBookiesList),
EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
fail("Should get not enough bookies exception since there are only 3 non-default racks"
+ " and defaultrack bookies are excluded");
} catch (BKNotEnoughBookiesException bnebe) {
}
/*
* Though minNumRacksPerWriteQuorum is set to 4, since writeQuorum is 3
* and there are enough bookies in 3 racks, this newEnsemble calls
* should succeed.
*/
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse;
List<BookieSocketAddress> ensemble;
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy;
int ensembleSize = numOfRacks * numOfBookiesPerRack;
int writeQuorumSize = numOfRacks;
int ackQuorumSize = numOfRacks;
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
ensemble = ensembleResponse.getResult();
isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
new HashSet<>(defaultRackBookiesList), EnsembleForReplacementWithNoConstraints.INSTANCE,
TruePredicate.INSTANCE);
ensemble = ensembleResponse.getResult();
isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
}
@Test
public void testNewEnsembleWithSufficientRacksAndEnforceMinNumRacks() throws Exception {
repp.uninitalize();
int minNumRacksPerWriteQuorum = 4;
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
// set enforceMinNumRacksPerWriteQuorum
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
int writeQuorumSize = 3;
int ackQuorumSize = 3;
int effectiveMinNumRacksPerWriteQuorum = Math.min(minNumRacksPerWriteQuorum, writeQuorumSize);
int numOfRacks = 2 * effectiveMinNumRacksPerWriteQuorum - 1;
int numOfBookiesPerRack = 20;
BookieSocketAddress[] bookieSocketAddresses = new BookieSocketAddress[numOfRacks * numOfBookiesPerRack];
for (int i = 0; i < numOfRacks; i++) {
for (int j = 0; j < numOfBookiesPerRack; j++) {
int index = i * numOfBookiesPerRack + j;
bookieSocketAddresses[index] = new BookieSocketAddress("128.0.0." + index, 3181);
StaticDNSResolver.addNodeToRack(bookieSocketAddresses[index].getHostName(), "/default-region/r" + i);
}
}
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
repp.onClusterChanged(new HashSet<BookieSocketAddress>(Arrays.asList(bookieSocketAddresses)),
new HashSet<BookieSocketAddress>());
/*
* in this scenario we have enough number of racks (2 *
* effectiveMinNumRacksPerWriteQuorum - 1) and more number of bookies in
* each rack. So we should be able to create ensemble for all
* ensembleSizes (as long as there are enough number of bookies in each
* rack).
*/
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse;
List<BookieSocketAddress> ensemble;
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy;
for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum; ensembleSize < 40; ensembleSize++) {
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
ensemble = ensembleResponse.getResult();
isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, new HashSet<>(),
EnsembleForReplacementWithNoConstraints.INSTANCE, TruePredicate.INSTANCE);
ensemble = ensembleResponse.getResult();
isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
}
}
@Test
public void testReplaceBookieWithEnforceMinNumRacks() throws Exception {
String defaultRackForThisTest = NetworkTopology.DEFAULT_REGION_AND_RACK;
repp.uninitalize();
updateMyRack(defaultRackForThisTest);
int minNumRacksPerWriteQuorum = 4;
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
// set enforceMinNumRacksPerWriteQuorum
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
statsLogger);
repp.withDefaultRack(defaultRackForThisTest);
Gauge<? extends Number> numBookiesInDefaultRackGauge = statsLogger
.getGauge(BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK);
int numOfRacks = 3;
int numOfBookiesPerRack = 5;
Set<BookieSocketAddress> bookieSocketAddresses = new HashSet<BookieSocketAddress>();
Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
BookieSocketAddress bookieAddress;
String rack;
for (int i = 0; i < numOfRacks; i++) {
for (int j = 0; j < numOfBookiesPerRack; j++) {
int index = i * numOfBookiesPerRack + j;
bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
rack = "/default-region/r" + i;
StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rack);
bookieSocketAddresses.add(bookieAddress);
bookieRackMap.put(bookieAddress, rack);
}
}
/*
* bookies in this default rack should not be returned for replacebookie
* response.
*/
int numOfBookiesInDefaultRack = 5;
BookieSocketAddress[] bookieSocketAddressesInDefaultRack = new BookieSocketAddress[numOfBookiesInDefaultRack];
for (int i = 0; i < numOfBookiesInDefaultRack; i++) {
bookieSocketAddressesInDefaultRack[i] = new BookieSocketAddress("127.0.0." + (i + 100), 3181);
StaticDNSResolver.addNodeToRack(bookieSocketAddressesInDefaultRack[i].getHostName(),
defaultRackForThisTest);
}
Set<BookieSocketAddress> nonDefaultRackBookiesList = bookieSocketAddresses;
List<BookieSocketAddress> defaultRackBookiesList = Arrays.asList(bookieSocketAddressesInDefaultRack);
Set<BookieSocketAddress> writableBookies = new HashSet<BookieSocketAddress>(nonDefaultRackBookiesList);
writableBookies.addAll(defaultRackBookiesList);
repp.onClusterChanged(writableBookies, new HashSet<BookieSocketAddress>());
assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", numOfBookiesInDefaultRack,
numBookiesInDefaultRackGauge.getSample());
/*
* Though minNumRacksPerWriteQuorum is set to 4, since writeQuorum is 3
* and there are enough bookies in 3 racks, this newEnsemble call should
* succeed.
*/
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse;
List<BookieSocketAddress> ensemble;
int ensembleSize = numOfRacks * numOfBookiesPerRack;
int writeQuorumSize = numOfRacks;
int ackQuorumSize = numOfRacks;
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
ensemble = ensembleResponse.getResult();
BookieSocketAddress bookieInEnsembleToBeReplaced = ensemble.get(7);
// get rack of some other bookie
String rackOfOtherBookieInEnsemble = bookieRackMap.get(ensemble.get(8));
BookieSocketAddress newBookieAddress1 = new BookieSocketAddress("128.0.0.100", 3181);
/*
* add the newBookie to the rack of some other bookie in the current
* ensemble
*/
StaticDNSResolver.addNodeToRack(newBookieAddress1.getHostName(), rackOfOtherBookieInEnsemble);
bookieSocketAddresses.add(newBookieAddress1);
writableBookies.add(newBookieAddress1);
bookieRackMap.put(newBookieAddress1, rackOfOtherBookieInEnsemble);
repp.onClusterChanged(writableBookies, new HashSet<BookieSocketAddress>());
assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", numOfBookiesInDefaultRack,
numBookiesInDefaultRackGauge.getSample());
try {
repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
ensemble, bookieInEnsembleToBeReplaced, new HashSet<>());
fail("Should get not enough bookies exception since there are no more bookies in rack"
+ "of 'bookieInEnsembleToReplace'"
+ "and new bookie added belongs to the rack of some other bookie in the ensemble");
} catch (BKNotEnoughBookiesException bnebe) {
// this is expected
}
String newRack = "/default-region/r100";
BookieSocketAddress newBookieAddress2 = new BookieSocketAddress("128.0.0.101", 3181);
/*
* add the newBookie to a new rack.
*/
StaticDNSResolver.addNodeToRack(newBookieAddress2.getHostName(), newRack);
bookieSocketAddresses.add(newBookieAddress2);
writableBookies.add(newBookieAddress2);
bookieRackMap.put(newBookieAddress2, newRack);
repp.onClusterChanged(writableBookies, new HashSet<BookieSocketAddress>());
assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", numOfBookiesInDefaultRack,
numBookiesInDefaultRackGauge.getSample());
/*
* this replaceBookie should succeed, because a new bookie is added to a
* new rack.
*/
EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress> replaceBookieResponse;
BookieSocketAddress replacedBookieAddress;
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy;
replaceBookieResponse = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null, ensemble,
bookieInEnsembleToBeReplaced, new HashSet<>());
replacedBookieAddress = replaceBookieResponse.getResult();
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
assertEquals("It should be newBookieAddress2", newBookieAddress2, replacedBookieAddress);
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
Set<BookieSocketAddress> bookiesToExclude = new HashSet<>();
bookiesToExclude.add(newBookieAddress2);
repp.onClusterChanged(writableBookies, new HashSet<BookieSocketAddress>());
assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", numOfBookiesInDefaultRack,
numBookiesInDefaultRackGauge.getSample());
try {
repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null, ensemble,
bookieInEnsembleToBeReplaced, bookiesToExclude);
fail("Should get not enough bookies exception since the only available bookie to replace"
+ "is added to excludedBookies list");
} catch (BKNotEnoughBookiesException bnebe) {
// this is expected
}
// get rack of the bookie to be replaced
String rackOfBookieToBeReplaced = bookieRackMap.get(bookieInEnsembleToBeReplaced);
BookieSocketAddress newBookieAddress3 = new BookieSocketAddress("128.0.0.102", 3181);
/*
* add the newBookie to rack of the bookie to be replaced.
*/
StaticDNSResolver.addNodeToRack(newBookieAddress3.getHostName(), rackOfBookieToBeReplaced);
bookieSocketAddresses.add(newBookieAddress3);
writableBookies.add(newBookieAddress3);
bookieRackMap.put(newBookieAddress3, rackOfBookieToBeReplaced);
repp.onClusterChanged(writableBookies, new HashSet<BookieSocketAddress>());
assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", numOfBookiesInDefaultRack,
numBookiesInDefaultRackGauge.getSample());
/*
* here we have added new bookie to the rack of the bookie to be
* replaced, so we should be able to replacebookie though
* newBookieAddress2 is added to excluded bookies list.
*/
replaceBookieResponse = repp.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, null,
ensemble, bookieInEnsembleToBeReplaced, bookiesToExclude);
replacedBookieAddress = replaceBookieResponse.getResult();
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
assertEquals("It should be newBookieAddress3", newBookieAddress3, replacedBookieAddress);
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
}
@Test
public void testSelectBookieFromNetworkLoc() throws Exception {
repp.uninitalize();
int minNumRacksPerWriteQuorum = 4;
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
// set enforceMinNumRacksPerWriteQuorum
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
int numOfRacks = 3;
int numOfBookiesPerRack = 5;
String[] rackLocationNames = new String[numOfRacks];
List<BookieSocketAddress> bookieSocketAddresses = new ArrayList<BookieSocketAddress>();
Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
BookieSocketAddress bookieAddress;
for (int i = 0; i < numOfRacks; i++) {
rackLocationNames[i] = "/default-region/r" + i;
for (int j = 0; j < numOfBookiesPerRack; j++) {
int index = i * numOfBookiesPerRack + j;
bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rackLocationNames[i]);
bookieSocketAddresses.add(bookieAddress);
bookieRackMap.put(bookieAddress, rackLocationNames[i]);
}
}
String nonExistingRackLocation = "/default-region/r25";
repp.onClusterChanged(new HashSet<BookieSocketAddress>(bookieSocketAddresses),
new HashSet<BookieSocketAddress>());
String rack = bookieRackMap.get(bookieSocketAddresses.get(0));
BookieNode bookieNode = repp.selectFromNetworkLocation(rack, new HashSet<Node>(), TruePredicate.INSTANCE,
EnsembleForReplacementWithNoConstraints.INSTANCE, false);
String recRack = bookieNode.getNetworkLocation();
assertEquals("Rack of node", rack, recRack);
try {
repp.selectFromNetworkLocation(nonExistingRackLocation, new HashSet<Node>(), TruePredicate.INSTANCE,
EnsembleForReplacementWithNoConstraints.INSTANCE, false);
fail("Should get not enough bookies exception since there are no bookies in this rack");
} catch (BKNotEnoughBookiesException bnebe) {
// this is expected
}
// it should not fail, since fallback is set to true and it should pick
// some random one
repp.selectFromNetworkLocation(nonExistingRackLocation, new HashSet<Node>(), TruePredicate.INSTANCE,
EnsembleForReplacementWithNoConstraints.INSTANCE, true);
Set<BookieSocketAddress> excludeBookiesOfRackR0 = new HashSet<BookieSocketAddress>();
for (int i = 0; i < numOfBookiesPerRack; i++) {
excludeBookiesOfRackR0.add(bookieSocketAddresses.get(i));
}
Set<Node> excludeBookieNodesOfRackR0 = repp.convertBookiesToNodes(excludeBookiesOfRackR0);
try {
repp.selectFromNetworkLocation(bookieRackMap.get(bookieSocketAddresses.get(0)), excludeBookieNodesOfRackR0,
TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
fail("Should get not enough bookies exception since all the bookies in r0 are added to the exclusion list");
} catch (BKNotEnoughBookiesException bnebe) {
// this is expected
}
// not expected to get exception since fallback is set to true
bookieNode = repp.selectFromNetworkLocation(bookieRackMap.get(bookieSocketAddresses.get(0)),
excludeBookieNodesOfRackR0, TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE,
true);
assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
rackLocationNames[1].equals(bookieNode.getNetworkLocation())
|| rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
}
@Test
public void testSelectBookieFromExcludingRacks() throws Exception {
repp.uninitalize();
int minNumRacksPerWriteQuorum = 4;
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
// set enforceMinNumRacksPerWriteQuorum
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
int numOfRacks = 3;
int numOfBookiesPerRack = 5;
String[] rackLocationNames = new String[numOfRacks];
List<BookieSocketAddress> bookieSocketAddresses = new ArrayList<BookieSocketAddress>();
Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
BookieSocketAddress bookieAddress;
for (int i = 0; i < numOfRacks; i++) {
rackLocationNames[i] = "/default-region/r" + i;
for (int j = 0; j < numOfBookiesPerRack; j++) {
int index = i * numOfBookiesPerRack + j;
bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rackLocationNames[i]);
bookieSocketAddresses.add(bookieAddress);
bookieRackMap.put(bookieAddress, rackLocationNames[i]);
}
}
repp.onClusterChanged(new HashSet<BookieSocketAddress>(bookieSocketAddresses),
new HashSet<BookieSocketAddress>());
Set<BookieSocketAddress> excludeBookiesOfRackR0 = new HashSet<BookieSocketAddress>();
for (int i = 0; i < numOfBookiesPerRack; i++) {
excludeBookiesOfRackR0.add(bookieSocketAddresses.get(i));
}
Set<Node> excludeBookieNodesOfRackR0 = repp.convertBookiesToNodes(excludeBookiesOfRackR0);
Set<String> excludeRacksRackR1AndR2 = new HashSet<String>();
excludeRacksRackR1AndR2.add(rackLocationNames[1]);
excludeRacksRackR1AndR2.add(rackLocationNames[2]);
try {
repp.selectFromNetworkLocation(excludeRacksRackR1AndR2, excludeBookieNodesOfRackR0, TruePredicate.INSTANCE,
EnsembleForReplacementWithNoConstraints.INSTANCE, false);
fail("Should get not enough bookies exception racks R1 and R2 are"
+ "excluded and all the bookies in r0 are added to the exclusion list");
} catch (BKNotEnoughBookiesException bnebe) {
// this is expected
}
BookieNode bookieNode = repp.selectFromNetworkLocation(excludeRacksRackR1AndR2, new HashSet<Node>(),
TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
assertTrue("BookieNode should be from Rack /r0" + bookieNode.getNetworkLocation(),
rackLocationNames[0].equals(bookieNode.getNetworkLocation()));
// not expected to get exception since fallback is set to true
bookieNode = repp.selectFromNetworkLocation(excludeRacksRackR1AndR2, excludeBookieNodesOfRackR0,
TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, true);
assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
rackLocationNames[1].equals(bookieNode.getNetworkLocation())
|| rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
}
@Test
public void testSelectBookieFromNetworkLocAndExcludingRacks() throws Exception {
repp.uninitalize();
int minNumRacksPerWriteQuorum = 4;
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
// set enforceMinNumRacksPerWriteQuorum
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
int numOfRacks = 3;
int numOfBookiesPerRack = 5;
String[] rackLocationNames = new String[numOfRacks];
List<BookieSocketAddress> bookieSocketAddresses = new ArrayList<BookieSocketAddress>();
Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
BookieSocketAddress bookieAddress;
for (int i = 0; i < numOfRacks; i++) {
rackLocationNames[i] = "/default-region/r" + i;
for (int j = 0; j < numOfBookiesPerRack; j++) {
int index = i * numOfBookiesPerRack + j;
bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rackLocationNames[i]);
bookieSocketAddresses.add(bookieAddress);
bookieRackMap.put(bookieAddress, rackLocationNames[i]);
}
}
String nonExistingRackLocation = "/default-region/r25";
repp.onClusterChanged(new HashSet<BookieSocketAddress>(bookieSocketAddresses),
new HashSet<BookieSocketAddress>());
Set<BookieSocketAddress> excludeBookiesOfRackR0 = new HashSet<BookieSocketAddress>();
for (int i = 0; i < numOfBookiesPerRack; i++) {
excludeBookiesOfRackR0.add(bookieSocketAddresses.get(i));
}
Set<Node> excludeBookieNodesOfRackR0 = repp.convertBookiesToNodes(excludeBookiesOfRackR0);
Set<String> excludeRacksRackR1AndR2 = new HashSet<String>();
excludeRacksRackR1AndR2.add(rackLocationNames[1]);
excludeRacksRackR1AndR2.add(rackLocationNames[2]);
try {
repp.selectFromNetworkLocation(nonExistingRackLocation, excludeRacksRackR1AndR2,
excludeBookieNodesOfRackR0,
TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
fail("Should get not enough bookies exception racks R1 and R2 are excluded and all the bookies in"
+ "r0 are added to the exclusion list");
} catch (BKNotEnoughBookiesException bnebe) {
// this is expected
}
BookieNode bookieNode = repp.selectFromNetworkLocation(rackLocationNames[0], excludeRacksRackR1AndR2,
new HashSet<Node>(), TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
assertTrue("BookieNode should be from Rack /r0" + bookieNode.getNetworkLocation(),
rackLocationNames[0].equals(bookieNode.getNetworkLocation()));
bookieNode = repp.selectFromNetworkLocation(rackLocationNames[0], new HashSet<String>(),
excludeBookieNodesOfRackR0, TruePredicate.INSTANCE,
EnsembleForReplacementWithNoConstraints.INSTANCE, false);
assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
rackLocationNames[1].equals(bookieNode.getNetworkLocation())
|| rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
bookieNode = repp.selectFromNetworkLocation(nonExistingRackLocation, excludeRacksRackR1AndR2,
excludeBookieNodesOfRackR0, TruePredicate.INSTANCE, EnsembleForReplacementWithNoConstraints.INSTANCE,
true);
assertTrue("BookieNode should not be from Rack /r0" + bookieNode.getNetworkLocation(),
rackLocationNames[1].equals(bookieNode.getNetworkLocation())
|| rackLocationNames[2].equals(bookieNode.getNetworkLocation()));
}
@Test
public void testSelectBookieByExcludingRacksAndBookies() throws Exception {
repp.uninitalize();
int minNumRacksPerWriteQuorum = 4;
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
// set enforceMinNumRacksPerWriteQuorum
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
/*
* Durability is enforced
*
* When durability is being enforced; we must not violate the predicate
* even when selecting a random bookie; as durability guarantee is not
* best effort; correctness is implied by it
*/
repp = new RackawareEnsemblePlacementPolicy(true);
repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL,
NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
int numOfRacks = 3;
int numOfBookiesPerRack = 5;
String[] rackLocationNames = new String[numOfRacks];
List<BookieSocketAddress> bookieSocketAddresses = new ArrayList<BookieSocketAddress>();
Map<BookieSocketAddress, String> bookieRackMap = new HashMap<BookieSocketAddress, String>();
BookieSocketAddress bookieAddress;
for (int i = 0; i < numOfRacks; i++) {
rackLocationNames[i] = "/default-region/r" + i;
for (int j = 0; j < numOfBookiesPerRack; j++) {
int index = i * numOfBookiesPerRack + j;
bookieAddress = new BookieSocketAddress("128.0.0." + index, 3181);
StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), rackLocationNames[i]);
bookieSocketAddresses.add(bookieAddress);
bookieRackMap.put(bookieAddress, rackLocationNames[i]);
}
}
repp.onClusterChanged(new HashSet<BookieSocketAddress>(bookieSocketAddresses),
new HashSet<BookieSocketAddress>());
Set<BookieSocketAddress> excludeBookiesOfRackR0 = new HashSet<BookieSocketAddress>();
for (int i = 0; i < numOfBookiesPerRack; i++) {
excludeBookiesOfRackR0.add(bookieSocketAddresses.get(i));
}
Set<Node> excludeBookieNodesOfRackR0 = repp.convertBookiesToNodes(excludeBookiesOfRackR0);
Set<String> excludeRackR1 = new HashSet<String>();
excludeRackR1.add(rackLocationNames[1]);
BookieNode nodeSelected;
nodeSelected = repp.selectFromNetworkLocation(excludeRackR1, excludeBookieNodesOfRackR0, TruePredicate.INSTANCE,
EnsembleForReplacementWithNoConstraints.INSTANCE, false);
assertEquals("BookieNode should be from Rack2", rackLocationNames[2], nodeSelected.getNetworkLocation());
try {
/*
* durability is enforced, so false predicate will reject all
* bookies.
*/
repp.selectFromNetworkLocation(excludeRackR1, excludeBookieNodesOfRackR0, (candidate, chosenBookies) -> {
return false;
}, EnsembleForReplacementWithNoConstraints.INSTANCE, false);
fail("Should get not enough bookies exception since we are using false predicate");
} catch (BKNotEnoughBookiesException bnebe) {
// this is expected
}
try {
/*
* Using ensemble which rejects all the nodes.
*/
repp.selectFromNetworkLocation(excludeRackR1, excludeBookieNodesOfRackR0, TruePredicate.INSTANCE,
new Ensemble<BookieNode>() {
@Override
public boolean addNode(BookieNode node) {
return false;
}
@Override
public List<BookieSocketAddress> toList() {
return null;
}
@Override
public boolean validate() {
return false;
}
}, false);
fail("Should get not enough bookies exception since ensemble rejects all the nodes");
} catch (BKNotEnoughBookiesException bnebe) {
// this is expected
}
}
@Test
public void testNewEnsembleWithMultipleRacks() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r2");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
int ensembleSize = 3;
int writeQuorumSize = 2;
int acqQuorumSize = 2;
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, null, new HashSet<>());
List<BookieSocketAddress> ensemble = ensembleResponse.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
assertEquals(PlacementPolicyAdherence.FAIL, isEnsembleAdheringToPlacementPolicy);
ensembleSize = 4;
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse2 =
repp.newEnsemble(ensembleSize, writeQuorumSize,
acqQuorumSize, null, new HashSet<>());
List<BookieSocketAddress> ensemble2 = ensembleResponse2.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy2 = ensembleResponse2.isAdheringToPolicy();
numCovered = getNumCoveredWriteQuorums(ensemble2, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum());
assertTrue(numCovered >= 1 && numCovered < 3);
assertEquals(PlacementPolicyAdherence.FAIL, isEnsembleAdheringToPlacementPolicy2);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is only one rack.");
}
}
@Test
public void testMinNumRacksPerWriteQuorumOfRacks() throws Exception {
int numOfRacksToCreate = 6;
int numOfNodesInEachRack = 5;
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
BookieSocketAddress addr;
for (int i = 0; i < numOfRacksToCreate; i++) {
for (int j = 0; j < numOfNodesInEachRack; j++) {
addr = new BookieSocketAddress("128.0.0." + ((i * numOfNodesInEachRack) + j), 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr.getHostName(), "/default-region/r" + i);
addrs.add(addr);
}
}
try {
ClientConfiguration newConf = new ClientConfiguration(conf);
// set MinNumRacksPerWriteQuorum to 4
int minNumRacksPerWriteQuorum = 4;
int ensembleSize = 12;
int writeQuorumSize = 6;
validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, minNumRacksPerWriteQuorum, ensembleSize,
writeQuorumSize);
// set MinNumRacksPerWriteQuorum to 6
newConf = new ClientConfiguration(conf);
minNumRacksPerWriteQuorum = 6;
ensembleSize = 6;
writeQuorumSize = 6;
validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, minNumRacksPerWriteQuorum, ensembleSize,
writeQuorumSize);
// set MinNumRacksPerWriteQuorum to 6
newConf = new ClientConfiguration(conf);
minNumRacksPerWriteQuorum = 6;
ensembleSize = 10;
writeQuorumSize = ensembleSize;
validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, minNumRacksPerWriteQuorum, ensembleSize,
writeQuorumSize);
// set MinNumRacksPerWriteQuorum to 5
newConf = new ClientConfiguration(conf);
minNumRacksPerWriteQuorum = 5;
ensembleSize = 24;
writeQuorumSize = 12;
validateNumOfWriteQuorumsCoveredInEnsembleCreation(addrs, minNumRacksPerWriteQuorum, ensembleSize,
writeQuorumSize);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is only one rack.");
}
}
void validateNumOfWriteQuorumsCoveredInEnsembleCreation(Set<BookieSocketAddress> addrs,
int minNumRacksPerWriteQuorum, int ensembleSize, int writeQuorumSize) throws Exception {
ClientConfiguration newConf = new ClientConfiguration(conf);
newConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(newConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
writeQuorumSize, null, new HashSet<>());
List<BookieSocketAddress> ensemble = ensembleResponse.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
int numCovered = getNumCoveredWriteQuorums(ensemble, writeQuorumSize, minNumRacksPerWriteQuorum);
assertEquals("minimum number of racks covered for writequorum ensemble: " + ensemble, ensembleSize, numCovered);
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
}
@Test
public void testNewEnsembleWithEnoughRacks() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.7", 3181);
BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.8", 3181);
BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.9", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r3");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r4");
StaticDNSResolver.addNodeToRack(addr5.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr6.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr7.getHostName(), "/default-region/r3");
StaticDNSResolver.addNodeToRack(addr8.getHostName(), "/default-region/r4");
int availableNumOfRacks = 4;
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
addrs.add(addr5);
addrs.add(addr6);
addrs.add(addr7);
addrs.add(addr8);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
try {
int ensembleSize = 3;
int writeQuorumSize = 3;
int ackQuorumSize = 2;
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse =
repp.newEnsemble(ensembleSize, writeQuorumSize,
ackQuorumSize, null, new HashSet<>());
List<BookieSocketAddress> ensemble1 = ensembleResponse.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy1 = ensembleResponse.isAdheringToPolicy();
assertEquals(ensembleSize,
getNumCoveredWriteQuorums(ensemble1, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()));
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy1);
ensembleSize = 4;
writeQuorumSize = 4;
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse2 =
repp.newEnsemble(ensembleSize, writeQuorumSize, 2, null, new HashSet<>());
List<BookieSocketAddress> ensemble2 = ensembleResponse2.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy2 = ensembleResponse2.isAdheringToPolicy();
assertEquals(ensembleSize,
getNumCoveredWriteQuorums(ensemble2, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()));
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy2);
} catch (BKNotEnoughBookiesException bnebe) {
fail("Should not get not enough bookies exception even there is only one rack.");
}
}
/**
* Test for BOOKKEEPER-633.
*/
@Test
public void testRemoveBookieFromCluster() {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getHostName(), NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/default-region/r3");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
addrs.remove(addr1);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
}
@Test
public void testWeightedPlacementAndReplaceBookieWithEnoughBookiesInSameRack() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r2");
StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r2");
StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r2");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
int multiple = 10;
conf.setDiskWeightBasedPlacementEnabled(true);
conf.setBookieMaxWeightMultipleForWeightBasedPlacement(-1); // no max cap on weight
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>();
bookieInfoMap.put(addr1, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr2, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr3, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr4, new BookieInfo(multiple * 100L, multiple * 100L));
repp.updateBookieInfo(bookieInfoMap);
Map<BookieSocketAddress, Long> selectionCounts = new HashMap<BookieSocketAddress, Long>();
selectionCounts.put(addr3, 0L);
selectionCounts.put(addr4, 0L);
int numTries = 50000;
EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress> replaceBookieResponse;
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy;
BookieSocketAddress replacedBookie;
for (int i = 0; i < numTries; i++) {
// replace node under r2
replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), addr2, new HashSet<>());
replacedBookie = replaceBookieResponse.getResult();
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
assertTrue("replaced : " + replacedBookie, addr3.equals(replacedBookie) || addr4.equals(replacedBookie));
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie) + 1);
}
double observedMultiple = ((double) selectionCounts.get(addr4) / (double) selectionCounts.get(addr3));
assertTrue("Weights not being honored " + observedMultiple, Math.abs(observedMultiple - multiple) < 1);
}
@Test
public void testWeightedPlacementAndReplaceBookieWithoutEnoughBookiesInSameRack() throws Exception {
BookieSocketAddress addr0 = new BookieSocketAddress("126.0.0.1", 3181);
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
// update dns mapping
StaticDNSResolver.reset();
StaticDNSResolver.addNodeToRack(addr0.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r0");
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r2");
StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r3");
StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r4");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr0);
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
int multiple = 10, maxMultiple = 4;
conf.setDiskWeightBasedPlacementEnabled(true);
conf.setBookieMaxWeightMultipleForWeightBasedPlacement(maxMultiple);
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>();
bookieInfoMap.put(addr0, new BookieInfo(50L, 50L));
bookieInfoMap.put(addr1, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr2, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr3, new BookieInfo(200L, 200L));
bookieInfoMap.put(addr4, new BookieInfo(multiple * 50L, multiple * 50L));
repp.updateBookieInfo(bookieInfoMap);
Map<BookieSocketAddress, Long> selectionCounts = new HashMap<BookieSocketAddress, Long>();
selectionCounts.put(addr0, 0L);
selectionCounts.put(addr1, 0L);
selectionCounts.put(addr2, 0L);
selectionCounts.put(addr3, 0L);
selectionCounts.put(addr4, 0L);
int numTries = 50000;
EnsemblePlacementPolicy.PlacementResult<BookieSocketAddress> replaceBookieResponse;
BookieSocketAddress replacedBookie;
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy;
for (int i = 0; i < numTries; i++) {
// addr2 is on /r2 and this is the only one on this rack. So the replacement
// will come from other racks. However, the weight should be honored in such
// selections as well
replaceBookieResponse = repp.replaceBookie(1, 1, 1, null, new ArrayList<>(), addr2, new HashSet<>());
replacedBookie = replaceBookieResponse.getResult();
isEnsembleAdheringToPlacementPolicy = replaceBookieResponse.isAdheringToPolicy();
assertTrue(addr0.equals(replacedBookie) || addr1.equals(replacedBookie) || addr3.equals(replacedBookie)
|| addr4.equals(replacedBookie));
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
selectionCounts.put(replacedBookie, selectionCounts.get(replacedBookie) + 1);
}
/*
* since addr2 has to be replaced, the remaining bookies weight are - 50, 100, 200, 500 (10*50)
* So the median calculated by WeightedRandomSelection is (100 + 200) / 2 = 150
*/
double medianWeight = 150;
double medianSelectionCounts = (double) (medianWeight / bookieInfoMap.get(addr1).getWeight())
* selectionCounts.get(addr1);
double observedMultiple1 = ((double) selectionCounts.get(addr4) / (double) medianSelectionCounts);
double observedMultiple2 = ((double) selectionCounts.get(addr4) / (double) selectionCounts.get(addr3));
LOG.info("oM1 " + observedMultiple1 + " oM2 " + observedMultiple2);
assertTrue("Weights not being honored expected " + maxMultiple + " observed " + observedMultiple1,
Math.abs(observedMultiple1 - maxMultiple) < 1);
// expected multiple for addr3
double expected = (medianWeight * maxMultiple) / bookieInfoMap.get(addr3).getWeight();
assertTrue("Weights not being honored expected " + expected + " observed " + observedMultiple2,
Math.abs(observedMultiple2 - expected) < 1);
}
@Test
public void testWeightedPlacementAndNewEnsembleWithEnoughBookiesInSameRack() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
BookieSocketAddress addr6 = new BookieSocketAddress("127.0.0.6", 3181);
BookieSocketAddress addr7 = new BookieSocketAddress("127.0.0.7", 3181);
BookieSocketAddress addr8 = new BookieSocketAddress("127.0.0.8", 3181);
BookieSocketAddress addr9 = new BookieSocketAddress("127.0.0.9", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r2");
StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r2");
StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r2");
StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r2");
StaticDNSResolver.addNodeToRack(addr6.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r3");
StaticDNSResolver.addNodeToRack(addr7.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r3");
StaticDNSResolver.addNodeToRack(addr8.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r3");
StaticDNSResolver.addNodeToRack(addr9.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r3");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
addrs.add(addr5);
addrs.add(addr6);
addrs.add(addr7);
addrs.add(addr8);
addrs.add(addr9);
int maxMultiple = 4;
conf.setDiskWeightBasedPlacementEnabled(true);
conf.setBookieMaxWeightMultipleForWeightBasedPlacement(maxMultiple);
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>();
bookieInfoMap.put(addr1, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr2, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr3, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr4, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr5, new BookieInfo(1000L, 1000L));
bookieInfoMap.put(addr6, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr7, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr8, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr9, new BookieInfo(1000L, 1000L));
repp.updateBookieInfo(bookieInfoMap);
Map<BookieSocketAddress, Long> selectionCounts = new HashMap<BookieSocketAddress, Long>();
for (BookieSocketAddress b : addrs) {
selectionCounts.put(b, 0L);
}
int numTries = 10000;
Set<BookieSocketAddress> excludeList = new HashSet<BookieSocketAddress>();
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse;
List<BookieSocketAddress> ensemble;
int ensembleSize = 3;
int writeQuorumSize = 2;
int acqQuorumSize = 2;
for (int i = 0; i < numTries; i++) {
// addr2 is on /r2 and this is the only one on this rack. So the replacement
// will come from other racks. However, the weight should be honored in such
// selections as well
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, acqQuorumSize, null, excludeList);
ensemble = ensembleResponse.getResult();
assertTrue(
"Rackaware selection not happening "
+ getNumCoveredWriteQuorums(ensemble, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()),
getNumCoveredWriteQuorums(ensemble, writeQuorumSize, conf.getMinNumRacksPerWriteQuorum()) >= 2);
for (BookieSocketAddress b : ensemble) {
selectionCounts.put(b, selectionCounts.get(b) + 1);
}
}
// the median weight used is 100 since addr2 and addr6 have the same weight, we use their
// selection counts as the same as median
double observedMultiple1 = ((double) selectionCounts.get(addr5) / (double) selectionCounts.get(addr2));
double observedMultiple2 = ((double) selectionCounts.get(addr9) / (double) selectionCounts.get(addr6));
assertTrue("Weights not being honored expected 2 observed " + observedMultiple1,
Math.abs(observedMultiple1 - maxMultiple) < 0.5);
assertTrue("Weights not being honored expected 4 observed " + observedMultiple2,
Math.abs(observedMultiple2 - maxMultiple) < 0.5);
}
@Test
public void testWeightedPlacementAndNewEnsembleWithoutEnoughBookies() throws Exception {
BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.1", 3181);
BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.2", 3181);
BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.3", 3181);
BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.4", 3181);
BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.5", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(addr1.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION_AND_RACK);
StaticDNSResolver.addNodeToRack(addr2.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r2");
StaticDNSResolver.addNodeToRack(addr3.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r2");
StaticDNSResolver.addNodeToRack(addr4.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r3");
StaticDNSResolver.addNodeToRack(addr5.getSocketAddress().getAddress().getHostAddress(),
NetworkTopology.DEFAULT_REGION + "/r3");
// Update cluster
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
addrs.add(addr5);
int maxMultiple = 4;
conf.setDiskWeightBasedPlacementEnabled(true);
conf.setBookieMaxWeightMultipleForWeightBasedPlacement(maxMultiple);
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
Map<BookieSocketAddress, BookieInfo> bookieInfoMap = new HashMap<BookieSocketAddress, BookieInfo>();
bookieInfoMap.put(addr1, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr2, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr3, new BookieInfo(1000L, 1000L));
bookieInfoMap.put(addr4, new BookieInfo(100L, 100L));
bookieInfoMap.put(addr5, new BookieInfo(1000L, 1000L));
repp.updateBookieInfo(bookieInfoMap);
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse;
List<BookieSocketAddress> ensemble;
Set<BookieSocketAddress> excludeList = new HashSet<BookieSocketAddress>();
try {
excludeList.add(addr1);
excludeList.add(addr2);
excludeList.add(addr3);
excludeList.add(addr4);
ensembleResponse = repp.newEnsemble(3, 2, 2, null, excludeList);
ensemble = ensembleResponse.getResult();
fail("Should throw BKNotEnoughBookiesException when there is not enough bookies" + ensemble);
} catch (BKNotEnoughBookiesException e) {
// this is expected
}
try {
ensembleResponse = repp.newEnsemble(1, 1, 1, null, excludeList);
ensemble = ensembleResponse.getResult();
} catch (BKNotEnoughBookiesException e) {
fail("Should not throw BKNotEnoughBookiesException when there are enough bookies for the ensemble");
}
}
static int getNumCoveredWriteQuorums(List<BookieSocketAddress> ensemble, int writeQuorumSize,
int minNumRacksPerWriteQuorumConfValue) throws Exception {
int ensembleSize = ensemble.size();
int numCoveredWriteQuorums = 0;
for (int i = 0; i < ensembleSize; i++) {
Set<String> racks = new HashSet<String>();
for (int j = 0; j < writeQuorumSize; j++) {
int bookieIdx = (i + j) % ensembleSize;
BookieSocketAddress addr = ensemble.get(bookieIdx);
racks.add(StaticDNSResolver.getRack(addr.getHostName()));
}
int numOfRacksToCoverTo = Math.max(Math.min(writeQuorumSize, minNumRacksPerWriteQuorumConfValue), 2);
numCoveredWriteQuorums += (racks.size() >= numOfRacksToCoverTo ? 1 : 0);
}
return numCoveredWriteQuorums;
}
@Test
public void testNodeWithFailures() throws Exception {
repp.uninitalize();
updateMyRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
HashMap<BookieSocketAddress, Long> bookieFailures = new HashMap<BookieSocketAddress, Long>();
bookieFailures.put(addr1, 20L);
bookieFailures.put(addr2, 22L);
// remove failure bookies: addr1 and addr2
addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
DistributionSchedule.WriteSet reoderSet = repp.reorderReadSequence(
ensemble, getBookiesHealthInfo(bookieFailures, new HashMap<>()), writeSet);
LOG.info("reorder set : {}", reoderSet);
assertEquals(ensemble.get(reoderSet.get(2)), addr1);
assertEquals(ensemble.get(reoderSet.get(3)), addr2);
assertEquals(ensemble.get(reoderSet.get(0)), addr3);
assertEquals(ensemble.get(reoderSet.get(1)), addr4);
}
@Test
public void testPlacementOnStabilizeNetworkTopology() throws Exception {
repp.uninitalize();
updateMyRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
repp = new RackawareEnsemblePlacementPolicy();
ClientConfiguration confLocal = new ClientConfiguration();
confLocal.addConfiguration(conf);
confLocal.setNetworkTopologyStabilizePeriodSeconds(99999);
repp.initialize(confLocal, Optional.<DNSToSwitchMapping>empty(), timer, DISABLE_ALL, NullStatsLogger.INSTANCE);
repp.withDefaultRack(NetworkTopology.DEFAULT_REGION_AND_RACK);
Set<BookieSocketAddress> addrs = new HashSet<BookieSocketAddress>();
addrs.add(addr1);
addrs.add(addr2);
addrs.add(addr3);
addrs.add(addr4);
repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
// addr4 left
addrs.remove(addr4);
Set<BookieSocketAddress> deadBookies = repp.onClusterChanged(addrs, new HashSet<BookieSocketAddress>());
assertTrue(deadBookies.isEmpty());
// we will never use addr4 even it is in the stabilized network topology
for (int i = 0; i < 5; i++) {
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse =
repp.newEnsemble(3, 2, 2, null, new HashSet<BookieSocketAddress>());
List<BookieSocketAddress> ensemble = ensembleResponse.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
assertFalse(ensemble.contains(addr4));
assertEquals(PlacementPolicyAdherence.FAIL, isEnsembleAdheringToPlacementPolicy);
}
// we could still use addr4 for urgent allocation if it is just bookie flapping
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse =
repp.newEnsemble(4, 2, 2, null, new HashSet<BookieSocketAddress>());
List<BookieSocketAddress> ensemble = ensembleResponse.getResult();
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
assertEquals(PlacementPolicyAdherence.FAIL, isEnsembleAdheringToPlacementPolicy);
assertTrue(ensemble.contains(addr4));
}
@Test
public void testShuffleWithMask() {
int mask = 0xE1 << 16;
int maskBits = 0xFF << 16;
boolean shuffleOccurred = false;
for (int i = 0; i < 100; i++) {
DistributionSchedule.WriteSet w = writeSetFromValues(
1, 2, 3 & mask, 4 & mask, 5 & mask, 6);
shuffleWithMask(w, mask, maskBits);
assertEquals(w.get(0), 1);
assertEquals(w.get(1), 2);
assertEquals(w.get(5), 6);
if (w.get(3) == (3 & mask)
|| w.get(4) == (3 & mask)) {
shuffleOccurred = true;
} else if (w.get(2) != (3 & mask)) {
fail("3 not found");
}
if (w.get(2) == (4 & mask)
|| w.get(4) == (4 & mask)) {
shuffleOccurred = true;
} else if (w.get(3) != (4 & mask)) {
fail("4 not found");
}
if (w.get(2) == (5 & mask)
|| w.get(3) == (5 & mask)) {
shuffleOccurred = true;
} else if (w.get(4) != (5 & mask)) {
fail("5 not found");
}
}
assertTrue(shuffleOccurred);
// at start of array
shuffleOccurred = false;
for (int i = 0; i < 100; i++) {
DistributionSchedule.WriteSet w = writeSetFromValues(
1 & mask, 2 & mask, 3 & mask, 4, 5, 6);
shuffleWithMask(w, mask, maskBits);
assertEquals(w.get(3), 4);
assertEquals(w.get(4), 5);
assertEquals(w.get(5), 6);
if (w.get(1) == (1 & mask)
|| w.get(2) == (1 & mask)) {
shuffleOccurred = true;
} else if (w.get(0) != (1 & mask)) {
fail("1 not found");
}
if (w.get(0) == (2 & mask)
|| w.get(2) == (2 & mask)) {
shuffleOccurred = true;
} else if (w.get(1) != (2 & mask)) {
fail("2 not found");
}
if (w.get(0) == (3 & mask)
|| w.get(1) == (3 & mask)) {
shuffleOccurred = true;
} else if (w.get(2) != (3 & mask)) {
fail("3 not found");
}
}
assertTrue(shuffleOccurred);
// at end of array
shuffleOccurred = false;
for (int i = 0; i < 100; i++) {
DistributionSchedule.WriteSet w = writeSetFromValues(
1, 2, 3, 4 & mask, 5 & mask, 6 & mask);
shuffleWithMask(w, mask, maskBits);
assertEquals(w.get(0), 1);
assertEquals(w.get(1), 2);
assertEquals(w.get(2), 3);
if (w.get(4) == (4 & mask)
|| w.get(5) == (4 & mask)) {
shuffleOccurred = true;
} else if (w.get(3) != (4 & mask)) {
fail("4 not found");
}
if (w.get(3) == (5 & mask)
|| w.get(5) == (5 & mask)) {
shuffleOccurred = true;
} else if (w.get(4) != (5 & mask)) {
fail("5 not found");
}
if (w.get(3) == (6 & mask)
|| w.get(4) == (6 & mask)) {
shuffleOccurred = true;
} else if (w.get(5) != (6 & mask)) {
fail("6 not found");
}
}
assertTrue(shuffleOccurred);
}
@Test
public void testNumBookiesInDefaultRackGauge() throws Exception {
String defaultRackForThisTest = NetworkTopology.DEFAULT_REGION_AND_RACK;
repp.uninitalize();
updateMyRack(defaultRackForThisTest);
// Update cluster
BookieSocketAddress newAddr1 = new BookieSocketAddress("127.0.0.100", 3181);
BookieSocketAddress newAddr2 = new BookieSocketAddress("127.0.0.101", 3181);
BookieSocketAddress newAddr3 = new BookieSocketAddress("127.0.0.102", 3181);
BookieSocketAddress newAddr4 = new BookieSocketAddress("127.0.0.103", 3181);
// update dns mapping
StaticDNSResolver.addNodeToRack(newAddr1.getHostName(), defaultRackForThisTest);
StaticDNSResolver.addNodeToRack(newAddr2.getHostName(), "/default-region/r2");
StaticDNSResolver.addNodeToRack(newAddr3.getHostName(), "/default-region/r3");
StaticDNSResolver.addNodeToRack(newAddr4.getHostName(), defaultRackForThisTest);
TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL, statsLogger);
repp.withDefaultRack(defaultRackForThisTest);
Gauge<? extends Number> numBookiesInDefaultRackGauge = statsLogger
.getGauge(BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK);
Set<BookieSocketAddress> writeableBookies = new HashSet<BookieSocketAddress>();
writeableBookies.add(newAddr1);
writeableBookies.add(newAddr2);
Set<BookieSocketAddress> readOnlyBookies = new HashSet<BookieSocketAddress>();
readOnlyBookies.add(newAddr3);
readOnlyBookies.add(newAddr4);
repp.onClusterChanged(writeableBookies, readOnlyBookies);
// only writable bookie - newAddr1 in default rack
assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 1, numBookiesInDefaultRackGauge.getSample());
readOnlyBookies.remove(newAddr4);
writeableBookies.add(newAddr4);
repp.onClusterChanged(writeableBookies, readOnlyBookies);
// newAddr4 is also added to writable bookie so 2 writable bookies -
// newAddr1 and newAddr4
assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 2, numBookiesInDefaultRackGauge.getSample());
// newAddr4 rack is changed and it is not in default anymore
StaticDNSResolver
.changeRack(Collections.singletonList(newAddr4), Collections.singletonList("/default-region/r4"));
assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 1, numBookiesInDefaultRackGauge.getSample());
writeableBookies.clear();
// writeableBookies is empty so 0 writable bookies in default rack
repp.onClusterChanged(writeableBookies, readOnlyBookies);
assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 0, numBookiesInDefaultRackGauge.getSample());
StaticDNSResolver
.changeRack(Collections.singletonList(newAddr1), Collections.singletonList("/default-region/r2"));
readOnlyBookies.clear();
writeableBookies.add(newAddr1);
writeableBookies.add(newAddr2);
writeableBookies.add(newAddr3);
writeableBookies.add(newAddr4);
repp.onClusterChanged(writeableBookies, readOnlyBookies);
// newAddr1 rack is changed and it is not in default anymore. So no
// bookies in default rack anymore
assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", 0, numBookiesInDefaultRackGauge.getSample());
}
@Test
public void testNewEnsembleExcludesDefaultRackBookiesEnforceMinNumRacks() throws Exception {
String defaultRackForThisTest = NetworkTopology.DEFAULT_REGION_AND_RACK;
repp.uninitalize();
updateMyRack(defaultRackForThisTest);
int minNumRacksPerWriteQuorum = 4;
ClientConfiguration clientConf = new ClientConfiguration(conf);
clientConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorum);
// set enforceMinNumRacksPerWriteQuorum
clientConf.setEnforceMinNumRacksPerWriteQuorum(true);
TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(clientConf, Optional.<DNSToSwitchMapping> empty(), timer, DISABLE_ALL, statsLogger);
repp.withDefaultRack(defaultRackForThisTest);
Gauge<? extends Number> numBookiesInDefaultRackGauge = statsLogger
.getGauge(BookKeeperClientStats.NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK);
int writeQuorumSize = 3;
int ackQuorumSize = 3;
int effectiveMinNumRacksPerWriteQuorum = Math.min(minNumRacksPerWriteQuorum, writeQuorumSize);
int numOfRacks = 2 * effectiveMinNumRacksPerWriteQuorum - 1;
int numOfBookiesPerRack = 20;
BookieSocketAddress[] bookieSocketAddresses = new BookieSocketAddress[numOfRacks * numOfBookiesPerRack];
for (int i = 0; i < numOfRacks; i++) {
for (int j = 0; j < numOfBookiesPerRack; j++) {
int index = i * numOfBookiesPerRack + j;
bookieSocketAddresses[index] = new BookieSocketAddress("128.0.0." + index, 3181);
StaticDNSResolver.addNodeToRack(bookieSocketAddresses[index].getHostName(), "/default-region/r" + i);
}
}
int numOfBookiesInDefaultRack = 10;
BookieSocketAddress[] bookieSocketAddressesInDefaultRack = new BookieSocketAddress[numOfBookiesInDefaultRack];
for (int i = 0; i < numOfBookiesInDefaultRack; i++) {
bookieSocketAddressesInDefaultRack[i] = new BookieSocketAddress("127.0.0." + (i + 100), 3181);
StaticDNSResolver.addNodeToRack(bookieSocketAddressesInDefaultRack[i].getHostName(),
defaultRackForThisTest);
}
Set<BookieSocketAddress> writableBookies = new HashSet<BookieSocketAddress>(
Arrays.asList(bookieSocketAddresses));
writableBookies.addAll(Arrays.asList(bookieSocketAddressesInDefaultRack));
repp.onClusterChanged(writableBookies, new HashSet<BookieSocketAddress>());
assertEquals("NUM_WRITABLE_BOOKIES_IN_DEFAULT_RACK guage value", numOfBookiesInDefaultRack,
numBookiesInDefaultRackGauge.getSample());
/*
* in this scenario we have enough number of racks (2 *
* effectiveMinNumRacksPerWriteQuorum - 1) and more number of bookies in
* each rack. So we should be able to create ensemble for all
* ensembleSizes (as long as there are enough number of bookies in each
* rack).
*
* Since minNumRacksPerWriteQuorum is enforced, it shouldn't select node
* from default rack.
*/
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>> ensembleResponse;
List<BookieSocketAddress> ensemble;
PlacementPolicyAdherence isEnsembleAdheringToPlacementPolicy;
for (int ensembleSize = effectiveMinNumRacksPerWriteQuorum; ensembleSize < 40; ensembleSize++) {
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
ensemble = ensembleResponse.getResult();
isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
ensembleResponse = repp.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, null, new HashSet<>());
ensemble = ensembleResponse.getResult();
isEnsembleAdheringToPlacementPolicy = ensembleResponse.isAdheringToPolicy();
assertEquals("Number of writeQuorum sets covered", ensembleSize,
getNumCoveredWriteQuorums(ensemble, writeQuorumSize, clientConf.getMinNumRacksPerWriteQuorum()));
assertEquals(PlacementPolicyAdherence.MEETS_STRICT, isEnsembleAdheringToPlacementPolicy);
Collection<BookieSocketAddress> bookiesOfDefaultRackInEnsemble = CollectionUtils
.intersection(Arrays.asList(bookieSocketAddressesInDefaultRack), ensemble);
assertTrue("Ensemble is not supposed to contain bookies from default rack, but ensemble contains - "
+ bookiesOfDefaultRackInEnsemble, bookiesOfDefaultRackInEnsemble.isEmpty());
}
}
private void testAreAckedBookiesAdheringToPlacementPolicyHelper(int minNumRacksPerWriteQuorumConfValue,
int ensembleSize,
int writeQuorumSize,
int ackQuorumSize,
int numOfBookiesInDefaultRack,
int numOfRacks,
int numOfBookiesPerRack) throws Exception {
String defaultRackForThisTest = NetworkTopology.DEFAULT_REGION_AND_RACK;
repp.uninitalize();
updateMyRack(defaultRackForThisTest);
ClientConfiguration conf = new ClientConfiguration(this.conf);
conf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsLogger statsLogger = statsProvider.getStatsLogger("");
repp = new RackawareEnsemblePlacementPolicy();
repp.initialize(conf, Optional.empty(), timer, DISABLE_ALL, statsLogger);
repp.withDefaultRack(defaultRackForThisTest);
List<BookieSocketAddress> bookieSocketAddressesDefaultRack = new ArrayList<>();
List<BookieSocketAddress> bookieSocketAddressesNonDefaultRack = new ArrayList<>();
Set<BookieSocketAddress> writableBookies;
Set<BookieSocketAddress> bookiesForEntry = new HashSet<>();
for (int i = 0; i < numOfRacks; i++) {
for (int j = 0; j < numOfBookiesPerRack; j++) {
int index = i * numOfBookiesPerRack + j;
bookieSocketAddressesNonDefaultRack.add(new BookieSocketAddress("128.0.0." + index, 3181));
StaticDNSResolver.addNodeToRack(bookieSocketAddressesNonDefaultRack.get(index).getHostName(),
"/default-region/r" + i);
}
}
for (int i = 0; i < numOfBookiesInDefaultRack; i++) {
bookieSocketAddressesDefaultRack.add(new BookieSocketAddress("127.0.0." + (i + 100), 3181));
StaticDNSResolver.addNodeToRack(bookieSocketAddressesDefaultRack.get(i).getHostName(),
defaultRackForThisTest);
}
writableBookies = new HashSet<>(bookieSocketAddressesNonDefaultRack);
writableBookies.addAll(bookieSocketAddressesDefaultRack);
repp.onClusterChanged(writableBookies, new HashSet<>());
// Case 1 : Bookies in the ensemble from the same rack.
// Manually crafting the ensemble here to create the error case when the check should return false
List<BookieSocketAddress> ensemble = new ArrayList<>(bookieSocketAddressesDefaultRack);
for (int entryId = 0; entryId < 10; entryId++) {
DistributionSchedule ds = new RoundRobinDistributionSchedule(writeQuorumSize, ackQuorumSize, ensembleSize);
DistributionSchedule.WriteSet ws = ds.getWriteSet(entryId);
for (int i = 0; i < ws.size(); i++) {
bookiesForEntry.add(ensemble.get(ws.get(i)));
}
assertFalse(repp.areAckedBookiesAdheringToPlacementPolicy(bookiesForEntry, writeQuorumSize, ackQuorumSize));
}
// Case 2 : Bookies in the ensemble from the different racks
EnsemblePlacementPolicy.PlacementResult<List<BookieSocketAddress>>
ensembleResponse = repp.newEnsemble(ensembleSize,
writeQuorumSize,
ackQuorumSize,
null,
new HashSet<>());
ensemble = ensembleResponse.getResult();
for (int entryId = 0; entryId < 10; entryId++) {
DistributionSchedule ds = new RoundRobinDistributionSchedule(writeQuorumSize, ackQuorumSize, ensembleSize);
DistributionSchedule.WriteSet ws = ds.getWriteSet(entryId);
for (int i = 0; i < ws.size(); i++) {
bookiesForEntry.add(ensemble.get(ws.get(i)));
}
assertTrue(repp.areAckedBookiesAdheringToPlacementPolicy(bookiesForEntry, writeQuorumSize, ackQuorumSize));
}
}
/**
* This tests areAckedBookiesAdheringToPlacementPolicy function in RackawareEnsemblePlacementPolicy.
*/
@Test
public void testAreAckedBookiesAdheringToPlacementPolicy() throws Exception {
testAreAckedBookiesAdheringToPlacementPolicyHelper(2, 7, 3, 2, 7, 3, 3);
testAreAckedBookiesAdheringToPlacementPolicyHelper(4, 6, 3, 2, 6, 3, 3);
testAreAckedBookiesAdheringToPlacementPolicyHelper(5, 7, 5, 3, 7, 5, 2);
}
}