blob: b250777827c6a0988e3590d65d7469cbde2495fa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Set;
import java.util.SortedMap;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests BKAdmin that it should be able to replicate the failed bookie fragments
* to target bookie.
*/
public class TestLedgerFragmentReplication extends BookKeeperClusterTestCase {
private static final byte[] TEST_PSSWD = "testpasswd".getBytes();
private static final DigestType TEST_DIGEST_TYPE = BookKeeper.DigestType.CRC32;
private static Logger LOG = LoggerFactory
.getLogger(TestLedgerFragmentReplication.class);
public TestLedgerFragmentReplication() {
super(3);
}
private static class CheckerCallback implements
GenericCallback<Set<LedgerFragment>> {
private Set<LedgerFragment> result = null;
private CountDownLatch latch = new CountDownLatch(1);
Set<LedgerFragment> waitAndGetResult() throws InterruptedException {
latch.await();
return result;
}
@Override
public void operationComplete(int rc, Set<LedgerFragment> result) {
this.result = result;
latch.countDown();
}
}
/**
* Tests that replicate method should replicate the failed bookie fragments
* to target bookie passed.
*/
@Test(timeout=60000)
public void testReplicateLFShouldCopyFailedBookieFragmentsToTargetBookie()
throws Exception {
byte[] data = "TestLedgerFragmentReplication".getBytes();
LedgerHandle lh = bkc.createLedger(3, 3, TEST_DIGEST_TYPE,
TEST_PSSWD);
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
}
InetSocketAddress replicaToKill = lh.getLedgerMetadata().getEnsembles()
.get(0L).get(0);
LOG.info("Killing Bookie", replicaToKill);
killBookie(replicaToKill);
int startNewBookie = startNewBookie();
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
}
InetSocketAddress newBkAddr = new InetSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);
Set<LedgerFragment> result = getFragmentsToReplicate(lh);
BookKeeperAdmin admin = new BookKeeperAdmin(baseClientConf);
lh.close();
// 0-9 entries should be copy to new bookie
for (LedgerFragment lf : result) {
admin.replicateLedgerFragment(lh, lf, newBkAddr);
}
// Killing all bookies except newly replicated bookie
SortedMap<Long, ArrayList<InetSocketAddress>> allBookiesBeforeReplication = lh
.getLedgerMetadata().getEnsembles();
Set<Entry<Long, ArrayList<InetSocketAddress>>> entrySet = allBookiesBeforeReplication
.entrySet();
for (Entry<Long, ArrayList<InetSocketAddress>> entry : entrySet) {
ArrayList<InetSocketAddress> bookies = entry.getValue();
for (InetSocketAddress bookie : bookies) {
if (newBkAddr.equals(bookie)) {
continue;
}
killBookie(bookie);
}
}
// Should be able to read the entries from 0-9
verifyRecoveredLedgers(lh, 0, 9);
}
/**
* Tests that fragment re-replication fails on last unclosed ledger
* fragments.
*/
@Test(timeout=60000)
public void testReplicateLFFailsOnlyOnLastUnClosedFragments()
throws Exception {
byte[] data = "TestLedgerFragmentReplication".getBytes();
LedgerHandle lh = bkc.createLedger(3, 3, TEST_DIGEST_TYPE,
TEST_PSSWD);
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
}
InetSocketAddress replicaToKill = lh.getLedgerMetadata().getEnsembles()
.get(0L).get(0);
startNewBookie();
LOG.info("Killing Bookie", replicaToKill);
killBookie(replicaToKill);
// Lets reform ensemble
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
}
InetSocketAddress replicaToKill2 = lh.getLedgerMetadata()
.getEnsembles().get(0L).get(1);
int startNewBookie2 = startNewBookie();
LOG.info("Killing Bookie", replicaToKill2);
killBookie(replicaToKill2);
InetSocketAddress newBkAddr = new InetSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie2);
LOG.info("New Bookie addr :" + newBkAddr);
Set<LedgerFragment> result = getFragmentsToReplicate(lh);
BookKeeperAdmin admin = new BookKeeperAdmin(baseClientConf);
// 0-9 entries should be copy to new bookie
int unclosedCount = 0;
for (LedgerFragment lf : result) {
if (lf.isClosed()) {
admin.replicateLedgerFragment(lh, lf, newBkAddr);
} else {
unclosedCount++;
try {
admin.replicateLedgerFragment(lh, lf, newBkAddr);
fail("Shouldn't be able to rereplicate unclosed ledger");
} catch (BKException bke) {
// correct behaviour
}
}
}
assertEquals("Should be only one unclosed fragment", 1, unclosedCount);
}
/**
* Tests that ReplicateLedgerFragment should return false if replication
* fails
*/
@Test(timeout=60000)
public void testReplicateLFShouldReturnFalseIfTheReplicationFails()
throws Exception {
byte[] data = "TestLedgerFragmentReplication".getBytes();
LedgerHandle lh = bkc.createLedger(2, 1, TEST_DIGEST_TYPE,
TEST_PSSWD);
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
}
// Kill the first Bookie
InetSocketAddress replicaToKill = lh.getLedgerMetadata().getEnsembles()
.get(0L).get(0);
killBookie(replicaToKill);
LOG.info("Killed Bookie =" + replicaToKill);
// Write some more entries
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
}
// Kill the second Bookie
replicaToKill = lh.getLedgerMetadata().getEnsembles().get(0L).get(0);
killBookie(replicaToKill);
LOG.info("Killed Bookie =" + replicaToKill);
Set<LedgerFragment> fragments = getFragmentsToReplicate(lh);
BookKeeperAdmin admin = new BookKeeperAdmin(baseClientConf);
int startNewBookie = startNewBookie();
InetSocketAddress additionalBK = new InetSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie);
for (LedgerFragment lf : fragments) {
try {
admin.replicateLedgerFragment(lh, lf, additionalBK);
} catch (BKException.BKLedgerRecoveryException e) {
// expected
}
}
}
/**
* Tests that splitIntoSubFragment should be able to split the original
* passed fragment into sub fragments at correct boundaries
*/
@Test(timeout = 30000)
public void testSplitIntoSubFragmentsWithDifferentFragmentBoundaries()
throws Exception {
LedgerMetadata metadata = new LedgerMetadata(3, 3, 3, TEST_DIGEST_TYPE,
TEST_PSSWD) {
@Override
ArrayList<InetSocketAddress> getEnsemble(long entryId) {
return null;
}
@Override
public boolean isClosed() {
return true;
}
};
LedgerHandle lh = new LedgerHandle(bkc, 0, metadata, TEST_DIGEST_TYPE,
TEST_PSSWD);
testSplitIntoSubFragments(10, 21, -1, 1, lh);
testSplitIntoSubFragments(10, 21, 20, 1, lh);
testSplitIntoSubFragments(0, 0, 10, 1, lh);
testSplitIntoSubFragments(0, 1, 1, 2, lh);
testSplitIntoSubFragments(20, 24, 2, 3, lh);
testSplitIntoSubFragments(21, 32, 3, 4, lh);
testSplitIntoSubFragments(22, 103, 11, 8, lh);
testSplitIntoSubFragments(49, 51, 1, 3, lh);
testSplitIntoSubFragments(11, 101, 3, 31, lh);
}
/** assert the sub-fragment boundaries */
void testSplitIntoSubFragments(final long oriFragmentFirstEntry,
final long oriFragmentLastEntry, long entriesPerSubFragment,
long expectedSubFragments, LedgerHandle lh) {
LedgerFragment fr = new LedgerFragment(lh, oriFragmentFirstEntry,
oriFragmentLastEntry, 0) {
@Override
public long getLastStoredEntryId() {
return oriFragmentLastEntry;
}
@Override
public long getFirstStoredEntryId() {
return oriFragmentFirstEntry;
}
};
Set<LedgerFragment> subFragments = LedgerFragmentReplicator
.splitIntoSubFragments(lh, fr, entriesPerSubFragment);
assertEquals(expectedSubFragments, subFragments.size());
int fullSubFragment = 0;
int partialSubFragment = 0;
for (LedgerFragment ledgerFragment : subFragments) {
if ((ledgerFragment.getLastKnownEntryId()
- ledgerFragment.getFirstEntryId() + 1) == entriesPerSubFragment) {
fullSubFragment++;
} else {
long totalEntriesToReplicate = oriFragmentLastEntry
- oriFragmentFirstEntry + 1;
if (entriesPerSubFragment <= 0
|| totalEntriesToReplicate / entriesPerSubFragment == 0) {
assertEquals(
"FirstEntryId should be same as original fragment's firstEntryId",
fr.getFirstEntryId(), ledgerFragment
.getFirstEntryId());
assertEquals(
"LastEntryId should be same as original fragment's lastEntryId",
fr.getLastKnownEntryId(), ledgerFragment
.getLastKnownEntryId());
} else {
long partialSplitEntries = totalEntriesToReplicate
% entriesPerSubFragment;
assertEquals(
"Partial fragment with wrong entry boundaries",
ledgerFragment.getLastKnownEntryId()
- ledgerFragment.getFirstEntryId() + 1,
partialSplitEntries);
}
partialSubFragment++;
}
}
assertEquals("Unexpected number of sub fargments", fullSubFragment
+ partialSubFragment, expectedSubFragments);
assertTrue("There should be only one or zero partial sub Fragment",
partialSubFragment == 0 || partialSubFragment == 1);
}
private Set<LedgerFragment> getFragmentsToReplicate(LedgerHandle lh)
throws InterruptedException {
LedgerChecker checker = new LedgerChecker(bkc);
CheckerCallback cb = new CheckerCallback();
checker.checkLedger(lh, cb);
Set<LedgerFragment> fragments = cb.waitAndGetResult();
return fragments;
}
private void verifyRecoveredLedgers(LedgerHandle lh, long startEntryId,
long endEntryId) throws BKException, InterruptedException {
LedgerHandle lhs = bkc.openLedgerNoRecovery(lh.getId(),
TEST_DIGEST_TYPE, TEST_PSSWD);
Enumeration<LedgerEntry> entries = lhs.readEntries(startEntryId,
endEntryId);
assertTrue("Should have the elements", entries.hasMoreElements());
while (entries.hasMoreElements()) {
LedgerEntry entry = entries.nextElement();
assertEquals("TestLedgerFragmentReplication", new String(entry
.getEntry()));
}
}
}