blob: 2952fa37c85c49bd559fe922b41a2e46a59b85f6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.FamilyFilter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.TimestampsFilter;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@RunWith(Parameterized.class)
@Category({ MediumTests.class, ClientTests.class })
public class TestAsyncTable {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncTable.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("async");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] QUALIFIER = Bytes.toBytes("cq");
private static byte[] VALUE = Bytes.toBytes("value");
private static int MAX_KEY_VALUE_SIZE = 64 * 1024;
private static AsyncConnection ASYNC_CONN;
@Rule
public TestName testName = new TestName();
private byte[] row;
@Parameter
public Supplier<AsyncTable<?>> getTable;
private static AsyncTable<?> getRawTable() {
return ASYNC_CONN.getTable(TABLE_NAME);
}
private static AsyncTable<?> getTable() {
return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
}
@Parameters
public static List<Object[]> params() {
return Arrays.asList(new Supplier<?>[] { TestAsyncTable::getRawTable },
new Supplier<?>[] { TestAsyncTable::getTable });
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(ConnectionConfiguration.MAX_KEYVALUE_SIZE_KEY,
MAX_KEY_VALUE_SIZE);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
assertFalse(ASYNC_CONN.isClosed());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
Closeables.close(ASYNC_CONN, true);
assertTrue(ASYNC_CONN.isClosed());
TEST_UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws IOException, InterruptedException, ExecutionException {
row = Bytes.toBytes(testName.getMethodName().replaceAll("[^0-9A-Za-z]", "_"));
if (ASYNC_CONN.getAdmin().isTableDisabled(TABLE_NAME).get()) {
ASYNC_CONN.getAdmin().enableTable(TABLE_NAME).get();
}
}
@Test
public void testSimple() throws Exception {
AsyncTable<?> table = getTable.get();
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).get();
assertTrue(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
Result result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
table.delete(new Delete(row)).get();
result = table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get();
assertTrue(result.isEmpty());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, QUALIFIER)).get());
}
private byte[] concat(byte[] base, int index) {
return Bytes.toBytes(Bytes.toString(base) + "-" + index);
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testSimpleMultiple() throws Exception {
AsyncTable<?> table = getTable.get();
int count = 100;
CountDownLatch putLatch = new CountDownLatch(count);
IntStream.range(0, count).forEach(
i -> table.put(new Put(concat(row, i)).addColumn(FAMILY, QUALIFIER, concat(VALUE, i)))
.thenAccept(x -> putLatch.countDown()));
putLatch.await();
BlockingQueue<Boolean> existsResp = new ArrayBlockingQueue<>(count);
IntStream.range(0, count)
.forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> existsResp.add(x)));
for (int i = 0; i < count; i++) {
assertTrue(existsResp.take());
}
BlockingQueue<Pair<Integer, Result>> getResp = new ArrayBlockingQueue<>(count);
IntStream.range(0, count)
.forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> getResp.add(Pair.newPair(i, x))));
for (int i = 0; i < count; i++) {
Pair<Integer, Result> pair = getResp.take();
assertArrayEquals(concat(VALUE, pair.getFirst()),
pair.getSecond().getValue(FAMILY, QUALIFIER));
}
CountDownLatch deleteLatch = new CountDownLatch(count);
IntStream.range(0, count).forEach(
i -> table.delete(new Delete(concat(row, i))).thenAccept(x -> deleteLatch.countDown()));
deleteLatch.await();
IntStream.range(0, count)
.forEach(i -> table.exists(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> existsResp.add(x)));
for (int i = 0; i < count; i++) {
assertFalse(existsResp.take());
}
IntStream.range(0, count)
.forEach(i -> table.get(new Get(concat(row, i)).addColumn(FAMILY, QUALIFIER))
.thenAccept(x -> getResp.add(Pair.newPair(i, x))));
for (int i = 0; i < count; i++) {
Pair<Integer, Result> pair = getResp.take();
assertTrue(pair.getSecond().isEmpty());
}
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testIncrement() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
int count = 100;
CountDownLatch latch = new CountDownLatch(count);
AtomicLong sum = new AtomicLong(0L);
IntStream.range(0, count)
.forEach(i -> table.incrementColumnValue(row, FAMILY, QUALIFIER, 1).thenAccept(x -> {
sum.addAndGet(x);
latch.countDown();
}));
latch.await();
assertEquals(count, Bytes.toLong(
table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER)));
assertEquals((1 + count) * count / 2, sum.get());
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testAppend() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
int count = 10;
CountDownLatch latch = new CountDownLatch(count);
char suffix = ':';
AtomicLong suffixCount = new AtomicLong(0L);
IntStream.range(0, count)
.forEachOrdered(i -> table
.append(new Append(row).addColumn(FAMILY, QUALIFIER, Bytes.toBytes("" + i + suffix)))
.thenAccept(r -> {
suffixCount.addAndGet(
Bytes.toString(r.getValue(FAMILY, QUALIFIER)).chars().filter(x -> x == suffix).count());
latch.countDown();
}));
latch.await();
assertEquals((1 + count) * count / 2, suffixCount.get());
String value = Bytes.toString(
table.get(new Get(row).addColumn(FAMILY, QUALIFIER)).get().getValue(FAMILY, QUALIFIER));
int[] actual = Arrays.asList(value.split("" + suffix)).stream().mapToInt(Integer::parseInt)
.sorted().toArray();
assertArrayEquals(IntStream.range(0, count).toArray(), actual);
}
@Test
public void testMutateRow() throws InterruptedException, ExecutionException, IOException {
AsyncTable<?> table = getTable.get();
RowMutations mutation = new RowMutations(row);
mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 1), VALUE));
Result result = table.mutateRow(mutation).get();
assertTrue(result.getExists());
assertTrue(result.isEmpty());
result = table.get(new Get(row)).get();
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 1)));
mutation = new RowMutations(row);
mutation.add(new Delete(row).addColumn(FAMILY, concat(QUALIFIER, 1)));
mutation.add(new Put(row).addColumn(FAMILY, concat(QUALIFIER, 2), VALUE));
mutation.add(new Increment(row).addColumn(FAMILY, concat(QUALIFIER, 3), 2L));
mutation.add(new Append(row).addColumn(FAMILY, concat(QUALIFIER, 4), Bytes.toBytes("abc")));
result = table.mutateRow(mutation).get();
assertTrue(result.getExists());
assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
result = table.get(new Get(row)).get();
assertNull(result.getValue(FAMILY, concat(QUALIFIER, 1)));
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, 2)));
assertEquals(2L, Bytes.toLong(result.getValue(FAMILY, concat(QUALIFIER, 3))));
assertEquals("abc", Bytes.toString(result.getValue(FAMILY, concat(QUALIFIER, 4))));
}
// Tests for old checkAndMutate API
@SuppressWarnings("FutureReturnValueIgnored")
@Test
@Deprecated
public void testCheckAndPutForOldApi() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
int count = 10;
CountDownLatch latch = new CountDownLatch(count);
IntStream.range(0, count)
.forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists()
.thenPut(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))).thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
latch.countDown();
}));
latch.await();
assertEquals(1, successCount.get());
String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
@Deprecated
public void testCheckAndDeleteForOldApi() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
int count = 10;
CountDownLatch putLatch = new CountDownLatch(count + 1);
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
IntStream.range(0, count)
.forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
.thenRun(() -> putLatch.countDown()));
putLatch.await();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
CountDownLatch deleteLatch = new CountDownLatch(count);
IntStream.range(0, count)
.forEach(i -> table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE)
.thenDelete(
new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i)))
.thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
deleteLatch.countDown();
}));
deleteLatch.await();
assertEquals(1, successCount.get());
Result result = table.get(new Get(row)).get();
IntStream.range(0, count).forEach(i -> {
if (i == successIndex.get()) {
assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
} else {
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
}
});
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
@Deprecated
public void testCheckAndMutateForOldApi() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
int count = 10;
CountDownLatch putLatch = new CountDownLatch(count + 1);
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
IntStream.range(0, count)
.forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
.thenRun(() -> putLatch.countDown()));
putLatch.await();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
CountDownLatch mutateLatch = new CountDownLatch(count);
IntStream.range(0, count).forEach(i -> {
RowMutations mutation = new RowMutations(row);
try {
mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
mutation
.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifEquals(VALUE).thenMutate(mutation)
.thenAccept(x -> {
if (x) {
successCount.incrementAndGet();
successIndex.set(i);
}
mutateLatch.countDown();
});
});
mutateLatch.await();
assertEquals(1, successCount.get());
Result result = table.get(new Get(row)).get();
IntStream.range(0, count).forEach(i -> {
if (i == successIndex.get()) {
assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
} else {
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
}
});
}
@Test
@Deprecated
public void testCheckAndMutateWithTimeRangeForOldApi() throws Exception {
AsyncTable<?> table = getTable.get();
final long ts = System.currentTimeMillis() / 2;
Put put = new Put(row);
put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
boolean ok =
table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).ifNotExists().thenPut(put).get();
assertTrue(ok);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE).thenPut(put).get();
assertFalse(ok);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
.ifEquals(VALUE).thenPut(put).get();
assertTrue(ok);
RowMutations rm = new RowMutations(row).add((Mutation) put);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE).thenMutate(rm).get();
assertFalse(ok);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
.ifEquals(VALUE).thenMutate(rm).get();
assertTrue(ok);
Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts + 10000))
.ifEquals(VALUE).thenDelete(delete).get();
assertFalse(ok);
ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER).timeRange(TimeRange.at(ts))
.ifEquals(VALUE).thenDelete(delete).get();
assertTrue(ok);
}
@Test
@Deprecated
public void testCheckAndMutateWithSingleFilterForOldApi() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put one row
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
table.put(put).get();
// Put with success
boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.get();
assertTrue(ok);
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("b")))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
.get();
assertFalse(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
// Delete with success
ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
.get();
assertTrue(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
// Mutate with success
ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
CompareOperator.EQUAL, Bytes.toBytes("b")))
.thenMutate(new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
.get();
assertTrue(ok);
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
}
@Test
@Deprecated
public void testCheckAndMutateWithMultipleFiltersForOldApi() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put one row
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
table.put(put).get();
// Put with success
boolean ok = table.checkAndMutate(row, new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))
))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.get();
assertTrue(ok);
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
ok = table.checkAndMutate(row, new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("c"))
))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))
.get();
assertFalse(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
// Delete with success
ok = table.checkAndMutate(row, new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))
))
.thenDelete(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))
.get();
assertTrue(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
// Mutate with success
ok = table.checkAndMutate(row, new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))
))
.thenMutate(new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))
.get();
assertTrue(ok);
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
}
@Test
@Deprecated
public void testCheckAndMutateWithTimestampFilterForOldApi() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put with specifying the timestamp
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
// Put with success
boolean ok = table.checkAndMutate(row, new FilterList(
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
new TimestampsFilter(Collections.singletonList(100L))
))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
.get();
assertTrue(ok);
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
ok = table.checkAndMutate(row, new FilterList(
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
new TimestampsFilter(Collections.singletonList(101L))
))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
.get();
assertFalse(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
}
@Test
@Deprecated
public void testCheckAndMutateWithFilterAndTimeRangeForOldApi() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put with specifying the timestamp
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
.get();
// Put with success
boolean ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 101))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))
.get();
assertTrue(ok);
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
ok = table.checkAndMutate(row, new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 100))
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
.get();
assertFalse(ok);
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
}
@Test(expected = NullPointerException.class)
@Deprecated
public void testCheckAndMutateWithoutConditionForOldApi() {
getTable.get().checkAndMutate(row, FAMILY)
.thenPut(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
}
// Tests for new CheckAndMutate API
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndPut() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
int count = 10;
CountDownLatch latch = new CountDownLatch(count);
IntStream.range(0, count)
.forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER)
.build(new Put(row).addColumn(FAMILY, QUALIFIER, concat(VALUE, i))))
.thenAccept(x -> {
if (x.isSuccess()) {
successCount.incrementAndGet();
successIndex.set(i);
}
assertNull(x.getResult());
latch.countDown();
}));
latch.await();
assertEquals(1, successCount.get());
String actual = Bytes.toString(table.get(new Get(row)).get().getValue(FAMILY, QUALIFIER));
assertTrue(actual.endsWith(Integer.toString(successIndex.get())));
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndDelete() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
int count = 10;
CountDownLatch putLatch = new CountDownLatch(count + 1);
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
IntStream.range(0, count)
.forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
.thenRun(() -> putLatch.countDown()));
putLatch.await();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
CountDownLatch deleteLatch = new CountDownLatch(count);
IntStream.range(0, count)
.forEach(i -> table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.build(
new Delete(row).addColumn(FAMILY, QUALIFIER).addColumn(FAMILY, concat(QUALIFIER, i))))
.thenAccept(x -> {
if (x.isSuccess()) {
successCount.incrementAndGet();
successIndex.set(i);
}
assertNull(x.getResult());
deleteLatch.countDown();
}));
deleteLatch.await();
assertEquals(1, successCount.get());
Result result = table.get(new Get(row)).get();
IntStream.range(0, count).forEach(i -> {
if (i == successIndex.get()) {
assertFalse(result.containsColumn(FAMILY, concat(QUALIFIER, i)));
} else {
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
}
});
}
@SuppressWarnings("FutureReturnValueIgnored")
@Test
public void testCheckAndMutate() throws InterruptedException, ExecutionException {
AsyncTable<?> table = getTable.get();
int count = 10;
CountDownLatch putLatch = new CountDownLatch(count + 1);
table.put(new Put(row).addColumn(FAMILY, QUALIFIER, VALUE)).thenRun(() -> putLatch.countDown());
IntStream.range(0, count)
.forEach(i -> table.put(new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), VALUE))
.thenRun(() -> putLatch.countDown()));
putLatch.await();
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger successIndex = new AtomicInteger(-1);
CountDownLatch mutateLatch = new CountDownLatch(count);
IntStream.range(0, count).forEach(i -> {
RowMutations mutation = new RowMutations(row);
try {
mutation.add((Mutation) new Delete(row).addColumn(FAMILY, QUALIFIER));
mutation
.add((Mutation) new Put(row).addColumn(FAMILY, concat(QUALIFIER, i), concat(VALUE, i)));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.build(mutation))
.thenAccept(x -> {
if (x.isSuccess()) {
successCount.incrementAndGet();
successIndex.set(i);
}
assertNull(x.getResult());
mutateLatch.countDown();
});
});
mutateLatch.await();
assertEquals(1, successCount.get());
Result result = table.get(new Get(row)).get();
IntStream.range(0, count).forEach(i -> {
if (i == successIndex.get()) {
assertArrayEquals(concat(VALUE, i), result.getValue(FAMILY, concat(QUALIFIER, i)));
} else {
assertArrayEquals(VALUE, result.getValue(FAMILY, concat(QUALIFIER, i)));
}
});
}
@Test
public void testCheckAndMutateWithTimeRange() throws Exception {
AsyncTable<?> table = getTable.get();
final long ts = System.currentTimeMillis() / 2;
Put put = new Put(row);
put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER)
.build(put)).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts + 10000))
.build(put)).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts))
.build(put)).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
RowMutations rm = new RowMutations(row).add((Mutation) put);
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts + 10000))
.build(rm)).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts))
.build(rm)).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
Delete delete = new Delete(row).addColumn(FAMILY, QUALIFIER);
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts + 10000))
.build(delete)).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, QUALIFIER, VALUE)
.timeRange(TimeRange.at(ts))
.build(delete)).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
}
@Test
public void testCheckAndMutateWithSingleFilter() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put one row
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
table.put(put).get();
// Put with success
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("b")))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
// Delete with success
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
// Mutate with success
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"),
CompareOperator.EQUAL, Bytes.toBytes("b")))
.build(new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
}
@Test
public void testCheckAndMutateWithMultipleFilters() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put one row
Put put = new Put(row);
put.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"));
put.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"));
put.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"));
table.put(put).get();
// Put with success
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
// Put with failure
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("c"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")))).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("E"))).get());
// Delete with success
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("D")))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get());
// Mutate with success
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
}
@Test
public void testCheckAndMutateWithTimestampFilter() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put with specifying the timestamp
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))).get();
// Put with success
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
new TimestampsFilter(Collections.singletonList(100L))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(FAMILY)),
new QualifierFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("A"))),
new TimestampsFilter(Collections.singletonList(101L))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
}
@Test
public void testCheckAndMutateWithFilterAndTimeRange() throws Throwable {
AsyncTable<?> table = getTable.get();
// Put with specifying the timestamp
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a")))
.get();
// Put with success
CheckAndMutateResult result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 101))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
assertTrue(result.isSuccess());
assertNull(result.getResult());
Result r = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(r.getValue(FAMILY, Bytes.toBytes("B"))));
// Put with failure
result = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"),
CompareOperator.EQUAL, Bytes.toBytes("a")))
.timeRange(TimeRange.between(0, 100))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))))
.get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
}
@Test
public void testCheckAndIncrement() throws Throwable {
AsyncTable<?> table = getTable.get();
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
// CheckAndIncrement with correct value
CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get();
assertTrue(res.isSuccess());
assertEquals(1, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndIncrement with wrong value
res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1))).get();
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
// CheckAndIncrement with a filter and correct value
res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("c"))))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
assertTrue(res.isSuccess());
assertEquals(3, Bytes.toLong(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndIncrement with a filter and correct value
res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("b")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("d"))))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 2))).get();
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals(3, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
}
@Test
public void testCheckAndAppend() throws Throwable {
AsyncTable<?> table = getTable.get();
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))).get();
// CheckAndAppend with correct value
CheckAndMutateResult res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
assertTrue(res.isSuccess());
assertEquals("b", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndAppend with correct value
res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("b"))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")))).get();
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
table.put(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")));
// CheckAndAppend with a filter and correct value
res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("c"))))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
assertTrue(res.isSuccess());
assertEquals("bbb", Bytes.toString(res.getResult().getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// CheckAndAppend with a filter and wrong value
res = table.checkAndMutate(CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("b")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("C"), CompareOperator.EQUAL,
Bytes.toBytes("d"))))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("bb")))).get();
assertFalse(res.isSuccess());
assertNull(res.getResult());
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("bbb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
}
@Test
public void testCheckAndRowMutations() throws Throwable {
final byte[] q1 = Bytes.toBytes("q1");
final byte[] q2 = Bytes.toBytes("q2");
final byte[] q3 = Bytes.toBytes("q3");
final byte[] q4 = Bytes.toBytes("q4");
final String v1 = "v1";
AsyncTable<?> table = getTable.get();
// Initial values
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, q2, Bytes.toBytes("toBeDeleted")),
new Put(row).addColumn(FAMILY, q3, Bytes.toBytes(5L)),
new Put(row).addColumn(FAMILY, q4, Bytes.toBytes("a")))).get();
// Do CheckAndRowMutations
CheckAndMutate checkAndMutate = CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, q1)
.build(new RowMutations(row).add(Arrays.asList(
new Put(row).addColumn(FAMILY, q1, Bytes.toBytes(v1)),
new Delete(row).addColumns(FAMILY, q2),
new Increment(row).addColumn(FAMILY, q3, 1),
new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
);
CheckAndMutateResult result = table.checkAndMutate(checkAndMutate).get();
assertTrue(result.isSuccess());
assertEquals(6L, Bytes.toLong(result.getResult().getValue(FAMILY, q3)));
assertEquals("ab", Bytes.toString(result.getResult().getValue(FAMILY, q4)));
// Verify the value
Result r = table.get(new Get(row)).get();
assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
assertNull(r.getValue(FAMILY, q2));
assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
// Do CheckAndRowMutations again
checkAndMutate = CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, q1)
.build(new RowMutations(row).add(Arrays.asList(
new Delete(row).addColumns(FAMILY, q1),
new Put(row).addColumn(FAMILY, q2, Bytes.toBytes(v1)),
new Increment(row).addColumn(FAMILY, q3, 1),
new Append(row).addColumn(FAMILY, q4, Bytes.toBytes("b"))))
);
result = table.checkAndMutate(checkAndMutate).get();
assertFalse(result.isSuccess());
assertNull(result.getResult());
// Verify the value
r = table.get(new Get(row)).get();
assertEquals(v1, Bytes.toString(r.getValue(FAMILY, q1)));
assertNull(r.getValue(FAMILY, q2));
assertEquals(6L, Bytes.toLong(r.getValue(FAMILY, q3)));
assertEquals("ab", Bytes.toString(r.getValue(FAMILY, q4)));
}
// Tests for batch version of checkAndMutate
@Test
public void testCheckAndMutateBatch() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
// Test for Put
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
.build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
List<CheckAndMutateResult> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Test for Delete
checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e"))
.build(new Delete(row));
checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("a"))
.build(new Delete(row2));
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get());
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Test for RowMutations
checkAndMutate1 = CheckAndMutate.newBuilder(row3)
.ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
.build(new RowMutations(row3)
.add((Mutation) new Put(row3)
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
.add((Mutation) new Delete(row3).addColumns(FAMILY, Bytes.toBytes("C"))));
checkAndMutate2 = CheckAndMutate.newBuilder(row4)
.ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f"))
.build(new RowMutations(row4)
.add((Mutation) new Put(row4)
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))
.add((Mutation) new Delete(row4).addColumns(FAMILY, Bytes.toBytes("D"))));
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
result = table.get(new Get(row3)).get();
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
assertNull(result.getValue(FAMILY, Bytes.toBytes("D")));
result = table.get(new Get(row4)).get();
assertNull(result.getValue(FAMILY, Bytes.toBytes("F")));
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
@Test
public void testCheckAndMutateBatch2() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
byte[] row3 = Bytes.toBytes(Bytes.toString(row) + "3");
byte[] row4 = Bytes.toBytes(Bytes.toString(row) + "4");
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), 100, Bytes.toBytes("c")),
new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d")))).get();
// Test for ifNotExists()
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, Bytes.toBytes("B"))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("e")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifNotExists(FAMILY, Bytes.toBytes("B"))
.build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
List<CheckAndMutateResult> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Test for ifMatches()
checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifMatches(FAMILY, Bytes.toBytes("A"), CompareOperator.NOT_EQUAL, Bytes.toBytes("a"))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")));
checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifMatches(FAMILY, Bytes.toBytes("B"), CompareOperator.GREATER, Bytes.toBytes("b"))
.build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("f")));
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("A"))).get();
assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("b", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
// Test for timeRange()
checkAndMutate1 = CheckAndMutate.newBuilder(row3)
.ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
.timeRange(TimeRange.between(0, 101))
.build(new Put(row3).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("e")));
checkAndMutate2 = CheckAndMutate.newBuilder(row4)
.ifEquals(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
.timeRange(TimeRange.between(0, 100))
.build(new Put(row4).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("f")));
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
result = table.get(new Get(row3).addColumn(FAMILY, Bytes.toBytes("C"))).get();
assertEquals("e", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
result = table.get(new Get(row4).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
@Test
public void testCheckAndMutateBatchWithFilter() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
table.putAll(Arrays.asList(
new Put(row)
.addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(row2)
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
.addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e"))
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get();
// Test for Put
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
List<CheckAndMutateResult> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
// Test for Delete
checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Delete(row).addColumns(FAMILY, Bytes.toBytes("C")));
checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new Delete(row2).addColumn(FAMILY, Bytes.toBytes("F")));
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
assertFalse(table.exists(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get());
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
// Test for RowMutations
checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new RowMutations(row)
.add((Mutation) new Put(row)
.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")))
.add((Mutation) new Delete(row).addColumns(FAMILY, Bytes.toBytes("A"))));
checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.build(new RowMutations(row2)
.add((Mutation) new Put(row2)
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("g")))
.add((Mutation) new Delete(row2).addColumns(FAMILY, Bytes.toBytes("D"))));
results = table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
result = table.get(new Get(row)).get();
assertNull(result.getValue(FAMILY, Bytes.toBytes("A")));
assertEquals("c", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
result = table.get(new Get(row2)).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
}
@Test
public void testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), 100, Bytes.toBytes("a"))
.addColumn(FAMILY, Bytes.toBytes("B"), 100, Bytes.toBytes("b"))
.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("D"), 100, Bytes.toBytes("d"))
.addColumn(FAMILY, Bytes.toBytes("E"), 100, Bytes.toBytes("e"))
.addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f")))).get();
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("A"), CompareOperator.EQUAL,
Bytes.toBytes("a")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("B"), CompareOperator.EQUAL,
Bytes.toBytes("b"))))
.timeRange(TimeRange.between(0, 101))
.build(new Put(row).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("g")));
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifMatches(new FilterList(
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("D"), CompareOperator.EQUAL,
Bytes.toBytes("d")),
new SingleColumnValueFilter(FAMILY, Bytes.toBytes("E"), CompareOperator.EQUAL,
Bytes.toBytes("e"))))
.timeRange(TimeRange.between(0, 100))
.build(new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("h")));
List<CheckAndMutateResult> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertNull(results.get(0).getResult());
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("C"))).get();
assertEquals("g", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("C"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("F"))).get();
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
}
@Test
public void testCheckAndIncrementBatch() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes(0L)),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes(0L)))).get();
// CheckAndIncrement with correct value
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1));
// CheckAndIncrement with wrong value
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
.build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
List<CheckAndMutateResult> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertEquals(1, Bytes.toLong(results.get(0).getResult()
.getValue(FAMILY, Bytes.toBytes("B"))));
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals(0, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
@Test
public void testCheckAndAppendBatch() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")))).get();
// CheckAndAppend with correct value
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
.build(new Append(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b")));
// CheckAndAppend with wrong value
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
.build(new Append(row2).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")));
List<CheckAndMutateResult> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertEquals("bb", Bytes.toString(results.get(0).getResult()
.getValue(FAMILY, Bytes.toBytes("B"))));
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row).addColumn(FAMILY, Bytes.toBytes("B"))).get();
assertEquals("bb", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("B"))));
result = table.get(new Get(row2).addColumn(FAMILY, Bytes.toBytes("D"))).get();
assertEquals("d", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
}
@Test
public void testCheckAndRowMutationsBatch() throws Throwable {
AsyncTable<?> table = getTable.get();
byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
table.putAll(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
.addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes(1L))
.addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d")),
new Put(row2).addColumn(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("f"))
.addColumn(FAMILY, Bytes.toBytes("G"), Bytes.toBytes(1L))
.addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h")))
).get();
// CheckAndIncrement with correct value
CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
.ifEquals(FAMILY, Bytes.toBytes("B"), Bytes.toBytes("b"))
.build(new RowMutations(row).add(Arrays.asList(
new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a")),
new Delete(row).addColumns(FAMILY, Bytes.toBytes("B")),
new Increment(row).addColumn(FAMILY, Bytes.toBytes("C"), 1L),
new Append(row).addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes("d"))
)));
// CheckAndIncrement with wrong value
CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
.ifEquals(FAMILY, Bytes.toBytes("F"), Bytes.toBytes("a"))
.build(new RowMutations(row2).add(Arrays.asList(
new Put(row2).addColumn(FAMILY, Bytes.toBytes("E"), Bytes.toBytes("e")),
new Delete(row2).addColumns(FAMILY, Bytes.toBytes("F")),
new Increment(row2).addColumn(FAMILY, Bytes.toBytes("G"), 1L),
new Append(row2).addColumn(FAMILY, Bytes.toBytes("H"), Bytes.toBytes("h"))
)));
List<CheckAndMutateResult> results =
table.checkAndMutateAll(Arrays.asList(checkAndMutate1, checkAndMutate2)).get();
assertTrue(results.get(0).isSuccess());
assertEquals(2, Bytes.toLong(results.get(0).getResult()
.getValue(FAMILY, Bytes.toBytes("C"))));
assertEquals("dd", Bytes.toString(results.get(0).getResult()
.getValue(FAMILY, Bytes.toBytes("D"))));
assertFalse(results.get(1).isSuccess());
assertNull(results.get(1).getResult());
Result result = table.get(new Get(row)).get();
assertEquals("a", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("A"))));
assertNull(result.getValue(FAMILY, Bytes.toBytes("B")));
assertEquals(2, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("C"))));
assertEquals("dd", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("D"))));
result = table.get(new Get(row2)).get();
assertNull(result.getValue(FAMILY, Bytes.toBytes("E")));
assertEquals("f", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("F"))));
assertEquals(1, Bytes.toLong(result.getValue(FAMILY, Bytes.toBytes("G"))));
assertEquals("h", Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("H"))));
}
@Test
public void testDisabled() throws InterruptedException, ExecutionException {
ASYNC_CONN.getAdmin().disableTable(TABLE_NAME).get();
try {
getTable.get().get(new Get(row)).get();
fail("Should fail since table has been disabled");
} catch (ExecutionException e) {
Throwable cause = e.getCause();
assertThat(cause, instanceOf(TableNotEnabledException.class));
assertThat(cause.getMessage(), containsString(TABLE_NAME.getNameAsString()));
}
}
@Test
public void testInvalidPut() {
try {
getTable.get().put(new Put(Bytes.toBytes(0)));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}
try {
getTable.get()
.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
@Test
public void testInvalidPutInRowMutations() throws IOException {
final byte[] row = Bytes.toBytes(0);
try {
getTable.get().mutateRow(new RowMutations(row).add(new Put(row)));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}
try {
getTable.get()
.mutateRow(new RowMutations(row).add(new Put(row)
.addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE])));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
@Test
public void testInvalidPutInRowMutationsInCheckAndMutate() throws IOException {
final byte[] row = Bytes.toBytes(0);
try {
getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER)
.build(new RowMutations(row).add(new Put(row))));
fail("Should fail since the put does not contain any cells");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("No columns to insert"));
}
try {
getTable.get().checkAndMutate(CheckAndMutate.newBuilder(row)
.ifNotExists(FAMILY, QUALIFIER)
.build(new RowMutations(row).add(new Put(row)
.addColumn(FAMILY, QUALIFIER, new byte[MAX_KEY_VALUE_SIZE]))));
fail("Should fail since the put exceeds the max key value size");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), containsString("KeyValue size too large"));
}
}
}