blob: b08c99098c5a6f57024fc1885813c01c93c39401 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.AuthFailedException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* A {@link BaseReplicationEndpoint} for replication endpoints whose
* target cluster is an HBase cluster.
public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
implements Abortable {
private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
private ZKWatcher zkw = null;
private final Object zkwLock = new Object();
protected Configuration conf;
private AsyncClusterConnection conn;
* Default maximum number of times a replication sink can be reported as bad before
* it will no longer be provided as a sink for replication without the pool of
* replication sinks being refreshed.
public static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
* Default ratio of the total number of peer cluster region servers to consider
* replicating to.
public static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
// Ratio of total number of potential peer region servers to be used
private float ratio;
// Maximum number of times a sink can be reported as bad before the pool of
// replication sinks is refreshed
private int badSinkThreshold;
// Count of "bad replication sink" reports per peer sink
private Map<ServerName, Integer> badReportCounts;
private List<ServerName> sinkServers = new ArrayList<>(0);
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
* Connection implementations, or initialize it in a different way, so defining createConnection
* as protected for possible overridings.
protected AsyncClusterConnection createConnection(Configuration conf) throws IOException {
return ClusterConnectionFactory.createAsyncClusterConnection(conf,
null, User.getCurrent());
public void init(Context context) throws IOException {
this.conf = HBaseConfiguration.create(ctx.getConfiguration());
this.ratio =
ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
this.badSinkThreshold =
ctx.getConfiguration().getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
this.badReportCounts = Maps.newHashMap();
protected void disconnect() {
synchronized (zkwLock) {
if (zkw != null) {
if (this.conn != null) {
try {
this.conn = null;
} catch (IOException e) {
LOG.warn("{} Failed to close the connection", ctx.getPeerId());
* A private method used to re-establish a zookeeper session with a peer cluster.
* @param ke
private void reconnect(KeeperException ke) {
if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
|| ke instanceof AuthFailedException) {
String clusterKey = ctx.getPeerConfig().getClusterKey();
LOG.warn("Lost the ZooKeeper connection for peer {}", clusterKey, ke);
try {
} catch (IOException io) {
LOG.warn("Creation of ZookeeperWatcher failed for peer {}", clusterKey, io);
public void start() {
public void stop() {
protected void doStart() {
try {
} catch (IOException e) {
protected void doStop() {
// Synchronize peer cluster connection attempts to avoid races and rate
// limit connections when multiple replication sources try to connect to
// the peer cluster. If the peer cluster is down we can get out of control
// over time.
public UUID getPeerUUID() {
UUID peerUUID = null;
try {
synchronized (zkwLock) {
peerUUID = ZKClusterId.getUUIDForCluster(zkw);
} catch (KeeperException ke) {
return peerUUID;
* Closes the current ZKW (if not null) and creates a new one
* @throws IOException If anything goes wrong connecting
private void reloadZkWatcher() throws IOException {
synchronized (zkwLock) {
if (zkw != null) {
zkw = new ZKWatcher(ctx.getConfiguration(),
"connection to cluster: " + ctx.getPeerId(), this);
zkw.registerListener(new PeerRegionServerListener(this));
private void connectPeerCluster() throws IOException {
try {
conn = createConnection(this.conf);
} catch (IOException ioe) {
LOG.warn("{} Failed to create connection for peer cluster", ctx.getPeerId(), ioe);
throw ioe;
public void abort(String why, Throwable e) {
LOG.error("The HBaseReplicationEndpoint corresponding to peer " + ctx.getPeerId()
+ " was aborted for the following reason(s):" + why, e);
public boolean isAborted() {
// Currently this is never "Aborted", we just log when the abort method is called.
return false;
* Get the list of all the region servers from the specified peer
* @return list of region server addresses or an empty list if the slave is unavailable
protected List<ServerName> fetchSlavesAddresses() {
List<String> children = null;
try {
synchronized (zkwLock) {
children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode);
} catch (KeeperException ke) {
if (LOG.isDebugEnabled()) {
LOG.debug("Fetch slaves addresses failed", ke);
if (children == null) {
return Collections.emptyList();
List<ServerName> addresses = new ArrayList<>(children.size());
for (String child : children) {
return addresses;
protected synchronized void chooseSinks() {
List<ServerName> slaveAddresses = fetchSlavesAddresses();
if (slaveAddresses.isEmpty()) {
LOG.warn("No sinks available at peer. Will not be able to replicate");
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
this.sinkServers = slaveAddresses.subList(0, numSinks);
protected synchronized int getNumSinks() {
return sinkServers.size();
* Get a randomly-chosen replication sink to replicate to.
* @return a replication sink to replicate to
protected synchronized SinkPeer getReplicationSink() throws IOException {
if (sinkServers.isEmpty()) {"Current list of sinks is out of date or empty, updating");
if (sinkServers.isEmpty()) {
throw new IOException("No replication sinks are available");
ServerName serverName =
return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
* Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
* failed). If a single SinkPeer is reported as bad more than
* replication.bad.sink.threshold times, it will be removed
* from the pool of potential replication targets.
* @param sinkPeer The SinkPeer that had a failed replication attempt on it
protected synchronized void reportBadSink(SinkPeer sinkPeer) {
ServerName serverName = sinkPeer.getServerName();
int badReportCount = badReportCounts.compute(serverName, (k, v) -> v == null ? 1 : v + 1);
if (badReportCount > badSinkThreshold) {
if (sinkServers.isEmpty()) {
* Report that a {@code SinkPeer} successfully replicated a chunk of data.
* @param sinkPeer
* The SinkPeer that had a failed replication attempt on it
protected synchronized void reportSinkSuccess(SinkPeer sinkPeer) {
List<ServerName> getSinkServers() {
return sinkServers;
* Tracks changes to the list of region servers in a peer's cluster.
public static class PeerRegionServerListener extends ZKListener {
private final HBaseReplicationEndpoint replicationEndpoint;
private final String regionServerListNode;
public PeerRegionServerListener(HBaseReplicationEndpoint endpoint) {
this.replicationEndpoint = endpoint;
this.regionServerListNode = endpoint.zkw.getZNodePaths().rsZNode;
public synchronized void nodeChildrenChanged(String path) {
if (path.equals(regionServerListNode)) {"Detected change to peer region servers, fetching updated list");
* Wraps a replication region server sink to provide the ability to identify it.
public static class SinkPeer {
private ServerName serverName;
private AsyncRegionServerAdmin regionServer;
public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
this.serverName = serverName;
this.regionServer = regionServer;
ServerName getServerName() {
return serverName;
public AsyncRegionServerAdmin getRegionServer() {
return regionServer;