blob: 5c480bacdd822a702781fc531131efd82045a515 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static java.util.stream.Collectors.toList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.BadVersionException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.KeeperException.NotEmptyException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
/**
* ZK based replication queue storage.
* <p>
* The base znode for each regionserver is the regionserver name. For example:
*
* <pre>
* /hbase/replication/rs/hostname.example.org,6020,1234
* </pre>
*
* Within this znode, the region server maintains a set of WAL replication queues. These queues are
* represented by child znodes named using there give queue id. For example:
*
* <pre>
* /hbase/replication/rs/hostname.example.org,6020,1234/1
* /hbase/replication/rs/hostname.example.org,6020,1234/2
* </pre>
*
* Each queue has one child znode for every WAL that still needs to be replicated. The value of
* these WAL child znodes is the latest position that has been replicated. This position is updated
* every time a WAL entry is replicated. For example:
*
* <pre>
* /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
* </pre>
*/
@InterfaceAudience.Private
class ZKReplicationQueueStorage extends ZKReplicationStorageBase
implements ReplicationQueueStorage {
private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
"zookeeper.znode.replication.hfile.refs";
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY =
"zookeeper.znode.replication.regions";
public static final String ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT = "regions";
/**
* The name of the znode that contains all replication queues
*/
private final String queuesZNode;
/**
* The name of the znode that contains queues of hfile references to be replicated
*/
private final String hfileRefsZNode;
final String regionsZNode;
public ZKReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) {
super(zookeeper, conf);
String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
this.regionsZNode = ZNodePaths.joinZNode(replicationZNode, conf
.get(ZOOKEEPER_ZNODE_REPLICATION_REGIONS_KEY, ZOOKEEPER_ZNODE_REPLICATION_REGIONS_DEFAULT));
}
@Override
public String getRsNode(ServerName serverName) {
return ZNodePaths.joinZNode(queuesZNode, serverName.getServerName());
}
private String getQueueNode(ServerName serverName, String queueId) {
return ZNodePaths.joinZNode(getRsNode(serverName), queueId);
}
private String getFileNode(String queueNode, String fileName) {
return ZNodePaths.joinZNode(queueNode, fileName);
}
private String getFileNode(ServerName serverName, String queueId, String fileName) {
return getFileNode(getQueueNode(serverName, queueId), fileName);
}
/**
* <p>
* Put all regions under /hbase/replication/regions znode will lead to too many children because
* of the huge number of regions in real production environment. So here we will distribute the
* znodes to multiple directories.
* </p>
* <p>
* So the final znode path will be format like this:
*
* <pre>
* /hbase/replication/regions/dd/04/e76a6966d4ffa908ed0586764767-100
* </pre>
*
* Here the full encoded region name is dd04e76a6966d4ffa908ed0586764767, and we use the first two
* characters 'dd' as the first level directory name, and use the next two characters '04' as the
* second level directory name, and the rest part as the prefix of the znode, and the suffix '100'
* is the peer id.
* </p>
* @param encodedRegionName the encoded region name.
* @param peerId peer id for replication.
* @return ZNode path to persist the max sequence id that we've pushed for the given region and
* peer.
*/
String getSerialReplicationRegionPeerNode(String encodedRegionName, String peerId) {
if (encodedRegionName == null || encodedRegionName.length() != RegionInfo.MD5_HEX_LENGTH) {
throw new IllegalArgumentException(
"Invalid encoded region name: " + encodedRegionName + ", length should be 32.");
}
return new StringBuilder(regionsZNode).append(ZNodePaths.ZNODE_PATH_SEPARATOR)
.append(encodedRegionName, 0, 2).append(ZNodePaths.ZNODE_PATH_SEPARATOR)
.append(encodedRegionName, 2, 4).append(ZNodePaths.ZNODE_PATH_SEPARATOR)
.append(encodedRegionName, 4, encodedRegionName.length()).append("-").append(peerId)
.toString();
}
@Override
public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {
try {
ZKUtil.deleteNodeRecursively(zookeeper, getQueueNode(serverName, queueId));
} catch (KeeperException e) {
throw new ReplicationException(
"Failed to delete queue (serverName=" + serverName + ", queueId=" + queueId + ")", e);
}
}
@Override
public void addWAL(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
try {
ZKUtil.createWithParents(zookeeper, getFileNode(serverName, queueId, fileName));
} catch (KeeperException e) {
throw new ReplicationException("Failed to add wal to queue (serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName + ")", e);
}
}
@Override
public void removeWAL(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
String fileNode = getFileNode(serverName, queueId, fileName);
try {
ZKUtil.deleteNode(zookeeper, fileNode);
} catch (NoNodeException e) {
LOG.warn("{} already deleted when removing log", fileNode);
} catch (KeeperException e) {
throw new ReplicationException("Failed to remove wal from queue (serverName=" + serverName +
", queueId=" + queueId + ", fileName=" + fileName + ")", e);
}
}
private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException {
String peerId = new ReplicationQueueInfo(queueId).getPeerId();
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
Pair<Long, Integer> p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId);
byte[] data = ZKUtil.positionToByteArray(lastSeqEntry.getValue());
if (p.getSecond() < 0) { // ZNode does not exist.
ZKUtil.createWithParents(zookeeper,
path.substring(0, path.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR)));
listOfOps.add(ZKUtilOp.createAndFailSilent(path, data));
continue;
}
// Perform CAS in a specific version v0 (HBASE-20138)
int v0 = p.getSecond();
long lastPushedSeqId = p.getFirst();
if (lastSeqEntry.getValue() <= lastPushedSeqId) {
continue;
}
listOfOps.add(ZKUtilOp.setData(path, data, v0));
}
}
@Override
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
Map<String, Long> lastSeqIds) throws ReplicationException {
try {
for (int retry = 0;; retry++) {
List<ZKUtilOp> listOfOps = new ArrayList<>();
if (position > 0) {
listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
ZKUtil.positionToByteArray(position)));
}
// Persist the max sequence id(s) of regions for serial replication atomically.
addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps);
if (listOfOps.isEmpty()) {
return;
}
try {
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
return;
} catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) {
LOG.warn(
"Bad version(or node exist) when persist the last pushed sequence id to zookeeper "
+ "storage, Retry = " + retry + ", serverName=" + serverName + ", queueId="
+ queueId + ", fileName=" + fileName);
}
}
} catch (KeeperException e) {
throw new ReplicationException("Failed to set log position (serverName=" + serverName
+ ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
}
}
/**
* Return the {lastPushedSequenceId, ZNodeDataVersion} pair. if ZNodeDataVersion is -1, it means
* that the ZNode does not exist.
*/
protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
String peerId) throws KeeperException {
Stat stat = new Stat();
String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
byte[] data = ZKUtil.getDataNoWatch(zookeeper, path, stat);
if (data == null) {
// ZNode does not exist, so just return version -1 to indicate that no node exist.
return Pair.newPair(HConstants.NO_SEQNUM, -1);
}
try {
return Pair.newPair(ZKUtil.parseWALPositionFrom(data), stat.getVersion());
} catch (DeserializationException de) {
LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId
+ "), data=" + Bytes.toStringBinary(data));
}
return Pair.newPair(HConstants.NO_SEQNUM, stat.getVersion());
}
@Override
public long getLastSequenceId(String encodedRegionName, String peerId)
throws ReplicationException {
try {
return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst();
} catch (KeeperException e) {
throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName="
+ encodedRegionName + ", peerId=" + peerId + ")", e);
}
}
@Override
public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
throws ReplicationException {
try {
// No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers
// only, so no conflict happen.
List<ZKUtilOp> listOfOps = new ArrayList<>();
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
ZKUtil.createWithParents(zookeeper, path);
listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
}
if (!listOfOps.isEmpty()) {
ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
}
} catch (KeeperException e) {
throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId
+ ", size of lastSeqIds=" + lastSeqIds.size(), e);
}
}
@Override
public void removeLastSequenceIds(String peerId) throws ReplicationException {
String suffix = "-" + peerId;
try {
StringBuilder sb = new StringBuilder(regionsZNode);
int regionsZNodeLength = regionsZNode.length();
int levelOneLength = regionsZNodeLength + 3;
int levelTwoLength = levelOneLength + 3;
List<String> levelOneDirs = ZKUtil.listChildrenNoWatch(zookeeper, regionsZNode);
// it is possible that levelOneDirs is null if we haven't write any last pushed sequence ids
// yet, so we need an extra check here.
if (CollectionUtils.isEmpty(levelOneDirs)) {
return;
}
for (String levelOne : levelOneDirs) {
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelOne);
for (String levelTwo : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(levelTwo);
for (String znode : ZKUtil.listChildrenNoWatch(zookeeper, sb.toString())) {
if (znode.endsWith(suffix)) {
sb.append(ZNodePaths.ZNODE_PATH_SEPARATOR).append(znode);
ZKUtil.deleteNode(zookeeper, sb.toString());
sb.setLength(levelTwoLength);
}
}
sb.setLength(levelOneLength);
}
sb.setLength(regionsZNodeLength);
}
} catch (KeeperException e) {
throw new ReplicationException("Failed to remove all last sequence ids, peerId=" + peerId, e);
}
}
@Override
public void removeLastSequenceIds(String peerId, List<String> encodedRegionNames)
throws ReplicationException {
try {
List<ZKUtilOp> listOfOps =
encodedRegionNames.stream().map(n -> getSerialReplicationRegionPeerNode(n, peerId))
.map(ZKUtilOp::deleteNodeFailSilent).collect(Collectors.toList());
ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
} catch (KeeperException e) {
throw new ReplicationException("Failed to remove last sequence ids, peerId=" + peerId +
", encodedRegionNames.size=" + encodedRegionNames.size(), e);
}
}
@Override
public long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException {
byte[] bytes;
try {
bytes = ZKUtil.getData(zookeeper, getFileNode(serverName, queueId, fileName));
} catch (KeeperException | InterruptedException e) {
throw new ReplicationException("Failed to get log position (serverName=" + serverName +
", queueId=" + queueId + ", fileName=" + fileName + ")", e);
}
try {
return ZKUtil.parseWALPositionFrom(bytes);
} catch (DeserializationException de) {
LOG.warn("Failed parse log position (serverName={}, queueId={}, fileName={})",
serverName, queueId, fileName);
}
// if we can not parse the position, start at the beginning of the wal file again
return 0;
}
@Override
public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
ServerName destServerName) throws ReplicationException {
LOG.info("Atomically moving {}/{}'s WALs to {}", sourceServerName, queueId, destServerName);
try {
ZKUtil.createWithParents(zookeeper, getRsNode(destServerName));
} catch (KeeperException e) {
throw new ReplicationException(
"Claim queue queueId=" + queueId + " from " + sourceServerName + " to " + destServerName +
" failed when creating the node for " + destServerName,
e);
}
String newQueueId = queueId + "-" + sourceServerName;
try {
String oldQueueNode = getQueueNode(sourceServerName, queueId);
List<String> wals = ZKUtil.listChildrenNoWatch(zookeeper, oldQueueNode);
if (CollectionUtils.isEmpty(wals)) {
ZKUtil.deleteNodeFailSilent(zookeeper, oldQueueNode);
LOG.info("Removed empty {}/{}", sourceServerName, queueId);
return new Pair<>(newQueueId, Collections.emptySortedSet());
}
String newQueueNode = getQueueNode(destServerName, newQueueId);
List<ZKUtilOp> listOfOps = new ArrayList<>();
SortedSet<String> logQueue = new TreeSet<>();
// create the new cluster znode
listOfOps.add(ZKUtilOp.createAndFailSilent(newQueueNode, HConstants.EMPTY_BYTE_ARRAY));
// get the offset of the logs and set it to new znodes
for (String wal : wals) {
String oldWalNode = getFileNode(oldQueueNode, wal);
byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalNode);
LOG.debug("Creating {} with data {}", wal, Bytes.toStringBinary(logOffset));
String newWalNode = getFileNode(newQueueNode, wal);
listOfOps.add(ZKUtilOp.createAndFailSilent(newWalNode, logOffset));
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalNode));
logQueue.add(wal);
}
// add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode));
LOG.trace("The multi list size is {}", listOfOps.size());
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
LOG.info("Atomically moved {}/{}'s WALs to {}", sourceServerName, queueId, destServerName);
return new Pair<>(newQueueId, logQueue);
} catch (NoNodeException | NodeExistsException | NotEmptyException | BadVersionException e) {
// Multi call failed; it looks like some other regionserver took away the logs.
// These exceptions mean that zk tells us the request can not be execute. So return an empty
// queue to tell the upper layer that claim nothing. For other types of exception should be
// thrown out to notify the upper layer.
LOG.info("Claim queue queueId={} from {} to {} failed with {}, someone else took the log?",
queueId,sourceServerName, destServerName, e.toString());
return new Pair<>(newQueueId, Collections.emptySortedSet());
} catch (KeeperException | InterruptedException e) {
throw new ReplicationException("Claim queue queueId=" + queueId + " from " +
sourceServerName + " to " + destServerName + " failed", e);
}
}
@Override
public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException {
try {
ZKUtil.deleteNodeFailSilent(zookeeper, getRsNode(serverName));
} catch (NotEmptyException e) {
// keep silence to avoid logging too much.
} catch (KeeperException e) {
throw new ReplicationException("Failed to remove replicator for " + serverName, e);
}
}
private List<ServerName> getListOfReplicators0() throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, queuesZNode);
if (children == null) {
children = Collections.emptyList();
}
return children.stream().map(ServerName::parseServerName).collect(toList());
}
@Override
public List<ServerName> getListOfReplicators() throws ReplicationException {
try {
return getListOfReplicators0();
} catch (KeeperException e) {
throw new ReplicationException("Failed to get list of replicators", e);
}
}
private List<String> getWALsInQueue0(ServerName serverName, String queueId)
throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName,
queueId));
return children != null ? children : Collections.emptyList();
}
@Override
public List<String> getWALsInQueue(ServerName serverName, String queueId)
throws ReplicationException {
try {
return getWALsInQueue0(serverName, queueId);
} catch (KeeperException e) {
throw new ReplicationException(
"Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")",
e);
}
}
private List<String> getAllQueues0(ServerName serverName) throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName));
return children != null ? children : Collections.emptyList();
}
@Override
public List<String> getAllQueues(ServerName serverName) throws ReplicationException {
try {
return getAllQueues0(serverName);
} catch (KeeperException e) {
throw new ReplicationException("Failed to get all queues (serverName=" + serverName + ")", e);
}
}
// will be overridden in UTs
protected int getQueuesZNodeCversion() throws KeeperException {
Stat stat = new Stat();
ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
return stat.getCversion();
}
@Override
public Set<String> getAllWALs() throws ReplicationException {
try {
for (int retry = 0;; retry++) {
int v0 = getQueuesZNodeCversion();
List<ServerName> rss = getListOfReplicators0();
if (rss.isEmpty()) {
LOG.debug("Didn't find a RegionServer that replicates, won't prevent deletions.");
return Collections.emptySet();
}
Set<String> wals = new HashSet<>();
for (ServerName rs : rss) {
for (String queueId : getAllQueues0(rs)) {
wals.addAll(getWALsInQueue0(rs, queueId));
}
}
int v1 = getQueuesZNodeCversion();
if (v0 == v1) {
return wals;
}
LOG.info("Replication queue node cversion changed from %d to %d, retry = %d",
v0, v1, retry);
}
} catch (KeeperException e) {
throw new ReplicationException("Failed to get all wals", e);
}
}
private String getHFileRefsPeerNode(String peerId) {
return ZNodePaths.joinZNode(hfileRefsZNode, peerId);
}
private String getHFileNode(String peerNode, String fileName) {
return ZNodePaths.joinZNode(peerNode, fileName);
}
@Override
public void addPeerToHFileRefs(String peerId) throws ReplicationException {
String peerNode = getHFileRefsPeerNode(peerId);
try {
if (ZKUtil.checkExists(zookeeper, peerNode) == -1) {
LOG.info("Adding peer {} to hfile reference queue.", peerId);
ZKUtil.createWithParents(zookeeper, peerNode);
}
} catch (KeeperException e) {
throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
e);
}
}
@Override
public void removePeerFromHFileRefs(String peerId) throws ReplicationException {
String peerNode = getHFileRefsPeerNode(peerId);
try {
if (ZKUtil.checkExists(zookeeper, peerNode) == -1) {
LOG.debug("Peer {} not found in hfile reference queue.", peerNode);
} else {
LOG.info("Removing peer {} from hfile reference queue.", peerNode);
ZKUtil.deleteNodeRecursively(zookeeper, peerNode);
}
} catch (KeeperException e) {
throw new ReplicationException(
"Failed to remove peer " + peerId + " from hfile reference queue.", e);
}
}
@Override
public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerNode = getHFileRefsPeerNode(peerId);
LOG.debug("Adding hfile references {} in queue {}", pairs, peerNode);
List<ZKUtilOp> listOfOps = pairs.stream().map(p -> p.getSecond().getName())
.map(n -> getHFileNode(peerNode, n))
.map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList());
LOG.debug("The multi list size for adding hfile references in zk for node {} is {}",
peerNode, listOfOps.size());
try {
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
} catch (KeeperException e) {
throw new ReplicationException("Failed to add hfile reference to peer " + peerId, e);
}
}
@Override
public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {
String peerNode = getHFileRefsPeerNode(peerId);
LOG.debug("Removing hfile references {} from queue {}", files, peerNode);
List<ZKUtilOp> listOfOps = files.stream().map(n -> getHFileNode(peerNode, n))
.map(ZKUtilOp::deleteNodeFailSilent).collect(toList());
LOG.debug("The multi list size for removing hfile references in zk for node {} is {}",
peerNode, listOfOps.size());
try {
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
} catch (KeeperException e) {
throw new ReplicationException("Failed to remove hfile reference from peer " + peerId, e);
}
}
private List<String> getAllPeersFromHFileRefsQueue0() throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode);
return children != null ? children : Collections.emptyList();
}
@Override
public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
try {
return getAllPeersFromHFileRefsQueue0();
} catch (KeeperException e) {
throw new ReplicationException("Failed to get list of all peers in hfile references node.",
e);
}
}
private List<String> getReplicableHFiles0(String peerId) throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(this.zookeeper,
getHFileRefsPeerNode(peerId));
return children != null ? children : Collections.emptyList();
}
@Override
public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
try {
return getReplicableHFiles0(peerId);
} catch (KeeperException e) {
throw new ReplicationException("Failed to get list of hfile references for peer " + peerId,
e);
}
}
// will be overridden in UTs
protected int getHFileRefsZNodeCversion() throws ReplicationException {
Stat stat = new Stat();
try {
ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat);
} catch (KeeperException e) {
throw new ReplicationException("Failed to get stat of replication hfile references node.", e);
}
return stat.getCversion();
}
@Override
public Set<String> getAllHFileRefs() throws ReplicationException {
try {
for (int retry = 0;; retry++) {
int v0 = getHFileRefsZNodeCversion();
List<String> peers = getAllPeersFromHFileRefsQueue();
if (peers.isEmpty()) {
LOG.debug("Didn't find any peers with hfile references, won't prevent deletions.");
return Collections.emptySet();
}
Set<String> hfileRefs = new HashSet<>();
for (String peer : peers) {
hfileRefs.addAll(getReplicableHFiles0(peer));
}
int v1 = getHFileRefsZNodeCversion();
if (v0 == v1) {
return hfileRefs;
}
LOG.debug("Replication hfile references node cversion changed from %d to %d, retry = %d",
v0, v1, retry);
}
} catch (KeeperException e) {
throw new ReplicationException("Failed to get all hfile refs", e);
}
}
}