blob: 4417f543ae4804e4c274cc4a7e6e4dcf00dc8983 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.util.BookKeeperConstants.FEATURE_DISABLE_ENSEMBLE_CHANGE;
import static org.apache.bookkeeper.util.TestUtils.assertEventuallyTrue;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import com.google.common.util.concurrent.RateLimiter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.feature.SettableFeature;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test Case on Disabling Ensemble Change Feature.
*/
public class TestDisableEnsembleChange extends BookKeeperClusterTestCase {
private static final Logger logger = LoggerFactory.getLogger(TestDisableEnsembleChange.class);
public TestDisableEnsembleChange() {
super(4);
}
@Test
public void testDisableEnsembleChange() throws Exception {
disableEnsembleChangeTest(true);
}
@Test
public void testDisableEnsembleChangeNotEnoughBookies() throws Exception {
disableEnsembleChangeTest(false);
}
void disableEnsembleChangeTest(boolean startNewBookie) throws Exception {
ClientConfiguration conf = new ClientConfiguration();
conf.setMetadataServiceUri(metadataServiceUri)
.setDelayEnsembleChange(false)
.setDisableEnsembleChangeFeatureName(FEATURE_DISABLE_ENSEMBLE_CHANGE);
SettableFeatureProvider featureProvider = new SettableFeatureProvider("test", 0);
BookKeeper bkc = BookKeeper.forConfig(conf)
.featureProvider(featureProvider)
.build();
SettableFeature disableEnsembleChangeFeature = featureProvider.getFeature(FEATURE_DISABLE_ENSEMBLE_CHANGE);
disableEnsembleChangeFeature.set(true);
final byte[] password = new byte[0];
final LedgerHandle lh = bkc.createLedger(4, 3, 2, BookKeeper.DigestType.CRC32, password);
final AtomicBoolean finished = new AtomicBoolean(false);
final AtomicBoolean failTest = new AtomicBoolean(false);
final byte[] entry = "test-disable-ensemble-change".getBytes(UTF_8);
assertEquals(1, lh.getLedgerMetadata().getAllEnsembles().size());
ArrayList<BookieSocketAddress> ensembleBeforeFailure =
new ArrayList<>(lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next().getValue());
final RateLimiter rateLimiter = RateLimiter.create(10);
Thread addThread = new Thread() {
@Override
public void run() {
try {
while (!finished.get()) {
rateLimiter.acquire();
lh.addEntry(entry);
}
} catch (Exception e) {
logger.error("Exception on adding entry : ", e);
failTest.set(true);
}
}
};
addThread.start();
Thread.sleep(2000);
killBookie(0);
Thread.sleep(2000);
finished.set(true);
addThread.join();
assertFalse("Should not fail adding entries facing one bookie failure when disable ensemble change",
failTest.get());
// check the ensemble after failure
assertEquals("No new ensemble should be added when disable ensemble change.",
1, lh.getLedgerMetadata().getAllEnsembles().size());
ArrayList<BookieSocketAddress> ensembleAfterFailure =
new ArrayList<>(lh.getLedgerMetadata().getAllEnsembles().entrySet().iterator().next().getValue());
assertArrayEquals(ensembleBeforeFailure.toArray(new BookieSocketAddress[ensembleBeforeFailure.size()]),
ensembleAfterFailure.toArray(new BookieSocketAddress[ensembleAfterFailure.size()]));
// enable ensemble change
disableEnsembleChangeFeature.set(false);
if (startNewBookie) {
startNewBookie();
}
// reset add thread
finished.set(false);
final CountDownLatch failLatch = new CountDownLatch(1);
addThread = new Thread() {
@Override
public void run() {
try {
while (!finished.get()) {
lh.addEntry(entry);
}
} catch (Exception e) {
logger.error("Exception on adding entry : ", e);
failLatch.countDown();
failTest.set(true);
}
}
};
addThread.start();
failLatch.await(4000, TimeUnit.MILLISECONDS);
finished.set(true);
addThread.join();
if (startNewBookie) {
assertFalse("Should not fail adding entries when enable ensemble change again.",
failTest.get());
assertFalse("Ledger should be closed when enable ensemble change again.",
lh.getLedgerMetadata().isClosed());
assertEquals("New ensemble should be added when enable ensemble change again.",
2, lh.getLedgerMetadata().getAllEnsembles().size());
} else {
assertTrue("Should fail adding entries when enable ensemble change again.",
failTest.get());
// The ledger close occurs in the background, so assert that it happens eventually
assertEventuallyTrue("Ledger should be closed when enable ensemble change again.",
() -> lh.getLedgerMetadata().isClosed());
}
}
@Test
public void testRetryFailureBookie() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
conf.setMetadataServiceUri(metadataServiceUri)
.setDelayEnsembleChange(false)
.setDisableEnsembleChangeFeatureName(FEATURE_DISABLE_ENSEMBLE_CHANGE);
SettableFeatureProvider featureProvider = new SettableFeatureProvider("test", 0);
BookKeeper bkc = BookKeeper.forConfig(conf)
.featureProvider(featureProvider)
.build();
SettableFeature disableEnsembleChangeFeature = featureProvider.getFeature(FEATURE_DISABLE_ENSEMBLE_CHANGE);
disableEnsembleChangeFeature.set(true);
LedgerHandle lh = bkc.createLedger(4, 4, 4, BookKeeper.DigestType.CRC32, new byte[] {});
byte[] entry = "testRetryFailureBookie".getBytes();
for (int i = 0; i < 10; i++) {
lh.addEntry(entry);
}
// kill a bookie
ServerConfiguration killedConf = killBookie(0);
final AtomicInteger res = new AtomicInteger(0xdeadbeef);
final CountDownLatch addLatch = new CountDownLatch(1);
AsyncCallback.AddCallback cb = new AsyncCallback.AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
logger.info("Add entry {} completed : rc {}.", entryId, rc);
res.set(rc);
addLatch.countDown();
}
};
lh.asyncAddEntry(entry, cb, null);
assertFalse("Add entry operation should not complete.",
addLatch.await(1000, TimeUnit.MILLISECONDS));
assertEquals(res.get(), 0xdeadbeef);
// start the original bookie
bsConfs.add(killedConf);
bs.add(startBookie(killedConf));
assertTrue("Add entry operation should complete at this point.",
addLatch.await(10000, TimeUnit.MILLISECONDS));
assertEquals(res.get(), BKException.Code.OK);
}
@Test
public void testRetrySlowBookie() throws Exception {
final int readTimeout = 2;
ClientConfiguration conf = new ClientConfiguration();
conf.setReadEntryTimeout(readTimeout)
.setAddEntryTimeout(readTimeout)
.setDelayEnsembleChange(false)
.setDisableEnsembleChangeFeatureName(FEATURE_DISABLE_ENSEMBLE_CHANGE)
.setMetadataServiceUri(metadataServiceUri);
SettableFeatureProvider featureProvider = new SettableFeatureProvider("test", 0);
BookKeeper bkc = BookKeeper.forConfig(conf)
.featureProvider(featureProvider)
.build();
SettableFeature disableEnsembleChangeFeature = featureProvider.getFeature(FEATURE_DISABLE_ENSEMBLE_CHANGE);
disableEnsembleChangeFeature.set(true);
LedgerHandle lh = bkc.createLedger(4, 4, 4, BookKeeper.DigestType.CRC32, new byte[] {});
byte[] entry = "testRetryFailureBookie".getBytes();
for (int i = 0; i < 10; i++) {
lh.addEntry(entry);
}
List<BookieSocketAddress> curEns = lh.getCurrentEnsemble();
final CountDownLatch wakeupLatch = new CountDownLatch(1);
final CountDownLatch suspendLatch = new CountDownLatch(1);
sleepBookie(curEns.get(2), wakeupLatch, suspendLatch);
suspendLatch.await();
final AtomicInteger res = new AtomicInteger(0xdeadbeef);
final CountDownLatch addLatch = new CountDownLatch(1);
AsyncCallback.AddCallback cb = new AsyncCallback.AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
logger.info("Add entry {} completed : rc {}.", entryId, rc);
res.set(rc);
addLatch.countDown();
}
};
lh.asyncAddEntry(entry, cb, null);
assertFalse("Add entry operation should not complete.",
addLatch.await(1000, TimeUnit.MILLISECONDS));
assertEquals(res.get(), 0xdeadbeef);
// wait until read timeout
assertFalse("Add entry operation should not complete even timeout.",
addLatch.await(readTimeout, TimeUnit.SECONDS));
assertEquals(res.get(), 0xdeadbeef);
// wait one more read timeout, to ensure we resend multiple retries
// to ensure it works correctly
assertFalse("Add entry operation should not complete even timeout.",
addLatch.await(readTimeout, TimeUnit.SECONDS));
assertEquals(res.get(), 0xdeadbeef);
// wakeup the sleep bookie
wakeupLatch.countDown();
assertTrue("Add entry operation should complete at this point.",
addLatch.await(10000, TimeUnit.MILLISECONDS));
assertEquals(res.get(), BKException.Code.OK);
}
}