blob: 2e7cf7f1d1d32f6c4a5ef9039d047f4abd6a3f5c [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.tool;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
import com.google.common.collect.Lists;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.time.StopWatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ConnectStringParser;
import org.apache.zookeeper.data.Stat;
/**
* HBase Canary Tool, that that can be used to do
* "canary monitoring" of a running HBase cluster.
*
* Here are three modes
* 1. region mode - Foreach region tries to get one row per column family
* and outputs some information about failure or latency.
*
* 2. regionserver mode - Foreach regionserver tries to get one row from one table
* selected randomly and outputs some information about failure or latency.
*
* 3. zookeeper mode - for each zookeeper instance, selects a zNode and
* outputs some information about failure or latency.
*/
public final class Canary implements Tool {
// Sink interface used by the canary to outputs information
public interface Sink {
public long getReadFailureCount();
public long incReadFailureCount();
public void publishReadFailure(HRegionInfo region, Exception e);
public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
public long getWriteFailureCount();
public void publishWriteFailure(HRegionInfo region, Exception e);
public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
}
// new extended sink for output regionserver mode info
// do not change the Sink interface directly due to maintaining the API
public interface ExtendedSink extends Sink {
public void publishReadFailure(String table, String server);
public void publishReadTiming(String table, String server, long msTime);
}
// Simple implementation of canary sink that allows to plot on
// file or standard output timings or failures.
public static class StdOutSink implements Sink {
private AtomicLong readFailureCount = new AtomicLong(0),
writeFailureCount = new AtomicLong(0);
@Override
public long getReadFailureCount() {
return readFailureCount.get();
}
@Override
public long incReadFailureCount() {
return readFailureCount.incrementAndGet();
}
@Override
public void publishReadFailure(HRegionInfo region, Exception e) {
readFailureCount.incrementAndGet();
LOG.error(String.format("read from region %s failed", region.getRegionNameAsString()), e);
}
@Override
public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
readFailureCount.incrementAndGet();
LOG.error(String.format("read from region %s column family %s failed",
region.getRegionNameAsString(), column.getNameAsString()), e);
}
@Override
public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
LOG.info(String.format("read from region %s column family %s in %dms",
region.getRegionNameAsString(), column.getNameAsString(), msTime));
}
@Override
public long getWriteFailureCount() {
return writeFailureCount.get();
}
@Override
public void publishWriteFailure(HRegionInfo region, Exception e) {
writeFailureCount.incrementAndGet();
LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e);
}
@Override
public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
writeFailureCount.incrementAndGet();
LOG.error(String.format("write to region %s column family %s failed",
region.getRegionNameAsString(), column.getNameAsString()), e);
}
@Override
public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
LOG.info(String.format("write to region %s column family %s in %dms",
region.getRegionNameAsString(), column.getNameAsString(), msTime));
}
}
// a ExtendedSink implementation
public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
@Override
public void publishReadFailure(String table, String server) {
incReadFailureCount();
LOG.error(String.format("Read from table:%s on region server:%s", table, server));
}
@Override
public void publishReadTiming(String table, String server, long msTime) {
LOG.info(String.format("Read from table:%s on region server:%s in %dms",
table, server, msTime));
}
}
public static class ZookeeperStdOutSink extends StdOutSink implements ExtendedSink {
@Override public void publishReadFailure(String zNode, String server) {
incReadFailureCount();
LOG.error(String.format("Read from zNode:%s on zookeeper instance:%s", zNode, server));
}
@Override public void publishReadTiming(String znode, String server, long msTime) {
LOG.info(String.format("Read from zNode:%s on zookeeper instance:%s in %dms",
znode, server, msTime));
}
}
static class ZookeeperTask implements Callable<Void> {
private final Connection connection;
private final String host;
private String znode;
private final int timeout;
private ZookeeperStdOutSink sink;
public ZookeeperTask(Connection connection, String host, String znode, int timeout,
ZookeeperStdOutSink sink) {
this.connection = connection;
this.host = host;
this.znode = znode;
this.timeout = timeout;
this.sink = sink;
}
@Override public Void call() throws Exception {
ZooKeeper zooKeeper = null;
try {
zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
Stat exists = zooKeeper.exists(znode, false);
StopWatch stopwatch = new StopWatch();
stopwatch.start();
zooKeeper.getData(znode, false, exists);
stopwatch.stop();
sink.publishReadTiming(znode, host, stopwatch.getTime());
} catch (KeeperException | InterruptedException e) {
sink.publishReadFailure(znode, host);
} finally {
if (zooKeeper != null) {
zooKeeper.close();
}
}
return null;
}
}
/**
* For each column family of the region tries to get one row and outputs the latency, or the
* failure.
*/
static class RegionTask implements Callable<Void> {
public enum TaskType{
READ, WRITE
}
private Connection connection;
private HRegionInfo region;
private Sink sink;
private TaskType taskType;
private boolean rawScanEnabled;
RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType,
boolean rawScanEnabled) {
this.connection = connection;
this.region = region;
this.sink = sink;
this.taskType = taskType;
this.rawScanEnabled = rawScanEnabled;
}
@Override
public Void call() {
switch (taskType) {
case READ:
return read();
case WRITE:
return write();
default:
return read();
}
}
public Void read() {
Table table = null;
HTableDescriptor tableDesc = null;
try {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("reading table descriptor for table %s",
region.getTable()));
}
table = connection.getTable(region.getTable());
tableDesc = table.getTableDescriptor();
} catch (IOException e) {
LOG.debug("sniffRegion failed", e);
sink.publishReadFailure(region, e);
if (table != null) {
try {
table.close();
} catch (IOException ioe) {
LOG.error("Close table failed", e);
}
}
return null;
}
byte[] startKey = null;
Get get = null;
Scan scan = null;
ResultScanner rs = null;
StopWatch stopWatch = new StopWatch();
for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
stopWatch.reset();
startKey = region.getStartKey();
// Can't do a get on empty start row so do a Scan of first element if any instead.
if (startKey.length > 0) {
get = new Get(startKey);
get.setCacheBlocks(false);
get.setFilter(new FirstKeyOnlyFilter());
get.addFamily(column.getName());
} else {
scan = new Scan();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("rawScan : %s for table: %s", rawScanEnabled,
tableDesc.getTableName()));
}
scan.setRaw(rawScanEnabled);
scan.setCaching(1);
scan.setCacheBlocks(false);
scan.setFilter(new FirstKeyOnlyFilter());
scan.addFamily(column.getName());
scan.setMaxResultSize(1L);
scan.setSmall(true);
}
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("reading from table %s region %s column family %s and key %s",
tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
Bytes.toStringBinary(startKey)));
}
try {
stopWatch.start();
if (startKey.length > 0) {
table.get(get);
} else {
rs = table.getScanner(scan);
rs.next();
}
stopWatch.stop();
sink.publishReadTiming(region, column, stopWatch.getTime());
} catch (Exception e) {
sink.publishReadFailure(region, column, e);
} finally {
if (rs != null) {
rs.close();
}
scan = null;
get = null;
startKey = null;
}
}
try {
table.close();
} catch (IOException e) {
LOG.error("Close table failed", e);
}
return null;
}
/**
* Check writes for the canary table
* @return
*/
private Void write() {
Table table = null;
HTableDescriptor tableDesc = null;
try {
table = connection.getTable(region.getTable());
tableDesc = table.getTableDescriptor();
byte[] rowToCheck = region.getStartKey();
if (rowToCheck.length == 0) {
rowToCheck = new byte[]{0x0};
}
int writeValueSize =
connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
Put put = new Put(rowToCheck);
byte[] value = new byte[writeValueSize];
Bytes.random(value);
put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("writing to table %s region %s column family %s and key %s",
tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(),
Bytes.toStringBinary(rowToCheck)));
}
try {
long startTime = System.currentTimeMillis();
table.put(put);
long time = System.currentTimeMillis() - startTime;
sink.publishWriteTiming(region, column, time);
} catch (Exception e) {
sink.publishWriteFailure(region, column, e);
}
}
table.close();
} catch (IOException e) {
sink.publishWriteFailure(region, e);
}
return null;
}
}
/**
* Get one row from a region on the regionserver and outputs the latency, or the failure.
*/
static class RegionServerTask implements Callable<Void> {
private Connection connection;
private String serverName;
private HRegionInfo region;
private ExtendedSink sink;
private AtomicLong successes;
RegionServerTask(Connection connection, String serverName, HRegionInfo region,
ExtendedSink sink, AtomicLong successes) {
this.connection = connection;
this.serverName = serverName;
this.region = region;
this.sink = sink;
this.successes = successes;
}
@Override
public Void call() {
TableName tableName = null;
Table table = null;
Get get = null;
byte[] startKey = null;
Scan scan = null;
StopWatch stopWatch = new StopWatch();
// monitor one region on every region server
stopWatch.reset();
try {
tableName = region.getTable();
table = connection.getTable(tableName);
startKey = region.getStartKey();
// Can't do a get on empty start row so do a Scan of first element if any instead.
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("reading from region server %s table %s region %s and key %s",
serverName, region.getTable(), region.getRegionNameAsString(),
Bytes.toStringBinary(startKey)));
}
if (startKey.length > 0) {
get = new Get(startKey);
get.setCacheBlocks(false);
get.setFilter(new FirstKeyOnlyFilter());
stopWatch.start();
table.get(get);
stopWatch.stop();
} else {
scan = new Scan();
scan.setCacheBlocks(false);
scan.setFilter(new FirstKeyOnlyFilter());
scan.setCaching(1);
scan.setMaxResultSize(1L);
scan.setSmall(true);
stopWatch.start();
ResultScanner s = table.getScanner(scan);
s.next();
s.close();
stopWatch.stop();
}
successes.incrementAndGet();
sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
} catch (TableNotFoundException tnfe) {
LOG.error("Table may be deleted", tnfe);
// This is ignored because it doesn't imply that the regionserver is dead
} catch (TableNotEnabledException tnee) {
// This is considered a success since we got a response.
successes.incrementAndGet();
LOG.debug("The targeted table was disabled. Assuming success.");
} catch (DoNotRetryIOException dnrioe) {
sink.publishReadFailure(tableName.getNameAsString(), serverName);
LOG.error(dnrioe);
} catch (IOException e) {
sink.publishReadFailure(tableName.getNameAsString(), serverName);
LOG.error(e);
} finally {
if (table != null) {
try {
table.close();
} catch (IOException e) {/* DO NOTHING */
LOG.error("Close table failed", e);
}
}
scan = null;
get = null;
startKey = null;
}
return null;
}
}
private static final int USAGE_EXIT_CODE = 1;
private static final int INIT_ERROR_EXIT_CODE = 2;
private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
private static final int ERROR_EXIT_CODE = 4;
private static final int FAILURE_EXIT_CODE = 5;
private static final long DEFAULT_INTERVAL = 6000;
private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
private static final Log LOG = LogFactory.getLog(Canary.class);
public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
private static final String CANARY_TABLE_FAMILY_NAME = "Test";
private Configuration conf = null;
private long interval = 0;
private Sink sink = null;
private boolean useRegExp;
private long timeout = DEFAULT_TIMEOUT;
private boolean failOnError = true;
private boolean regionServerMode = false;
private boolean zookeeperMode = false;
private boolean regionServerAllRegions = false;
private boolean writeSniffing = false;
private boolean treatFailureAsError = false;
private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
private ExecutorService executor; // threads to retrieve data from regionservers
public Canary() {
this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
}
public Canary(ExecutorService executor, Sink sink) {
this.executor = executor;
this.sink = sink;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
private int parseArgs(String[] args) {
int index = -1;
// Process command line args
for (int i = 0; i < args.length; i++) {
String cmd = args[i];
if (cmd.startsWith("-")) {
if (index >= 0) {
// command line args must be in the form: [opts] [table 1 [table 2 ...]]
System.err.println("Invalid command line options");
printUsageAndExit();
}
if (cmd.equals("-help")) {
// user asked for help, print the help and quit.
printUsageAndExit();
} else if (cmd.equals("-daemon") && interval == 0) {
// user asked for daemon mode, set a default interval between checks
interval = DEFAULT_INTERVAL;
} else if (cmd.equals("-interval")) {
// user has specified an interval for canary breaths (-interval N)
i++;
if (i == args.length) {
System.err.println("-interval needs a numeric value argument.");
printUsageAndExit();
}
try {
interval = Long.parseLong(args[i]) * 1000;
} catch (NumberFormatException e) {
System.err.println("-interval needs a numeric value argument.");
printUsageAndExit();
}
} else if (cmd.equals("-zookeeper")) {
this.zookeeperMode = true;
} else if(cmd.equals("-regionserver")) {
this.regionServerMode = true;
} else if(cmd.equals("-allRegions")) {
this.regionServerAllRegions = true;
} else if(cmd.equals("-writeSniffing")) {
this.writeSniffing = true;
} else if(cmd.equals("-treatFailureAsError")) {
this.treatFailureAsError = true;
} else if (cmd.equals("-e")) {
this.useRegExp = true;
} else if (cmd.equals("-t")) {
i++;
if (i == args.length) {
System.err.println("-t needs a numeric value argument.");
printUsageAndExit();
}
try {
this.timeout = Long.parseLong(args[i]);
} catch (NumberFormatException e) {
System.err.println("-t needs a numeric value argument.");
printUsageAndExit();
}
} else if (cmd.equals("-writeTable")) {
i++;
if (i == args.length) {
System.err.println("-writeTable needs a string value argument.");
printUsageAndExit();
}
this.writeTableName = TableName.valueOf(args[i]);
} else if (cmd.equals("-f")) {
i++;
if (i == args.length) {
System.err
.println("-f needs a boolean value argument (true|false).");
printUsageAndExit();
}
this.failOnError = Boolean.parseBoolean(args[i]);
} else {
// no options match
System.err.println(cmd + " options is invalid.");
printUsageAndExit();
}
} else if (index < 0) {
// keep track of first table name specified by the user
index = i;
}
}
if (this.regionServerAllRegions && !this.regionServerMode) {
System.err.println("-allRegions can only be specified in regionserver mode.");
printUsageAndExit();
}
if (this.zookeeperMode) {
if (this.regionServerMode || this.regionServerAllRegions || this.writeSniffing) {
System.err.println("-zookeeper is exclusive and cannot be combined with "
+ "other modes.");
printUsageAndExit();
}
}
return index;
}
@Override
public int run(String[] args) throws Exception {
int index = parseArgs(args);
ChoreService choreService = null;
// Launches chore for refreshing kerberos credentials if security is enabled.
// Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
// for more details.
final ScheduledChore authChore = AuthUtil.getAuthChore(conf);
if (authChore != null) {
choreService = new ChoreService("CANARY_TOOL");
choreService.scheduleChore(authChore);
}
// Start to prepare the stuffs
Monitor monitor = null;
Thread monitorThread = null;
long startTime = 0;
long currentTimeLength = 0;
// Get a connection to use in below.
try (Connection connection = ConnectionFactory.createConnection(this.conf)) {
do {
// Do monitor !!
try {
monitor = this.newMonitor(connection, index, args);
monitorThread = new Thread(monitor);
startTime = System.currentTimeMillis();
monitorThread.start();
while (!monitor.isDone()) {
// wait for 1 sec
Thread.sleep(1000);
// exit if any error occurs
if (this.failOnError && monitor.hasError()) {
monitorThread.interrupt();
if (monitor.initialized) {
return monitor.errorCode;
} else {
return INIT_ERROR_EXIT_CODE;
}
}
currentTimeLength = System.currentTimeMillis() - startTime;
if (currentTimeLength > this.timeout) {
LOG.error("The monitor is running too long (" + currentTimeLength
+ ") after timeout limit:" + this.timeout
+ " will be killed itself !!");
if (monitor.initialized) {
return TIMEOUT_ERROR_EXIT_CODE;
} else {
return INIT_ERROR_EXIT_CODE;
}
}
}
if (this.failOnError && monitor.finalCheckForErrors()) {
monitorThread.interrupt();
return monitor.errorCode;
}
} finally {
if (monitor != null) monitor.close();
}
Thread.sleep(interval);
} while (interval > 0);
} // try-with-resources close
if (choreService != null) {
choreService.shutdown();
}
return monitor.errorCode;
}
private void printUsageAndExit() {
System.err.printf(
"Usage: bin/hbase %s [opts] [table1 [table2]...] | [regionserver1 [regionserver2]..]%n",
getClass().getName());
System.err.println(" where [opts] are:");
System.err.println(" -help Show this help and exit.");
System.err.println(" -regionserver replace the table argument to regionserver,");
System.err.println(" which means to enable regionserver mode");
System.err.println(" -allRegions Tries all regions on a regionserver,");
System.err.println(" only works in regionserver mode.");
System.err.println(" -zookeeper Tries to grab zookeeper.znode.parent ");
System.err.println(" on each zookeeper instance");
System.err.println(" -daemon Continuous check at defined intervals.");
System.err.println(" -interval <N> Interval between checks (sec)");
System.err.println(" -e Use table/regionserver as regular expression");
System.err.println(" which means the table/regionserver is regular expression pattern");
System.err.println(" -f <B> stop whole program if first error occurs," +
" default is true");
System.err.println(" -t <N> timeout for a check, default is 600000 (milisecs)");
System.err.println(" -writeSniffing enable the write sniffing in canary");
System.err.println(" -treatFailureAsError treats read / write failure as error");
System.err.println(" -writeTable The table used for write sniffing."
+ " Default is hbase:canary");
System.err.println(" -Dhbase.canary.read.raw.enabled=<true/false> Use this flag to enable or disable raw scan during read canary test"
+ " Default is false and raw is not enabled during scan");
System.err
.println(" -D<configProperty>=<value> assigning or override the configuration params");
System.exit(USAGE_EXIT_CODE);
}
/**
* A Factory method for {@link Monitor}.
* Can be overridden by user.
* @param index a start index for monitor target
* @param args args passed from user
* @return a Monitor instance
*/
public Monitor newMonitor(final Connection connection, int index, String[] args) {
Monitor monitor = null;
String[] monitorTargets = null;
if(index >= 0) {
int length = args.length - index;
monitorTargets = new String[length];
System.arraycopy(args, index, monitorTargets, 0, length);
}
if (this.regionServerMode) {
monitor =
new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
(ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
this.treatFailureAsError);
} else if (this.zookeeperMode) {
monitor =
new ZookeeperMonitor(connection, monitorTargets, this.useRegExp,
(ZookeeperStdOutSink) this.sink, this.executor, this.treatFailureAsError);
} else {
monitor =
new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
this.writeSniffing, this.writeTableName, this.treatFailureAsError);
}
return monitor;
}
// a Monitor super-class can be extended by users
public static abstract class Monitor implements Runnable, Closeable {
protected Connection connection;
protected Admin admin;
protected String[] targets;
protected boolean useRegExp;
protected boolean treatFailureAsError;
protected boolean initialized = false;
protected boolean done = false;
protected int errorCode = 0;
protected Sink sink;
protected ExecutorService executor;
public boolean isDone() {
return done;
}
public boolean hasError() {
return errorCode != 0;
}
public boolean finalCheckForErrors() {
if (errorCode != 0) {
return true;
}
if (treatFailureAsError &&
(sink.getReadFailureCount() > 0 || sink.getWriteFailureCount() > 0)) {
errorCode = FAILURE_EXIT_CODE;
return true;
}
return false;
}
@Override
public void close() throws IOException {
if (this.admin != null) this.admin.close();
}
protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
ExecutorService executor, boolean treatFailureAsError) {
if (null == connection) throw new IllegalArgumentException("connection shall not be null");
this.connection = connection;
this.targets = monitorTargets;
this.useRegExp = useRegExp;
this.treatFailureAsError = treatFailureAsError;
this.sink = sink;
this.executor = executor;
}
@Override
public abstract void run();
protected boolean initAdmin() {
if (null == this.admin) {
try {
this.admin = this.connection.getAdmin();
} catch (Exception e) {
LOG.error("Initial HBaseAdmin failed...", e);
this.errorCode = INIT_ERROR_EXIT_CODE;
}
} else if (admin.isAborted()) {
LOG.error("HBaseAdmin aborted");
this.errorCode = INIT_ERROR_EXIT_CODE;
}
return !this.hasError();
}
}
// a monitor for region mode
private static class RegionMonitor extends Monitor {
// 10 minutes
private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
// 1 days
private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
private long lastCheckTime = -1;
private boolean writeSniffing;
private TableName writeTableName;
private int writeDataTTL;
private float regionsLowerLimit;
private float regionsUpperLimit;
private int checkPeriod;
private boolean rawScanEnabled;
public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
boolean treatFailureAsError) {
super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
Configuration conf = connection.getConfiguration();
this.writeSniffing = writeSniffing;
this.writeTableName = writeTableName;
this.writeDataTTL =
conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
this.regionsLowerLimit =
conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
this.regionsUpperLimit =
conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
this.checkPeriod =
conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
DEFAULT_WRITE_TABLE_CHECK_PERIOD);
this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
}
@Override
public void run() {
if (this.initAdmin()) {
try {
List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
if (this.targets != null && this.targets.length > 0) {
String[] tables = generateMonitorTables(this.targets);
this.initialized = true;
for (String table : tables) {
taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ,
this.rawScanEnabled));
}
} else {
taskFutures.addAll(sniff(TaskType.READ));
}
if (writeSniffing) {
if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
try {
checkWriteTableDistribution();
} catch (IOException e) {
LOG.error("Check canary table distribution failed!", e);
}
lastCheckTime = EnvironmentEdgeManager.currentTime();
}
// sniff canary table with write operation
taskFutures.addAll(Canary.sniff(admin, sink, admin.getTableDescriptor(writeTableName),
executor, TaskType.WRITE, this.rawScanEnabled));
}
for (Future<Void> future : taskFutures) {
try {
future.get();
} catch (ExecutionException e) {
LOG.error("Sniff region failed!", e);
}
}
} catch (Exception e) {
LOG.error("Run regionMonitor failed", e);
this.errorCode = ERROR_EXIT_CODE;
}
}
this.done = true;
}
private String[] generateMonitorTables(String[] monitorTargets) throws IOException {
String[] returnTables = null;
if (this.useRegExp) {
Pattern pattern = null;
HTableDescriptor[] tds = null;
Set<String> tmpTables = new TreeSet<String>();
try {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("reading list of tables"));
}
tds = this.admin.listTables(pattern);
if (tds == null) {
tds = new HTableDescriptor[0];
}
for (String monitorTarget : monitorTargets) {
pattern = Pattern.compile(monitorTarget);
for (HTableDescriptor td : tds) {
if (pattern.matcher(td.getNameAsString()).matches()) {
tmpTables.add(td.getNameAsString());
}
}
}
} catch (IOException e) {
LOG.error("Communicate with admin failed", e);
throw e;
}
if (tmpTables.size() > 0) {
returnTables = tmpTables.toArray(new String[tmpTables.size()]);
} else {
String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
LOG.error(msg);
this.errorCode = INIT_ERROR_EXIT_CODE;
throw new TableNotFoundException(msg);
}
} else {
returnTables = monitorTargets;
}
return returnTables;
}
/*
* canary entry point to monitor all the tables.
*/
private List<Future<Void>> sniff(TaskType taskType) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("reading list of tables"));
}
List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
for (HTableDescriptor table : admin.listTables()) {
if (admin.isTableEnabled(table.getTableName())
&& (!table.getTableName().equals(writeTableName))) {
taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType, this.rawScanEnabled));
}
}
return taskFutures;
}
private void checkWriteTableDistribution() throws IOException {
if (!admin.tableExists(writeTableName)) {
int numberOfServers = admin.getClusterStatus().getServers().size();
if (numberOfServers == 0) {
throw new IllegalStateException("No live regionservers");
}
createWriteTable(numberOfServers);
}
if (!admin.isTableEnabled(writeTableName)) {
admin.enableTable(writeTableName);
}
ClusterStatus status = admin.getClusterStatus();
int numberOfServers = status.getServersSize();
if (status.getServers().contains(status.getMaster())) {
numberOfServers -= 1;
}
List<Pair<HRegionInfo, ServerName>> pairs =
MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
int numberOfRegions = pairs.size();
if (numberOfRegions < numberOfServers * regionsLowerLimit
|| numberOfRegions > numberOfServers * regionsUpperLimit) {
admin.disableTable(writeTableName);
admin.deleteTable(writeTableName);
createWriteTable(numberOfServers);
}
HashSet<ServerName> serverSet = new HashSet<ServerName>();
for (Pair<HRegionInfo, ServerName> pair : pairs) {
serverSet.add(pair.getSecond());
}
int numberOfCoveredServers = serverSet.size();
if (numberOfCoveredServers < numberOfServers) {
admin.balancer();
}
}
private void createWriteTable(int numberOfServers) throws IOException {
int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
LOG.info("Number of live regionservers: " + numberOfServers + ", "
+ "pre-splitting the canary table into " + numberOfRegions + " regions "
+ "(current lower limit of regions per server is " + regionsLowerLimit
+ " and you can change it by config: "
+ HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY + " )");
HTableDescriptor desc = new HTableDescriptor(writeTableName);
HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
family.setMaxVersions(1);
family.setTimeToLive(writeDataTTL);
desc.addFamily(family);
byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
admin.createTable(desc, splits);
}
}
/**
* Canary entry point for specified table.
* @throws Exception
*/
public static void sniff(final Admin admin, TableName tableName, boolean rawScanEnabled)
throws Exception {
sniff(admin, tableName, TaskType.READ, rawScanEnabled);
}
/**
* Canary entry point for specified table.
* Keeping this method backward compatibility
* @throws Exception
*/
public static void sniff(final Admin admin, TableName tableName)
throws Exception {
sniff(admin, tableName, TaskType.READ, false);
}
/**
* Canary entry point for specified table with task type(read/write)
* @throws Exception
*/
public static void sniff(final Admin admin, TableName tableName, TaskType taskType,
boolean rawScanEnabled) throws Exception {
List<Future<Void>> taskFutures =
Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
new ScheduledThreadPoolExecutor(1), taskType, rawScanEnabled);
for (Future<Void> future : taskFutures) {
future.get();
}
}
/**
* Canary entry point for specified table with task type(read/write)
* Keeping this method backward compatible
* @throws Exception
*/
public static void sniff(final Admin admin, TableName tableName, TaskType taskType)
throws Exception {
Canary.sniff(admin, tableName, taskType, false);
}
/**
* Canary entry point for specified table.
* @throws Exception
*/
private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
ExecutorService executor, TaskType taskType, boolean rawScanEnabled) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s",
tableName));
}
if (admin.isTableEnabled(TableName.valueOf(tableName))) {
return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
executor, taskType, rawScanEnabled);
} else {
LOG.warn(String.format("Table %s is not enabled", tableName));
}
return new LinkedList<Future<Void>>();
}
/*
* Loops over regions that owns this table, and output some information abouts the state.
*/
private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
boolean rawScanEnabled) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName()));
}
Table table = null;
try {
table = admin.getConnection().getTable(tableDesc.getTableName());
} catch (TableNotFoundException e) {
return new ArrayList<Future<Void>>();
}
List<RegionTask> tasks = new ArrayList<RegionTask>();
try {
for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType, rawScanEnabled));
}
} finally {
table.close();
}
return executor.invokeAll(tasks);
}
// monitor for zookeeper mode
private static class ZookeeperMonitor extends Monitor {
private List<String> hosts;
private final String znode;
private final int timeout;
protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
ExtendedSink sink, ExecutorService executor, boolean treatFailureAsError) {
super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
Configuration configuration = connection.getConfiguration();
znode =
configuration.get(ZOOKEEPER_ZNODE_PARENT,
DEFAULT_ZOOKEEPER_ZNODE_PARENT);
timeout = configuration
.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
ConnectStringParser parser =
new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
hosts = Lists.newArrayList();
for (InetSocketAddress server : parser.getServerAddresses()) {
hosts.add(server.toString());
}
}
@Override public void run() {
List<ZookeeperTask> tasks = Lists.newArrayList();
for (final String host : hosts) {
tasks.add(new ZookeeperTask(connection, host, znode, timeout, getSink()));
}
try {
for (Future<Void> future : this.executor.invokeAll(tasks)) {
try {
future.get();
} catch (ExecutionException e) {
LOG.error("Sniff zookeeper failed!", e);
this.errorCode = ERROR_EXIT_CODE;
}
}
} catch (InterruptedException e) {
this.errorCode = ERROR_EXIT_CODE;
Thread.currentThread().interrupt();
LOG.error("Sniff zookeeper interrupted!", e);
}
this.done = true;
}
private ZookeeperStdOutSink getSink() {
if (!(sink instanceof ZookeeperStdOutSink)) {
throw new RuntimeException("Can only write to zookeeper sink");
}
return ((ZookeeperStdOutSink) sink);
}
}
// a monitor for regionserver mode
private static class RegionServerMonitor extends Monitor {
private boolean allRegions;
public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
ExtendedSink sink, ExecutorService executor, boolean allRegions,
boolean treatFailureAsError) {
super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
this.allRegions = allRegions;
}
private ExtendedSink getSink() {
return (ExtendedSink) this.sink;
}
@Override
public void run() {
if (this.initAdmin() && this.checkNoTableNames()) {
Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
this.initialized = true;
this.monitorRegionServers(rsAndRMap);
}
this.done = true;
}
private boolean checkNoTableNames() {
List<String> foundTableNames = new ArrayList<String>();
TableName[] tableNames = null;
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("reading list of tables"));
}
try {
tableNames = this.admin.listTableNames();
} catch (IOException e) {
LOG.error("Get listTableNames failed", e);
this.errorCode = INIT_ERROR_EXIT_CODE;
return false;
}
if (this.targets == null || this.targets.length == 0) return true;
for (String target : this.targets) {
for (TableName tableName : tableNames) {
if (target.equals(tableName.getNameAsString())) {
foundTableNames.add(target);
}
}
}
if (foundTableNames.size() > 0) {
System.err.println("Cannot pass a tablename when using the -regionserver " +
"option, tablenames:" + foundTableNames.toString());
this.errorCode = USAGE_EXIT_CODE;
}
return foundTableNames.size() == 0;
}
private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
Map<String, AtomicLong> successMap = new HashMap<String, AtomicLong>();
Random rand = new Random();
for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
String serverName = entry.getKey();
AtomicLong successes = new AtomicLong(0);
successMap.put(serverName, successes);
if (entry.getValue().isEmpty()) {
LOG.error(String.format("Regionserver not serving any regions - %s", serverName));
} else if (this.allRegions) {
for (HRegionInfo region : entry.getValue()) {
tasks.add(new RegionServerTask(this.connection,
serverName,
region,
getSink(),
successes));
}
} else {
// random select a region if flag not set
HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
tasks.add(new RegionServerTask(this.connection,
serverName,
region,
getSink(),
successes));
}
}
try {
for (Future<Void> future : this.executor.invokeAll(tasks)) {
try {
future.get();
} catch (ExecutionException e) {
LOG.error("Sniff regionserver failed!", e);
this.errorCode = ERROR_EXIT_CODE;
}
}
if (this.allRegions) {
for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
String serverName = entry.getKey();
LOG.info("Successfully read " + successMap.get(serverName) + " regions out of "
+ entry.getValue().size() + " on regionserver:" + serverName);
}
}
} catch (InterruptedException e) {
this.errorCode = ERROR_EXIT_CODE;
LOG.error("Sniff regionserver interrupted!", e);
}
}
private Map<String, List<HRegionInfo>> filterRegionServerByName() {
Map<String, List<HRegionInfo>> regionServerAndRegionsMap = this.getAllRegionServerByName();
regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap);
return regionServerAndRegionsMap;
}
private Map<String, List<HRegionInfo>> getAllRegionServerByName() {
Map<String, List<HRegionInfo>> rsAndRMap = new HashMap<String, List<HRegionInfo>>();
Table table = null;
RegionLocator regionLocator = null;
try {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("reading list of tables and locations"));
}
HTableDescriptor[] tableDescs = this.admin.listTables();
List<HRegionInfo> regions = null;
for (HTableDescriptor tableDesc : tableDescs) {
table = this.admin.getConnection().getTable(tableDesc.getTableName());
regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName());
for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
ServerName rs = location.getServerName();
String rsName = rs.getHostname();
HRegionInfo r = location.getRegionInfo();
if (rsAndRMap.containsKey(rsName)) {
regions = rsAndRMap.get(rsName);
} else {
regions = new ArrayList<HRegionInfo>();
rsAndRMap.put(rsName, regions);
}
regions.add(r);
}
table.close();
}
//get any live regionservers not serving any regions
for (ServerName rs : this.admin.getClusterStatus().getServers()) {
String rsName = rs.getHostname();
if (!rsAndRMap.containsKey(rsName)) {
rsAndRMap.put(rsName, Collections.<HRegionInfo>emptyList());
}
}
} catch (IOException e) {
String msg = "Get HTables info failed";
LOG.error(msg, e);
this.errorCode = INIT_ERROR_EXIT_CODE;
} finally {
if (table != null) {
try {
table.close();
} catch (IOException e) {
LOG.warn("Close table failed", e);
}
}
}
return rsAndRMap;
}
private Map<String, List<HRegionInfo>> doFilterRegionServerByName(
Map<String, List<HRegionInfo>> fullRsAndRMap) {
Map<String, List<HRegionInfo>> filteredRsAndRMap = null;
if (this.targets != null && this.targets.length > 0) {
filteredRsAndRMap = new HashMap<String, List<HRegionInfo>>();
Pattern pattern = null;
Matcher matcher = null;
boolean regExpFound = false;
for (String rsName : this.targets) {
if (this.useRegExp) {
regExpFound = false;
pattern = Pattern.compile(rsName);
for (Map.Entry<String, List<HRegionInfo>> entry : fullRsAndRMap.entrySet()) {
matcher = pattern.matcher(entry.getKey());
if (matcher.matches()) {
filteredRsAndRMap.put(entry.getKey(), entry.getValue());
regExpFound = true;
}
}
if (!regExpFound) {
LOG.info("No RegionServerInfo found, regionServerPattern:" + rsName);
}
} else {
if (fullRsAndRMap.containsKey(rsName)) {
filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName));
} else {
LOG.info("No RegionServerInfo found, regionServerName:" + rsName);
}
}
}
} else {
filteredRsAndRMap = fullRsAndRMap;
}
return filteredRsAndRMap;
}
}
public static void main(String[] args) throws Exception {
final Configuration conf = HBaseConfiguration.create();
// loading the generic options to conf
new GenericOptionsParser(conf, args);
int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
LOG.info("Number of execution threads " + numThreads);
ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
Class<? extends Sink> sinkClass =
conf.getClass("hbase.canary.sink.class", RegionServerStdOutSink.class, Sink.class);
Sink sink = ReflectionUtils.newInstance(sinkClass);
int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
executor.shutdown();
System.exit(exitCode);
}
}