blob: ba6da7af3befde2c881a8a1192038b9f27010c49 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.replication;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
* This provides an class for maintaining a set of peer clusters. These peers are remote slave
* clusters that data is replicated to.
public class ReplicationPeers {
private final Configuration conf;
// Map of peer clusters keyed by their id
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
private final ReplicationPeerStorage peerStorage;
ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
this.conf = conf;
this.peerCache = new ConcurrentHashMap<>();
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
public Configuration getConf() {
return conf;
public void init() throws ReplicationException {
// Loading all existing peerIds into peer cache.
for (String peerId : this.peerStorage.listPeerIds()) {
public ReplicationPeerStorage getPeerStorage() {
return this.peerStorage;
* Method called after a peer has been connected. It will create a ReplicationPeer to track the
* newly connected cluster.
* @param peerId a short that identifies the cluster
* @return whether a ReplicationPeer was successfully created
* @throws ReplicationException if connecting to the peer fails
public boolean addPeer(String peerId) throws ReplicationException {
if (this.peerCache.containsKey(peerId)) {
return false;
peerCache.put(peerId, createPeer(peerId));
return true;
public ReplicationPeerImpl removePeer(String peerId) {
return peerCache.remove(peerId);
* Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will
* continue to track changes to the Peer's state and config. This method returns null if no peer
* has been cached with the given peerId.
* @param peerId id for the peer
* @return ReplicationPeer object
public ReplicationPeerImpl getPeer(String peerId) {
return peerCache.get(peerId);
* Returns the set of peerIds of the clusters that have been connected and have an underlying
* ReplicationPeer.
* @return a Set of Strings for peerIds
public Set<String> getAllPeerIds() {
return Collections.unmodifiableSet(peerCache.keySet());
public Map<String, ReplicationPeerImpl> getPeerCache() {
return Collections.unmodifiableMap(peerCache);
public PeerState refreshPeerState(String peerId) throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
return peer.getPeerState();
public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
return peer.getPeerConfig();
public SyncReplicationState refreshPeerNewSyncReplicationState(String peerId)
throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
SyncReplicationState newState = peerStorage.getPeerNewSyncReplicationState(peerId);
return newState;
public void transitPeerSyncReplicationState(String peerId) {
ReplicationPeerImpl peer = peerCache.get(peerId);
* Helper method to connect to a peer
* @param peerId peer's identifier
* @return object representing the peer
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId);
SyncReplicationState newSyncReplicationState =
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState);