blob: 7401c4b8d543a0413c1cc5a6307b068db3ab3ea7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
/**
* The base class for all replication peer related procedure except sync replication state
* transition.
*/
@InterfaceAudience.Private
public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModificationState> {
private static final Logger LOG = LoggerFactory.getLogger(ModifyPeerProcedure.class);
protected ModifyPeerProcedure() {
}
protected ModifyPeerProcedure(String peerId) {
super(peerId);
}
/**
* Called before we start the actual processing. The implementation should call the pre CP hook,
* and also the pre-check for the peer modification.
* <p>
* If an IOException is thrown then we will give up and mark the procedure as failed directly. If
* all checks passes then the procedure can not be rolled back any more.
*/
protected abstract void prePeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException, InterruptedException;
protected abstract void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException;
/**
* Called before we finish the procedure. The implementation can do some logging work, and also
* call the coprocessor hook if any.
* <p>
* Notice that, since we have already done the actual work, throwing {@code IOException} here will
* not fail this procedure, we will just ignore it and finish the procedure as suceeded. If
* {@code ReplicationException} is thrown we will retry since this usually means we fails to
* update the peer storage.
*/
protected abstract void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException;
protected void releaseLatch(MasterProcedureEnv env) {
ProcedurePrepareLatch.releaseLatch(latch, this);
}
/**
* Implementation class can override this method. By default we will jump to
* POST_PEER_MODIFICATION and finish the procedure.
*/
protected PeerModificationState nextStateAfterRefresh() {
return PeerModificationState.POST_PEER_MODIFICATION;
}
/**
* The implementation class should override this method if the procedure may enter the serial
* related states.
*/
protected boolean enablePeerBeforeFinish() {
throw new UnsupportedOperationException();
}
protected ReplicationPeerConfig getOldPeerConfig() {
return null;
}
protected ReplicationPeerConfig getNewPeerConfig() {
throw new UnsupportedOperationException();
}
protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
throws IOException, ReplicationException {
throw new UnsupportedOperationException();
}
// If the table is in enabling state, we need to wait until it is enabled and then reopen all its
// regions.
private boolean needReopen(TableStateManager tsm, TableName tn) throws IOException {
for (;;) {
try {
TableState state = tsm.getTableState(tn);
if (state.isEnabled()) {
return true;
}
if (!state.isEnabling()) {
return false;
}
Thread.sleep(SLEEP_INTERVAL_MS);
} catch (TableNotFoundException e) {
return false;
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
}
}
}
// will be override in test to simulate error
protected void reopenRegions(MasterProcedureEnv env) throws IOException {
ReplicationPeerConfig peerConfig = getNewPeerConfig();
ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
TableStateManager tsm = env.getMasterServices().getTableStateManager();
for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
if (!td.hasGlobalReplicationScope()) {
continue;
}
TableName tn = td.getTableName();
if (!peerConfig.needToReplicate(tn)) {
continue;
}
if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
oldPeerConfig.needToReplicate(tn)) {
continue;
}
if (needReopen(tsm, tn)) {
addChildProcedure(new ReopenTableRegionsProcedure(tn));
}
}
}
@Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
throws ProcedureSuspendedException, InterruptedException {
switch (state) {
case PRE_PEER_MODIFICATION:
try {
prePeerModification(env);
} catch (IOException e) {
LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
"mark the procedure as failure and give up", getClass().getName(), peerId, e);
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
releaseLatch(env);
return Flow.NO_MORE_STATE;
} catch (ReplicationException e) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e));
}
resetRetry();
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
return Flow.HAS_MORE_STATE;
case UPDATE_PEER_STORAGE:
try {
updatePeerStorage(env);
} catch (ReplicationException e) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn("{} update peer storage for peer {} failed, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e));
}
resetRetry();
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS:
refreshPeer(env, getPeerOperationType());
setNextState(nextStateAfterRefresh());
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_REOPEN_REGIONS:
try {
reopenRegions(env);
} catch (Exception e) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn("{} reopen regions for peer {} failed, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e));
}
resetRetry();
setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
try {
updateLastPushedSequenceIdForSerialPeer(env);
} catch (Exception e) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e));
}
resetRetry();
setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
: PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_SET_PEER_ENABLED:
try {
enablePeer(env);
} catch (ReplicationException e) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e));
}
resetRetry();
setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
refreshPeer(env, PeerOperationType.ENABLE);
setNextState(PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
case POST_PEER_MODIFICATION:
try {
postPeerModification(env);
} catch (ReplicationException e) {
throw suspend(env.getMasterConfiguration(),
backoff -> LOG.warn(
"{} failed to call postPeerModification for peer {}, sleep {} secs",
getClass().getName(), peerId, backoff / 1000, e));
} catch (IOException e) {
LOG.warn("{} failed to call post CP hook for peer {}, " +
"ignore since the procedure has already done", getClass().getName(), peerId, e);
}
releaseLatch(env);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
}
}
@Override
protected PeerModificationState getState(int stateId) {
return PeerModificationState.forNumber(stateId);
}
@Override
protected int getStateId(PeerModificationState state) {
return state.getNumber();
}
@Override
protected PeerModificationState getInitialState() {
return PeerModificationState.PRE_PEER_MODIFICATION;
}
}