blob: 8985d2f629edeb717adcb8b98c29e149e70ea6f9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.replication;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The base class for all replication peer related procedure.
*/
@InterfaceAudience.Private
public abstract class AbstractPeerProcedure<TState> extends AbstractPeerNoLockProcedure<TState>
implements PeerProcedureInterface {
private static final Logger LOG = LoggerFactory.getLogger(AbstractPeerProcedure.class);
protected static final int UPDATE_LAST_SEQ_ID_BATCH_SIZE = 1000;
// The sleep interval when waiting table to be enabled or disabled.
protected static final int SLEEP_INTERVAL_MS = 1000;
// used to keep compatible with old client where we can only returns after updateStorage.
protected ProcedurePrepareLatch latch;
protected AbstractPeerProcedure() {
}
protected AbstractPeerProcedure(String peerId) {
super(peerId);
this.latch = ProcedurePrepareLatch.createLatch(2, 1);
}
public ProcedurePrepareLatch getLatch() {
return latch;
}
@Override
protected LockState acquireLock(MasterProcedureEnv env) {
if (env.getProcedureScheduler().waitPeerExclusiveLock(this, peerId)) {
return LockState.LOCK_EVENT_WAIT;
}
return LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(MasterProcedureEnv env) {
env.getProcedureScheduler().wakePeerExclusiveLock(this, peerId);
}
@Override
protected boolean holdLock(MasterProcedureEnv env) {
return true;
}
protected final void refreshPeer(MasterProcedureEnv env, PeerOperationType type) {
addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream()
.map(sn -> new RefreshPeerProcedure(peerId, type, sn)).toArray(RefreshPeerProcedure[]::new));
}
// will be override in test to simulate error
protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
env.getReplicationPeerManager().enablePeer(peerId);
}
private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
ReplicationQueueStorage queueStorage) throws ReplicationException {
if (barrier >= 0) {
lastSeqIds.put(encodedRegionName, barrier);
if (lastSeqIds.size() >= UPDATE_LAST_SEQ_ID_BATCH_SIZE) {
queueStorage.setLastSequenceIds(peerId, lastSeqIds);
lastSeqIds.clear();
}
}
}
protected final void setLastPushedSequenceId(MasterProcedureEnv env,
ReplicationPeerConfig peerConfig) throws IOException, ReplicationException {
Map<String, Long> lastSeqIds = new HashMap<String, Long>();
for (TableDescriptor td : env.getMasterServices().getTableDescriptors().getAll().values()) {
if (!td.hasGlobalReplicationScope()) {
continue;
}
TableName tn = td.getTableName();
if (!peerConfig.needToReplicate(tn)) {
continue;
}
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
}
if (!lastSeqIds.isEmpty()) {
env.getReplicationPeerManager().getQueueStorage().setLastSequenceIds(peerId, lastSeqIds);
}
}
// If the table is currently disabling, then we need to wait until it is disabled.We will write
// replication barrier for a disabled table. And return whether we need to update the last pushed
// sequence id, if the table has been deleted already, i.e, we hit TableStateNotFoundException,
// then we do not need to update last pushed sequence id for this table.
private boolean needSetLastPushedSequenceId(TableStateManager tsm, TableName tn)
throws IOException {
for (;;) {
try {
if (!tsm.getTableState(tn).isDisabling()) {
return true;
}
Thread.sleep(SLEEP_INTERVAL_MS);
} catch (TableNotFoundException e) {
return false;
} catch (InterruptedException e) {
throw (IOException) new InterruptedIOException(e.getMessage()).initCause(e);
}
}
}
// Will put the encodedRegionName->lastPushedSeqId pair into the map passed in, if the map is
// large enough we will call queueStorage.setLastSequenceIds and clear the map. So the caller
// should not forget to check whether the map is empty at last, if not you should call
// queueStorage.setLastSequenceIds to write out the remaining entries in the map.
protected final void setLastPushedSequenceIdForTable(MasterProcedureEnv env, TableName tableName,
Map<String, Long> lastSeqIds) throws IOException, ReplicationException {
TableStateManager tsm = env.getMasterServices().getTableStateManager();
ReplicationQueueStorage queueStorage = env.getReplicationPeerManager().getQueueStorage();
Connection conn = env.getMasterServices().getConnection();
if (!needSetLastPushedSequenceId(tsm, tableName)) {
LOG.debug("Skip settting last pushed sequence id for {}", tableName);
return;
}
for (Pair<String, Long> name2Barrier : ReplicationBarrierFamilyFormat
.getTableEncodedRegionNameAndLastBarrier(conn, tableName)) {
LOG.trace("Update last pushed sequence id for {}, {}", tableName, name2Barrier);
addToMap(lastSeqIds, name2Barrier.getFirst(), name2Barrier.getSecond().longValue() - 1,
queueStorage);
}
}
}