blob: 7b0f3479200d6acae8426498277d6ee13b7b73ef [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.metron.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.hbase.ColumnList;
import org.apache.metron.hbase.HBaseProjectionCriteria;
import org.apache.metron.hbase.TableProvider;
import org.junit.jupiter.api.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
/**
* Tests the HBaseClient
*/
public class HBaseClientTest {
private static final String tableName = "table";
private static HBaseTestingUtility util;
private static HBaseClient client;
private static Table table;
private static Admin admin;
private static byte[] cf = Bytes.toBytes("cf");
private static byte[] column = Bytes.toBytes("column");
byte[] rowKey1;
byte[] rowKey2;
byte[] value1 = Bytes.toBytes("value1");
byte[] value2 = Bytes.toBytes("value2");
ColumnList cols1;
ColumnList cols2;
@BeforeAll
public static void startHBase() throws Exception {
Configuration config = HBaseConfiguration.create();
config.set("hbase.master.hostname", "localhost");
config.set("hbase.regionserver.hostname", "localhost");
util = new HBaseTestingUtility(config);
util.startMiniCluster();
admin = util.getHBaseAdmin();
// create the table
table = util.createTable(Bytes.toBytes(tableName), cf);
util.waitTableEnabled(table.getName());
// setup the client
client = new HBaseClient((c,t) -> table, table.getConfiguration(), tableName);
}
@AfterAll
public static void stopHBase() throws Exception {
util.deleteTable(tableName);
util.shutdownMiniCluster();
util.cleanupTestDir();
}
@AfterEach
public void clearTable() throws Exception {
List<Delete> deletions = new ArrayList<>();
for(Result r : table.getScanner(new Scan())) {
deletions.add(new Delete(r.getRow()));
}
table.delete(deletions);
}
@BeforeEach
public void setupTuples() {
rowKey1 = Bytes.toBytes("rowKey1");
cols1 = new ColumnList();
cols1.addColumn(cf, column, value1);
rowKey2 = Bytes.toBytes("rowKey2");
cols2 = new ColumnList();
cols2.addColumn(cf, column, value2);
}
/**
* Should be able to read/write a single row.
*/
@Test
public void testWrite() throws Exception {
// add a tuple to the batch
client.addMutation(rowKey1, cols1, Durability.SYNC_WAL);
client.mutate();
HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
criteria.addColumnFamily(Bytes.toString(cf));
// read back the tuple
client.addGet(rowKey1, criteria);
Result[] results = client.getAll();
assertEquals(1, results.length);
// validate
assertEquals(1, results.length);
assertArrayEquals(rowKey1, results[0].getRow());
assertArrayEquals(value1, results[0].getValue(cf, column));
}
/**
* Should be able to read/write multiple rows in a batch.
*/
@Test
public void testBatchWrite() {
// add two mutations to the queue
client.addMutation(rowKey1, cols1, Durability.SYNC_WAL);
client.addMutation(rowKey2, cols2, Durability.SYNC_WAL);
int count = client.mutate();
// there were two mutations
assertEquals(2, count);
HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
criteria.addColumnFamily(Bytes.toString(cf));
// read back both tuples
client.addGet(rowKey1, criteria);
client.addGet(rowKey2, criteria);
Result[] results = client.getAll();
// validate
assertEquals(2, results.length);
assertArrayEquals(rowKey1, results[0].getRow());
assertArrayEquals(value1, results[0].getValue(cf, column));
assertArrayEquals(rowKey1, results[0].getRow());
assertArrayEquals(value2, results[1].getValue(cf, column));
}
/**
* What happens when there is nothing in the batch to write?
*/
@Test
public void testEmptyBatch() {
// do not add any mutations before attempting to write
int count = client.mutate();
assertEquals(0, count);
HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
criteria.addColumnFamily(Bytes.toString(cf));
// read back both
client.addGet(rowKey1, criteria);
client.addGet(rowKey2, criteria);
Result[] results = client.getAll();
// validate - there should be nothing to find
assertEquals(2, results.length);
for(Result result : results) {
assertTrue(result.isEmpty());
}
}
/**
* Should be able to read back rows that were written with a TTL 30 days out.
*/
@Test
public void testWriteWithTimeToLive() {
long timeToLive = TimeUnit.DAYS.toMillis(30);
// add two mutations to the queue
client.addMutation(rowKey1, cols1, Durability.SYNC_WAL, timeToLive);
client.addMutation(rowKey2, cols2, Durability.SYNC_WAL, timeToLive);
client.mutate();
HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
criteria.addColumnFamily(Bytes.toString(cf));
// read back both tuples
client.addGet(rowKey1, criteria);
client.addGet(rowKey2, criteria);
Result[] results = client.getAll();
// validate
assertEquals(2, results.length);
assertArrayEquals(rowKey1, results[0].getRow());
assertArrayEquals(value1, results[0].getValue(cf, column));
assertArrayEquals(rowKey1, results[0].getRow());
assertArrayEquals(value2, results[1].getValue(cf, column));
}
/**
* Should NOT be able to read rows that are expired due to the TTL.
*/
@Test
public void testExpiredRows() throws Exception {
long timeToLive = TimeUnit.MILLISECONDS.toMillis(1);
// add two mutations to the queue
client.addMutation(rowKey1, cols1, Durability.SYNC_WAL, timeToLive);
client.addMutation(rowKey2, cols2, Durability.SYNC_WAL, timeToLive);
client.mutate();
HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
criteria.addColumnFamily(Bytes.toString(cf));
// wait for a second to ensure the TTL has expired
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
// read back both rows
client.addGet(rowKey1, criteria);
client.addGet(rowKey2, criteria);
Result[] results = client.getAll();
// validate - the TTL should have expired all rows
assertEquals(2, results.length);
assertTrue(results[0].isEmpty());
assertTrue(results[1].isEmpty());
}
@Test
public void testUnableToOpenConnection() throws IOException {
// used to trigger a failure condition
TableProvider tableProvider = mock(TableProvider.class);
when(tableProvider.getTable(any(), any())).thenThrow(new IllegalArgumentException("test exception"));
assertThrows(
RuntimeException.class,
() -> client = new HBaseClient(tableProvider, HBaseConfiguration.create(), tableName));
}
@Test
public void testFailureToMutate() throws IOException, InterruptedException {
// used to trigger a failure condition in `HbaseClient.mutate`
Table table = mock(Table.class);
doThrow(new IOException("exception!")).when(table).batch(any(), any());
TableProvider tableProvider = mock(TableProvider.class);
when(tableProvider.getTable(any(), any())).thenReturn(table);
client = new HBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
client.addMutation(rowKey1, cols1, Durability.SYNC_WAL);
assertThrows(RuntimeException.class, () -> client.mutate());
}
@Test
public void testFailureToGetAll() throws IOException {
// used to trigger a failure condition in `HbaseClient.getAll`
Table table = mock(Table.class);
when(table.get(anyList())).thenThrow(new IOException("exception!"));
TableProvider tableProvider = mock(TableProvider.class);
when(tableProvider.getTable(any(), any())).thenReturn(table);
HBaseProjectionCriteria criteria = new HBaseProjectionCriteria();
criteria.addColumnFamily(Bytes.toString(cf));
client = new HBaseClient(tableProvider, HBaseConfiguration.create(), tableName);
client.addGet(rowKey1, criteria);
client.addGet(rowKey2, criteria);
assertThrows(RuntimeException.class, () -> client.getAll());
}
}