blob: d01b1305039bc9721d7b4b200eca5a9a339c2400 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.LogRoller;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class PeerProcedureHandlerImpl implements PeerProcedureHandler {
private static final Logger LOG = LoggerFactory.getLogger(PeerProcedureHandlerImpl.class);
private final ReplicationSourceManager replicationSourceManager;
private final PeerActionListener peerActionListener;
private final KeyLocker<String> peersLock = new KeyLocker<>();
public PeerProcedureHandlerImpl(ReplicationSourceManager replicationSourceManager,
PeerActionListener peerActionListener) {
this.replicationSourceManager = replicationSourceManager;
this.peerActionListener = peerActionListener;
}
@Override
public void addPeer(String peerId) throws IOException {
Lock peerLock = peersLock.acquireLock(peerId);
try {
replicationSourceManager.addPeer(peerId);
} finally {
peerLock.unlock();
}
}
@Override
public void removePeer(String peerId) throws IOException {
Lock peerLock = peersLock.acquireLock(peerId);
try {
if (replicationSourceManager.getReplicationPeers().getPeer(peerId) != null) {
replicationSourceManager.removePeer(peerId);
}
} finally {
peerLock.unlock();
}
}
private void refreshPeerState(String peerId) throws ReplicationException, IOException {
Lock peerLock = peersLock.acquireLock(peerId);
ReplicationPeerImpl peer = null;
PeerState oldState = null;
boolean success = false;
try {
peer = replicationSourceManager.getReplicationPeers().getPeer(peerId);
if (peer == null) {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
oldState = peer.getPeerState();
PeerState newState = replicationSourceManager.getReplicationPeers().refreshPeerState(peerId);
// RS need to start work with the new replication state change
if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
replicationSourceManager.refreshSources(peerId);
}
success = true;
} finally {
if (!success && peer != null) {
// Reset peer state if refresh source failed
peer.setPeerState(oldState.equals(PeerState.ENABLED));
}
peerLock.unlock();
}
}
@Override
public void enablePeer(String peerId) throws ReplicationException, IOException {
refreshPeerState(peerId);
}
@Override
public void disablePeer(String peerId) throws ReplicationException, IOException {
refreshPeerState(peerId);
}
@Override
public void updatePeerConfig(String peerId) throws ReplicationException, IOException {
Lock peerLock = peersLock.acquireLock(peerId);
ReplicationPeers peers = replicationSourceManager.getReplicationPeers();
ReplicationPeerImpl peer = null;
ReplicationPeerConfig oldConfig = null;
PeerState oldState = null;
boolean success = false;
try {
peer = peers.getPeer(peerId);
if (peer == null) {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
oldConfig = peer.getPeerConfig();
oldState = peer.getPeerState();
ReplicationPeerConfig newConfig = peers.refreshPeerConfig(peerId);
// also need to refresh peer state here. When updating a serial replication peer we may
// disable it first and then enable it.
PeerState newState = peers.refreshPeerState(peerId);
// RS need to start work with the new replication config change
if (!ReplicationUtils.isNamespacesAndTableCFsEqual(oldConfig, newConfig) ||
oldConfig.isSerial() != newConfig.isSerial() ||
(oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED))) {
replicationSourceManager.refreshSources(peerId);
}
success = true;
} finally {
if (!success && peer != null) {
// Reset peer config if refresh source failed
peer.setPeerConfig(oldConfig);
peer.setPeerState(oldState.equals(PeerState.ENABLED));
}
peerLock.unlock();
}
}
@Override
public void transitSyncReplicationPeerState(String peerId, int stage, HRegionServer rs)
throws ReplicationException, IOException {
ReplicationPeers replicationPeers = replicationSourceManager.getReplicationPeers();
Lock peerLock = peersLock.acquireLock(peerId);
try {
ReplicationPeerImpl peer = replicationPeers.getPeer(peerId);
if (peer == null) {
throw new ReplicationException("Peer with id=" + peerId + " is not cached.");
}
if (!peer.getPeerConfig().isSyncReplication()) {
throw new ReplicationException("Peer with id=" + peerId + " is not synchronous.");
}
SyncReplicationState newSyncReplicationState = peer.getNewSyncReplicationState();
if (stage == 0) {
if (newSyncReplicationState != SyncReplicationState.NONE) {
LOG.warn("The new sync replication state for peer {} has already been set to {}, " +
"this should be a retry, give up", peerId, newSyncReplicationState);
return;
}
// refresh the peer state first, as when we transit to STANDBY, we may need to disable the
// peer before processing the sync replication state.
PeerState oldState = peer.getPeerState();
boolean success = false;
try {
PeerState newState = replicationPeers.refreshPeerState(peerId);
if (oldState.equals(PeerState.ENABLED) && newState.equals(PeerState.DISABLED)) {
replicationSourceManager.refreshSources(peerId);
}
success = true;
} finally {
if (!success) {
peer.setPeerState(oldState.equals(PeerState.ENABLED));
}
}
newSyncReplicationState = replicationPeers.refreshPeerNewSyncReplicationState(peerId);
SyncReplicationState oldSyncReplicationState = peer.getSyncReplicationState();
peerActionListener.peerSyncReplicationStateChange(peerId, oldSyncReplicationState,
newSyncReplicationState, stage);
} else {
if (newSyncReplicationState == SyncReplicationState.NONE) {
LOG.warn(
"The new sync replication state for peer {} has already been clear, and the " +
"current state is {}, this should be a retry, give up",
peerId, newSyncReplicationState);
return;
}
if (newSyncReplicationState == SyncReplicationState.STANDBY) {
replicationSourceManager.drainSources(peerId);
// Need to roll the wals and make the ReplicationSource for this peer track the new file.
// If we do not do this, there will be two problems that can not be addressed at the same
// time. First, if we just throw away the current wal file, and later when we transit the
// peer to DA, and the wal has not been rolled yet, then the new data written to the wal
// file will not be replicated and cause data inconsistency. But if we just track the
// current wal file without rolling, it may contains some data before we transit the peer
// to S, later if we transit the peer to DA, the data will also be replicated and cause
// data inconsistency. So here we need to roll the wal, and let the ReplicationSource
// track the new wal file, and throw the old wal files away.
LogRoller roller = rs.getWalRoller();
roller.requestRollAll();
try {
roller.waitUntilWalRollFinished();
} catch (InterruptedException e) {
// reset the interrupted flag
Thread.currentThread().interrupt();
throw (IOException) new InterruptedIOException(
"Interrupted while waiting for wal roll finish").initCause(e);
}
}
SyncReplicationState oldState = peer.getSyncReplicationState();
peerActionListener.peerSyncReplicationStateChange(peerId, oldState, newSyncReplicationState,
stage);
peer.transitSyncReplicationState();
}
} finally {
peerLock.unlock();
}
}
}