blob: 297aa50f7153ceac91dd6632b8fb7794b4d3bad4 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@Category(MediumTests.class)
public class TestRegionServerReadRequestMetrics {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final TableName TABLE_NAME = TableName.valueOf("test");
private static final byte[] CF1 = "c1".getBytes();
private static final byte[] CF2 = "c2".getBytes();
private static final byte[] ROW1 = "a".getBytes();
private static final byte[] ROW2 = "b".getBytes();
private static final byte[] ROW3 = "c".getBytes();
private static final byte[] COL1 = "q1".getBytes();
private static final byte[] COL2 = "q2".getBytes();
private static final byte[] COL3 = "q3".getBytes();
private static final byte[] VAL1 = "v1".getBytes();
private static final byte[] VAL2 = "v2".getBytes();
private static final byte[] VAL3 = Bytes.toBytes(0L);
private static final int MAX_TRY = 20;
private static final int SLEEP_MS = 100;
private static final int TTL = 1;
private static Admin admin;
private static Collection<ServerName> serverNames;
private static Table table;
private static List<HRegionInfo> tableRegions;
private static Map<Metric, Long> requestsMap = new HashMap<>();
private static Map<Metric, Long> requestsMapPrev = new HashMap<>();
@BeforeClass
public static void setUpOnce() throws Exception {
// Default starts one regionserver only.
TEST_UTIL.startMiniCluster();
admin = TEST_UTIL.getAdmin();
serverNames = admin.getClusterStatus().getServers();
table = createTable();
putData();
tableRegions = admin.getTableRegions(TABLE_NAME);
for (Metric metric : Metric.values()) {
requestsMap.put(metric, 0L);
requestsMapPrev.put(metric, 0L);
}
}
private static Table createTable() throws IOException {
HTableDescriptor td = new HTableDescriptor(TABLE_NAME);
HColumnDescriptor cd1 = new HColumnDescriptor(CF1);
td.addFamily(cd1);
HColumnDescriptor cd2 = new HColumnDescriptor(CF2);
cd2.setTimeToLive(TTL);
td.addFamily(cd2);
admin.createTable(td);
return TEST_UTIL.getConnection().getTable(TABLE_NAME);
}
private static void testReadRequests(long resultCount,
long expectedReadRequests, long expectedFilteredReadRequests)
throws IOException, InterruptedException {
updateMetricsMap();
System.out.println("requestsMapPrev = " + requestsMapPrev);
System.out.println("requestsMap = " + requestsMap);
assertEquals(expectedReadRequests,
requestsMap.get(Metric.REGION_READ) - requestsMapPrev.get(Metric.REGION_READ));
boolean tablesOnMaster = LoadBalancer.isTablesOnMaster(TEST_UTIL.getConfiguration());
if (tablesOnMaster) {
// If NO tables on master, then the single regionserver in this test carries user-space
// tables and the meta table. The first time through, the read will be inflated by meta
// lookups. We don't know which test will be first through since junit randomizes. This
// method is used by a bunch of tests. Just do this check if master is hosting (system)
// regions only.
assertEquals(expectedReadRequests,
requestsMap.get(Metric.SERVER_READ) - requestsMapPrev.get(Metric.SERVER_READ));
}
assertEquals(expectedFilteredReadRequests,
requestsMap.get(Metric.FILTERED_REGION_READ)
- requestsMapPrev.get(Metric.FILTERED_REGION_READ));
assertEquals(expectedFilteredReadRequests,
requestsMap.get(Metric.FILTERED_SERVER_READ)
- requestsMapPrev.get(Metric.FILTERED_SERVER_READ));
assertEquals(expectedReadRequests, resultCount);
}
private static void updateMetricsMap() throws IOException, InterruptedException {
for (Metric metric : Metric.values()) {
requestsMapPrev.put(metric, requestsMap.get(metric));
}
ServerLoad serverLoad = null;
RegionLoad regionLoadOuter = null;
boolean metricsUpdated = false;
for (int i = 0; i < MAX_TRY; i++) {
for (ServerName serverName : serverNames) {
serverLoad = admin.getClusterStatus().getLoad(serverName);
Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad();
for (HRegionInfo tableRegion : tableRegions) {
RegionLoad regionLoad = regionsLoad.get(tableRegion.getRegionName());
if (regionLoad != null) {
regionLoadOuter = regionLoad;
for (Metric metric : Metric.values()) {
if (getReadRequest(serverLoad, regionLoad, metric) > requestsMapPrev.get(metric)) {
for (Metric metricInner : Metric.values()) {
requestsMap.put(metricInner, getReadRequest(serverLoad, regionLoad, metricInner));
}
metricsUpdated = true;
break;
}
}
}
}
}
if (metricsUpdated) {
break;
}
Thread.sleep(SLEEP_MS);
}
if (!metricsUpdated) {
for (Metric metric : Metric.values()) {
requestsMap.put(metric, getReadRequest(serverLoad, regionLoadOuter, metric));
}
}
}
private static long getReadRequest(ServerLoad serverLoad, RegionLoad regionLoad, Metric metric) {
switch (metric) {
case REGION_READ:
return regionLoad.getReadRequestsCount();
case SERVER_READ:
return serverLoad.getReadRequestsCount();
case FILTERED_REGION_READ:
return regionLoad.getFilteredReadRequestsCount();
case FILTERED_SERVER_READ:
return serverLoad.getFilteredReadRequestsCount();
default:
throw new IllegalStateException();
}
}
private static void putData() throws IOException {
Put put;
put = new Put(ROW1);
put.addColumn(CF1, COL1, VAL1);
put.addColumn(CF1, COL2, VAL2);
put.addColumn(CF1, COL3, VAL3);
table.put(put);
put = new Put(ROW2);
put.addColumn(CF1, COL1, VAL2); // put val2 instead of val1
put.addColumn(CF1, COL2, VAL2);
table.put(put);
put = new Put(ROW3);
put.addColumn(CF1, COL1, VAL1);
put.addColumn(CF1, COL2, VAL2);
table.put(put);
}
private static void putTTLExpiredData() throws IOException, InterruptedException {
Put put;
put = new Put(ROW1);
put.addColumn(CF2, COL1, VAL1);
put.addColumn(CF2, COL2, VAL2);
table.put(put);
Thread.sleep(TTL * 1000);
put = new Put(ROW2);
put.addColumn(CF2, COL1, VAL1);
put.addColumn(CF2, COL2, VAL2);
table.put(put);
put = new Put(ROW3);
put.addColumn(CF2, COL1, VAL1);
put.addColumn(CF2, COL2, VAL2);
table.put(put);
}
@AfterClass
public static void tearDownOnce() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testReadRequestsCountNotFiltered() throws Exception {
int resultCount;
Scan scan;
Append append;
Put put;
Increment increment;
Get get;
// test for scan
scan = new Scan();
try (ResultScanner scanner = table.getScanner(scan)) {
resultCount = 0;
for (Result ignore : scanner) {
resultCount++;
}
testReadRequests(resultCount, 3, 0);
}
// test for scan
scan = new Scan(ROW2, ROW3);
try (ResultScanner scanner = table.getScanner(scan)) {
resultCount = 0;
for (Result ignore : scanner) {
resultCount++;
}
testReadRequests(resultCount, 1, 0);
}
// test for get
get = new Get(ROW2);
Result result = table.get(get);
resultCount = result.isEmpty() ? 0 : 1;
testReadRequests(resultCount, 1, 0);
// test for increment
increment = new Increment(ROW1);
increment.addColumn(CF1, COL3, 1);
result = table.increment(increment);
resultCount = result.isEmpty() ? 0 : 1;
testReadRequests(resultCount, 1, 0);
// test for checkAndPut
put = new Put(ROW1);
put.addColumn(CF1, COL2, VAL2);
boolean checkAndPut =
table.checkAndPut(ROW1, CF1, COL2, CompareFilter.CompareOp.EQUAL, VAL2, put);
resultCount = checkAndPut ? 1 : 0;
testReadRequests(resultCount, 1, 0);
// test for append
append = new Append(ROW1);
append.addColumn(CF1, COL2, VAL2);
result = table.append(append);
resultCount = result.isEmpty() ? 0 : 1;
testReadRequests(resultCount, 1, 0);
// test for checkAndMutate
put = new Put(ROW1);
put.addColumn(CF1, COL1, VAL1);
RowMutations rm = new RowMutations(ROW1);
rm.add(put);
boolean checkAndMutate =
table.checkAndMutate(ROW1, CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1, rm);
resultCount = checkAndMutate ? 1 : 0;
testReadRequests(resultCount, 1, 0);
}
@Test
public void testReadRequestsCountWithFilter() throws Exception {
int resultCount;
Scan scan;
// test for scan
scan = new Scan();
scan.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1));
try (ResultScanner scanner = table.getScanner(scan)) {
resultCount = 0;
for (Result ignore : scanner) {
resultCount++;
}
testReadRequests(resultCount, 2, 1);
}
// test for scan
scan = new Scan();
scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(ROW1)));
try (ResultScanner scanner = table.getScanner(scan)) {
resultCount = 0;
for (Result ignore : scanner) {
resultCount++;
}
testReadRequests(resultCount, 1, 2);
}
// test for scan
scan = new Scan(ROW2, ROW3);
scan.setFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(ROW1)));
try (ResultScanner scanner = table.getScanner(scan)) {
resultCount = 0;
for (Result ignore : scanner) {
resultCount++;
}
testReadRequests(resultCount, 0, 1);
}
// fixme filtered get should not increase readRequestsCount
// Get get = new Get(ROW2);
// get.setFilter(new SingleColumnValueFilter(CF1, COL1, CompareFilter.CompareOp.EQUAL, VAL1));
// Result result = table.get(get);
// resultCount = result.isEmpty() ? 0 : 1;
// testReadRequests(resultCount, 0, 1);
}
@Test
public void testReadRequestsCountWithDeletedRow() throws Exception {
try {
Delete delete = new Delete(ROW3);
table.delete(delete);
Scan scan = new Scan();
try (ResultScanner scanner = table.getScanner(scan)) {
int resultCount = 0;
for (Result ignore : scanner) {
resultCount++;
}
testReadRequests(resultCount, 2, 1);
}
} finally {
Put put = new Put(ROW3);
put.addColumn(CF1, COL1, VAL1);
put.addColumn(CF1, COL2, VAL2);
table.put(put);
}
}
@Test
public void testReadRequestsCountWithTTLExpiration() throws Exception {
putTTLExpiredData();
Scan scan = new Scan();
scan.addFamily(CF2);
try (ResultScanner scanner = table.getScanner(scan)) {
int resultCount = 0;
for (Result ignore : scanner) {
resultCount++;
}
testReadRequests(resultCount, 2, 1);
}
}
private enum Metric {REGION_READ, SERVER_READ, FILTERED_REGION_READ, FILTERED_SERVER_READ}
}