blob: a8e93b9fd8265d64e75bedcaf876decae52fa4a8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
/**
* Class that test tags
*/
@Category({RegionServerTests.class, MediumTests.class})
public class TestTags {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestTags.class);
static boolean useFilter = false;
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Rule
public final TestName TEST_NAME = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt("hfile.format.version", 3);
conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
TestCoprocessorForTags.class.getName());
TEST_UTIL.startMiniCluster(2);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@After
public void tearDown() {
useFilter = false;
}
@Test
public void testTags() throws Exception {
Table table = null;
try {
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
byte[] fam = Bytes.toBytes("info");
byte[] row = Bytes.toBytes("rowa");
// column names
byte[] qual = Bytes.toBytes("qual");
byte[] row1 = Bytes.toBytes("rowb");
byte[] row2 = Bytes.toBytes("rowc");
TableDescriptor tableDescriptor =
TableDescriptorBuilder
.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam)
.setBlockCacheEnabled(true).setDataBlockEncoding(DataBlockEncoding.NONE).build())
.build();
Admin admin = TEST_UTIL.getAdmin();
admin.createTable(tableDescriptor);
byte[] value = Bytes.toBytes("value");
table = TEST_UTIL.getConnection().getTable(tableName);
Put put = new Put(row);
put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
put.setAttribute("visibility", Bytes.toBytes("myTag"));
table.put(put);
admin.flush(tableName);
// We are lacking an API for confirming flush request compaction.
// Just sleep for a short time. We won't be able to confirm flush
// completion but the test won't hang now or in the future if
// default compaction policy causes compaction between flush and
// when we go to confirm it.
Thread.sleep(1000);
Put put1 = new Put(row1);
byte[] value1 = Bytes.toBytes("1000dfsdf");
put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
// put1.setAttribute("visibility", Bytes.toBytes("myTag3"));
table.put(put1);
admin.flush(tableName);
Thread.sleep(1000);
Put put2 = new Put(row2);
byte[] value2 = Bytes.toBytes("1000dfsdf");
put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
put2.setAttribute("visibility", Bytes.toBytes("myTag3"));
table.put(put2);
admin.flush(tableName);
Thread.sleep(1000);
result(fam, row, qual, row2, table, value, value2, row1, value1);
admin.compact(tableName);
while (admin.getCompactionState(tableName) != CompactionState.NONE) {
Thread.sleep(10);
}
result(fam, row, qual, row2, table, value, value2, row1, value1);
} finally {
if (table != null) {
table.close();
}
}
}
@Test
public void testFlushAndCompactionWithoutTags() throws Exception {
Table table = null;
try {
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
byte[] fam = Bytes.toBytes("info");
byte[] row = Bytes.toBytes("rowa");
// column names
byte[] qual = Bytes.toBytes("qual");
byte[] row1 = Bytes.toBytes("rowb");
byte[] row2 = Bytes.toBytes("rowc");
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(fam).setBlockCacheEnabled(true).build())
.build();
Admin admin = TEST_UTIL.getAdmin();
admin.createTable(tableDescriptor);
table = TEST_UTIL.getConnection().getTable(tableName);
Put put = new Put(row);
byte[] value = Bytes.toBytes("value");
put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
table.put(put);
admin.flush(tableName);
// We are lacking an API for confirming flush request compaction.
// Just sleep for a short time. We won't be able to confirm flush
// completion but the test won't hang now or in the future if
// default compaction policy causes compaction between flush and
// when we go to confirm it.
Thread.sleep(1000);
Put put1 = new Put(row1);
byte[] value1 = Bytes.toBytes("1000dfsdf");
put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
table.put(put1);
admin.flush(tableName);
Thread.sleep(1000);
Put put2 = new Put(row2);
byte[] value2 = Bytes.toBytes("1000dfsdf");
put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
table.put(put2);
admin.flush(tableName);
Thread.sleep(1000);
Scan s = new Scan().withStartRow(row);
ResultScanner scanner = table.getScanner(s);
try {
Result[] next = scanner.next(3);
for (Result result : next) {
CellScanner cellScanner = result.cellScanner();
cellScanner.advance();
Cell current = cellScanner.current();
assertEquals(0, current.getTagsLength());
}
} finally {
if (scanner != null)
scanner.close();
}
admin.compact(tableName);
while (admin.getCompactionState(tableName) != CompactionState.NONE) {
Thread.sleep(10);
}
s = new Scan().withStartRow(row);
scanner = table.getScanner(s);
try {
Result[] next = scanner.next(3);
for (Result result : next) {
CellScanner cellScanner = result.cellScanner();
cellScanner.advance();
Cell current = cellScanner.current();
assertEquals(0, current.getTagsLength());
}
} finally {
if (scanner != null) {
scanner.close();
}
}
} finally {
if (table != null) {
table.close();
}
}
}
@Test
public void testFlushAndCompactionwithCombinations() throws Exception {
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
byte[] fam = Bytes.toBytes("info");
byte[] row = Bytes.toBytes("rowa");
// column names
byte[] qual = Bytes.toBytes("qual");
byte[] row1 = Bytes.toBytes("rowb");
byte[] row2 = Bytes.toBytes("rowc");
byte[] rowd = Bytes.toBytes("rowd");
byte[] rowe = Bytes.toBytes("rowe");
Table table = null;
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam).setBlockCacheEnabled(true)
.setDataBlockEncoding(encoding).build())
.build();
Admin admin = TEST_UTIL.getAdmin();
admin.createTable(tableDescriptor);
try {
table = TEST_UTIL.getConnection().getTable(tableName);
Put put = new Put(row);
byte[] value = Bytes.toBytes("value");
put.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value);
int bigTagLen = Short.MAX_VALUE - 5;
put.setAttribute("visibility", new byte[bigTagLen]);
table.put(put);
Put put1 = new Put(row1);
byte[] value1 = Bytes.toBytes("1000dfsdf");
put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
table.put(put1);
admin.flush(tableName);
// We are lacking an API for confirming flush request compaction.
// Just sleep for a short time. We won't be able to confirm flush
// completion but the test won't hang now or in the future if
// default compaction policy causes compaction between flush and
// when we go to confirm it.
Thread.sleep(1000);
put1 = new Put(row2);
value1 = Bytes.toBytes("1000dfsdf");
put1.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value1);
table.put(put1);
admin.flush(tableName);
Thread.sleep(1000);
Put put2 = new Put(rowd);
byte[] value2 = Bytes.toBytes("1000dfsdf");
put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
table.put(put2);
put2 = new Put(rowe);
value2 = Bytes.toBytes("1000dfsddfdf");
put2.addColumn(fam, qual, HConstants.LATEST_TIMESTAMP, value2);
put.setAttribute("visibility", Bytes.toBytes("ram"));
table.put(put2);
admin.flush(tableName);
Thread.sleep(1000);
TestCoprocessorForTags.checkTagPresence = true;
Scan s = new Scan().withStartRow(row);
s.setCaching(1);
ResultScanner scanner = table.getScanner(s);
try {
Result next = null;
while ((next = scanner.next()) != null) {
CellScanner cellScanner = next.cellScanner();
cellScanner.advance();
Cell current = cellScanner.current();
if (CellUtil.matchingRows(current, row)) {
assertEquals(1, TestCoprocessorForTags.tags.size());
Tag tag = TestCoprocessorForTags.tags.get(0);
assertEquals(bigTagLen, tag.getValueLength());
} else {
assertEquals(0, TestCoprocessorForTags.tags.size());
}
}
} finally {
if (scanner != null) {
scanner.close();
}
TestCoprocessorForTags.checkTagPresence = false;
}
while (admin.getCompactionState(tableName) != CompactionState.NONE) {
Thread.sleep(10);
}
TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(s);
try {
Result next = null;
while ((next = scanner.next()) != null) {
CellScanner cellScanner = next.cellScanner();
cellScanner.advance();
Cell current = cellScanner.current();
if (CellUtil.matchingRows(current, row)) {
assertEquals(1, TestCoprocessorForTags.tags.size());
Tag tag = TestCoprocessorForTags.tags.get(0);
assertEquals(bigTagLen, tag.getValueLength());
} else {
assertEquals(0, TestCoprocessorForTags.tags.size());
}
}
} finally {
if (scanner != null) {
scanner.close();
}
TestCoprocessorForTags.checkTagPresence = false;
}
} finally {
if (table != null) {
table.close();
}
// delete the table
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
}
}
@Test
public void testTagsWithAppendAndIncrement() throws Exception {
TableName tableName = TableName.valueOf(TEST_NAME.getMethodName());
byte[] f = Bytes.toBytes("f");
byte[] q = Bytes.toBytes("q");
byte[] row1 = Bytes.toBytes("r1");
byte[] row2 = Bytes.toBytes("r2");
TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(f)).build();
TEST_UTIL.getAdmin().createTable(tableDescriptor);
Table table = null;
try {
table = TEST_UTIL.getConnection().getTable(tableName);
Put put = new Put(row1);
byte[] v = Bytes.toBytes(2L);
put.addColumn(f, q, v);
put.setAttribute("visibility", Bytes.toBytes("tag1"));
table.put(put);
Increment increment = new Increment(row1);
increment.addColumn(f, q, 1L);
table.increment(increment);
TestCoprocessorForTags.checkTagPresence = true;
ResultScanner scanner = table.getScanner(new Scan());
Result result = scanner.next();
KeyValue kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
List<Tag> tags = TestCoprocessorForTags.tags;
assertEquals(3L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(1, tags.size());
assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0))));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
increment = new Increment(row1);
increment.add(new KeyValue(row1, f, q, 1234L, v));
increment.setAttribute("visibility", Bytes.toBytes("tag2"));
table.increment(increment);
TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(new Scan());
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = TestCoprocessorForTags.tags;
assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(2, tags.size());
// We cannot assume the ordering of tags
List<String> tagValues = new ArrayList<>();
for (Tag tag: tags) {
tagValues.add(Bytes.toString(Tag.cloneValue(tag)));
}
assertTrue(tagValues.contains("tag1"));
assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
put = new Put(row2);
v = Bytes.toBytes(2L);
put.addColumn(f, q, v);
table.put(put);
increment = new Increment(row2);
increment.add(new KeyValue(row2, f, q, 1234L, v));
increment.setAttribute("visibility", Bytes.toBytes("tag2"));
table.increment(increment);
TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(new Scan().withStartRow(row2));
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = TestCoprocessorForTags.tags;
assertEquals(4L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(1, tags.size());
assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0))));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
// Test Append
byte[] row3 = Bytes.toBytes("r3");
put = new Put(row3);
put.addColumn(f, q, Bytes.toBytes("a"));
put.setAttribute("visibility", Bytes.toBytes("tag1"));
table.put(put);
Append append = new Append(row3);
append.addColumn(f, q, Bytes.toBytes("b"));
table.append(append);
TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(new Scan().withStartRow(row3));
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = TestCoprocessorForTags.tags;
assertEquals(1, tags.size());
assertEquals("tag1", Bytes.toString(Tag.cloneValue(tags.get(0))));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
append = new Append(row3);
append.add(new KeyValue(row3, f, q, 1234L, v));
append.setAttribute("visibility", Bytes.toBytes("tag2"));
table.append(append);
TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(new Scan().withStartRow(row3));
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = TestCoprocessorForTags.tags;
assertEquals(2, tags.size());
// We cannot assume the ordering of tags
tagValues.clear();
for (Tag tag: tags) {
tagValues.add(Bytes.toString(Tag.cloneValue(tag)));
}
assertTrue(tagValues.contains("tag1"));
assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
byte[] row4 = Bytes.toBytes("r4");
put = new Put(row4);
put.addColumn(f, q, Bytes.toBytes("a"));
table.put(put);
append = new Append(row4);
append.add(new KeyValue(row4, f, q, 1234L, v));
append.setAttribute("visibility", Bytes.toBytes("tag2"));
table.append(append);
TestCoprocessorForTags.checkTagPresence = true;
scanner = table.getScanner(new Scan().withStartRow(row4));
result = scanner.next();
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = TestCoprocessorForTags.tags;
assertEquals(1, tags.size());
assertEquals("tag2", Bytes.toString(Tag.cloneValue(tags.get(0))));
} finally {
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
if (table != null) {
table.close();
}
}
}
private void result(byte[] fam, byte[] row, byte[] qual, byte[] row2, Table table, byte[] value,
byte[] value2, byte[] row1, byte[] value1) throws IOException {
Scan s = new Scan().withStartRow(row);
// If filters are used this attribute can be specifically check for in
// filterKV method and
// kvs can be filtered out if the tags of interest is not found in that kv
s.setAttribute("visibility", Bytes.toBytes("myTag"));
ResultScanner scanner = null;
try {
scanner = table.getScanner(s);
Result next = scanner.next();
assertTrue(Bytes.equals(next.getRow(), row));
assertTrue(Bytes.equals(next.getValue(fam, qual), value));
Result next2 = scanner.next();
assertTrue(next2 != null);
assertTrue(Bytes.equals(next2.getRow(), row1));
assertTrue(Bytes.equals(next2.getValue(fam, qual), value1));
next2 = scanner.next();
assertTrue(next2 != null);
assertTrue(Bytes.equals(next2.getRow(), row2));
assertTrue(Bytes.equals(next2.getValue(fam, qual), value2));
} finally {
if (scanner != null)
scanner.close();
}
}
public static class TestCoprocessorForTags implements RegionCoprocessor, RegionObserver {
public static volatile boolean checkTagPresence = false;
public static List<Tag> tags = null;
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
final WALEdit edit, final Durability durability) throws IOException {
updateMutationAddingTags(put);
}
private void updateMutationAddingTags(final Mutation m) {
byte[] attribute = m.getAttribute("visibility");
byte[] cf = null;
List<Cell> updatedCells = new ArrayList<>();
if (attribute != null) {
for (List<? extends Cell> edits : m.getFamilyCellMap().values()) {
for (Cell cell : edits) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
if (cf == null) {
cf = CellUtil.cloneFamily(kv);
}
Tag tag = new ArrayBackedTag((byte) 1, attribute);
List<Tag> tagList = new ArrayList<>();
tagList.add(tag);
KeyValue newKV = new KeyValue(CellUtil.cloneRow(kv), 0, kv.getRowLength(),
CellUtil.cloneFamily(kv), 0, kv.getFamilyLength(), CellUtil.cloneQualifier(kv), 0,
kv.getQualifierLength(), kv.getTimestamp(),
KeyValue.Type.codeToType(kv.getTypeByte()), CellUtil.cloneValue(kv), 0,
kv.getValueLength(), tagList);
((List<Cell>) updatedCells).add(newKV);
}
}
m.getFamilyCellMap().remove(cf);
// Update the family map
m.getFamilyCellMap().put(cf, updatedCells);
}
}
@Override
public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> e, Increment increment)
throws IOException {
updateMutationAddingTags(increment);
return null;
}
@Override
public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> e, Append append)
throws IOException {
updateMutationAddingTags(append);
return null;
}
@Override
public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> e,
InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
if (checkTagPresence) {
if (results.size() > 0) {
// Check tag presence in the 1st cell in 1st Result
Result result = results.get(0);
CellScanner cellScanner = result.cellScanner();
if (cellScanner.advance()) {
Cell cell = cellScanner.current();
tags = PrivateCellUtil.getTags(cell);
}
}
}
return hasMore;
}
}
}