blob: ebe99da3541b01a60fae054d0f60db51cc119927 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/**
* This provides an class for maintaining a set of peer clusters. These peers are remote slave
* clusters that data is replicated to.
*/
@InterfaceAudience.Private
public class ReplicationPeers {
private final Configuration conf;
// Map of peer clusters keyed by their id
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
private final ReplicationPeerStorage peerStorage;
ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
this.conf = conf;
this.peerCache = new ConcurrentHashMap<>();
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
}
public Configuration getConf() {
return conf;
}
public void init() throws ReplicationException {
// Loading all existing peerIds into peer cache.
for (String peerId : this.peerStorage.listPeerIds()) {
addPeer(peerId);
}
}
public ReplicationPeerStorage getPeerStorage() {
return this.peerStorage;
}
/**
* Method called after a peer has been connected. It will create a ReplicationPeer to track the
* newly connected cluster.
* @param peerId a short that identifies the cluster
* @return whether a ReplicationPeer was successfully created
* @throws ReplicationException if connecting to the peer fails
*/
public boolean addPeer(String peerId) throws ReplicationException {
if (this.peerCache.containsKey(peerId)) {
return false;
}
peerCache.put(peerId, createPeer(peerId));
return true;
}
public ReplicationPeerImpl removePeer(String peerId) {
return peerCache.remove(peerId);
}
/**
* Returns the ReplicationPeerImpl for the specified cached peer. This ReplicationPeer will
* continue to track changes to the Peer's state and config. This method returns null if no peer
* has been cached with the given peerId.
* @param peerId id for the peer
* @return ReplicationPeer object
*/
public ReplicationPeerImpl getPeer(String peerId) {
return peerCache.get(peerId);
}
/**
* Returns the set of peerIds of the clusters that have been connected and have an underlying
* ReplicationPeer.
* @return a Set of Strings for peerIds
*/
public Set<String> getAllPeerIds() {
return Collections.unmodifiableSet(peerCache.keySet());
}
public Map<String, ReplicationPeerImpl> getPeerCache() {
return Collections.unmodifiableMap(peerCache);
}
public PeerState refreshPeerState(String peerId) throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
peer.setPeerState(peerStorage.isPeerEnabled(peerId));
return peer.getPeerState();
}
public ReplicationPeerConfig refreshPeerConfig(String peerId) throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
peer.setPeerConfig(peerStorage.getPeerConfig(peerId));
return peer.getPeerConfig();
}
public SyncReplicationState refreshPeerNewSyncReplicationState(String peerId)
throws ReplicationException {
ReplicationPeerImpl peer = peerCache.get(peerId);
SyncReplicationState newState = peerStorage.getPeerNewSyncReplicationState(peerId);
peer.setNewSyncReplicationState(newState);
return newState;
}
public void transitPeerSyncReplicationState(String peerId) {
ReplicationPeerImpl peer = peerCache.get(peerId);
peer.transitSyncReplicationState();
}
/**
* Helper method to connect to a peer
* @param peerId peer's identifier
* @return object representing the peer
*/
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId);
SyncReplicationState syncReplicationState = peerStorage.getPeerSyncReplicationState(peerId);
SyncReplicationState newSyncReplicationState =
peerStorage.getPeerNewSyncReplicationState(peerId);
return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf),
peerId, peerConfig, enabled, syncReplicationState, newSyncReplicationState);
}
}