| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.master.replication; |
| |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.Semaphore; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.DoNotRetryIOException; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; |
| import org.apache.hadoop.hbase.replication.ReplicationEndpoint; |
| import org.apache.hadoop.hbase.replication.ReplicationException; |
| import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; |
| import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; |
| import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; |
| import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; |
| import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; |
| import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; |
| import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; |
| import org.apache.hadoop.hbase.replication.ReplicationUtils; |
| import org.apache.hadoop.hbase.replication.SyncReplicationState; |
| import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; |
| import org.apache.hadoop.hbase.zookeeper.ZKConfig; |
| import org.apache.hadoop.hbase.zookeeper.ZKWatcher; |
| import org.apache.yetus.audience.InterfaceAudience; |
| |
| import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; |
| import org.apache.hbase.thirdparty.com.google.common.collect.Maps; |
| |
| /** |
| * Manages and performs all replication admin operations. |
| * <p> |
| * Used to add/remove a replication peer. |
| */ |
| @InterfaceAudience.Private |
| public class ReplicationPeerManager { |
| |
| private final ReplicationPeerStorage peerStorage; |
| |
| private final ReplicationQueueStorage queueStorage; |
| |
| private final ConcurrentMap<String, ReplicationPeerDescription> peers; |
| |
| private final ImmutableMap<SyncReplicationState, EnumSet<SyncReplicationState>> |
| allowedTransition = Maps.immutableEnumMap(ImmutableMap.of(SyncReplicationState.ACTIVE, |
| EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE, SyncReplicationState.STANDBY), |
| SyncReplicationState.STANDBY, EnumSet.of(SyncReplicationState.DOWNGRADE_ACTIVE), |
| SyncReplicationState.DOWNGRADE_ACTIVE, |
| EnumSet.of(SyncReplicationState.STANDBY, SyncReplicationState.ACTIVE))); |
| |
| // Only allow to add one sync replication peer concurrently |
| private final Semaphore syncReplicationPeerLock = new Semaphore(1); |
| |
| ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, |
| ConcurrentMap<String, ReplicationPeerDescription> peers) { |
| this.peerStorage = peerStorage; |
| this.queueStorage = queueStorage; |
| this.peers = peers; |
| } |
| |
| private void checkQueuesDeleted(String peerId) |
| throws ReplicationException, DoNotRetryIOException { |
| for (ServerName replicator : queueStorage.getListOfReplicators()) { |
| List<String> queueIds = queueStorage.getAllQueues(replicator); |
| for (String queueId : queueIds) { |
| ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); |
| if (queueInfo.getPeerId().equals(peerId)) { |
| throw new DoNotRetryIOException("undeleted queue for peerId: " + peerId + |
| ", replicator: " + replicator + ", queueId: " + queueId); |
| } |
| } |
| } |
| if (queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) { |
| throw new DoNotRetryIOException("Undeleted queue for peer " + peerId + " in hfile-refs"); |
| } |
| } |
| |
| void preAddPeer(String peerId, ReplicationPeerConfig peerConfig) |
| throws DoNotRetryIOException, ReplicationException { |
| if (peerId.contains("-")) { |
| throw new DoNotRetryIOException("Found invalid peer name: " + peerId); |
| } |
| checkPeerConfig(peerConfig); |
| if (peerConfig.isSyncReplication()) { |
| checkSyncReplicationPeerConfigConflict(peerConfig); |
| } |
| if (peers.containsKey(peerId)) { |
| throw new DoNotRetryIOException("Replication peer " + peerId + " already exists"); |
| } |
| // make sure that there is no queues with the same peer id. This may happen when we create a |
| // peer with the same id with a old deleted peer. If the replication queues for the old peer |
| // have not been cleaned up yet then we should not create the new peer, otherwise the old wal |
| // file may also be replicated. |
| checkQueuesDeleted(peerId); |
| } |
| |
| private ReplicationPeerDescription checkPeerExists(String peerId) throws DoNotRetryIOException { |
| ReplicationPeerDescription desc = peers.get(peerId); |
| if (desc == null) { |
| throw new DoNotRetryIOException("Replication peer " + peerId + " does not exist"); |
| } |
| return desc; |
| } |
| |
| private void checkPeerInDAStateIfSyncReplication(String peerId) throws DoNotRetryIOException { |
| ReplicationPeerDescription desc = peers.get(peerId); |
| if (desc != null && desc.getPeerConfig().isSyncReplication() |
| && !SyncReplicationState.DOWNGRADE_ACTIVE.equals(desc.getSyncReplicationState())) { |
| throw new DoNotRetryIOException("Couldn't remove synchronous replication peer with state=" |
| + desc.getSyncReplicationState() |
| + ", Transit the synchronous replication state to be DOWNGRADE_ACTIVE firstly."); |
| } |
| } |
| |
| ReplicationPeerConfig preRemovePeer(String peerId) throws DoNotRetryIOException { |
| ReplicationPeerDescription pd = checkPeerExists(peerId); |
| checkPeerInDAStateIfSyncReplication(peerId); |
| return pd.getPeerConfig(); |
| } |
| |
| void preEnablePeer(String peerId) throws DoNotRetryIOException { |
| ReplicationPeerDescription desc = checkPeerExists(peerId); |
| if (desc.isEnabled()) { |
| throw new DoNotRetryIOException("Replication peer " + peerId + " has already been enabled"); |
| } |
| } |
| |
| void preDisablePeer(String peerId) throws DoNotRetryIOException { |
| ReplicationPeerDescription desc = checkPeerExists(peerId); |
| if (!desc.isEnabled()) { |
| throw new DoNotRetryIOException("Replication peer " + peerId + " has already been disabled"); |
| } |
| } |
| |
| /** |
| * Return the old peer description. Can never be null. |
| */ |
| ReplicationPeerDescription preUpdatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) |
| throws DoNotRetryIOException { |
| checkPeerConfig(peerConfig); |
| ReplicationPeerDescription desc = checkPeerExists(peerId); |
| ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); |
| if (!isStringEquals(peerConfig.getClusterKey(), oldPeerConfig.getClusterKey())) { |
| throw new DoNotRetryIOException( |
| "Changing the cluster key on an existing peer is not allowed. Existing key '" + |
| oldPeerConfig.getClusterKey() + "' for peer " + peerId + " does not match new key '" + |
| peerConfig.getClusterKey() + "'"); |
| } |
| |
| if (!isStringEquals(peerConfig.getReplicationEndpointImpl(), |
| oldPeerConfig.getReplicationEndpointImpl())) { |
| throw new DoNotRetryIOException("Changing the replication endpoint implementation class " + |
| "on an existing peer is not allowed. Existing class '" + |
| oldPeerConfig.getReplicationEndpointImpl() + "' for peer " + peerId + |
| " does not match new class '" + peerConfig.getReplicationEndpointImpl() + "'"); |
| } |
| |
| if (!isStringEquals(peerConfig.getRemoteWALDir(), oldPeerConfig.getRemoteWALDir())) { |
| throw new DoNotRetryIOException( |
| "Changing the remote wal dir on an existing peer is not allowed. Existing remote wal " + |
| "dir '" + oldPeerConfig.getRemoteWALDir() + "' for peer " + peerId + |
| " does not match new remote wal dir '" + peerConfig.getRemoteWALDir() + "'"); |
| } |
| |
| if (oldPeerConfig.isSyncReplication()) { |
| if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldPeerConfig, peerConfig)) { |
| throw new DoNotRetryIOException( |
| "Changing the replicated namespace/table config on a synchronous replication " + |
| "peer(peerId: " + peerId + ") is not allowed."); |
| } |
| } |
| return desc; |
| } |
| |
| /** |
| * @return the old desciption of the peer |
| */ |
| ReplicationPeerDescription preTransitPeerSyncReplicationState(String peerId, |
| SyncReplicationState state) throws DoNotRetryIOException { |
| ReplicationPeerDescription desc = checkPeerExists(peerId); |
| SyncReplicationState fromState = desc.getSyncReplicationState(); |
| EnumSet<SyncReplicationState> allowedToStates = allowedTransition.get(fromState); |
| if (allowedToStates == null || !allowedToStates.contains(state)) { |
| throw new DoNotRetryIOException("Can not transit current cluster state from " + fromState + |
| " to " + state + " for peer id=" + peerId); |
| } |
| return desc; |
| } |
| |
| public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled) |
| throws ReplicationException { |
| if (peers.containsKey(peerId)) { |
| // this should be a retry, just return |
| return; |
| } |
| ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build(); |
| SyncReplicationState syncReplicationState = |
| copiedPeerConfig.isSyncReplication() ? SyncReplicationState.DOWNGRADE_ACTIVE |
| : SyncReplicationState.NONE; |
| peerStorage.addPeer(peerId, copiedPeerConfig, enabled, syncReplicationState); |
| peers.put(peerId, |
| new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig, syncReplicationState)); |
| } |
| |
| public void removePeer(String peerId) throws ReplicationException { |
| if (!peers.containsKey(peerId)) { |
| // this should be a retry, just return |
| return; |
| } |
| peerStorage.removePeer(peerId); |
| peers.remove(peerId); |
| } |
| |
| private void setPeerState(String peerId, boolean enabled) throws ReplicationException { |
| ReplicationPeerDescription desc = peers.get(peerId); |
| if (desc.isEnabled() == enabled) { |
| // this should be a retry, just return |
| return; |
| } |
| peerStorage.setPeerState(peerId, enabled); |
| peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, desc.getPeerConfig(), |
| desc.getSyncReplicationState())); |
| } |
| |
| public void enablePeer(String peerId) throws ReplicationException { |
| setPeerState(peerId, true); |
| } |
| |
| public void disablePeer(String peerId) throws ReplicationException { |
| setPeerState(peerId, false); |
| } |
| |
| public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig) |
| throws ReplicationException { |
| // the checking rules are too complicated here so we give up checking whether this is a retry. |
| ReplicationPeerDescription desc = peers.get(peerId); |
| ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig(); |
| ReplicationPeerConfigBuilder newPeerConfigBuilder = |
| ReplicationPeerConfig.newBuilder(peerConfig); |
| // we need to use the new conf to overwrite the old one. |
| newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); |
| newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); |
| newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration()); |
| newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration()); |
| ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build(); |
| peerStorage.updatePeerConfig(peerId, newPeerConfig); |
| peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig, |
| desc.getSyncReplicationState())); |
| } |
| |
| public List<ReplicationPeerDescription> listPeers(Pattern pattern) { |
| if (pattern == null) { |
| return new ArrayList<>(peers.values()); |
| } |
| return peers.values().stream().filter(r -> pattern.matcher(r.getPeerId()).matches()) |
| .collect(Collectors.toList()); |
| } |
| |
| public Optional<ReplicationPeerConfig> getPeerConfig(String peerId) { |
| ReplicationPeerDescription desc = peers.get(peerId); |
| return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); |
| } |
| |
| void removeAllLastPushedSeqIds(String peerId) throws ReplicationException { |
| queueStorage.removeLastSequenceIds(peerId); |
| } |
| |
| public void setPeerNewSyncReplicationState(String peerId, SyncReplicationState state) |
| throws ReplicationException { |
| peerStorage.setPeerNewSyncReplicationState(peerId, state); |
| } |
| |
| public void transitPeerSyncReplicationState(String peerId, SyncReplicationState newState) |
| throws ReplicationException { |
| if (peerStorage.getPeerNewSyncReplicationState(peerId) != SyncReplicationState.NONE) { |
| // Only transit if this is not a retry |
| peerStorage.transitPeerSyncReplicationState(peerId); |
| } |
| ReplicationPeerDescription desc = peers.get(peerId); |
| if (desc.getSyncReplicationState() != newState) { |
| // Only recreate the desc if this is not a retry |
| peers.put(peerId, |
| new ReplicationPeerDescription(peerId, desc.isEnabled(), desc.getPeerConfig(), newState)); |
| } |
| } |
| |
| public void removeAllQueues(String peerId) throws ReplicationException { |
| // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still |
| // on-going when the refresh peer config procedure is done, if a RS which has already been |
| // scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in |
| // the scan here, and if the RS who has claimed the queue crashed before creating recovered |
| // source, then the queue will leave there until the another RS detects the crash and helps |
| // removing the queue. |
| // A two pass scan can solve the problem. Anyway, the queue will not disappear during the |
| // claiming, it will either under the old RS or under the new RS, and a queue can only be |
| // claimed once after the refresh peer procedure done(as the next claim queue will just delete |
| // it), so we can make sure that a two pass scan will finally find the queue and remove it, |
| // unless it has already been removed by others. |
| ReplicationUtils.removeAllQueues(queueStorage, peerId); |
| ReplicationUtils.removeAllQueues(queueStorage, peerId); |
| } |
| |
| public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { |
| removeAllQueues(peerId); |
| queueStorage.removePeerFromHFileRefs(peerId); |
| } |
| |
| private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { |
| String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); |
| boolean checkClusterKey = true; |
| if (!StringUtils.isBlank(replicationEndpointImpl)) { |
| // try creating a instance |
| ReplicationEndpoint endpoint; |
| try { |
| endpoint = Class.forName(replicationEndpointImpl) |
| .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance(); |
| } catch (Throwable e) { |
| throw new DoNotRetryIOException( |
| "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl, |
| e); |
| } |
| // do not check cluster key if we are not HBaseInterClusterReplicationEndpoint |
| if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) { |
| checkClusterKey = false; |
| } |
| } |
| if (checkClusterKey) { |
| checkClusterKey(peerConfig.getClusterKey()); |
| } |
| |
| if (peerConfig.replicateAllUserTables()) { |
| // If replicate_all flag is true, it means all user tables will be replicated to peer cluster. |
| // Then allow config exclude namespaces or exclude table-cfs which can't be replicated to peer |
| // cluster. |
| if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) || |
| (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { |
| throw new DoNotRetryIOException("Need clean namespaces or table-cfs config firstly " + |
| "when you want replicate all cluster"); |
| } |
| checkNamespacesAndTableCfsConfigConflict(peerConfig.getExcludeNamespaces(), |
| peerConfig.getExcludeTableCFsMap()); |
| } else { |
| // If replicate_all flag is false, it means all user tables can't be replicated to peer |
| // cluster. Then allow to config namespaces or table-cfs which will be replicated to peer |
| // cluster. |
| if ((peerConfig.getExcludeNamespaces() != null && |
| !peerConfig.getExcludeNamespaces().isEmpty()) || |
| (peerConfig.getExcludeTableCFsMap() != null && |
| !peerConfig.getExcludeTableCFsMap().isEmpty())) { |
| throw new DoNotRetryIOException( |
| "Need clean exclude-namespaces or exclude-table-cfs config firstly" + |
| " when replicate_all flag is false"); |
| } |
| checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(), |
| peerConfig.getTableCFsMap()); |
| } |
| |
| if (peerConfig.isSyncReplication()) { |
| checkPeerConfigForSyncReplication(peerConfig); |
| } |
| |
| checkConfiguredWALEntryFilters(peerConfig); |
| } |
| |
| private void checkPeerConfigForSyncReplication(ReplicationPeerConfig peerConfig) |
| throws DoNotRetryIOException { |
| // This is used to reduce the difficulty for implementing the sync replication state transition |
| // as we need to reopen all the related regions. |
| // TODO: Add namespace, replicat_all flag back |
| if (peerConfig.replicateAllUserTables()) { |
| throw new DoNotRetryIOException( |
| "Only support replicated table config for sync replication peer"); |
| } |
| if (peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) { |
| throw new DoNotRetryIOException( |
| "Only support replicated table config for sync replication peer"); |
| } |
| if (peerConfig.getTableCFsMap() == null || peerConfig.getTableCFsMap().isEmpty()) { |
| throw new DoNotRetryIOException("Need config replicated tables for sync replication peer"); |
| } |
| for (List<String> cfs : peerConfig.getTableCFsMap().values()) { |
| if (cfs != null && !cfs.isEmpty()) { |
| throw new DoNotRetryIOException( |
| "Only support replicated table config for sync replication peer"); |
| } |
| } |
| |
| Path remoteWALDir = new Path(peerConfig.getRemoteWALDir()); |
| if (!remoteWALDir.isAbsolute()) { |
| throw new DoNotRetryIOException( |
| "The remote WAL directory " + peerConfig.getRemoteWALDir() + " is not absolute"); |
| } |
| URI remoteWALDirUri = remoteWALDir.toUri(); |
| if (remoteWALDirUri.getScheme() == null || remoteWALDirUri.getAuthority() == null) { |
| throw new DoNotRetryIOException("The remote WAL directory " + peerConfig.getRemoteWALDir() + |
| " is not qualified, you must provide scheme and authority"); |
| } |
| } |
| |
| private void checkSyncReplicationPeerConfigConflict(ReplicationPeerConfig peerConfig) |
| throws DoNotRetryIOException { |
| for (TableName tableName : peerConfig.getTableCFsMap().keySet()) { |
| for (Map.Entry<String, ReplicationPeerDescription> entry : peers.entrySet()) { |
| ReplicationPeerConfig rpc = entry.getValue().getPeerConfig(); |
| if (rpc.isSyncReplication() && rpc.getTableCFsMap().containsKey(tableName)) { |
| throw new DoNotRetryIOException( |
| "Table " + tableName + " has been replicated by peer " + entry.getKey()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Set a namespace in the peer config means that all tables in this namespace will be replicated |
| * to the peer cluster. |
| * <ol> |
| * <li>If peer config already has a namespace, then not allow set any table of this namespace to |
| * the peer config.</li> |
| * <li>If peer config already has a table, then not allow set this table's namespace to the peer |
| * config.</li> |
| * </ol> |
| * <p> |
| * Set a exclude namespace in the peer config means that all tables in this namespace can't be |
| * replicated to the peer cluster. |
| * <ol> |
| * <li>If peer config already has a exclude namespace, then not allow set any exclude table of |
| * this namespace to the peer config.</li> |
| * <li>If peer config already has a exclude table, then not allow set this table's namespace as a |
| * exclude namespace.</li> |
| * </ol> |
| */ |
| private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces, |
| Map<TableName, ? extends Collection<String>> tableCfs) throws DoNotRetryIOException { |
| if (namespaces == null || namespaces.isEmpty()) { |
| return; |
| } |
| if (tableCfs == null || tableCfs.isEmpty()) { |
| return; |
| } |
| for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) { |
| TableName table = entry.getKey(); |
| if (namespaces.contains(table.getNamespaceAsString())) { |
| throw new DoNotRetryIOException("Table-cfs " + table + " is conflict with namespaces " + |
| table.getNamespaceAsString() + " in peer config"); |
| } |
| } |
| } |
| |
| private void checkConfiguredWALEntryFilters(ReplicationPeerConfig peerConfig) |
| throws DoNotRetryIOException { |
| String filterCSV = peerConfig.getConfiguration() |
| .get(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY); |
| if (filterCSV != null && !filterCSV.isEmpty()) { |
| String[] filters = filterCSV.split(","); |
| for (String filter : filters) { |
| try { |
| Class.forName(filter).getDeclaredConstructor().newInstance(); |
| } catch (Exception e) { |
| throw new DoNotRetryIOException("Configured WALEntryFilter " + filter + |
| " could not be created. Failing add/update peer operation.", e); |
| } |
| } |
| } |
| } |
| |
| private void checkClusterKey(String clusterKey) throws DoNotRetryIOException { |
| try { |
| ZKConfig.validateClusterKey(clusterKey); |
| } catch (IOException e) { |
| throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey, e); |
| } |
| } |
| |
| public List<String> getSerialPeerIdsBelongsTo(TableName tableName) { |
| return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) |
| .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId()) |
| .collect(Collectors.toList()); |
| } |
| |
| public ReplicationQueueStorage getQueueStorage() { |
| return queueStorage; |
| } |
| |
| public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) |
| throws ReplicationException { |
| ReplicationPeerStorage peerStorage = |
| ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); |
| ConcurrentMap<String, ReplicationPeerDescription> peers = new ConcurrentHashMap<>(); |
| for (String peerId : peerStorage.listPeerIds()) { |
| ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); |
| boolean enabled = peerStorage.isPeerEnabled(peerId); |
| SyncReplicationState state = peerStorage.getPeerSyncReplicationState(peerId); |
| peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state)); |
| } |
| return new ReplicationPeerManager(peerStorage, |
| ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); |
| } |
| |
| /** |
| * For replication peer cluster key or endpoint class, null and empty string is same. So here |
| * don't use {@link StringUtils#equals(CharSequence, CharSequence)} directly. |
| */ |
| private boolean isStringEquals(String s1, String s2) { |
| if (StringUtils.isBlank(s1)) { |
| return StringUtils.isBlank(s2); |
| } |
| return s1.equals(s2); |
| } |
| |
| public void acquireSyncReplicationPeerLock() throws InterruptedException { |
| syncReplicationPeerLock.acquire(); |
| } |
| |
| public void releaseSyncReplicationPeerLock() { |
| syncReplicationPeerLock.release(); |
| } |
| } |