blob: b578ffe7459055da5da3fba8bd89c922c0175ee7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.filter.FilterAllFilter;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MetricsUserAggregateFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestClientClusterMetrics {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestClientClusterMetrics.class);
private static HBaseTestingUtility UTIL;
private static Admin ADMIN;
private final static int SLAVES = 5;
private final static int MASTERS = 3;
private static MiniHBaseCluster CLUSTER;
private static HRegionServer DEAD;
private static final TableName TABLE_NAME = TableName.valueOf("test");
private static final byte[] CF = Bytes.toBytes("cf");
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, MyObserver.class.getName());
UTIL = new HBaseTestingUtility(conf);
StartMiniClusterOption option = StartMiniClusterOption.builder()
.numMasters(MASTERS).numRegionServers(SLAVES).numDataNodes(SLAVES).build();
UTIL.startMiniCluster(option);
CLUSTER = UTIL.getHBaseCluster();
CLUSTER.waitForActiveAndReadyMaster();
ADMIN = UTIL.getAdmin();
// Kill one region server
List<RegionServerThread> rsts = CLUSTER.getLiveRegionServerThreads();
RegionServerThread rst = rsts.get(rsts.size() - 1);
DEAD = rst.getRegionServer();
DEAD.stop("Test dead servers metrics");
while (rst.isAlive()) {
Thread.sleep(500);
}
}
@Test
public void testDefaults() throws Exception {
ClusterMetrics origin = ADMIN.getClusterMetrics();
ClusterMetrics defaults = ADMIN.getClusterMetrics(EnumSet.allOf(Option.class));
Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion());
Assert.assertEquals(origin.getClusterId(), defaults.getClusterId());
Assert.assertEquals(origin.getAverageLoad(), defaults.getAverageLoad(), 0);
Assert.assertEquals(origin.getBackupMasterNames().size(),
defaults.getBackupMasterNames().size());
Assert.assertEquals(origin.getDeadServerNames().size(), defaults.getDeadServerNames().size());
Assert.assertEquals(origin.getRegionCount(), defaults.getRegionCount());
Assert.assertEquals(origin.getLiveServerMetrics().size(),
defaults.getLiveServerMetrics().size());
Assert.assertEquals(origin.getMasterInfoPort(), defaults.getMasterInfoPort());
Assert.assertEquals(origin.getServersName().size(), defaults.getServersName().size());
Assert.assertEquals(ADMIN.getRegionServers().size(), defaults.getServersName().size());
}
@Test
public void testAsyncClient() throws Exception {
try (AsyncConnection asyncConnect = ConnectionFactory.createAsyncConnection(
UTIL.getConfiguration()).get()) {
AsyncAdmin asyncAdmin = asyncConnect.getAdmin();
CompletableFuture<ClusterMetrics> originFuture =
asyncAdmin.getClusterMetrics();
CompletableFuture<ClusterMetrics> defaultsFuture =
asyncAdmin.getClusterMetrics(EnumSet.allOf(Option.class));
ClusterMetrics origin = originFuture.get();
ClusterMetrics defaults = defaultsFuture.get();
Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion());
Assert.assertEquals(origin.getClusterId(), defaults.getClusterId());
Assert.assertEquals(origin.getHBaseVersion(), defaults.getHBaseVersion());
Assert.assertEquals(origin.getClusterId(), defaults.getClusterId());
Assert.assertEquals(origin.getAverageLoad(), defaults.getAverageLoad(), 0);
Assert.assertEquals(origin.getBackupMasterNames().size(),
defaults.getBackupMasterNames().size());
Assert.assertEquals(origin.getDeadServerNames().size(), defaults.getDeadServerNames().size());
Assert.assertEquals(origin.getRegionCount(), defaults.getRegionCount());
Assert.assertEquals(origin.getLiveServerMetrics().size(),
defaults.getLiveServerMetrics().size());
Assert.assertEquals(origin.getMasterInfoPort(), defaults.getMasterInfoPort());
Assert.assertEquals(origin.getServersName().size(), defaults.getServersName().size());
origin.getTableRegionStatesCount().forEach(((tableName, regionStatesCount) -> {
RegionStatesCount defaultRegionStatesCount = defaults.getTableRegionStatesCount()
.get(tableName);
Assert.assertEquals(defaultRegionStatesCount, regionStatesCount);
}));
}
}
@Test
public void testLiveAndDeadServersStatus() throws Exception {
// Count the number of live regionservers
List<RegionServerThread> regionserverThreads = CLUSTER.getLiveRegionServerThreads();
int numRs = 0;
int len = regionserverThreads.size();
for (int i = 0; i < len; i++) {
if (regionserverThreads.get(i).isAlive()) {
numRs++;
}
}
// Depending on the (random) order of unit execution we may run this unit before the
// minicluster is fully up and recovered from the RS shutdown done during test init.
Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ClusterMetrics metrics = ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
Assert.assertNotNull(metrics);
return metrics.getRegionCount() > 0;
}
});
// Retrieve live servers and dead servers info.
EnumSet<Option> options =
EnumSet.of(Option.LIVE_SERVERS, Option.DEAD_SERVERS, Option.SERVERS_NAME);
ClusterMetrics metrics = ADMIN.getClusterMetrics(options);
Assert.assertNotNull(metrics);
// exclude a dead region server
Assert.assertEquals(SLAVES -1, numRs);
// live servers = nums of regionservers
// By default, HMaster don't carry any regions so it won't report its load.
// Hence, it won't be in the server list.
Assert.assertEquals(numRs, metrics.getLiveServerMetrics().size());
Assert.assertTrue(metrics.getRegionCount() > 0);
Assert.assertNotNull(metrics.getDeadServerNames());
Assert.assertEquals(1, metrics.getDeadServerNames().size());
ServerName deadServerName = metrics.getDeadServerNames().iterator().next();
Assert.assertEquals(DEAD.getServerName(), deadServerName);
Assert.assertNotNull(metrics.getServersName());
Assert.assertEquals(numRs, metrics.getServersName().size());
}
@Test
public void testRegionStatesCount() throws Exception {
Table table = UTIL.createTable(TABLE_NAME, CF);
table.put(new Put(Bytes.toBytes("k1"))
.addColumn(CF, Bytes.toBytes("q1"), Bytes.toBytes("v1")));
table.put(new Put(Bytes.toBytes("k2"))
.addColumn(CF, Bytes.toBytes("q2"), Bytes.toBytes("v2")));
table.put(new Put(Bytes.toBytes("k3"))
.addColumn(CF, Bytes.toBytes("q3"), Bytes.toBytes("v3")));
ClusterMetrics metrics = ADMIN.getClusterMetrics();
Assert.assertEquals(metrics.getTableRegionStatesCount().size(), 2);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
.getRegionsInTransition(), 0);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
.getOpenRegions(), 1);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
.getTotalRegions(), 1);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
.getClosedRegions(), 0);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
.getSplitRegions(), 0);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
.getRegionsInTransition(), 0);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
.getOpenRegions(), 1);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
.getTotalRegions(), 1);
UTIL.deleteTable(TABLE_NAME);
}
@Test
public void testRegionStatesWithSplit() throws Exception {
int startRowNum = 20;
int rowCount = 80;
Table table = UTIL.createTable(TABLE_NAME, CF);
table.put(new Put(Bytes.toBytes("k1"))
.addColumn(CF, Bytes.toBytes("q1"), Bytes.toBytes("v1")));
table.put(new Put(Bytes.toBytes("k2"))
.addColumn(CF, Bytes.toBytes("q2"), Bytes.toBytes("v2")));
insertData(TABLE_NAME, startRowNum, rowCount);
ClusterMetrics metrics = ADMIN.getClusterMetrics();
Assert.assertEquals(metrics.getTableRegionStatesCount().size(), 2);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
.getRegionsInTransition(), 0);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
.getOpenRegions(), 1);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
.getTotalRegions(), 1);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
.getRegionsInTransition(), 0);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
.getOpenRegions(), 1);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
.getTotalRegions(), 1);
int splitRowNum = startRowNum + rowCount / 2;
byte[] splitKey = Bytes.toBytes("" + splitRowNum);
// Split region of the table
ADMIN.split(TABLE_NAME, splitKey);
metrics = ADMIN.getClusterMetrics();
Assert.assertEquals(metrics.getTableRegionStatesCount().size(), 2);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
.getRegionsInTransition(), 0);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
.getOpenRegions(), 1);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TableName.META_TABLE_NAME)
.getTotalRegions(), 1);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
.getRegionsInTransition(), 0);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
.getOpenRegions(), 2);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
.getTotalRegions(), 3);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
.getSplitRegions(), 1);
Assert.assertEquals(metrics.getTableRegionStatesCount().get(TABLE_NAME)
.getClosedRegions(), 0);
UTIL.deleteTable(TABLE_NAME);
}
@Test public void testMasterAndBackupMastersStatus() throws Exception {
// get all the master threads
List<MasterThread> masterThreads = CLUSTER.getMasterThreads();
int numActive = 0;
int activeIndex = 0;
ServerName activeName = null;
HMaster active = null;
for (int i = 0; i < masterThreads.size(); i++) {
if (masterThreads.get(i).getMaster().isActiveMaster()) {
numActive++;
activeIndex = i;
active = masterThreads.get(activeIndex).getMaster();
activeName = active.getServerName();
}
}
Assert.assertNotNull(active);
Assert.assertEquals(1, numActive);
Assert.assertEquals(MASTERS, masterThreads.size());
// Retrieve master and backup masters infos only.
EnumSet<Option> options = EnumSet.of(Option.MASTER, Option.BACKUP_MASTERS);
ClusterMetrics metrics = ADMIN.getClusterMetrics(options);
Assert.assertTrue(metrics.getMasterName().equals(activeName));
Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size());
}
@Test public void testUserMetrics() throws Exception {
Configuration conf = UTIL.getConfiguration();
// If metrics for users is not enabled, this test doesn't make sense.
if (!conf.getBoolean(MetricsUserAggregateFactory.METRIC_USER_ENABLED_CONF,
MetricsUserAggregateFactory.DEFAULT_METRIC_USER_ENABLED_CONF)) {
return;
}
User userFoo = User.createUserForTesting(conf, "FOO_USER_METRIC_TEST", new String[0]);
User userBar = User.createUserForTesting(conf, "BAR_USER_METRIC_TEST", new String[0]);
User userTest = User.createUserForTesting(conf, "TEST_USER_METRIC_TEST", new String[0]);
UTIL.createTable(TABLE_NAME, CF);
waitForUsersMetrics(0);
long writeMetaMetricBeforeNextuser = getMetaMetrics().getWriteRequestCount();
userFoo.runAs(new PrivilegedAction<Void>() {
@Override public Void run() {
try {
doPut();
} catch (IOException e) {
Assert.fail("Exception:" + e.getMessage());
}
return null;
}
});
waitForUsersMetrics(1);
long writeMetaMetricForUserFoo =
getMetaMetrics().getWriteRequestCount() - writeMetaMetricBeforeNextuser;
long readMetaMetricBeforeNextuser = getMetaMetrics().getReadRequestCount();
userBar.runAs(new PrivilegedAction<Void>() {
@Override public Void run() {
try {
doGet();
} catch (IOException e) {
Assert.fail("Exception:" + e.getMessage());
}
return null;
}
});
waitForUsersMetrics(2);
long readMetaMetricForUserBar =
getMetaMetrics().getReadRequestCount() - readMetaMetricBeforeNextuser;
long filteredMetaReqeust = getMetaMetrics().getFilteredReadRequestCount();
userTest.runAs(new PrivilegedAction<Void>() {
@Override public Void run() {
try {
Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
for (Result result : table.getScanner(new Scan().setFilter(new FilterAllFilter()))) {
Assert.fail("Should have filtered all rows");
}
} catch (IOException e) {
Assert.fail("Exception:" + e.getMessage());
}
return null;
}
});
waitForUsersMetrics(3);
long filteredMetaReqeustForTestUser =
getMetaMetrics().getFilteredReadRequestCount() - filteredMetaReqeust;
Map<byte[], UserMetrics> userMap =
ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
.iterator().next().getUserMetrics();
for (byte[] user : userMap.keySet()) {
switch (Bytes.toString(user)) {
case "FOO_USER_METRIC_TEST":
Assert.assertEquals(1,
userMap.get(user).getWriteRequestCount() - writeMetaMetricForUserFoo);
break;
case "BAR_USER_METRIC_TEST":
Assert
.assertEquals(1, userMap.get(user).getReadRequestCount() - readMetaMetricForUserBar);
Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
break;
case "TEST_USER_METRIC_TEST":
Assert.assertEquals(1,
userMap.get(user).getFilteredReadRequests() - filteredMetaReqeustForTestUser);
Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
break;
default:
//current user
Assert.assertEquals(UserProvider.instantiate(conf).getCurrent().getName(),
Bytes.toString(user));
//Read/write count because of Meta operations
Assert.assertTrue(userMap.get(user).getReadRequestCount() > 1);
break;
}
}
UTIL.deleteTable(TABLE_NAME);
}
private RegionMetrics getMetaMetrics() throws IOException {
for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
.getLiveServerMetrics().values()) {
RegionMetrics metaMetrics = serverMetrics.getRegionMetrics()
.get(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
if (metaMetrics != null) {
return metaMetrics;
}
}
Assert.fail("Should have find meta metrics");
return null;
}
private void waitForUsersMetrics(int noOfUsers) throws Exception {
//Sleep for metrics to get updated on master
Thread.sleep(5000);
Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
Map<byte[], UserMetrics> metrics =
ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
.iterator().next().getUserMetrics();
Assert.assertNotNull(metrics);
//including current user + noOfUsers
return metrics.keySet().size() > noOfUsers;
}
});
}
private void doPut() throws IOException {
Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
table.put(new Put(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"), Bytes.toBytes("1")));
}
private void doGet() throws IOException {
Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
table.get(new Get(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1")));
}
private Connection createConnection(Configuration conf) throws IOException {
User user = UserProvider.instantiate(conf).getCurrent();
return ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user).toConnection();
}
@Test
public void testOtherStatusInfos() throws Exception {
EnumSet<Option> options =
EnumSet.of(Option.MASTER_COPROCESSORS, Option.HBASE_VERSION,
Option.CLUSTER_ID, Option.BALANCER_ON);
ClusterMetrics metrics = ADMIN.getClusterMetrics(options);
Assert.assertEquals(1, metrics.getMasterCoprocessorNames().size());
Assert.assertNotNull(metrics.getHBaseVersion());
Assert.assertNotNull(metrics.getClusterId());
Assert.assertTrue(metrics.getAverageLoad() == 0.0);
Assert.assertNotNull(metrics.getBalancerOn());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
if (ADMIN != null) {
ADMIN.close();
}
UTIL.shutdownMiniCluster();
}
@Test
public void testObserver() throws IOException {
int preCount = MyObserver.PRE_COUNT.get();
int postCount = MyObserver.POST_COUNT.get();
Assert.assertTrue(ADMIN.getClusterMetrics().getMasterCoprocessorNames().stream()
.anyMatch(s -> s.equals(MyObserver.class.getSimpleName())));
Assert.assertEquals(preCount + 1, MyObserver.PRE_COUNT.get());
Assert.assertEquals(postCount + 1, MyObserver.POST_COUNT.get());
}
private static void insertData(final TableName tableName, int startRow, int rowCount)
throws IOException {
Table t = UTIL.getConnection().getTable(tableName);
Put p;
for (int i = 0; i < rowCount; i++) {
p = new Put(Bytes.toBytes("" + (startRow + i)));
p.addColumn(CF, Bytes.toBytes("val1"), Bytes.toBytes(i));
t.put(p);
}
}
public static class MyObserver implements MasterCoprocessor, MasterObserver {
private static final AtomicInteger PRE_COUNT = new AtomicInteger(0);
private static final AtomicInteger POST_COUNT = new AtomicInteger(0);
@Override public Optional<MasterObserver> getMasterObserver() {
return Optional.of(this);
}
@Override public void preGetClusterMetrics(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
PRE_COUNT.incrementAndGet();
}
@Override public void postGetClusterMetrics(ObserverContext<MasterCoprocessorEnvironment> ctx,
ClusterMetrics metrics) throws IOException {
POST_COUNT.incrementAndGet();
}
}
}