blob: fdc1e5414d00bb9c4b2645b9232187dd731af2e3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat;
import org.apache.hadoop.hbase.replication.ReplicationBarrierFamilyFormat.ReplicationBarrierResult;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
/**
* <p>
* Helper class to determine whether we can push a given WAL entry without breaking the replication
* order. The class is designed to per {@link ReplicationSourceWALReader}, so not thread safe.
* </p>
* <p>
* We record all the open sequence number for a region in a special family in meta, which is called
* 'rep_barrier', so there will be a sequence of open sequence number (b1, b2, b3, ...). We call
* [bn, bn+1) a range, and it is obvious that a region will always be on the same RS within a
* range.
* <p>
* When split and merge, we will also record the parent for the generated region(s) in the special
* family in meta. And also, we will write an extra 'open sequence number' for the parent
* region(s), which is the max sequence id of the region plus one.
* </p>
* </p>
* <p>
* For each peer, we record the last pushed sequence id for each region. It is managed by the
* replication storage.
* </p>
* <p>
* The algorithm works like this:
* <ol>
* <li>Locate the sequence id we want to push in the barriers</li>
* <li>If it is before the first barrier, we are safe to push. This usually because we enable serial
* replication for this table after we create the table and write data into the table.</li>
* <li>In general, if the previous range is finished, then we are safe to push. The way to determine
* whether a range is finish is straight-forward: check whether the last pushed sequence id is equal
* to the end barrier of the range minus 1. There are several exceptions:
* <ul>
* <li>If it is in the first range, we need to check whether there are parent regions. If so, we
* need to make sure that the data for parent regions have all been pushed.</li>
* <li>If it is in the last range, we need to check the region state. If state is OPENING, then we
* are not safe to push. This is because that, before we call reportRIT to master which update the
* open sequence number into meta table, we will write a open region event marker to WAL first, and
* its sequence id is greater than the newest open sequence number(which has not been updated to
* meta table yet so we do not know). For this scenario, the WAL entry for this open region event
* marker actually belongs to the range after the 'last' range, so we are not safe to push it.
* Otherwise the last pushed sequence id will be updated to this value and then we think the
* previous range has already been finished, but this is not true.</li>
* <li>Notice that the above two exceptions are not conflicts, since the first range can also be the
* last range if we only have one range.</li>
* </ul>
* </li>
* </ol>
* </p>
* <p>
* And for performance reason, we do not want to check meta for every WAL entry, so we introduce two
* in memory maps. The idea is simple:
* <ul>
* <li>If a range can be pushed, then put its end barrier into the {@code canPushUnder} map.</li>
* <li>Before accessing meta, first check the sequence id stored in the {@code canPushUnder} map. If
* the sequence id of WAL entry is less the one stored in {@code canPushUnder} map, then we are safe
* to push.</li>
* </ul>
* And for the last range, we do not have an end barrier, so we use the continuity of sequence id to
* determine whether we can push. The rule is:
* <ul>
* <li>When an entry is able to push, then put its sequence id into the {@code pushed} map.</li>
* <li>Check if the sequence id of WAL entry equals to the one stored in the {@code pushed} map plus
* one. If so, we are safe to push, and also update the {@code pushed} map with the sequence id of
* the WAL entry.</li>
* </ul>
* </p>
*/
@InterfaceAudience.Private
class SerialReplicationChecker {
private static final Logger LOG = LoggerFactory.getLogger(SerialReplicationChecker.class);
public static final String REPLICATION_SERIALLY_WAITING_KEY =
"hbase.serial.replication.waiting.ms";
public static final long REPLICATION_SERIALLY_WAITING_DEFAULT = 10000;
private final String peerId;
private final ReplicationQueueStorage storage;
private final Connection conn;
private final long waitTimeMs;
private final LoadingCache<String, MutableLong> pushed = CacheBuilder.newBuilder()
.expireAfterAccess(1, TimeUnit.DAYS).build(new CacheLoader<String, MutableLong>() {
@Override
public MutableLong load(String key) throws Exception {
return new MutableLong(HConstants.NO_SEQNUM);
}
});
// Use guava cache to set ttl for each key
private final Cache<String, Long> canPushUnder =
CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.DAYS).build();
public SerialReplicationChecker(Configuration conf, ReplicationSource source) {
this.peerId = source.getPeerId();
this.storage = source.getReplicationQueueStorage();
this.conn = source.getServer().getConnection();
this.waitTimeMs =
conf.getLong(REPLICATION_SERIALLY_WAITING_KEY, REPLICATION_SERIALLY_WAITING_DEFAULT);
}
private boolean isRangeFinished(long endBarrier, String encodedRegionName) throws IOException {
long pushedSeqId;
try {
pushedSeqId = storage.getLastSequenceId(encodedRegionName, peerId);
} catch (ReplicationException e) {
throw new IOException(
"Failed to get pushed sequence id for " + encodedRegionName + ", peer " + peerId, e);
}
// endBarrier is the open sequence number. When opening a region, the open sequence number will
// be set to the old max sequence id plus one, so here we need to minus one.
return pushedSeqId >= endBarrier - 1;
}
private boolean isParentFinished(byte[] regionName) throws IOException {
long[] barriers = ReplicationBarrierFamilyFormat.getReplicationBarriers(conn, regionName);
if (barriers.length == 0) {
return true;
}
return isRangeFinished(barriers[barriers.length - 1], RegionInfo.encodeRegionName(regionName));
}
// We may write a open region marker to WAL before we write the open sequence number to meta, so
// if a region is in OPENING state and we are in the last range, it is not safe to say we can push
// even if the previous range is finished.
private boolean isLastRangeAndOpening(ReplicationBarrierResult barrierResult, int index) {
return index == barrierResult.getBarriers().length &&
barrierResult.getState() == RegionState.State.OPENING;
}
private void recordCanPush(String encodedNameAsString, long seqId, long[] barriers, int index) {
if (barriers.length > index) {
canPushUnder.put(encodedNameAsString, barriers[index]);
}
pushed.getUnchecked(encodedNameAsString).setValue(seqId);
}
private boolean canPush(Entry entry, byte[] row) throws IOException {
String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName());
long seqId = entry.getKey().getSequenceId();
ReplicationBarrierResult barrierResult =
ReplicationBarrierFamilyFormat.getReplicationBarrierResult(conn,
entry.getKey().getTableName(), row, entry.getKey().getEncodedRegionName());
LOG.debug("Replication barrier for {}: {}", entry, barrierResult);
long[] barriers = barrierResult.getBarriers();
int index = Arrays.binarySearch(barriers, seqId);
if (index == -1) {
LOG.debug("{} is before the first barrier, pass", entry);
// This means we are in the range before the first record openSeqNum, this usually because the
// wal is written before we enable serial replication for this table, just return true since
// we can not guarantee the order.
pushed.getUnchecked(encodedNameAsString).setValue(seqId);
return true;
}
// The sequence id range is left closed and right open, so either we decrease the missed insert
// point to make the index start from 0, or increase the hit insert point to make the index
// start from 1. Here we choose the latter one.
if (index < 0) {
index = -index - 1;
} else {
index++;
}
if (index == 1) {
// we are in the first range, check whether we have parents
for (byte[] regionName : barrierResult.getParentRegionNames()) {
if (!isParentFinished(regionName)) {
LOG.debug("Parent {} has not been finished yet for entry {}, give up",
Bytes.toStringBinary(regionName), entry);
return false;
}
}
if (isLastRangeAndOpening(barrierResult, index)) {
LOG.debug("{} is in the last range and the region is opening, give up", entry);
return false;
}
LOG.debug("{} is in the first range, pass", entry);
recordCanPush(encodedNameAsString, seqId, barriers, 1);
return true;
}
// check whether the previous range is finished
if (!isRangeFinished(barriers[index - 1], encodedNameAsString)) {
LOG.debug("Previous range for {} has not been finished yet, give up", entry);
return false;
}
if (isLastRangeAndOpening(barrierResult, index)) {
LOG.debug("{} is in the last range and the region is opening, give up", entry);
return false;
}
LOG.debug("The previous range for {} has been finished, pass", entry);
recordCanPush(encodedNameAsString, seqId, barriers, index);
return true;
}
public boolean canPush(Entry entry, Cell firstCellInEdit) throws IOException {
String encodedNameAsString = Bytes.toString(entry.getKey().getEncodedRegionName());
long seqId = entry.getKey().getSequenceId();
Long canReplicateUnderSeqId = canPushUnder.getIfPresent(encodedNameAsString);
if (canReplicateUnderSeqId != null) {
if (seqId < canReplicateUnderSeqId.longValue()) {
LOG.trace("{} is before the end barrier {}, pass", entry, canReplicateUnderSeqId);
return true;
}
LOG.debug("{} is beyond the previous end barrier {}, remove from cache", entry,
canReplicateUnderSeqId);
// we are already beyond the last safe point, remove
canPushUnder.invalidate(encodedNameAsString);
}
// This is for the case where the region is currently opened on us, if the sequence id is
// continuous then we are safe to replicate. If there is a breakpoint, then maybe the region
// has been moved to another RS and then back, so we need to check the barrier.
MutableLong previousPushedSeqId = pushed.getUnchecked(encodedNameAsString);
if (seqId == previousPushedSeqId.longValue() + 1) {
LOG.trace("The sequence id for {} is continuous, pass", entry);
previousPushedSeqId.increment();
return true;
}
return canPush(entry, CellUtil.cloneRow(firstCellInEdit));
}
public void waitUntilCanPush(Entry entry, Cell firstCellInEdit)
throws IOException, InterruptedException {
byte[] row = CellUtil.cloneRow(firstCellInEdit);
while (!canPush(entry, row)) {
LOG.debug("Can not push {}, wait", entry);
Thread.sleep(waitTimeMs);
}
}
}