blob: d65054c505ad6eade3baef9dd6dd7ce32cf8c2b3 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestWALEntryStream {
private static HBaseTestingUtility TEST_UTIL;
private static Configuration conf;
private static FileSystem fs;
private static MiniDFSCluster cluster;
private static final TableName tableName = TableName.valueOf("tablename");
private static final byte[] family = Bytes.toBytes("column");
private static final byte[] qualifier = Bytes.toBytes("qualifier");
private static final HRegionInfo info =
new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HConstants.LAST_ROW, false);
private static final NavigableMap<byte[], Integer> scopes = getScopes();
private static NavigableMap<byte[], Integer> getScopes() {
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
scopes.put(family, 1);
return scopes;
}
private WAL log;
PriorityBlockingQueue<Path> walQueue;
private PathWatcher pathWatcher;
@Rule
public TestName tn = new TestName();
private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniDFSCluster(3);
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws Exception {
walQueue = new PriorityBlockingQueue<>();
List<WALActionsListener> listeners = new ArrayList<WALActionsListener>();
pathWatcher = new PathWatcher();
listeners.add(pathWatcher);
final WALFactory wals = new WALFactory(conf, listeners, tn.getMethodName());
log = wals.getWAL(info.getEncodedNameAsBytes(), info.getTable().getNamespace());
}
@After
public void tearDown() throws Exception {
log.close();
}
// Try out different combinations of row count and KeyValue count
@Test
public void testDifferentCounts() throws Exception {
int[] NB_ROWS = { 1500, 60000 };
int[] NB_KVS = { 1, 100 };
// whether compression is used
Boolean[] BOOL_VALS = { false, true };
// long lastPosition = 0;
for (int nbRows : NB_ROWS) {
for (int walEditKVs : NB_KVS) {
for (boolean isCompressionEnabled : BOOL_VALS) {
TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
isCompressionEnabled);
mvcc.advanceTo(1);
for (int i = 0; i < nbRows; i++) {
appendToLogPlus(walEditKVs);
}
log.rollWriter();
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
int i = 0;
while (entryStream.hasNext()) {
assertNotNull(entryStream.next());
i++;
}
assertEquals(nbRows, i);
// should've read all entries
assertFalse(entryStream.hasNext());
}
// reset everything for next loop
log.close();
setUp();
}
}
}
}
/**
* Tests basic reading of log appends
*/
@Test
public void testAppendsWithRolls() throws Exception {
appendToLog();
long oldPos;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
// There's one edit in the log, read it. Reading past it needs to throw exception
assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.next();
assertNotNull(entry);
assertFalse(entryStream.hasNext());
try {
entry = entryStream.next();
fail();
} catch (NoSuchElementException e) {
// expected
}
oldPos = entryStream.getPosition();
}
appendToLog();
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
log, new MetricsSource("1"))) {
// Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
oldPos = entryStream.getPosition();
}
// We rolled but we still should see the end of the first log and get that item
appendToLog();
log.rollWriter();
appendToLog();
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, oldPos,
log, new MetricsSource("1"))) {
WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
// next item should come from the new log
entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry);
// no more entries to read
assertFalse(entryStream.hasNext());
oldPos = entryStream.getPosition();
}
}
/**
* Tests that if after a stream is opened, more entries come in and then the log is rolled, we
* don't mistakenly dequeue the current log thinking we're done with it
*/
@Test
public void testLogrollWhileStreaming() throws Exception {
appendToLog("1");
appendToLog("2");// 2
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
assertEquals("1", getRow(entryStream.next()));
appendToLog("3"); // 3 - comes in after reader opened
log.rollWriter(); // log roll happening while we're reading
appendToLog("4"); // 4 - this append is in the rolled log
assertEquals("2", getRow(entryStream.next()));
assertEquals(2, walQueue.size()); // we should not have dequeued yet since there's still an
// entry in first log
assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
// and 3 would be skipped
assertEquals("4", getRow(entryStream.next())); // 4
assertEquals(1, walQueue.size()); // now we've dequeued and moved on to next log properly
assertFalse(entryStream.hasNext());
}
}
/**
* Tests that if writes come in while we have a stream open, we shouldn't miss them
*/
@Test
public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1");
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
// some new entries come in while we're streaming
appendToLog("2");
appendToLog("3");
// don't see them
assertFalse(entryStream.hasNext());
// But we do if we reset
entryStream.reset();
assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext());
}
}
@Test
public void testResumeStreamingFromPosition() throws Exception {
long lastPosition = 0;
appendToLog("1");
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
entryStream.next(); // we've hit the end of the stream at this point
appendToLog("2");
appendToLog("3");
lastPosition = entryStream.getPosition();
}
// next stream should picks up where we left off
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next()));
assertFalse(entryStream.hasNext()); // done
assertEquals(1, walQueue.size());
}
}
/**
* Tests that if we stop before hitting the end of a stream, we can continue where we left off
* using the last position
*/
@Test
public void testPosition() throws Exception {
long lastPosition = 0;
appendEntriesToLog(3);
// read only one element
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, lastPosition,
log, new MetricsSource("1"))) {
entryStream.next();
lastPosition = entryStream.getPosition();
}
// there should still be two more entries from where we left off
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, lastPosition, log, new MetricsSource("1"))) {
assertNotNull(entryStream.next());
assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext());
}
}
@Test
public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
assertFalse(entryStream.hasNext());
}
}
@Test
public void testReplicationSourceWALReaderThread() throws Exception {
appendEntriesToLog(3);
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, log, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
position = entryStream.getPosition();
}
// start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
when(source.getWALFileLengthProvider()).thenReturn(log);
ReplicationSourceWALReader batcher = new ReplicationSourceWALReader(fs, conf,
walQueue, 0, getDummyFilter(), source);
Path walPath = walQueue.peek();
batcher.start();
WALEntryBatch entryBatch = batcher.take();
// should've batched up our entries
assertNotNull(entryBatch);
assertEquals(3, entryBatch.getWalEntries().size());
assertEquals(position, entryBatch.getLastWalPosition());
assertEquals(walPath, entryBatch.getLastWalPath());
assertEquals(3, entryBatch.getNbRowKeys());
appendToLog("foo");
entryBatch = batcher.take();
assertEquals(1, entryBatch.getNbEntries());
assertEquals(getRow(entryBatch.getWalEntries().get(0)), "foo");
}
private String getRow(WAL.Entry entry) {
Cell cell = entry.getEdit().getCells().get(0);
return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
}
private void appendToLog(String key) throws IOException {
final long txid = log.append(info,
new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes),
getWALEdit(key), true);
log.sync(txid);
}
private void appendEntriesToLog(int count) throws IOException {
for (int i = 0; i < count; i++) {
appendToLog();
}
}
private void appendToLog() throws IOException {
appendToLogPlus(1);
}
private void appendToLogPlus(int count) throws IOException {
final long txid = log.append(info,
new WALKey(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), mvcc, scopes),
getWALEdits(count), true);
log.sync(txid);
}
private WALEdit getWALEdits(int count) {
WALEdit edit = new WALEdit();
for (int i = 0; i < count; i++) {
edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
System.currentTimeMillis(), qualifier));
}
return edit;
}
private WALEdit getWALEdit(String row) {
WALEdit edit = new WALEdit();
edit.add(
new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier));
return edit;
}
private WALEntryFilter getDummyFilter() {
return new WALEntryFilter() {
@Override
public Entry filter(Entry entry) {
return entry;
}
};
}
class PathWatcher extends WALActionsListener.Base {
Path currentPath;
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
walQueue.add(newPath);
currentPath = newPath;
}
}
@Test
public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
appendToLog("1");
appendToLog("2");
long size = log.getLogFileSizeIfBeingWritten(walQueue.peek()).getAsLong();
AtomicLong fileLength = new AtomicLong(size - 1);
try (WALEntryStream entryStream = new WALEntryStream(walQueue, fs, conf, 0,
p -> OptionalLong.of(fileLength.get()), new MetricsSource("1"))) {
assertTrue(entryStream.hasNext());
assertNotNull(entryStream.next());
// can not get log 2
assertFalse(entryStream.hasNext());
Thread.sleep(1000);
entryStream.reset();
// still can not get log 2
assertFalse(entryStream.hasNext());
// can get log 2 now
fileLength.set(size);
entryStream.reset();
assertTrue(entryStream.hasNext());
assertNotNull(entryStream.next());
assertFalse(entryStream.hasNext());
}
}
}