blob: 874a01c8c711ac893c4480eb9892fd3f8fc31d98 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncBufferMutator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static TableName MULTI_REGION_TABLE_NAME = TableName.valueOf("async-multi-region");
private static byte[] CF = Bytes.toBytes("cf");
private static byte[] CQ = Bytes.toBytes("cq");
private static int COUNT = 100;
private static byte[] VALUE = new byte[1024];
private static AsyncConnection CONN;
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, CF);
TEST_UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, CF);
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
ThreadLocalRandom.current().nextBytes(VALUE);
}
@AfterClass
public static void tearDown() throws Exception {
CONN.close();
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testWithMultiRegionTable() throws InterruptedException {
test(MULTI_REGION_TABLE_NAME);
}
@Test
public void testWithSingleRegionTable() throws InterruptedException {
test(TABLE_NAME);
}
private void test(TableName tableName) throws InterruptedException {
List<CompletableFuture<Void>> futures = new ArrayList<>();
try (AsyncBufferedMutator mutator =
CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16 * 1024).build()) {
List<CompletableFuture<Void>> fs = mutator.mutate(IntStream.range(0, COUNT / 2)
.mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))
.collect(Collectors.toList()));
// exceeded the write buffer size, a flush will be called directly
fs.forEach(f -> f.join());
IntStream.range(COUNT / 2, COUNT).forEach(i -> {
futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
});
// the first future should have been sent out.
futures.get(0).join();
Thread.sleep(2000);
// the last one should still be in write buffer
assertFalse(futures.get(futures.size() - 1).isDone());
}
// mutator.close will call mutator.flush automatically so all tasks should have been done.
futures.forEach(f -> f.join());
AsyncTable<?> table = CONN.getTable(tableName);
IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join())
.forEach(r -> {
assertArrayEquals(VALUE, r.getValue(CF, CQ));
});
}
@Test
public void testClosedMutate() throws InterruptedException {
AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME);
mutator.close();
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
try {
mutator.mutate(put).get();
fail("Close check failed");
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(IOException.class));
assertTrue(e.getCause().getMessage().startsWith("Already closed"));
}
for (CompletableFuture<Void> f : mutator.mutate(Arrays.asList(put))) {
try {
f.get();
fail("Close check failed");
} catch (ExecutionException e) {
assertThat(e.getCause(), instanceOf(IOException.class));
assertTrue(e.getCause().getMessage().startsWith("Already closed"));
}
}
}
@Test
public void testNoPeriodicFlush() throws InterruptedException, ExecutionException {
try (AsyncBufferedMutator mutator =
CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build()) {
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
CompletableFuture<?> future = mutator.mutate(put);
Thread.sleep(2000);
// assert that we have not flushed it out
assertFalse(future.isDone());
mutator.flush();
future.get();
}
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
}
@Test
public void testPeriodicFlush() throws InterruptedException, ExecutionException {
AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME)
.setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build();
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
CompletableFuture<?> future = mutator.mutate(put);
future.get();
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
}
// a bit deep into the implementation
@Test
public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException {
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl) CONN
.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS)
.setWriteBufferSize(10 * put.heapSize()).build()) {
List<CompletableFuture<?>> futures = new ArrayList<>();
futures.add(mutator.mutate(put));
Timeout task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task
assertNotNull(task);
for (int i = 1;; i++) {
futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)));
if (mutator.periodicFlushTask == null) {
break;
}
}
assertTrue(task.isCancelled());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
for (int i = 0; i < futures.size(); i++) {
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(i))).get().getValue(CF, CQ));
}
}
}
@Test
public void testCancelPeriodicFlushByManuallyFlush()
throws InterruptedException, ExecutionException {
try (AsyncBufferedMutatorImpl mutator =
(AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
.setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
CompletableFuture<?> future =
mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
Timeout task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task
assertNotNull(task);
mutator.flush();
assertTrue(task.isCancelled());
future.get();
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
}
}
@Test
public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException {
CompletableFuture<?> future;
Timeout task;
try (AsyncBufferedMutatorImpl mutator =
(AsyncBufferedMutatorImpl) CONN.getBufferedMutatorBuilder(TABLE_NAME)
.setWriteBufferPeriodicFlush(1, TimeUnit.SECONDS).build()) {
future = mutator.mutate(new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE));
task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task
assertNotNull(task);
}
assertTrue(task.isCancelled());
future.get();
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
}
private static final class AsyncBufferMutatorForTest extends AsyncBufferedMutatorImpl {
private int flushCount;
AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table,
long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs, maxKeyValueSize);
}
@Override
protected void internalFlush() {
flushCount++;
super.internalFlush();
}
}
@Test
public void testRaceBetweenNormalFlushAndPeriodicFlush()
throws InterruptedException, ExecutionException {
Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE);
try (AsyncBufferMutatorForTest mutator =
new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME),
10 * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200), 1024 * 1024)) {
CompletableFuture<?> future = mutator.mutate(put);
Timeout task = mutator.periodicFlushTask;
// we should have scheduled a periodic flush task
assertNotNull(task);
synchronized (mutator) {
// synchronized on mutator to prevent periodic flush to be executed
Thread.sleep(500);
// the timeout should be issued
assertTrue(task.isExpired());
// but no flush is issued as we hold the lock
assertEquals(0, mutator.flushCount);
assertFalse(future.isDone());
// manually flush, then release the lock
mutator.flush();
}
// this is a bit deep into the implementation in netty but anyway let's add a check here to
// confirm that an issued timeout can not be canceled by netty framework.
assertFalse(task.isCancelled());
// and the mutation is done
future.get();
AsyncTable<?> table = CONN.getTable(TABLE_NAME);
assertArrayEquals(VALUE, table.get(new Get(Bytes.toBytes(0))).get().getValue(CF, CQ));
// only the manual flush, the periodic flush should have been canceled by us
assertEquals(1, mutator.flushCount);
}
}
}