blob: 336f9dc04f8eb631e326ff08c62c5f08038f3165 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerMetricsBuilder;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
/**
* Tracks the online region servers via ZK.
* <p/>
* Handling of new RSs checking in is done via RPC. This class is only responsible for watching for
* expired nodes. It handles listening for changes in the RS node list. The only exception is when
* master restart, we will use the list fetched from zk to construct the initial set of live region
* servers.
* <p/>
* If an RS node gets deleted, this automatically handles calling of
* {@link ServerManager#expireServer(ServerName)}
*/
@InterfaceAudience.Private
public class RegionServerTracker extends ZKListener {
private static final Logger LOG = LoggerFactory.getLogger(RegionServerTracker.class);
private final Set<ServerName> regionServers = new HashSet<>();
private final ServerManager serverManager;
private final MasterServices server;
// As we need to send request to zk when processing the nodeChildrenChanged event, we'd better
// move the operation to a single threaded thread pool in order to not block the zk event
// processing since all the zk listener across HMaster will be called in one thread sequentially.
private final ExecutorService executor;
public RegionServerTracker(ZKWatcher watcher, MasterServices server,
ServerManager serverManager) {
super(watcher);
this.server = server;
this.serverManager = serverManager;
this.executor = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build());
}
private Pair<ServerName, RegionServerInfo> getServerInfo(String name)
throws KeeperException, IOException {
ServerName serverName = ServerName.parseServerName(name);
String nodePath = ZNodePaths.joinZNode(watcher.getZNodePaths().rsZNode, name);
byte[] data;
try {
data = ZKUtil.getData(watcher, nodePath);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
}
if (data == null) {
// we should receive a children changed event later and then we will expire it, so we still
// need to add it to the region server set.
LOG.warn("Server node {} does not exist, already dead?", name);
return Pair.newPair(serverName, null);
}
if (data.length == 0 || !ProtobufUtil.isPBMagicPrefix(data)) {
// this should not happen actually, unless we have bugs or someone has messed zk up.
LOG.warn("Invalid data for region server node {} on zookeeper, data length = {}", name,
data.length);
return Pair.newPair(serverName, null);
}
RegionServerInfo.Builder builder = RegionServerInfo.newBuilder();
int magicLen = ProtobufUtil.lengthOfPBMagic();
ProtobufUtil.mergeFrom(builder, data, magicLen, data.length - magicLen);
return Pair.newPair(serverName, builder.build());
}
/**
* Starts the tracking of online RegionServers. All RSes will be tracked after this method is
* called.
* <p/>
* In this method, we will also construct the region server sets in {@link ServerManager}. If a
* region server is dead between the crash of the previous master instance and the start of the
* current master instance, we will schedule a SCP for it. This is done in
* {@link ServerManager#findDeadServersAndProcess(Set, Set)}, we call it here under the lock
* protection to prevent concurrency issues with server expiration operation.
* @param deadServersFromPE the region servers which already have SCP associated.
* @param liveServersFromWALDir the live region servers from wal directory.
* @param splittingServersFromWALDir Servers whose WALs are being actively 'split'.
*/
public void start(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersFromWALDir,
Set<ServerName> splittingServersFromWALDir)
throws KeeperException, IOException {
LOG.info("Starting RegionServerTracker; {} have existing ServerCrashProcedures, {} " +
"possibly 'live' servers, and {} 'splitting'.", deadServersFromPE.size(),
liveServersFromWALDir.size(), splittingServersFromWALDir.size());
// deadServersFromPE is made from a list of outstanding ServerCrashProcedures.
// splittingServersFromWALDir are being actively split -- the directory in the FS ends in
// '-SPLITTING'. Each splitting server should have a corresponding SCP. Log if not.
splittingServersFromWALDir.stream().filter(s -> !deadServersFromPE.contains(s)).
forEach(s -> LOG.error("{} has no matching ServerCrashProcedure", s));
//create ServerNode for all possible live servers from wal directory
liveServersFromWALDir
.forEach(sn -> server.getAssignmentManager().getRegionStates().getOrCreateServer(sn));
watcher.registerListener(this);
synchronized (this) {
List<String> servers =
ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
if (null != servers) {
for (String n : servers) {
Pair<ServerName, RegionServerInfo> pair = getServerInfo(n);
ServerName serverName = pair.getFirst();
RegionServerInfo info = pair.getSecond();
regionServers.add(serverName);
ServerMetrics serverMetrics = info != null ?
ServerMetricsBuilder.of(serverName, VersionInfoUtil.getVersionNumber(info.getVersionInfo()),
info.getVersionInfo().getVersion()) :
ServerMetricsBuilder.of(serverName);
serverManager.checkAndRecordNewServer(serverName, serverMetrics);
}
}
serverManager.findDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir);
}
}
public void stop() {
executor.shutdownNow();
}
private synchronized void refresh() {
List<String> names;
try {
names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode);
} catch (KeeperException e) {
// here we need to abort as we failed to set watcher on the rs node which means that we can
// not track the node deleted evetnt any more.
server.abort("Unexpected zk exception getting RS nodes", e);
return;
}
Set<ServerName> servers = CollectionUtils.isEmpty(names) ? Collections.emptySet() :
names.stream().map(ServerName::parseServerName).collect(Collectors.toSet());
for (Iterator<ServerName> iter = regionServers.iterator(); iter.hasNext();) {
ServerName sn = iter.next();
if (!servers.contains(sn)) {
LOG.info("RegionServer ephemeral node deleted, processing expiration [{}]", sn);
serverManager.expireServer(sn);
iter.remove();
}
}
// here we do not need to parse the region server info as it is useless now, we only need the
// server name.
boolean newServerAdded = false;
for (ServerName sn : servers) {
if (regionServers.add(sn)) {
newServerAdded = true;
LOG.info("RegionServer ephemeral node created, adding [" + sn + "]");
}
}
if (newServerAdded && server.isInitialized()) {
// Only call the check to move servers if a RegionServer was added to the cluster; in this
// case it could be a server with a new version so it makes sense to run the check.
server.checkIfShouldMoveSystemRegionAsync();
}
}
@Override
public void nodeChildrenChanged(String path) {
if (path.equals(watcher.getZNodePaths().rsZNode) && !server.isAborted() &&
!server.isStopped()) {
executor.execute(this::refresh);
}
}
}