| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.master; |
| import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; |
| import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; |
| import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER; |
| import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER; |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.stream.Collectors; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; |
| import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; |
| import org.apache.hadoop.hbase.procedure2.Procedure; |
| import org.apache.hadoop.hbase.procedure2.ProcedureEvent; |
| import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; |
| import org.apache.hadoop.hbase.util.CommonFSUtils; |
| import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; |
| import org.apache.hadoop.hbase.wal.WALSplitUtil; |
| import org.apache.hbase.thirdparty.com.google.common.collect.Lists; |
| import org.apache.yetus.audience.InterfaceAudience; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each |
| * {@link SplitWALProcedure}. |
| * Total number of workers is (number of online servers) * (HBASE_SPLIT_WAL_MAX_SPLITTER). |
| * Helps assign and release workers for split tasks. |
| * Provide helper method to delete split WAL file and directory. |
| * |
| * The user can get the SplitWALProcedures via splitWALs(crashedServer, splitMeta) |
| * can get the files that need to split via getWALsToSplit(crashedServer, splitMeta) |
| * can delete the splitting WAL and directory via deleteSplitWAL(wal) |
| * and deleteSplitWAL(crashedServer) |
| * can check if splitting WALs of a crashed server is success via isSplitWALFinished(walPath) |
| * can acquire and release a worker for splitting WAL via acquireSplitWALWorker(procedure) |
| * and releaseSplitWALWorker(worker, scheduler) |
| * |
| * This class is to replace the zk-based WAL splitting related code, {@link MasterWalManager}, |
| * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and |
| * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed |
| * after we switch to procedure-based WAL splitting. |
| * @see SplitLogManager for the original distributed split WAL manager. |
| */ |
| @InterfaceAudience.Private |
| public class SplitWALManager { |
| private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class); |
| |
| private final MasterServices master; |
| private final SplitWorkerAssigner splitWorkerAssigner; |
| private final Path rootDir; |
| private final FileSystem fs; |
| private final Configuration conf; |
| private final Path walArchiveDir; |
| |
| public SplitWALManager(MasterServices master) throws IOException { |
| this.master = master; |
| this.conf = master.getConfiguration(); |
| this.splitWorkerAssigner = new SplitWorkerAssigner(this.master, |
| conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); |
| this.rootDir = master.getMasterFileSystem().getWALRootDir(); |
| // TODO: This should be the WAL FS, not the Master FS? |
| this.fs = master.getMasterFileSystem().getFileSystem(); |
| this.walArchiveDir = new Path(this.rootDir, HConstants.HREGION_OLDLOGDIR_NAME); |
| } |
| |
| public List<Procedure> splitWALs(ServerName crashedServer, boolean splitMeta) |
| throws IOException { |
| try { |
| // 1. list all splitting files |
| List<FileStatus> splittingFiles = getWALsToSplit(crashedServer, splitMeta); |
| // 2. create corresponding procedures |
| return createSplitWALProcedures(splittingFiles, crashedServer); |
| } catch (IOException e) { |
| LOG.error("Failed to create procedures for splitting WALs of {}", crashedServer, e); |
| throw e; |
| } |
| } |
| |
| public List<FileStatus> getWALsToSplit(ServerName serverName, boolean splitMeta) |
| throws IOException { |
| List<Path> logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName)); |
| FileStatus[] fileStatuses = |
| SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER); |
| LOG.info("{} WAL count={}, meta={}", serverName, fileStatuses.length, splitMeta); |
| return Lists.newArrayList(fileStatuses); |
| } |
| |
| private Path getWALSplitDir(ServerName serverName) { |
| Path logDir = |
| new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); |
| return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); |
| } |
| |
| /** |
| * Archive processed WAL |
| */ |
| public void archive(String wal) throws IOException { |
| WALSplitUtil.moveWAL(this.fs, new Path(wal), this.walArchiveDir); |
| } |
| |
| public void deleteWALDir(ServerName serverName) throws IOException { |
| Path splitDir = getWALSplitDir(serverName); |
| try { |
| if (!fs.delete(splitDir, false)) { |
| LOG.warn("Failed delete {}, contains {}", splitDir, fs.listFiles(splitDir, true)); |
| } |
| } catch (PathIsNotEmptyDirectoryException e) { |
| FileStatus [] files = CommonFSUtils.listStatus(fs, splitDir); |
| LOG.warn("PathIsNotEmptyDirectoryException {}", |
| Arrays.stream(files).map(f -> f.getPath()).collect(Collectors.toList())); |
| throw e; |
| } |
| } |
| |
| public boolean isSplitWALFinished(String walPath) throws IOException { |
| return !fs.exists(new Path(rootDir, walPath)); |
| } |
| |
| List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs, |
| ServerName crashedServer) { |
| return splittingWALs.stream() |
| .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer)) |
| .collect(Collectors.toList()); |
| } |
| |
| /** |
| * Acquire a split WAL worker |
| * @param procedure split WAL task |
| * @return an available region server which could execute this task |
| * @throws ProcedureSuspendedException if there is no available worker, |
| * it will throw this exception to WAIT the procedure. |
| */ |
| public ServerName acquireSplitWALWorker(Procedure<?> procedure) |
| throws ProcedureSuspendedException { |
| Optional<ServerName> worker = splitWorkerAssigner.acquire(); |
| if (worker.isPresent()) { |
| LOG.debug("Acquired split WAL worker={}", worker.get()); |
| return worker.get(); |
| } |
| splitWorkerAssigner.suspend(procedure); |
| throw new ProcedureSuspendedException(); |
| } |
| |
| /** |
| * After the worker finished the split WAL task, it will release the worker, and wake up all the |
| * suspend procedures in the ProcedureEvent |
| * @param worker worker which is about to release |
| * @param scheduler scheduler which is to wake up the procedure event |
| */ |
| public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) { |
| LOG.debug("Release split WAL worker={}", worker); |
| splitWorkerAssigner.release(worker); |
| splitWorkerAssigner.wake(scheduler); |
| } |
| |
| /** |
| * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL |
| * tasks running on the region server side, they will not be count by the new splitWorkerAssigner. |
| * Thus we should add the workers of running tasks to the assigner when we load the procedures |
| * from MasterProcWALs. |
| * @param worker region server which is executing a split WAL task |
| */ |
| public void addUsedSplitWALWorker(ServerName worker){ |
| splitWorkerAssigner.addUsedWorker(worker); |
| } |
| |
| /** |
| * help assign and release a worker for each WAL splitting task |
| * For each worker, concurrent running splitting task should be no more than maxSplitTasks |
| * If a task failed to acquire a worker, it will suspend and wait for workers available |
| * |
| */ |
| private static final class SplitWorkerAssigner implements ServerListener { |
| private int maxSplitTasks; |
| private final ProcedureEvent<?> event; |
| private Map<ServerName, Integer> currentWorkers = new HashMap<>(); |
| private MasterServices master; |
| |
| public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) { |
| this.maxSplitTasks = maxSplitTasks; |
| this.master = master; |
| this.event = new ProcedureEvent<>("split-WAL-worker-assigning"); |
| // ServerManager might be null in a test context where we are mocking; allow for this |
| ServerManager sm = this.master.getServerManager(); |
| if (sm != null) { |
| sm.registerListener(this); |
| } |
| } |
| |
| public synchronized Optional<ServerName> acquire() { |
| List<ServerName> serverList = master.getServerManager().getOnlineServersList(); |
| Collections.shuffle(serverList); |
| Optional<ServerName> worker = serverList.stream().filter( |
| serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0) |
| .findAny(); |
| if (worker.isPresent()) { |
| currentWorkers.compute(worker.get(), (serverName, |
| availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); |
| } |
| return worker; |
| } |
| |
| public synchronized void release(ServerName serverName) { |
| currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); |
| } |
| |
| public void suspend(Procedure<?> proc) { |
| event.suspend(); |
| event.suspendIfNotReady(proc); |
| } |
| |
| public void wake(MasterProcedureScheduler scheduler) { |
| if (!event.isReady()) { |
| event.wake(scheduler); |
| } |
| } |
| |
| @Override |
| public void serverAdded(ServerName worker) { |
| this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); |
| } |
| |
| public synchronized void addUsedWorker(ServerName worker) { |
| // load used worker when master restart |
| currentWorkers.compute(worker, (serverName, |
| availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); |
| } |
| } |
| } |