blob: 9e134eea7351efb031c1677ebd65fecce9f6ebbd [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.replication;
import static org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl.REPP_DNS_RESOLVER_CLASS;
import static org.apache.bookkeeper.replication.ReplicationStats.AUDITOR_SCOPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.client.LedgerMetadataBuilder;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.ZoneawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.replication.AuditorPeriodicCheckTest.TestAuditor;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.test.TestStatsProvider.TestOpStatsLogger;
import org.apache.bookkeeper.test.TestStatsProvider.TestStatsLogger;
import org.apache.bookkeeper.util.StaticDNSResolver;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Tests the logic of Auditor's PlacementPolicyCheck.
*/
public class AuditorPlacementPolicyCheckTest extends BookKeeperClusterTestCase {
private MetadataBookieDriver driver;
public AuditorPlacementPolicyCheckTest() {
super(1);
baseConf.setPageLimit(1); // to make it easy to push ledger out of cache
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
StaticDNSResolver.reset();
driver = MetadataDrivers.getBookieDriver(URI.create(bsConfs.get(0).getMetadataServiceUri()));
driver.initialize(bsConfs.get(0), () -> {
}, NullStatsLogger.INSTANCE);
}
@After
@Override
public void tearDown() throws Exception {
if (null != driver) {
driver.close();
}
super.tearDown();
}
@Test
public void testPlacementPolicyCheckWithBookiesFromDifferentRacks() throws Exception {
int numOfBookies = 5;
List<BookieId> bookieAddresses = new ArrayList<>();
BookieSocketAddress bookieAddress;
RegistrationManager regManager = driver.getRegistrationManager();
// all the numOfBookies (5) are going to be in different racks
for (int i = 0; i < numOfBookies; i++) {
bookieAddress = new BookieSocketAddress("98.98.98." + i, 2181);
StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), "/rack" + (i));
bookieAddresses.add(bookieAddress.toBookieId());
regManager.registerBookie(bookieAddress.toBookieId(), false, BookieServiceInfo.EMPTY);
}
LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
LedgerManager lm = mFactory.newLedgerManager();
int ensembleSize = 5;
int writeQuorumSize = 4;
int ackQuorumSize = 2;
int minNumRacksPerWriteQuorumConfValue = 4;
Collections.shuffle(bookieAddresses);
// closed ledger
LedgerMetadata initMeta = LedgerMetadataBuilder.create()
.withId(1L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
.newEnsembleEntry(0L, bookieAddresses)
.withClosedState()
.withLastEntryId(100)
.withLength(10000)
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(1L, initMeta).get();
Collections.shuffle(bookieAddresses);
ensembleSize = 4;
// closed ledger with multiple segments
initMeta = LedgerMetadataBuilder.create()
.withId(2L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
.newEnsembleEntry(0L, bookieAddresses.subList(0, 4))
.newEnsembleEntry(20L, bookieAddresses.subList(1, 5))
.newEnsembleEntry(60L, bookieAddresses.subList(0, 4))
.withClosedState()
.withLastEntryId(100)
.withLength(10000)
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(2L, initMeta).get();
Collections.shuffle(bookieAddresses);
// non-closed ledger
initMeta = LedgerMetadataBuilder.create()
.withId(3L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
.newEnsembleEntry(0L, bookieAddresses.subList(0, 4))
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(3L, initMeta).get();
Collections.shuffle(bookieAddresses);
// non-closed ledger with multiple segments
initMeta = LedgerMetadataBuilder.create()
.withId(4L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
.newEnsembleEntry(0L, bookieAddresses.subList(0, 4))
.newEnsembleEntry(20L, bookieAddresses.subList(1, 5))
.newEnsembleEntry(60L, bookieAddresses.subList(0, 4))
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(4L, initMeta).get();
ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
servConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
setServerConfigPropertiesForRackPlacement(servConf);
MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
try {
TestStatsLogger statsLogger = startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage = statsLogger
.getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
Gauge<? extends Number> ledgersSoftlyAdheringToPlacementPolicyGuage = statsLogger
.getGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY);
/*
* since all of the bookies are in different racks, there shouldn't be any ledger not adhering
* to placement policy.
*/
assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY guage value", 0,
ledgersNotAdheringToPlacementPolicyGuage.getSample());
/*
* since all of the bookies are in different racks, there shouldn't be any ledger softly adhering
* to placement policy.
*/
assertEquals("NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY guage value", 0,
ledgersSoftlyAdheringToPlacementPolicyGuage.getSample());
} finally {
Auditor auditor = auditorRef.getValue();
if (auditor != null) {
auditor.close();
}
}
}
@Test
public void testPlacementPolicyCheckWithLedgersNotAdheringToPlacementPolicy() throws Exception {
int numOfBookies = 5;
int numOfLedgersNotAdheringToPlacementPolicy = 0;
List<BookieId> bookieAddresses = new ArrayList<>();
RegistrationManager regManager = driver.getRegistrationManager();
for (int i = 0; i < numOfBookies; i++) {
BookieId bookieAddress = new BookieSocketAddress("98.98.98." + i, 2181).toBookieId();
bookieAddresses.add(bookieAddress);
regManager.registerBookie(bookieAddress, false, BookieServiceInfo.EMPTY);
}
// only three racks
StaticDNSResolver.addNodeToRack("98.98.98.0", "/rack1");
StaticDNSResolver.addNodeToRack("98.98.98.1", "/rack2");
StaticDNSResolver.addNodeToRack("98.98.98.2", "/rack3");
StaticDNSResolver.addNodeToRack("98.98.98.3", "/rack1");
StaticDNSResolver.addNodeToRack("98.98.98.4", "/rack2");
LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
LedgerManager lm = mFactory.newLedgerManager();
int ensembleSize = 5;
int writeQuorumSize = 3;
int ackQuorumSize = 2;
int minNumRacksPerWriteQuorumConfValue = 3;
/*
* this closed ledger doesn't adhere to placement policy because there are only
* 3 racks, and the ensembleSize is 5.
*/
LedgerMetadata initMeta = LedgerMetadataBuilder.create()
.withId(1L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
.newEnsembleEntry(0L, bookieAddresses)
.withClosedState()
.withLastEntryId(100)
.withLength(10000)
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(1L, initMeta).get();
numOfLedgersNotAdheringToPlacementPolicy++;
/*
* this is non-closed ledger, so it shouldn't count as ledger not
* adhering to placement policy
*/
initMeta = LedgerMetadataBuilder.create()
.withId(2L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
.newEnsembleEntry(0L, bookieAddresses)
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(2L, initMeta).get();
ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
servConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
setServerConfigPropertiesForRackPlacement(servConf);
MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
try {
TestStatsLogger statsLogger = startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage = statsLogger
.getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY guage value",
numOfLedgersNotAdheringToPlacementPolicy, ledgersNotAdheringToPlacementPolicyGuage.getSample());
Gauge<? extends Number> ledgersSoftlyAdheringToPlacementPolicyGuage = statsLogger
.getGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY);
assertEquals("NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY guage value",
0, ledgersSoftlyAdheringToPlacementPolicyGuage.getSample());
} finally {
Auditor auditor = auditorRef.getValue();
if (auditor != null) {
auditor.close();
}
}
}
@Test
public void testPlacementPolicyCheckForURLedgersElapsedRecoveryGracePeriod() throws Exception {
testPlacementPolicyCheckWithURLedgers(true);
}
@Test
public void testPlacementPolicyCheckForURLedgersNotElapsedRecoveryGracePeriod() throws Exception {
testPlacementPolicyCheckWithURLedgers(false);
}
public void testPlacementPolicyCheckWithURLedgers(boolean timeElapsed) throws Exception {
int numOfBookies = 4;
/*
* in timeElapsed=true scenario, set some low value, otherwise set some
* highValue.
*/
int underreplicatedLedgerRecoveryGracePeriod = timeElapsed ? 1 : 1000;
int numOfURLedgersElapsedRecoveryGracePeriod = 0;
List<BookieId> bookieAddresses = new ArrayList<BookieId>();
RegistrationManager regManager = driver.getRegistrationManager();
for (int i = 0; i < numOfBookies; i++) {
BookieId bookieAddress = new BookieSocketAddress("98.98.98." + i, 2181).toBookieId();
bookieAddresses.add(bookieAddress);
regManager.registerBookie(bookieAddress, false, BookieServiceInfo.EMPTY);
}
LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
LedgerManager lm = mFactory.newLedgerManager();
LedgerUnderreplicationManager underreplicationManager = mFactory.newLedgerUnderreplicationManager();
int ensembleSize = 4;
int writeQuorumSize = 3;
int ackQuorumSize = 2;
long ledgerId1 = 1L;
LedgerMetadata initMeta = LedgerMetadataBuilder.create()
.withId(ledgerId1)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
.newEnsembleEntry(0L, bookieAddresses)
.withClosedState()
.withLastEntryId(100)
.withLength(10000)
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(ledgerId1, initMeta).get();
underreplicationManager.markLedgerUnderreplicated(ledgerId1, bookieAddresses.get(0).toString());
if (timeElapsed) {
numOfURLedgersElapsedRecoveryGracePeriod++;
}
/*
* this is non-closed ledger, it should also be reported as
* URLedgersElapsedRecoveryGracePeriod
*/
ensembleSize = 3;
long ledgerId2 = 21234561L;
initMeta = LedgerMetadataBuilder.create()
.withId(ledgerId2)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
.newEnsembleEntry(0L,
Arrays.asList(bookieAddresses.get(0), bookieAddresses.get(1), bookieAddresses.get(2)))
.newEnsembleEntry(100L,
Arrays.asList(bookieAddresses.get(3), bookieAddresses.get(1), bookieAddresses.get(2)))
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(ledgerId2, initMeta).get();
underreplicationManager.markLedgerUnderreplicated(ledgerId2, bookieAddresses.get(0).toString());
if (timeElapsed) {
numOfURLedgersElapsedRecoveryGracePeriod++;
}
/*
* this ledger is not marked underreplicated.
*/
long ledgerId3 = 31234561L;
initMeta = LedgerMetadataBuilder.create()
.withId(ledgerId3)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
.newEnsembleEntry(0L,
Arrays.asList(bookieAddresses.get(1), bookieAddresses.get(2), bookieAddresses.get(3)))
.withClosedState()
.withLastEntryId(100)
.withLength(10000)
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(ledgerId3, initMeta).get();
if (timeElapsed) {
/*
* in timeelapsed scenario, by waiting for
* underreplicatedLedgerRecoveryGracePeriod, recovery time must be
* elapsed.
*/
Thread.sleep((underreplicatedLedgerRecoveryGracePeriod + 1) * 1000);
} else {
/*
* in timeElapsed=false scenario, since
* underreplicatedLedgerRecoveryGracePeriod is set to some high
* value, there is no value in waiting. So just wait for some time
* and make sure urledgers are not reported as recoverytime elapsed
* urledgers.
*/
Thread.sleep(5000);
}
ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
servConf.setUnderreplicatedLedgerRecoveryGracePeriod(underreplicatedLedgerRecoveryGracePeriod);
setServerConfigPropertiesForRackPlacement(servConf);
MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
try {
TestStatsLogger statsLogger = startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
Gauge<? extends Number> underreplicatedLedgersElapsedRecoveryGracePeriodGuage = statsLogger
.getGauge(ReplicationStats.NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD);
assertEquals("NUM_UNDERREPLICATED_LEDGERS_ELAPSED_RECOVERY_GRACE_PERIOD guage value",
numOfURLedgersElapsedRecoveryGracePeriod,
underreplicatedLedgersElapsedRecoveryGracePeriodGuage.getSample());
} finally {
Auditor auditor = auditorRef.getValue();
if (auditor != null) {
auditor.close();
}
}
}
@Test
public void testPlacementPolicyCheckWithLedgersNotAdheringToPolicyWithMultipleSegments() throws Exception {
int numOfBookies = 7;
int numOfLedgersNotAdheringToPlacementPolicy = 0;
List<BookieId> bookieAddresses = new ArrayList<>();
RegistrationManager regManager = driver.getRegistrationManager();
for (int i = 0; i < numOfBookies; i++) {
BookieId bookieAddress = new BookieSocketAddress("98.98.98." + i, 2181).toBookieId();
bookieAddresses.add(bookieAddress);
regManager.registerBookie(bookieAddress, false, BookieServiceInfo.EMPTY);
}
// only three racks
StaticDNSResolver.addNodeToRack("98.98.98.0", "/rack1");
StaticDNSResolver.addNodeToRack("98.98.98.1", "/rack2");
StaticDNSResolver.addNodeToRack("98.98.98.2", "/rack3");
StaticDNSResolver.addNodeToRack("98.98.98.3", "/rack4");
StaticDNSResolver.addNodeToRack("98.98.98.4", "/rack1");
StaticDNSResolver.addNodeToRack("98.98.98.5", "/rack2");
StaticDNSResolver.addNodeToRack("98.98.98.6", "/rack3");
LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
LedgerManager lm = mFactory.newLedgerManager();
int ensembleSize = 5;
int writeQuorumSize = 5;
int ackQuorumSize = 2;
int minNumRacksPerWriteQuorumConfValue = 4;
/*
* this closed ledger in each writeQuorumSize (5), there would be
* atleast minNumRacksPerWriteQuorumConfValue (4) racks. So it wont be
* counted as ledgers not adhering to placement policy.
*/
LedgerMetadata initMeta = LedgerMetadataBuilder.create()
.withId(1L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
.newEnsembleEntry(0L, bookieAddresses.subList(0, 5))
.newEnsembleEntry(20L, bookieAddresses.subList(1, 6))
.withClosedState()
.withLastEntryId(100)
.withLength(10000)
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(1L, initMeta).get();
/*
* for the second segment bookies are from /rack1, /rack2 and /rack3,
* which is < minNumRacksPerWriteQuorumConfValue (4). So it is not
* adhering to placement policy.
*
* also for the third segment are from /rack1, /rack2 and /rack3, which
* is < minNumRacksPerWriteQuorumConfValue (4). So it is not adhering to
* placement policy.
*
* Though there are multiple segments are not adhering to placement
* policy, it should be counted as single ledger.
*/
initMeta = LedgerMetadataBuilder.create()
.withId(2L)
.withEnsembleSize(ensembleSize)
.withWriteQuorumSize(writeQuorumSize)
.withAckQuorumSize(ackQuorumSize)
.newEnsembleEntry(0L, bookieAddresses.subList(0, 5))
.newEnsembleEntry(20L,
Arrays.asList(bookieAddresses.get(0), bookieAddresses.get(1), bookieAddresses.get(2),
bookieAddresses.get(4), bookieAddresses.get(5)))
.newEnsembleEntry(40L,
Arrays.asList(bookieAddresses.get(0), bookieAddresses.get(1), bookieAddresses.get(2),
bookieAddresses.get(4), bookieAddresses.get(6)))
.withClosedState()
.withLastEntryId(100)
.withLength(10000)
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(2L, initMeta).get();
numOfLedgersNotAdheringToPlacementPolicy++;
ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
servConf.setMinNumRacksPerWriteQuorum(minNumRacksPerWriteQuorumConfValue);
setServerConfigPropertiesForRackPlacement(servConf);
MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
try {
TestStatsLogger statsLogger = startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage = statsLogger
.getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY gauge value",
numOfLedgersNotAdheringToPlacementPolicy, ledgersNotAdheringToPlacementPolicyGuage.getSample());
Gauge<? extends Number> ledgersSoftlyAdheringToPlacementPolicyGuage = statsLogger
.getGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY);
assertEquals("NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY gauge value",
0, ledgersSoftlyAdheringToPlacementPolicyGuage.getSample());
} finally {
Auditor auditor = auditorRef.getValue();
if (auditor != null) {
auditor.close();
}
}
}
@Test
public void testZoneawarePlacementPolicyCheck() throws Exception {
int numOfBookies = 6;
int numOfLedgersNotAdheringToPlacementPolicy = 0;
int numOfLedgersSoftlyAdheringToPlacementPolicy = 0;
List<BookieId> bookieAddresses = new ArrayList<BookieId>();
RegistrationManager regManager = driver.getRegistrationManager();
/*
* 6 bookies - 3 zones and 2 uds
*/
for (int i = 0; i < numOfBookies; i++) {
BookieSocketAddress bookieAddress = new BookieSocketAddress("98.98.98." + i, 2181);
bookieAddresses.add(bookieAddress.toBookieId());
regManager.registerBookie(bookieAddress.toBookieId(), false, BookieServiceInfo.EMPTY);
String zone = "/zone" + (i % 3);
String upgradeDomain = "/ud" + (i % 2);
String networkLocation = zone + upgradeDomain;
StaticDNSResolver.addNodeToRack(bookieAddress.getHostName(), networkLocation);
}
LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
LedgerManager lm = mFactory.newLedgerManager();
ServerConfiguration servConf = new ServerConfiguration(bsConfs.get(0));
servConf.setDesiredNumZonesPerWriteQuorum(3);
servConf.setMinNumZonesPerWriteQuorum(2);
setServerConfigPropertiesForZonePlacement(servConf);
/*
* this closed ledger adheres to ZoneAwarePlacementPolicy, since
* ensemble is spread across 3 zones and 2 UDs
*/
LedgerMetadata initMeta = LedgerMetadataBuilder.create()
.withId(1L)
.withEnsembleSize(6)
.withWriteQuorumSize(6)
.withAckQuorumSize(2)
.newEnsembleEntry(0L, bookieAddresses)
.withClosedState()
.withLastEntryId(100)
.withLength(10000)
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(1L, initMeta).get();
/*
* this is non-closed ledger, so though ensemble is not adhering to
* placement policy (since ensemble is not multiple of writeQuorum),
* this shouldn't be reported
*/
initMeta = LedgerMetadataBuilder.create()
.withId(2L)
.withEnsembleSize(6)
.withWriteQuorumSize(5)
.withAckQuorumSize(2)
.newEnsembleEntry(0L, bookieAddresses)
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(2L, initMeta).get();
/*
* this is closed ledger, since ensemble is not multiple of writeQuorum,
* this ledger is not adhering to placement policy.
*/
initMeta = LedgerMetadataBuilder.create()
.withId(3L)
.withEnsembleSize(6)
.withWriteQuorumSize(5)
.withAckQuorumSize(2)
.newEnsembleEntry(0L, bookieAddresses)
.withClosedState()
.withLastEntryId(100)
.withLength(10000)
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(3L, initMeta).get();
numOfLedgersNotAdheringToPlacementPolicy++;
/*
* this closed ledger adheres softly to ZoneAwarePlacementPolicy, since
* ensemble/writeQuorum of size 4 has spread across just
* minNumZonesPerWriteQuorum (2).
*/
List<BookieId> newEnsemble = new ArrayList<BookieId>();
newEnsemble.add(bookieAddresses.get(0));
newEnsemble.add(bookieAddresses.get(1));
newEnsemble.add(bookieAddresses.get(3));
newEnsemble.add(bookieAddresses.get(4));
initMeta = LedgerMetadataBuilder.create()
.withId(4L)
.withEnsembleSize(4)
.withWriteQuorumSize(4)
.withAckQuorumSize(2)
.newEnsembleEntry(0L, newEnsemble)
.withClosedState()
.withLastEntryId(100)
.withLength(10000)
.withDigestType(DigestType.DUMMY)
.withPassword(new byte[0])
.build();
lm.createLedgerMetadata(4L, initMeta).get();
numOfLedgersSoftlyAdheringToPlacementPolicy++;
MutableObject<Auditor> auditorRef = new MutableObject<Auditor>();
try {
TestStatsLogger statsLogger = startAuditorAndWaitForPlacementPolicyCheck(servConf, auditorRef);
Gauge<? extends Number> ledgersNotAdheringToPlacementPolicyGuage = statsLogger
.getGauge(ReplicationStats.NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY);
assertEquals("NUM_LEDGERS_NOT_ADHERING_TO_PLACEMENT_POLICY guage value",
numOfLedgersNotAdheringToPlacementPolicy, ledgersNotAdheringToPlacementPolicyGuage.getSample());
Gauge<? extends Number> ledgersSoftlyAdheringToPlacementPolicyGuage = statsLogger
.getGauge(ReplicationStats.NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY);
assertEquals("NUM_LEDGERS_SOFTLY_ADHERING_TO_PLACEMENT_POLICY guage value",
numOfLedgersSoftlyAdheringToPlacementPolicy,
ledgersSoftlyAdheringToPlacementPolicyGuage.getSample());
} finally {
Auditor auditor = auditorRef.getValue();
if (auditor != null) {
auditor.close();
}
}
}
private void setServerConfigPropertiesForRackPlacement(ServerConfiguration servConf) {
setServerConfigProperties(servConf, RackawareEnsemblePlacementPolicy.class.getName());
}
private void setServerConfigPropertiesForZonePlacement(ServerConfiguration servConf) {
setServerConfigProperties(servConf, ZoneawareEnsemblePlacementPolicy.class.getName());
}
private void setServerConfigProperties(ServerConfiguration servConf, String ensemblePlacementPolicyClass) {
servConf.setProperty(REPP_DNS_RESOLVER_CLASS, StaticDNSResolver.class.getName());
servConf.setProperty(ClientConfiguration.ENSEMBLE_PLACEMENT_POLICY, ensemblePlacementPolicyClass);
servConf.setAuditorPeriodicCheckInterval(0);
servConf.setAuditorPeriodicBookieCheckInterval(0);
servConf.setAuditorPeriodicReplicasCheckInterval(0);
servConf.setAuditorPeriodicPlacementPolicyCheckInterval(1000);
}
private TestStatsLogger startAuditorAndWaitForPlacementPolicyCheck(ServerConfiguration servConf,
MutableObject<Auditor> auditorRef) throws MetadataException, CompatibilityException, KeeperException,
InterruptedException, UnavailableException, UnknownHostException {
LedgerManagerFactory mFactory = driver.getLedgerManagerFactory();
LedgerUnderreplicationManager urm = mFactory.newLedgerUnderreplicationManager();
TestStatsProvider statsProvider = new TestStatsProvider();
TestStatsLogger statsLogger = statsProvider.getStatsLogger(AUDITOR_SCOPE);
TestOpStatsLogger placementPolicyCheckStatsLogger = (TestOpStatsLogger) statsLogger
.getOpStatsLogger(ReplicationStats.PLACEMENT_POLICY_CHECK_TIME);
final TestAuditor auditor = new TestAuditor(BookieImpl.getBookieId(servConf).toString(), servConf,
statsLogger);
auditorRef.setValue(auditor);
CountDownLatch latch = auditor.getLatch();
assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 0, placementPolicyCheckStatsLogger.getSuccessCount());
urm.setPlacementPolicyCheckCTime(-1);
auditor.start();
/*
* since placementPolicyCheckCTime is set to -1, placementPolicyCheck should be
* scheduled to run with no initialdelay
*/
assertTrue("placementPolicyCheck should have executed", latch.await(20, TimeUnit.SECONDS));
for (int i = 0; i < 20; i++) {
Thread.sleep(100);
if (placementPolicyCheckStatsLogger.getSuccessCount() >= 1) {
break;
}
}
assertEquals("PLACEMENT_POLICY_CHECK_TIME SuccessCount", 1, placementPolicyCheckStatsLogger.getSuccessCount());
return statsLogger;
}
}