blob: 8ec887eae7d8852e7410ffa2a234015e9667c07e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.answerVoid;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.MDC;
import org.apache.log4j.spi.LoggingEvent;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test passing of MDC context.
*/
@SuppressWarnings("deprecation")
@Slf4j
public class MdcContextTest extends BookKeeperClusterTestCase {
public static final String MDC_REQUEST_ID = "request_id";
final byte[] entry = "Test Entry".getBytes();
BookKeeper bkc;
LedgerHandle lh;
private Appender mockAppender;
private Queue<String> capturedEvents;
private Logger rootLogger = LogManager.getRootLogger();
public MdcContextTest() {
super(3);
baseConf.setNumAddWorkerThreads(0);
baseConf.setNumReadWorkerThreads(0);
baseConf.setPreserveMdcForTaskExecution(true);
baseConf.setReadOnlyModeEnabled(true);
// for read-only bookie
baseConf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
baseConf.setEntryLogFilePreAllocationEnabled(false);
baseConf.setMinUsableSizeForEntryLogCreation(Long.MAX_VALUE);
}
public static String mdcFormat(Object mdc, String message) {
return mdc == null
? "[" + MDC_REQUEST_ID + ":] - " + message
: "[" + MDC_REQUEST_ID + ":" + mdc.toString()
+ "] - " + message;
}
public void assertLogWithMdc(String mdc, String msgSubstring) {
assertThat(capturedEvents,
hasItem(CoreMatchers.allOf(
containsString("[" + MDC_REQUEST_ID + ":" + mdc + "] - "),
containsString(msgSubstring)
)));
}
@Before
public void setUp() throws Exception {
super.setUp();
ClientConfiguration conf = new ClientConfiguration();
conf.setReadTimeout(360)
.setMetadataServiceUri(zkUtil.getMetadataServiceUri())
.setPreserveMdcForTaskExecution(true);
MDC.clear();
bkc = new BookKeeper(conf);
MDC.put(MDC_REQUEST_ID, "ledger_create");
log.info("creating ledger");
lh = bkc.createLedgerAdv(3, 3, 3, BookKeeper.DigestType.CRC32, new byte[] {});
MDC.clear();
mockAppender = mock(Appender.class);
when(mockAppender.getName()).thenReturn("MockAppender");
rootLogger.addAppender(mockAppender);
rootLogger.setLevel(Level.INFO);
capturedEvents = new ConcurrentLinkedQueue<>();
doAnswer(answerVoid((LoggingEvent event) -> capturedEvents.add(
mdcFormat(event.getMDC(MDC_REQUEST_ID), event.getRenderedMessage())
))).when(mockAppender).doAppend(any());
}
@After
public void tearDown() throws Exception {
lh.close();
bkc.close();
rootLogger.removeAppender(mockAppender);
capturedEvents = null;
MDC.clear();
super.tearDown();
}
@Test
public void testLedgerCreateFails() throws Exception {
MDC.put(MDC_REQUEST_ID, "ledger_create_fail");
try {
bkc.createLedgerAdv(99, 3, 2, BookKeeper.DigestType.CRC32, new byte[]{});
Assert.fail("should not get here");
} catch (BKException bke) {
// expected
}
assertLogWithMdc("ledger_create_fail", "Not enough bookies to create ledger");
}
@Test
public void testSimpleAdd() throws Exception {
MDC.put(MDC_REQUEST_ID, "ledger_add_entry");
lh.addEntry(0, entry);
// client msg
assertLogWithMdc("ledger_add_entry", "Successfully connected to bookie");
// bookie msg
assertLogWithMdc("ledger_add_entry", "Created new entry log file");
}
@Test
public void testAddWithEnsembleChange() throws Exception {
lh.addEntry(0, entry);
startNewBookie();
killBookie(0);
MDC.put(MDC_REQUEST_ID, "ledger_add_entry");
lh.addEntry(1, entry);
assertLogWithMdc("ledger_add_entry", "Could not connect to bookie");
assertLogWithMdc("ledger_add_entry", "Failed to write entry");
//commented out until we figure out a way to preserve MDC through a call out
//to another thread pool
//assertLogWithMdc("ledger_add_entry", "New Ensemble");
}
@Test
public void testAddFailsWithReadOnlyBookie() throws Exception {
for (int i = 0; i < 3; ++i) {
Bookie bookie = bs.get(i).getBookie();
File[] ledgerDirs = bsConfs.get(i).getLedgerDirs();
LedgerDirsManager ledgerDirsManager = ((BookieImpl) bookie).getLedgerDirsManager();
ledgerDirsManager.addToFilledDirs(new File(ledgerDirs[0], "current"));
}
MDC.put(MDC_REQUEST_ID, "ledger_add_entry");
try {
lh.addEntry(0, entry);
Assert.fail("should not get here");
} catch (BKException bke) {
// expected, pass
}
assertLogWithMdc("ledger_add_entry", "No writable ledger dirs below diskUsageThreshold");
assertLogWithMdc("ledger_add_entry", "All ledger directories are non writable and no reserved space");
assertLogWithMdc("ledger_add_entry", "Error writing entry:0 to ledger:0");
assertLogWithMdc("ledger_add_entry", "Add for failed on bookie");
assertLogWithMdc("ledger_add_entry", "Failed to find 1 bookies");
assertLogWithMdc("ledger_add_entry", "Closing ledger 0 due to NotEnoughBookiesException");
}
@Test
public void testAddFailsDuplicateEntry() throws Exception {
lh.addEntry(0, entry);
MDC.put(MDC_REQUEST_ID, "ledger_add_duplicate_entry");
try {
lh.addEntry(0, entry);
Assert.fail("should not get here");
} catch (BKException bke) {
// expected, pass
}
assertLogWithMdc("ledger_add_duplicate_entry", "Trying to re-add duplicate entryid:0");
assertLogWithMdc("ledger_add_duplicate_entry", "Write of ledger entry to quorum failed");
}
@Test
public void testReadEntryBeyondLac() throws Exception {
MDC.put(MDC_REQUEST_ID, "ledger_read_entry");
try {
lh.readEntries(100, 100);
fail("should not get here");
} catch (BKException.BKReadException e) {
// pass
}
assertLogWithMdc("ledger_read_entry", "ReadEntries exception on ledgerId:0 firstEntry:100 lastEntry:100");
}
@Test
public void testReadFromDeletedLedger() throws Exception {
lh.addEntry(0, entry);
lh.close();
bkc.deleteLedger(lh.ledgerId);
MDC.put(MDC_REQUEST_ID, "ledger_read_entry");
try {
lh.readEntries(100, 100);
fail("should not get here");
} catch (BKException.BKReadException e) {
// pass
}
assertLogWithMdc("ledger_read_entry", "ReadEntries exception on ledgerId:0 firstEntry:100 lastEntry:100");
}
}