blob: 301077906770876b06c1665723747ed8ce509ea6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.snapshot;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MetricsMaster;
import org.apache.hadoop.hbase.master.SnapshotSentinel;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.procedure.Procedure;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclCleaner;
import org.apache.hadoop.hbase.security.access.SnapshotScannerHDFSAclHelper;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.SnapshotCreationException;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotDoesNotExistException;
import org.apache.hadoop.hbase.snapshot.SnapshotExistsException;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
import org.apache.hadoop.hbase.snapshot.SnapshotReferenceUtil;
import org.apache.hadoop.hbase.snapshot.TablePartiallyOpenException;
import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.TableDescriptorChecker;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription.Type;
/**
* This class manages the procedure of taking and restoring snapshots. There is only one
* SnapshotManager for the master.
* <p>
* The class provides methods for monitoring in-progress snapshot actions.
* <p>
* Note: Currently there can only be one snapshot being taken at a time over the cluster. This is a
* simplification in the current implementation.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
@InterfaceStability.Unstable
public class SnapshotManager extends MasterProcedureManager implements Stoppable {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotManager.class);
/** By default, check to see if the snapshot is complete every WAKE MILLIS (ms) */
private static final int SNAPSHOT_WAKE_MILLIS_DEFAULT = 500;
/**
* Wait time before removing a finished sentinel from the in-progress map
*
* NOTE: This is used as a safety auto cleanup.
* The snapshot and restore handlers map entries are removed when a user asks if a snapshot or
* restore is completed. This operation is part of the HBaseAdmin snapshot/restore API flow.
* In case something fails on the client side and the snapshot/restore state is not reclaimed
* after a default timeout, the entry is removed from the in-progress map.
* At this point, if the user asks for the snapshot/restore status, the result will be
* snapshot done if exists or failed if it doesn't exists.
*/
public static final String HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS =
"hbase.snapshot.sentinels.cleanup.timeoutMillis";
public static final long SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT = 60 * 1000L;
/** Enable or disable snapshot support */
public static final String HBASE_SNAPSHOT_ENABLED = "hbase.snapshot.enabled";
/**
* Conf key for # of ms elapsed between checks for snapshot errors while waiting for
* completion.
*/
private static final String SNAPSHOT_WAKE_MILLIS_KEY = "hbase.snapshot.master.wakeMillis";
/** Name of the operation to use in the controller */
public static final String ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION = "online-snapshot";
/** Conf key for # of threads used by the SnapshotManager thread pool */
public static final String SNAPSHOT_POOL_THREADS_KEY = "hbase.snapshot.master.threads";
/** number of current operations running on the master */
public static final int SNAPSHOT_POOL_THREADS_DEFAULT = 1;
private boolean stopped;
private MasterServices master; // Needed by TableEventHandlers
private ProcedureCoordinator coordinator;
// Is snapshot feature enabled?
private boolean isSnapshotSupported = false;
// Snapshot handlers map, with table name as key.
// The map is always accessed and modified under the object lock using synchronized.
// snapshotTable() will insert an Handler in the table.
// isSnapshotDone() will remove the handler requested if the operation is finished.
private final Map<TableName, SnapshotSentinel> snapshotHandlers = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduleThreadPool =
Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("SnapshotHandlerChoreCleaner").setDaemon(true).build());
private ScheduledFuture<?> snapshotHandlerChoreCleanerTask;
// Restore map, with table name as key, procedure ID as value.
// The map is always accessed and modified under the object lock using synchronized.
// restoreSnapshot()/cloneSnapshot() will insert a procedure ID in the map.
//
// TODO: just as the Apache HBase 1.x implementation, this map would not survive master
// restart/failover. This is just a stopgap implementation until implementation of taking
// snapshot using Procedure-V2.
private Map<TableName, Long> restoreTableToProcIdMap = new HashMap<>();
private Path rootDir;
private ExecutorService executorService;
/**
* Read write lock between taking snapshot and snapshot HFile cleaner. The cleaner should skip to
* check the HFiles if any snapshot is in progress, otherwise it may clean a HFile which would
* belongs to the newly creating snapshot. So we should grab the write lock first when cleaner
* start to work. (See HBASE-21387)
*/
private ReentrantReadWriteLock takingSnapshotLock = new ReentrantReadWriteLock(true);
public SnapshotManager() {}
/**
* Fully specify all necessary components of a snapshot manager. Exposed for testing.
* @param master services for the master where the manager is running
* @param coordinator procedure coordinator instance. exposed for testing.
* @param pool HBase ExecutorServcie instance, exposed for testing.
*/
@InterfaceAudience.Private
SnapshotManager(final MasterServices master, ProcedureCoordinator coordinator,
ExecutorService pool, int sentinelCleanInterval)
throws IOException, UnsupportedOperationException {
this.master = master;
this.rootDir = master.getMasterFileSystem().getRootDir();
Configuration conf = master.getConfiguration();
checkSnapshotSupport(conf, master.getMasterFileSystem());
this.coordinator = coordinator;
this.executorService = pool;
resetTempDir();
snapshotHandlerChoreCleanerTask = this.scheduleThreadPool.scheduleAtFixedRate(
this::cleanupSentinels, sentinelCleanInterval, sentinelCleanInterval, TimeUnit.SECONDS);
}
/**
* Gets the list of all completed snapshots.
* @return list of SnapshotDescriptions
* @throws IOException File system exception
*/
public List<SnapshotDescription> getCompletedSnapshots() throws IOException {
return getCompletedSnapshots(SnapshotDescriptionUtils.getSnapshotsDir(rootDir), true);
}
/**
* Gets the list of all completed snapshots.
* @param snapshotDir snapshot directory
* @param withCpCall Whether to call CP hooks
* @return list of SnapshotDescriptions
* @throws IOException File system exception
*/
private List<SnapshotDescription> getCompletedSnapshots(Path snapshotDir, boolean withCpCall)
throws IOException {
List<SnapshotDescription> snapshotDescs = new ArrayList<>();
// first create the snapshot root path and check to see if it exists
FileSystem fs = master.getMasterFileSystem().getFileSystem();
if (snapshotDir == null) snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(rootDir);
// if there are no snapshots, return an empty list
if (!fs.exists(snapshotDir)) {
return snapshotDescs;
}
// ignore all the snapshots in progress
FileStatus[] snapshots = fs.listStatus(snapshotDir,
new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
withCpCall = withCpCall && cpHost != null;
// loop through all the completed snapshots
for (FileStatus snapshot : snapshots) {
Path info = new Path(snapshot.getPath(), SnapshotDescriptionUtils.SNAPSHOTINFO_FILE);
// if the snapshot is bad
if (!fs.exists(info)) {
LOG.error("Snapshot information for " + snapshot.getPath() + " doesn't exist");
continue;
}
FSDataInputStream in = null;
try {
in = fs.open(info);
SnapshotDescription desc = SnapshotDescription.parseFrom(in);
org.apache.hadoop.hbase.client.SnapshotDescription descPOJO = (withCpCall)
? ProtobufUtil.createSnapshotDesc(desc) : null;
if (withCpCall) {
try {
cpHost.preListSnapshot(descPOJO);
} catch (AccessDeniedException e) {
LOG.warn("Current user does not have access to " + desc.getName() + " snapshot. "
+ "Either you should be owner of this snapshot or admin user.");
// Skip this and try for next snapshot
continue;
}
}
snapshotDescs.add(desc);
// call coproc post hook
if (withCpCall) {
cpHost.postListSnapshot(descPOJO);
}
} catch (IOException e) {
LOG.warn("Found a corrupted snapshot " + snapshot.getPath(), e);
} finally {
if (in != null) {
in.close();
}
}
}
return snapshotDescs;
}
/**
* Cleans up any snapshots in the snapshot/.tmp directory that were left from failed
* snapshot attempts.
*
* @throws IOException if we can't reach the filesystem
*/
private void resetTempDir() throws IOException {
// cleanup any existing snapshots.
Path tmpdir = SnapshotDescriptionUtils.getWorkingSnapshotDir(rootDir,
master.getConfiguration());
FileSystem tmpFs = tmpdir.getFileSystem(master.getConfiguration());
if (!tmpFs.delete(tmpdir, true)) {
LOG.warn("Couldn't delete working snapshot directory: " + tmpdir);
}
}
/**
* Delete the specified snapshot
* @param snapshot
* @throws SnapshotDoesNotExistException If the specified snapshot does not exist.
* @throws IOException For filesystem IOExceptions
*/
public void deleteSnapshot(SnapshotDescription snapshot) throws IOException {
// check to see if it is completed
if (!isSnapshotCompleted(snapshot)) {
throw new SnapshotDoesNotExistException(ProtobufUtil.createSnapshotDesc(snapshot));
}
String snapshotName = snapshot.getName();
// first create the snapshot description and check to see if it exists
FileSystem fs = master.getMasterFileSystem().getFileSystem();
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootDir);
// Get snapshot info from file system. The one passed as parameter is a "fake" snapshotInfo with
// just the "name" and it does not contains the "real" snapshot information
snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
// call coproc pre hook
MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
if (cpHost != null) {
snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
cpHost.preDeleteSnapshot(snapshotPOJO);
}
LOG.debug("Deleting snapshot: " + snapshotName);
// delete the existing snapshot
if (!fs.delete(snapshotDir, true)) {
throw new HBaseSnapshotException("Failed to delete snapshot directory: " + snapshotDir);
}
// call coproc post hook
if (cpHost != null) {
cpHost.postDeleteSnapshot(snapshotPOJO);
}
}
/**
* Check if the specified snapshot is done
*
* @param expected
* @return true if snapshot is ready to be restored, false if it is still being taken.
* @throws IOException IOException if error from HDFS or RPC
* @throws UnknownSnapshotException if snapshot is invalid or does not exist.
*/
public boolean isSnapshotDone(SnapshotDescription expected) throws IOException {
// check the request to make sure it has a snapshot
if (expected == null) {
throw new UnknownSnapshotException(
"No snapshot name passed in request, can't figure out which snapshot you want to check.");
}
String ssString = ClientSnapshotDescriptionUtils.toString(expected);
// check to see if the sentinel exists,
// and if the task is complete removes it from the in-progress snapshots map.
SnapshotSentinel handler = removeSentinelIfFinished(this.snapshotHandlers, expected);
// stop tracking "abandoned" handlers
cleanupSentinels();
if (handler == null) {
// If there's no handler in the in-progress map, it means one of the following:
// - someone has already requested the snapshot state
// - the requested snapshot was completed long time ago (cleanupSentinels() timeout)
// - the snapshot was never requested
// In those cases returns to the user the "done state" if the snapshots exists on disk,
// otherwise raise an exception saying that the snapshot is not running and doesn't exist.
if (!isSnapshotCompleted(expected)) {
throw new UnknownSnapshotException("Snapshot " + ssString
+ " is not currently running or one of the known completed snapshots.");
}
// was done, return true;
return true;
}
// pass on any failure we find in the sentinel
try {
handler.rethrowExceptionIfFailed();
} catch (ForeignException e) {
// Give some procedure info on an exception.
String status;
Procedure p = coordinator.getProcedure(expected.getName());
if (p != null) {
status = p.getStatus();
} else {
status = expected.getName() + " not found in proclist " + coordinator.getProcedureNames();
}
throw new HBaseSnapshotException("Snapshot " + ssString + " had an error. " + status, e,
ProtobufUtil.createSnapshotDesc(expected));
}
// check to see if we are done
if (handler.isFinished()) {
LOG.debug("Snapshot '" + ssString + "' has completed, notifying client.");
return true;
} else if (LOG.isDebugEnabled()) {
LOG.debug("Snapshoting '" + ssString + "' is still in progress!");
}
return false;
}
/**
* Check to see if there is a snapshot in progress with the same name or on the same table.
* Currently we have a limitation only allowing a single snapshot per table at a time. Also we
* don't allow snapshot with the same name.
* @param snapshot description of the snapshot being checked.
* @return <tt>true</tt> if there is a snapshot in progress with the same name or on the same
* table.
*/
synchronized boolean isTakingSnapshot(final SnapshotDescription snapshot) {
TableName snapshotTable = TableName.valueOf(snapshot.getTable());
if (isTakingSnapshot(snapshotTable)) {
return true;
}
Iterator<Map.Entry<TableName, SnapshotSentinel>> it = this.snapshotHandlers.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TableName, SnapshotSentinel> entry = it.next();
SnapshotSentinel sentinel = entry.getValue();
if (snapshot.getName().equals(sentinel.getSnapshot().getName()) && !sentinel.isFinished()) {
return true;
}
}
return false;
}
/**
* Check to see if the specified table has a snapshot in progress. Currently we have a
* limitation only allowing a single snapshot per table at a time.
* @param tableName name of the table being snapshotted.
* @return <tt>true</tt> if there is a snapshot in progress on the specified table.
*/
public boolean isTakingSnapshot(final TableName tableName) {
SnapshotSentinel handler = this.snapshotHandlers.get(tableName);
return handler != null && !handler.isFinished();
}
/**
* Check to make sure that we are OK to run the passed snapshot. Checks to make sure that we
* aren't already running a snapshot or restore on the requested table.
* @param snapshot description of the snapshot we want to start
* @throws HBaseSnapshotException if the filesystem could not be prepared to start the snapshot
*/
private synchronized void prepareToTakeSnapshot(SnapshotDescription snapshot)
throws HBaseSnapshotException {
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir,
master.getConfiguration());
TableName snapshotTable =
TableName.valueOf(snapshot.getTable());
// make sure we aren't already running a snapshot
if (isTakingSnapshot(snapshot)) {
SnapshotSentinel handler = this.snapshotHandlers.get(snapshotTable);
throw new SnapshotCreationException("Rejected taking "
+ ClientSnapshotDescriptionUtils.toString(snapshot)
+ " because we are already running another snapshot "
+ (handler != null ? ("on the same table " +
ClientSnapshotDescriptionUtils.toString(handler.getSnapshot()))
: "with the same name"), ProtobufUtil.createSnapshotDesc(snapshot));
}
// make sure we aren't running a restore on the same table
if (isRestoringTable(snapshotTable)) {
throw new SnapshotCreationException("Rejected taking "
+ ClientSnapshotDescriptionUtils.toString(snapshot)
+ " because we are already have a restore in progress on the same snapshot.");
}
try {
FileSystem workingDirFS = workingDir.getFileSystem(master.getConfiguration());
// delete the working directory, since we aren't running the snapshot. Likely leftovers
// from a failed attempt.
workingDirFS.delete(workingDir, true);
// recreate the working directory for the snapshot
if (!workingDirFS.mkdirs(workingDir)) {
throw new SnapshotCreationException("Couldn't create working directory (" + workingDir
+ ") for snapshot" , ProtobufUtil.createSnapshotDesc(snapshot));
}
} catch (HBaseSnapshotException e) {
throw e;
} catch (IOException e) {
throw new SnapshotCreationException(
"Exception while checking to see if snapshot could be started.", e,
ProtobufUtil.createSnapshotDesc(snapshot));
}
}
/**
* Take a snapshot of a disabled table.
* @param snapshot description of the snapshot to take. Modified to be {@link Type#DISABLED}.
* @throws IOException if the snapshot could not be started or filesystem for snapshot
* temporary directory could not be determined
*/
private synchronized void snapshotDisabledTable(SnapshotDescription snapshot)
throws IOException {
// setup the snapshot
prepareToTakeSnapshot(snapshot);
// set the snapshot to be a disabled snapshot, since the client doesn't know about that
snapshot = snapshot.toBuilder().setType(Type.DISABLED).build();
// Take the snapshot of the disabled table
DisabledTableSnapshotHandler handler =
new DisabledTableSnapshotHandler(snapshot, master, this);
snapshotTable(snapshot, handler);
}
/**
* Take a snapshot of an enabled table.
* @param snapshot description of the snapshot to take.
* @throws IOException if the snapshot could not be started or filesystem for snapshot
* temporary directory could not be determined
*/
private synchronized void snapshotEnabledTable(SnapshotDescription snapshot)
throws IOException {
// setup the snapshot
prepareToTakeSnapshot(snapshot);
// Take the snapshot of the enabled table
EnabledTableSnapshotHandler handler =
new EnabledTableSnapshotHandler(snapshot, master, this);
snapshotTable(snapshot, handler);
}
/**
* Take a snapshot using the specified handler.
* On failure the snapshot temporary working directory is removed.
* NOTE: prepareToTakeSnapshot() called before this one takes care of the rejecting the
* snapshot request if the table is busy with another snapshot/restore operation.
* @param snapshot the snapshot description
* @param handler the snapshot handler
*/
private synchronized void snapshotTable(SnapshotDescription snapshot,
final TakeSnapshotHandler handler) throws IOException {
try {
handler.prepare();
this.executorService.submit(handler);
this.snapshotHandlers.put(TableName.valueOf(snapshot.getTable()), handler);
} catch (Exception e) {
// cleanup the working directory by trying to delete it from the fs.
Path workingDir = SnapshotDescriptionUtils.getWorkingSnapshotDir(snapshot, rootDir,
master.getConfiguration());
FileSystem workingDirFs = workingDir.getFileSystem(master.getConfiguration());
try {
if (!workingDirFs.delete(workingDir, true)) {
LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
ClientSnapshotDescriptionUtils.toString(snapshot));
}
} catch (IOException e1) {
LOG.error("Couldn't delete working directory (" + workingDir + " for snapshot:" +
ClientSnapshotDescriptionUtils.toString(snapshot));
}
// fail the snapshot
throw new SnapshotCreationException("Could not build snapshot handler", e,
ProtobufUtil.createSnapshotDesc(snapshot));
}
}
public ReadWriteLock getTakingSnapshotLock() {
return this.takingSnapshotLock;
}
/**
* The snapshot operation processing as following: <br>
* 1. Create a Snapshot Handler, and do some initialization; <br>
* 2. Put the handler into snapshotHandlers <br>
* So when we consider if any snapshot is taking, we should consider both the takingSnapshotLock
* and snapshotHandlers;
* @return true to indicate that there're some running snapshots.
*/
public synchronized boolean isTakingAnySnapshot() {
return this.takingSnapshotLock.getReadHoldCount() > 0 || this.snapshotHandlers.size() > 0;
}
/**
* Take a snapshot based on the enabled/disabled state of the table.
* @param snapshot
* @throws HBaseSnapshotException when a snapshot specific exception occurs.
* @throws IOException when some sort of generic IO exception occurs.
*/
public void takeSnapshot(SnapshotDescription snapshot) throws IOException {
this.takingSnapshotLock.readLock().lock();
try {
takeSnapshotInternal(snapshot);
} finally {
this.takingSnapshotLock.readLock().unlock();
}
}
private void takeSnapshotInternal(SnapshotDescription snapshot) throws IOException {
// check to see if we already completed the snapshot
if (isSnapshotCompleted(snapshot)) {
throw new SnapshotExistsException(
"Snapshot '" + snapshot.getName() + "' already stored on the filesystem.",
ProtobufUtil.createSnapshotDesc(snapshot));
}
LOG.debug("No existing snapshot, attempting snapshot...");
// stop tracking "abandoned" handlers
cleanupSentinels();
// check to see if the table exists
TableDescriptor desc = null;
try {
desc = master.getTableDescriptors().get(
TableName.valueOf(snapshot.getTable()));
} catch (FileNotFoundException e) {
String msg = "Table:" + snapshot.getTable() + " info doesn't exist!";
LOG.error(msg);
throw new SnapshotCreationException(msg, e, ProtobufUtil.createSnapshotDesc(snapshot));
} catch (IOException e) {
throw new SnapshotCreationException(
"Error while geting table description for table " + snapshot.getTable(), e,
ProtobufUtil.createSnapshotDesc(snapshot));
}
if (desc == null) {
throw new SnapshotCreationException(
"Table '" + snapshot.getTable() + "' doesn't exist, can't take snapshot.",
ProtobufUtil.createSnapshotDesc(snapshot));
}
SnapshotDescription.Builder builder = snapshot.toBuilder();
// if not specified, set the snapshot format
if (!snapshot.hasVersion()) {
builder.setVersion(SnapshotDescriptionUtils.SNAPSHOT_LAYOUT_VERSION);
}
RpcServer.getRequestUser().ifPresent(user -> {
if (AccessChecker.isAuthorizationSupported(master.getConfiguration())) {
builder.setOwner(user.getShortName());
}
});
snapshot = builder.build();
// call pre coproc hook
MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
if (cpHost != null) {
snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
cpHost.preSnapshot(snapshotPOJO, desc);
}
// if the table is enabled, then have the RS run actually the snapshot work
TableName snapshotTable = TableName.valueOf(snapshot.getTable());
if (master.getTableStateManager().isTableState(snapshotTable,
TableState.State.ENABLED)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Table enabled, starting distributed snapshots for {}",
ClientSnapshotDescriptionUtils.toString(snapshot));
}
snapshotEnabledTable(snapshot);
if (LOG.isDebugEnabled()) {
LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
}
}
// For disabled table, snapshot is created by the master
else if (master.getTableStateManager().isTableState(snapshotTable,
TableState.State.DISABLED)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Table is disabled, running snapshot entirely on master for {}",
ClientSnapshotDescriptionUtils.toString(snapshot));
}
snapshotDisabledTable(snapshot);
if (LOG.isDebugEnabled()) {
LOG.debug("Started snapshot: {}", ClientSnapshotDescriptionUtils.toString(snapshot));
}
} else {
LOG.error("Can't snapshot table '" + snapshot.getTable()
+ "', isn't open or closed, we don't know what to do!");
TablePartiallyOpenException tpoe = new TablePartiallyOpenException(snapshot.getTable()
+ " isn't fully open.");
throw new SnapshotCreationException("Table is not entirely open or closed", tpoe,
ProtobufUtil.createSnapshotDesc(snapshot));
}
// call post coproc hook
if (cpHost != null) {
cpHost.postSnapshot(snapshotPOJO, desc);
}
}
/**
* Set the handler for the current snapshot
* <p>
* Exposed for TESTING
* @param tableName
* @param handler handler the master should use
*
* TODO get rid of this if possible, repackaging, modify tests.
*/
public synchronized void setSnapshotHandlerForTesting(
final TableName tableName,
final SnapshotSentinel handler) {
if (handler != null) {
this.snapshotHandlers.put(tableName, handler);
} else {
this.snapshotHandlers.remove(tableName);
}
}
/**
* @return distributed commit coordinator for all running snapshots
*/
ProcedureCoordinator getCoordinator() {
return coordinator;
}
/**
* Check to see if the snapshot is one of the currently completed snapshots
* Returns true if the snapshot exists in the "completed snapshots folder".
*
* @param snapshot expected snapshot to check
* @return <tt>true</tt> if the snapshot is stored on the {@link FileSystem}, <tt>false</tt> if is
* not stored
* @throws IOException if the filesystem throws an unexpected exception,
* @throws IllegalArgumentException if snapshot name is invalid.
*/
private boolean isSnapshotCompleted(SnapshotDescription snapshot) throws IOException {
try {
final Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshot, rootDir);
FileSystem fs = master.getMasterFileSystem().getFileSystem();
// check to see if the snapshot already exists
return fs.exists(snapshotDir);
} catch (IllegalArgumentException iae) {
throw new UnknownSnapshotException("Unexpected exception thrown", iae);
}
}
/**
* Clone the specified snapshot.
* The clone will fail if the destination table has a snapshot or restore in progress.
*
* @param reqSnapshot Snapshot Descriptor from request
* @param tableName table to clone
* @param snapshot Snapshot Descriptor
* @param snapshotTableDesc Table Descriptor
* @param nonceKey unique identifier to prevent duplicated RPC
* @return procId the ID of the clone snapshot procedure
* @throws IOException
*/
private long cloneSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
final NonceKey nonceKey, final boolean restoreAcl) throws IOException {
MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
TableDescriptor htd = TableDescriptorBuilder.copy(tableName, snapshotTableDesc);
org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
if (cpHost != null) {
snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
cpHost.preCloneSnapshot(snapshotPOJO, htd);
}
long procId;
try {
procId = cloneSnapshot(snapshot, htd, nonceKey, restoreAcl);
} catch (IOException e) {
LOG.error("Exception occurred while cloning the snapshot " + snapshot.getName()
+ " as table " + tableName.getNameAsString(), e);
throw e;
}
LOG.info("Clone snapshot=" + snapshot.getName() + " as table=" + tableName);
if (cpHost != null) {
cpHost.postCloneSnapshot(snapshotPOJO, htd);
}
return procId;
}
/**
* Clone the specified snapshot into a new table.
* The operation will fail if the destination table has a snapshot or restore in progress.
*
* @param snapshot Snapshot Descriptor
* @param tableDescriptor Table Descriptor of the table to create
* @param nonceKey unique identifier to prevent duplicated RPC
* @return procId the ID of the clone snapshot procedure
*/
synchronized long cloneSnapshot(final SnapshotDescription snapshot,
final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl)
throws HBaseSnapshotException {
TableName tableName = tableDescriptor.getTableName();
// make sure we aren't running a snapshot on the same table
if (isTakingSnapshot(tableName)) {
throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
}
// make sure we aren't running a restore on the same table
if (isRestoringTable(tableName)) {
throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
}
try {
long procId = master.getMasterProcedureExecutor().submitProcedure(
new CloneSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
tableDescriptor, snapshot, restoreAcl),
nonceKey);
this.restoreTableToProcIdMap.put(tableName, procId);
return procId;
} catch (Exception e) {
String msg = "Couldn't clone the snapshot="
+ ClientSnapshotDescriptionUtils.toString(snapshot) + " on table=" + tableName;
LOG.error(msg, e);
throw new RestoreSnapshotException(msg, e);
}
}
/**
* Restore or Clone the specified snapshot
* @param reqSnapshot
* @param nonceKey unique identifier to prevent duplicated RPC
* @throws IOException
*/
public long restoreOrCloneSnapshot(final SnapshotDescription reqSnapshot, final NonceKey nonceKey,
final boolean restoreAcl) throws IOException {
FileSystem fs = master.getMasterFileSystem().getFileSystem();
Path snapshotDir = SnapshotDescriptionUtils.getCompletedSnapshotDir(reqSnapshot, rootDir);
// check if the snapshot exists
if (!fs.exists(snapshotDir)) {
LOG.error("A Snapshot named '" + reqSnapshot.getName() + "' does not exist.");
throw new SnapshotDoesNotExistException(
ProtobufUtil.createSnapshotDesc(reqSnapshot));
}
// Get snapshot info from file system. The reqSnapshot is a "fake" snapshotInfo with
// just the snapshot "name" and table name to restore. It does not contains the "real" snapshot
// information.
SnapshotDescription snapshot = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotDir);
SnapshotManifest manifest = SnapshotManifest.open(master.getConfiguration(), fs,
snapshotDir, snapshot);
TableDescriptor snapshotTableDesc = manifest.getTableDescriptor();
TableName tableName = TableName.valueOf(reqSnapshot.getTable());
// sanity check the new table descriptor
TableDescriptorChecker.sanityCheck(master.getConfiguration(), snapshotTableDesc);
// stop tracking "abandoned" handlers
cleanupSentinels();
// Verify snapshot validity
SnapshotReferenceUtil.verifySnapshot(master.getConfiguration(), fs, manifest);
// Execute the restore/clone operation
long procId;
if (master.getTableDescriptors().exists(tableName)) {
procId = restoreSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey,
restoreAcl);
} else {
procId =
cloneSnapshot(reqSnapshot, tableName, snapshot, snapshotTableDesc, nonceKey, restoreAcl);
}
return procId;
}
/**
* Restore the specified snapshot. The restore will fail if the destination table has a snapshot
* or restore in progress.
* @param reqSnapshot Snapshot Descriptor from request
* @param tableName table to restore
* @param snapshot Snapshot Descriptor
* @param snapshotTableDesc Table Descriptor
* @param nonceKey unique identifier to prevent duplicated RPC
* @param restoreAcl true to restore acl of snapshot
* @return procId the ID of the restore snapshot procedure
* @throws IOException
*/
private long restoreSnapshot(final SnapshotDescription reqSnapshot, final TableName tableName,
final SnapshotDescription snapshot, final TableDescriptor snapshotTableDesc,
final NonceKey nonceKey, final boolean restoreAcl) throws IOException {
MasterCoprocessorHost cpHost = master.getMasterCoprocessorHost();
if (master.getTableStateManager().isTableState(
TableName.valueOf(snapshot.getTable()), TableState.State.ENABLED)) {
throw new UnsupportedOperationException("Table '" +
TableName.valueOf(snapshot.getTable()) + "' must be disabled in order to " +
"perform a restore operation.");
}
// call Coprocessor pre hook
org.apache.hadoop.hbase.client.SnapshotDescription snapshotPOJO = null;
if (cpHost != null) {
snapshotPOJO = ProtobufUtil.createSnapshotDesc(snapshot);
cpHost.preRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
}
long procId;
try {
procId = restoreSnapshot(snapshot, snapshotTableDesc, nonceKey, restoreAcl);
} catch (IOException e) {
LOG.error("Exception occurred while restoring the snapshot " + snapshot.getName()
+ " as table " + tableName.getNameAsString(), e);
throw e;
}
LOG.info("Restore snapshot=" + snapshot.getName() + " as table=" + tableName);
if (cpHost != null) {
cpHost.postRestoreSnapshot(snapshotPOJO, snapshotTableDesc);
}
return procId;
}
/**
* Restore the specified snapshot. The restore will fail if the destination table has a snapshot
* or restore in progress.
* @param snapshot Snapshot Descriptor
* @param tableDescriptor Table Descriptor
* @param nonceKey unique identifier to prevent duplicated RPC
* @param restoreAcl true to restore acl of snapshot
* @return procId the ID of the restore snapshot procedure
*/
private synchronized long restoreSnapshot(final SnapshotDescription snapshot,
final TableDescriptor tableDescriptor, final NonceKey nonceKey, final boolean restoreAcl)
throws HBaseSnapshotException {
final TableName tableName = tableDescriptor.getTableName();
// make sure we aren't running a snapshot on the same table
if (isTakingSnapshot(tableName)) {
throw new RestoreSnapshotException("Snapshot in progress on the restore table=" + tableName);
}
// make sure we aren't running a restore on the same table
if (isRestoringTable(tableName)) {
throw new RestoreSnapshotException("Restore already in progress on the table=" + tableName);
}
try {
long procId = master.getMasterProcedureExecutor().submitProcedure(
new RestoreSnapshotProcedure(master.getMasterProcedureExecutor().getEnvironment(),
tableDescriptor, snapshot, restoreAcl),
nonceKey);
this.restoreTableToProcIdMap.put(tableName, procId);
return procId;
} catch (Exception e) {
String msg = "Couldn't restore the snapshot=" + ClientSnapshotDescriptionUtils.toString(
snapshot) +
" on table=" + tableName;
LOG.error(msg, e);
throw new RestoreSnapshotException(msg, e);
}
}
/**
* Verify if the restore of the specified table is in progress.
*
* @param tableName table under restore
* @return <tt>true</tt> if there is a restore in progress of the specified table.
*/
private synchronized boolean isRestoringTable(final TableName tableName) {
Long procId = this.restoreTableToProcIdMap.get(tableName);
if (procId == null) {
return false;
}
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
if (procExec.isRunning() && !procExec.isFinished(procId)) {
return true;
} else {
this.restoreTableToProcIdMap.remove(tableName);
return false;
}
}
/**
* Return the handler if it is currently live and has the same snapshot target name.
* The handler is removed from the sentinels map if completed.
* @param sentinels live handlers
* @param snapshot snapshot description
* @return null if doesn't match, else a live handler.
*/
private synchronized SnapshotSentinel removeSentinelIfFinished(
final Map<TableName, SnapshotSentinel> sentinels,
final SnapshotDescription snapshot) {
if (!snapshot.hasTable()) {
return null;
}
TableName snapshotTable = TableName.valueOf(snapshot.getTable());
SnapshotSentinel h = sentinels.get(snapshotTable);
if (h == null) {
return null;
}
if (!h.getSnapshot().getName().equals(snapshot.getName())) {
// specified snapshot is to the one currently running
return null;
}
// Remove from the "in-progress" list once completed
if (h.isFinished()) {
sentinels.remove(snapshotTable);
}
return h;
}
/**
* Removes "abandoned" snapshot/restore requests.
* As part of the HBaseAdmin snapshot/restore API the operation status is checked until completed,
* and the in-progress maps are cleaned up when the status of a completed task is requested.
* To avoid having sentinels staying around for long time if something client side is failed,
* each operation tries to clean up the in-progress maps sentinels finished from a long time.
*/
private void cleanupSentinels() {
cleanupSentinels(this.snapshotHandlers);
cleanupCompletedRestoreInMap();
}
/**
* Remove the sentinels that are marked as finished and the completion time
* has exceeded the removal timeout.
* @param sentinels map of sentinels to clean
*/
private synchronized void cleanupSentinels(final Map<TableName, SnapshotSentinel> sentinels) {
long currentTime = EnvironmentEdgeManager.currentTime();
long sentinelsCleanupTimeoutMillis =
master.getConfiguration().getLong(HBASE_SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLIS,
SNAPSHOT_SENTINELS_CLEANUP_TIMEOUT_MILLS_DEFAULT);
Iterator<Map.Entry<TableName, SnapshotSentinel>> it = sentinels.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TableName, SnapshotSentinel> entry = it.next();
SnapshotSentinel sentinel = entry.getValue();
if (sentinel.isFinished()
&& (currentTime - sentinel.getCompletionTimestamp()) > sentinelsCleanupTimeoutMillis) {
it.remove();
}
}
}
/**
* Remove the procedures that are marked as finished
*/
private synchronized void cleanupCompletedRestoreInMap() {
ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
Iterator<Map.Entry<TableName, Long>> it = restoreTableToProcIdMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<TableName, Long> entry = it.next();
Long procId = entry.getValue();
if (procExec.isRunning() && procExec.isFinished(procId)) {
it.remove();
}
}
}
//
// Implementing Stoppable interface
//
@Override
public void stop(String why) {
// short circuit
if (this.stopped) return;
// make sure we get stop
this.stopped = true;
// pass the stop onto take snapshot handlers
for (SnapshotSentinel snapshotHandler: this.snapshotHandlers.values()) {
snapshotHandler.cancel(why);
}
if (snapshotHandlerChoreCleanerTask != null) {
snapshotHandlerChoreCleanerTask.cancel(true);
}
try {
if (coordinator != null) {
coordinator.close();
}
} catch (IOException e) {
LOG.error("stop ProcedureCoordinator error", e);
}
}
@Override
public boolean isStopped() {
return this.stopped;
}
/**
* Throws an exception if snapshot operations (take a snapshot, restore, clone) are not supported.
* Called at the beginning of snapshot() and restoreSnapshot() methods.
* @throws UnsupportedOperationException if snapshot are not supported
*/
public void checkSnapshotSupport() throws UnsupportedOperationException {
if (!this.isSnapshotSupported) {
throw new UnsupportedOperationException(
"To use snapshots, You must add to the hbase-site.xml of the HBase Master: '" +
HBASE_SNAPSHOT_ENABLED + "' property with value 'true'.");
}
}
/**
* Called at startup, to verify if snapshot operation is supported, and to avoid
* starting the master if there're snapshots present but the cleaners needed are missing.
* Otherwise we can end up with snapshot data loss.
* @param conf The {@link Configuration} object to use
* @param mfs The MasterFileSystem to use
* @throws IOException in case of file-system operation failure
* @throws UnsupportedOperationException in case cleaners are missing and
* there're snapshot in the system
*/
private void checkSnapshotSupport(final Configuration conf, final MasterFileSystem mfs)
throws IOException, UnsupportedOperationException {
// Verify if snapshot is disabled by the user
String enabled = conf.get(HBASE_SNAPSHOT_ENABLED);
boolean snapshotEnabled = conf.getBoolean(HBASE_SNAPSHOT_ENABLED, false);
boolean userDisabled = (enabled != null && enabled.trim().length() > 0 && !snapshotEnabled);
// Extract cleaners from conf
Set<String> hfileCleaners = new HashSet<>();
String[] cleaners = conf.getStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
if (cleaners != null) Collections.addAll(hfileCleaners, cleaners);
Set<String> logCleaners = new HashSet<>();
cleaners = conf.getStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS);
if (cleaners != null) Collections.addAll(logCleaners, cleaners);
// check if an older version of snapshot directory was present
Path oldSnapshotDir = new Path(mfs.getRootDir(), HConstants.OLD_SNAPSHOT_DIR_NAME);
FileSystem fs = mfs.getFileSystem();
List<SnapshotDescription> ss = getCompletedSnapshots(new Path(rootDir, oldSnapshotDir), false);
if (ss != null && !ss.isEmpty()) {
LOG.error("Snapshots from an earlier release were found under: " + oldSnapshotDir);
LOG.error("Please rename the directory as " + HConstants.SNAPSHOT_DIR_NAME);
}
// If the user has enabled the snapshot, we force the cleaners to be present
// otherwise we still need to check if cleaners are enabled or not and verify
// that there're no snapshot in the .snapshot folder.
if (snapshotEnabled) {
// Inject snapshot cleaners, if snapshot.enable is true
hfileCleaners.add(SnapshotHFileCleaner.class.getName());
hfileCleaners.add(HFileLinkCleaner.class.getName());
// If sync acl to HDFS feature is enabled, then inject the cleaner
if (SnapshotScannerHDFSAclHelper.isAclSyncToHdfsEnabled(conf)) {
hfileCleaners.add(SnapshotScannerHDFSAclCleaner.class.getName());
}
// Set cleaners conf
conf.setStrings(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS,
hfileCleaners.toArray(new String[hfileCleaners.size()]));
conf.setStrings(HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS,
logCleaners.toArray(new String[logCleaners.size()]));
} else {
// Verify if cleaners are present
snapshotEnabled =
hfileCleaners.contains(SnapshotHFileCleaner.class.getName()) &&
hfileCleaners.contains(HFileLinkCleaner.class.getName());
// Warn if the cleaners are enabled but the snapshot.enabled property is false/not set.
if (snapshotEnabled) {
LOG.warn("Snapshot log and hfile cleaners are present in the configuration, " +
"but the '" + HBASE_SNAPSHOT_ENABLED + "' property " +
(userDisabled ? "is set to 'false'." : "is not set."));
}
}
// Mark snapshot feature as enabled if cleaners are present and user has not disabled it.
this.isSnapshotSupported = snapshotEnabled && !userDisabled;
// If cleaners are not enabled, verify that there're no snapshot in the .snapshot folder
// otherwise we end up with snapshot data loss.
if (!snapshotEnabled) {
LOG.info("Snapshot feature is not enabled, missing log and hfile cleaners.");
Path snapshotDir = SnapshotDescriptionUtils.getSnapshotsDir(mfs.getRootDir());
if (fs.exists(snapshotDir)) {
FileStatus[] snapshots = CommonFSUtils.listStatus(fs, snapshotDir,
new SnapshotDescriptionUtils.CompletedSnaphotDirectoriesFilter(fs));
if (snapshots != null) {
LOG.error("Snapshots are present, but cleaners are not enabled.");
checkSnapshotSupport();
}
}
}
}
@Override
public void initialize(MasterServices master, MetricsMaster metricsMaster) throws KeeperException,
IOException, UnsupportedOperationException {
this.master = master;
this.rootDir = master.getMasterFileSystem().getRootDir();
checkSnapshotSupport(master.getConfiguration(), master.getMasterFileSystem());
// get the configuration for the coordinator
Configuration conf = master.getConfiguration();
long wakeFrequency = conf.getInt(SNAPSHOT_WAKE_MILLIS_KEY, SNAPSHOT_WAKE_MILLIS_DEFAULT);
long timeoutMillis = Math.max(
conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME),
conf.getLong(SnapshotDescriptionUtils.MASTER_SNAPSHOT_TIMEOUT_MILLIS,
SnapshotDescriptionUtils.DEFAULT_MAX_WAIT_TIME));
int opThreads = conf.getInt(SNAPSHOT_POOL_THREADS_KEY, SNAPSHOT_POOL_THREADS_DEFAULT);
// setup the default procedure coordinator
String name = master.getServerName().toString();
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
ProcedureCoordinatorRpcs comms = new ZKProcedureCoordinator(
master.getZooKeeper(), SnapshotManager.ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION, name);
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
this.executorService = master.getExecutorService();
resetTempDir();
snapshotHandlerChoreCleanerTask =
scheduleThreadPool.scheduleAtFixedRate(this::cleanupSentinels, 10, 10, TimeUnit.SECONDS);
}
@Override
public String getProcedureSignature() {
return ONLINE_SNAPSHOT_CONTROLLER_DESCRIPTION;
}
@Override
public void execProcedure(ProcedureDescription desc) throws IOException {
takeSnapshot(toSnapshotDescription(desc));
}
@Override
public void checkPermissions(ProcedureDescription desc, AccessChecker accessChecker, User user)
throws IOException {
// Done by AccessController as part of preSnapshot coprocessor hook (legacy code path).
// In future, when we AC is removed for good, that check should be moved here.
}
@Override
public boolean isProcedureDone(ProcedureDescription desc) throws IOException {
return isSnapshotDone(toSnapshotDescription(desc));
}
private SnapshotDescription toSnapshotDescription(ProcedureDescription desc)
throws IOException {
SnapshotDescription.Builder builder = SnapshotDescription.newBuilder();
if (!desc.hasInstance()) {
throw new IOException("Snapshot name is not defined: " + desc.toString());
}
String snapshotName = desc.getInstance();
List<NameStringPair> props = desc.getConfigurationList();
String table = null;
for (NameStringPair prop : props) {
if ("table".equalsIgnoreCase(prop.getName())) {
table = prop.getValue();
}
}
if (table == null) {
throw new IOException("Snapshot table is not defined: " + desc.toString());
}
TableName tableName = TableName.valueOf(table);
builder.setTable(tableName.getNameAsString());
builder.setName(snapshotName);
builder.setType(SnapshotDescription.Type.FLUSH);
return builder.build();
}
}