| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.client; |
| |
| import static org.apache.hadoop.hbase.client.AsyncConnectionConfiguration.START_LOG_ERRORS_AFTER_COUNT_KEY; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.IOException; |
| import java.nio.file.FileSystems; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.StandardCopyOption; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.Map; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.ClusterMetrics; |
| import org.apache.hadoop.hbase.ClusterMetrics.Option; |
| import org.apache.hadoop.hbase.HBaseClassTestRule; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.RegionMetrics; |
| import org.apache.hadoop.hbase.ServerMetrics; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.regionserver.HRegion; |
| import org.apache.hadoop.hbase.regionserver.HRegionServer; |
| import org.apache.hadoop.hbase.testclassification.ClientTests; |
| import org.apache.hadoop.hbase.testclassification.LargeTests; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; |
| import org.junit.BeforeClass; |
| import org.junit.ClassRule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.Parameterized; |
| |
| import org.apache.hbase.thirdparty.com.google.common.collect.Lists; |
| import org.apache.hbase.thirdparty.com.google.common.collect.Maps; |
| |
| @RunWith(Parameterized.class) |
| @Category({ ClientTests.class, LargeTests.class }) |
| public class TestAsyncClusterAdminApi extends TestAsyncAdminBase { |
| |
| @ClassRule |
| public static final HBaseClassTestRule CLASS_RULE = |
| HBaseClassTestRule.forClass(TestAsyncClusterAdminApi.class); |
| |
| private final Path cnfPath = FileSystems.getDefault().getPath("target/test-classes/hbase-site.xml"); |
| private final Path cnf2Path = FileSystems.getDefault().getPath("target/test-classes/hbase-site2.xml"); |
| private final Path cnf3Path = FileSystems.getDefault().getPath("target/test-classes/hbase-site3.xml"); |
| |
| @BeforeClass |
| public static void setUpBeforeClass() throws Exception { |
| TEST_UTIL.getConfiguration().setInt(HConstants.MASTER_INFO_PORT, 0); |
| TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); |
| TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); |
| TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); |
| TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); |
| TEST_UTIL.startMiniCluster(2); |
| ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); |
| } |
| |
| @Test |
| public void testGetMasterInfoPort() throws Exception { |
| assertEquals(TEST_UTIL.getHBaseCluster().getMaster().getInfoServer().getPort(), (int) admin |
| .getMasterInfoPort().get()); |
| } |
| |
| @Test |
| public void testRegionServerOnlineConfigChange() throws Exception { |
| replaceHBaseSiteXML(); |
| admin.getRegionServers().get().forEach(server -> admin.updateConfiguration(server).join()); |
| |
| // Check the configuration of the RegionServers |
| TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { |
| Configuration conf = thread.getRegionServer().getConfiguration(); |
| assertEquals(1000, conf.getInt("hbase.custom.config", 0)); |
| }); |
| |
| restoreHBaseSiteXML(); |
| } |
| |
| @Test |
| public void testMasterOnlineConfigChange() throws Exception { |
| replaceHBaseSiteXML(); |
| ServerName master = admin.getMaster().get(); |
| admin.updateConfiguration(master).join(); |
| admin.getBackupMasters().get() |
| .forEach(backupMaster -> admin.updateConfiguration(backupMaster).join()); |
| |
| // Check the configuration of the Masters |
| TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { |
| Configuration conf = thread.getMaster().getConfiguration(); |
| assertEquals(1000, conf.getInt("hbase.custom.config", 0)); |
| }); |
| |
| restoreHBaseSiteXML(); |
| } |
| |
| @Test |
| public void testAllClusterOnlineConfigChange() throws IOException { |
| replaceHBaseSiteXML(); |
| admin.updateConfiguration().join(); |
| |
| // Check the configuration of the Masters |
| TEST_UTIL.getMiniHBaseCluster().getMasterThreads().forEach(thread -> { |
| Configuration conf = thread.getMaster().getConfiguration(); |
| assertEquals(1000, conf.getInt("hbase.custom.config", 0)); |
| }); |
| |
| // Check the configuration of the RegionServers |
| TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads().forEach(thread -> { |
| Configuration conf = thread.getRegionServer().getConfiguration(); |
| assertEquals(1000, conf.getInt("hbase.custom.config", 0)); |
| }); |
| |
| restoreHBaseSiteXML(); |
| } |
| |
| private void replaceHBaseSiteXML() throws IOException { |
| // make a backup of hbase-site.xml |
| Files.copy(cnfPath, cnf3Path, StandardCopyOption.REPLACE_EXISTING); |
| // update hbase-site.xml by overwriting it |
| Files.copy(cnf2Path, cnfPath, StandardCopyOption.REPLACE_EXISTING); |
| } |
| |
| private void restoreHBaseSiteXML() throws IOException { |
| // restore hbase-site.xml |
| Files.copy(cnf3Path, cnfPath, StandardCopyOption.REPLACE_EXISTING); |
| } |
| |
| @Test |
| public void testRollWALWALWriter() throws Exception { |
| setUpforLogRolling(); |
| String className = this.getClass().getName(); |
| StringBuilder v = new StringBuilder(className); |
| while (v.length() < 1000) { |
| v.append(className); |
| } |
| byte[] value = Bytes.toBytes(v.toString()); |
| HRegionServer regionServer = startAndWriteData(tableName, value); |
| LOG.info("after writing there are " |
| + AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)) + " log files"); |
| |
| // flush all regions |
| for (HRegion r : regionServer.getOnlineRegionsLocalContext()) { |
| r.flush(true); |
| } |
| admin.rollWALWriter(regionServer.getServerName()).join(); |
| int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null)); |
| LOG.info("after flushing all regions and rolling logs there are " + |
| count + " log files"); |
| assertTrue(("actual count: " + count), count <= 2); |
| } |
| |
| private void setUpforLogRolling() { |
| // Force a region split after every 768KB |
| TEST_UTIL.getConfiguration().setLong(HConstants.HREGION_MAX_FILESIZE, |
| 768L * 1024L); |
| |
| // We roll the log after every 32 writes |
| TEST_UTIL.getConfiguration().setInt("hbase.regionserver.maxlogentries", 32); |
| |
| TEST_UTIL.getConfiguration().setInt( |
| "hbase.regionserver.logroll.errors.tolerated", 2); |
| TEST_UTIL.getConfiguration().setInt("hbase.rpc.timeout", 10 * 1000); |
| |
| // For less frequently updated regions flush after every 2 flushes |
| TEST_UTIL.getConfiguration().setInt( |
| "hbase.hregion.memstore.optionalflushcount", 2); |
| |
| // We flush the cache after every 8192 bytes |
| TEST_UTIL.getConfiguration().setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, |
| 8192); |
| |
| // Increase the amount of time between client retries |
| TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 10 * 1000); |
| |
| // Reduce thread wake frequency so that other threads can get |
| // a chance to run. |
| TEST_UTIL.getConfiguration().setInt(HConstants.THREAD_WAKE_FREQUENCY, |
| 2 * 1000); |
| |
| /**** configuration for testLogRollOnDatanodeDeath ****/ |
| // lower the namenode & datanode heartbeat so the namenode |
| // quickly detects datanode failures |
| TEST_UTIL.getConfiguration().setInt("dfs.namenode.heartbeat.recheck-interval", 5000); |
| TEST_UTIL.getConfiguration().setInt("dfs.heartbeat.interval", 1); |
| // the namenode might still try to choose the recently-dead datanode |
| // for a pipeline, so try to a new pipeline multiple times |
| TEST_UTIL.getConfiguration().setInt("dfs.client.block.write.retries", 30); |
| TEST_UTIL.getConfiguration().setInt( |
| "hbase.regionserver.hlog.tolerable.lowreplication", 2); |
| TEST_UTIL.getConfiguration().setInt( |
| "hbase.regionserver.hlog.lowreplication.rolllimit", 3); |
| } |
| |
| private HRegionServer startAndWriteData(TableName tableName, byte[] value) throws Exception { |
| createTableWithDefaultConf(tableName); |
| AsyncTable<?> table = ASYNC_CONN.getTable(tableName); |
| HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(tableName); |
| for (int i = 1; i <= 256; i++) { // 256 writes should cause 8 log rolls |
| Put put = new Put(Bytes.toBytes("row" + String.format("%1$04d", i))); |
| put.addColumn(FAMILY, null, value); |
| table.put(put).join(); |
| if (i % 32 == 0) { |
| // After every 32 writes sleep to let the log roller run |
| try { |
| Thread.sleep(2000); |
| } catch (InterruptedException e) { |
| // continue |
| } |
| } |
| } |
| return regionServer; |
| } |
| |
| @Test |
| public void testGetRegionLoads() throws Exception { |
| // Turn off the balancer |
| admin.balancerSwitch(false).join(); |
| TableName[] tables = |
| new TableName[] { TableName.valueOf(tableName.getNameAsString() + "1"), |
| TableName.valueOf(tableName.getNameAsString() + "2"), |
| TableName.valueOf(tableName.getNameAsString() + "3") }; |
| createAndLoadTable(tables); |
| // Sleep to wait region server report |
| Thread.sleep(TEST_UTIL.getConfiguration().getInt("hbase.regionserver.msginterval", 3 * 1000) * 2); |
| // Check if regions match with the regionLoad from the server |
| Collection<ServerName> servers = admin.getRegionServers().get(); |
| for (ServerName serverName : servers) { |
| List<RegionInfo> regions = admin.getRegions(serverName).get(); |
| checkRegionsAndRegionLoads(regions, admin.getRegionMetrics(serverName).get()); |
| } |
| |
| // Check if regionLoad matches the table's regions and nothing is missed |
| for (TableName table : tables) { |
| List<RegionInfo> tableRegions = admin.getRegions(table).get(); |
| List<RegionMetrics> regionLoads = Lists.newArrayList(); |
| for (ServerName serverName : servers) { |
| regionLoads.addAll(admin.getRegionMetrics(serverName, table).get()); |
| } |
| checkRegionsAndRegionLoads(tableRegions, regionLoads); |
| } |
| |
| // Check RegionLoad matches the regionLoad from ClusterStatus |
| ClusterMetrics clusterStatus = admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).get(); |
| assertEquals(servers.size(), clusterStatus.getLiveServerMetrics().size()); |
| for (Map.Entry<ServerName, ServerMetrics> entry : |
| clusterStatus.getLiveServerMetrics().entrySet()) { |
| ServerName sn = entry.getKey(); |
| ServerMetrics sm = entry.getValue(); |
| compareRegionLoads(sm.getRegionMetrics().values(), admin.getRegionMetrics(sn).get()); |
| } |
| for (ServerName serverName : clusterStatus.getLiveServerMetrics().keySet()) { |
| ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(serverName); |
| |
| } |
| } |
| |
| private void compareRegionLoads(Collection<RegionMetrics> regionLoadCluster, |
| Collection<RegionMetrics> regionLoads) { |
| |
| assertEquals("No of regionLoads from clusterStatus and regionloads from RS doesn't match", |
| regionLoadCluster.size(), regionLoads.size()); |
| |
| for (RegionMetrics loadCluster : regionLoadCluster) { |
| boolean matched = false; |
| for (RegionMetrics load : regionLoads) { |
| if (Bytes.equals(loadCluster.getRegionName(), load.getRegionName())) { |
| matched = true; |
| continue; |
| } |
| } |
| assertTrue("The contents of region load from cluster and server should match", matched); |
| } |
| } |
| |
| private void checkRegionsAndRegionLoads(Collection<RegionInfo> regions, |
| Collection<RegionMetrics> regionLoads) { |
| |
| assertEquals("No of regions and regionloads doesn't match", regions.size(), regionLoads.size()); |
| |
| Map<byte[], RegionMetrics> regionLoadMap = Maps.newTreeMap(Bytes.BYTES_COMPARATOR); |
| for (RegionMetrics regionLoad : regionLoads) { |
| regionLoadMap.put(regionLoad.getRegionName(), regionLoad); |
| } |
| for (RegionInfo info : regions) { |
| assertTrue("Region not in regionLoadMap region:" + info.getRegionNameAsString() |
| + " regionMap: " + regionLoadMap, regionLoadMap.containsKey(info.getRegionName())); |
| } |
| } |
| |
| private void createAndLoadTable(TableName[] tables) { |
| for (TableName table : tables) { |
| TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(table); |
| builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); |
| admin.createTable(builder.build(), Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), 16).join(); |
| AsyncTable<?> asyncTable = ASYNC_CONN.getTable(table); |
| List<Put> puts = new ArrayList<>(); |
| for (byte[] row : HBaseTestingUtility.ROWS) { |
| puts.add(new Put(row).addColumn(FAMILY, Bytes.toBytes("q"), Bytes.toBytes("v"))); |
| } |
| asyncTable.putAll(puts).join(); |
| } |
| } |
| } |