blob: 3cc8bcf7409199ffb96324a1f57d5497540496ca [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.backup.mapreduce;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupCopyJob;
import org.apache.hadoop.hbase.backup.BackupInfo;
import org.apache.hadoop.hbase.backup.BackupType;
import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Map-Reduce implementation of {@link BackupCopyJob}. Basically, there are 2 types of copy
* operation: one is copying from snapshot, which bases on extending ExportSnapshot's function, the
* other is copying for incremental log files, which bases on extending DistCp's function.
*/
@InterfaceAudience.Private
public class MapReduceBackupCopyJob implements BackupCopyJob {
public static final String NUMBER_OF_LEVELS_TO_PRESERVE_KEY = "num.levels.preserve";
private static final Logger LOG = LoggerFactory.getLogger(MapReduceBackupCopyJob.class);
private Configuration conf;
// Accumulated progress within the whole backup process for the copy operation
private float progressDone = 0.1f;
private long bytesCopied = 0;
private static float INIT_PROGRESS = 0.1f;
// The percentage of the current copy task within the whole task if multiple time copies are
// needed. The default value is 100%, which means only 1 copy task for the whole.
private float subTaskPercntgInWholeTask = 1f;
public MapReduceBackupCopyJob() {
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
/**
* Get the current copy task percentage within the whole task if multiple copies are needed.
* @return the current copy task percentage
*/
public float getSubTaskPercntgInWholeTask() {
return subTaskPercntgInWholeTask;
}
/**
* Set the current copy task percentage within the whole task if multiple copies are needed. Must
* be called before calling
* {@link #copy(BackupInfo, BackupManager, Configuration, BackupType, String[])}
* @param subTaskPercntgInWholeTask The percentage of the copy subtask
*/
public void setSubTaskPercntgInWholeTask(float subTaskPercntgInWholeTask) {
this.subTaskPercntgInWholeTask = subTaskPercntgInWholeTask;
}
static class SnapshotCopy extends ExportSnapshot {
private BackupInfo backupInfo;
private TableName table;
public SnapshotCopy(BackupInfo backupInfo, TableName table) {
super();
this.backupInfo = backupInfo;
this.table = table;
}
public TableName getTable() {
return this.table;
}
public BackupInfo getBackupInfo() {
return this.backupInfo;
}
}
/**
* Update the ongoing backup with new progress.
* @param backupInfo backup info
* @param newProgress progress
* @param bytesCopied bytes copied
* @throws NoNodeException exception
*/
static void updateProgress(BackupInfo backupInfo, BackupManager backupManager,
int newProgress, long bytesCopied) throws IOException {
// compose the new backup progress data, using fake number for now
String backupProgressData = newProgress + "%";
backupInfo.setProgress(newProgress);
backupManager.updateBackupInfo(backupInfo);
LOG.debug("Backup progress data \"" + backupProgressData
+ "\" has been updated to backup system table for " + backupInfo.getBackupId());
}
/**
* Extends DistCp for progress updating to backup system table
* during backup. Using DistCpV2 (MAPREDUCE-2765).
* Simply extend it and override execute() method to get the
* Job reference for progress updating.
* Only the argument "src1, [src2, [...]] dst" is supported,
* no more DistCp options.
*/
class BackupDistCp extends DistCp {
private BackupInfo backupInfo;
private BackupManager backupManager;
public BackupDistCp(Configuration conf, DistCpOptions options, BackupInfo backupInfo,
BackupManager backupManager) throws Exception {
super(conf, options);
this.backupInfo = backupInfo;
this.backupManager = backupManager;
}
@Override
public Job execute() throws Exception {
// reflection preparation for private methods and fields
Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
Field fieldInputOptions = getInputOptionsField(classDistCp);
Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
methodCleanup.setAccessible(true);
fieldInputOptions.setAccessible(true);
fieldSubmitted.setAccessible(true);
// execute() logic starts here
assert fieldInputOptions.get(this) != null;
Job job = null;
try {
List<Path> srcs = getSourcePaths(fieldInputOptions);
long totalSrcLgth = 0;
for (Path aSrc : srcs) {
totalSrcLgth +=
BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), aSrc);
}
// Async call
job = super.execute();
// Update the copy progress to system table every 0.5s if progress value changed
int progressReportFreq =
MapReduceBackupCopyJob.this.getConf().getInt("hbase.backup.progressreport.frequency",
500);
float lastProgress = progressDone;
while (!job.isComplete()) {
float newProgress =
progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
if (newProgress > lastProgress) {
BigDecimal progressData =
new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
String newProgressStr = progressData + "%";
LOG.info("Progress: " + newProgressStr);
updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
LOG.debug("Backup progress data updated to backup system table: \"Progress: "
+ newProgressStr + ".\"");
lastProgress = newProgress;
}
Thread.sleep(progressReportFreq);
}
// update the progress data after copy job complete
float newProgress =
progressDone + job.mapProgress() * subTaskPercntgInWholeTask * (1 - INIT_PROGRESS);
BigDecimal progressData =
new BigDecimal(newProgress * 100).setScale(1, BigDecimal.ROUND_HALF_UP);
String newProgressStr = progressData + "%";
LOG.info("Progress: " + newProgressStr + " subTask: " + subTaskPercntgInWholeTask
+ " mapProgress: " + job.mapProgress());
// accumulate the overall backup progress
progressDone = newProgress;
bytesCopied += totalSrcLgth;
updateProgress(backupInfo, backupManager, progressData.intValue(), bytesCopied);
LOG.debug("Backup progress data updated to backup system table: \"Progress: "
+ newProgressStr + " - " + bytesCopied + " bytes copied.\"");
} catch (Throwable t) {
LOG.error(t.toString(), t);
throw t;
}
String jobID = job.getJobID().toString();
job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID);
LOG.debug("DistCp job-id: " + jobID + " completed: " + job.isComplete() + " "
+ job.isSuccessful());
Counters ctrs = job.getCounters();
LOG.debug(Objects.toString(ctrs));
if (job.isComplete() && !job.isSuccessful()) {
throw new Exception("DistCp job-id: " + jobID + " failed");
}
return job;
}
private Field getInputOptionsField(Class<?> classDistCp) throws IOException{
Field f = null;
try {
f = classDistCp.getDeclaredField("inputOptions");
} catch(Exception e) {
// Haddop 3
try {
f = classDistCp.getDeclaredField("context");
} catch (NoSuchFieldException | SecurityException e1) {
throw new IOException(e1);
}
}
return f;
}
@SuppressWarnings("unchecked")
private List<Path> getSourcePaths(Field fieldInputOptions) throws IOException{
Object options;
try {
options = fieldInputOptions.get(this);
if (options instanceof DistCpOptions) {
return ((DistCpOptions) options).getSourcePaths();
} else {
// Hadoop 3
Class<?> classContext = Class.forName("org.apache.hadoop.tools.DistCpContext");
Method methodGetSourcePaths = classContext.getDeclaredMethod("getSourcePaths");
methodGetSourcePaths.setAccessible(true);
return (List<Path>) methodGetSourcePaths.invoke(options);
}
} catch (IllegalArgumentException | IllegalAccessException |
ClassNotFoundException | NoSuchMethodException |
SecurityException | InvocationTargetException e) {
throw new IOException(e);
}
}
@Override
protected Path createInputFileListing(Job job) throws IOException {
if (conf.get(NUMBER_OF_LEVELS_TO_PRESERVE_KEY) == null) {
return super.createInputFileListing(job);
}
long totalBytesExpected = 0;
int totalRecords = 0;
Path fileListingPath = getFileListingPath();
try (SequenceFile.Writer writer = getWriter(fileListingPath)) {
List<Path> srcFiles = getSourceFiles();
if (srcFiles.size() == 0) {
return fileListingPath;
}
totalRecords = srcFiles.size();
FileSystem fs = srcFiles.get(0).getFileSystem(conf);
for (Path path : srcFiles) {
FileStatus fst = fs.getFileStatus(path);
totalBytesExpected += fst.getLen();
Text key = getKey(path);
writer.append(key, new CopyListingFileStatus(fst));
}
writer.close();
// update jobs configuration
Configuration cfg = job.getConfiguration();
cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_BYTES_TO_BE_COPIED, totalBytesExpected);
cfg.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, fileListingPath.toString());
cfg.setLong(DistCpConstants.CONF_LABEL_TOTAL_NUMBER_OF_RECORDS, totalRecords);
} catch (NoSuchFieldException | SecurityException | IllegalArgumentException
| IllegalAccessException | NoSuchMethodException | ClassNotFoundException
| InvocationTargetException e) {
throw new IOException(e);
}
return fileListingPath;
}
private Text getKey(Path path) {
int level = conf.getInt(NUMBER_OF_LEVELS_TO_PRESERVE_KEY, 1);
int count = 0;
String relPath = "";
while (count++ < level) {
relPath = Path.SEPARATOR + path.getName() + relPath;
path = path.getParent();
}
return new Text(relPath);
}
private List<Path> getSourceFiles() throws NoSuchFieldException, SecurityException,
IllegalArgumentException, IllegalAccessException, NoSuchMethodException,
ClassNotFoundException, InvocationTargetException, IOException {
Field options = null;
try {
options = DistCp.class.getDeclaredField("inputOptions");
} catch (NoSuchFieldException | SecurityException e) {
options = DistCp.class.getDeclaredField("context");
}
options.setAccessible(true);
return getSourcePaths(options);
}
private SequenceFile.Writer getWriter(Path pathToListFile) throws IOException {
FileSystem fs = pathToListFile.getFileSystem(conf);
fs.delete(pathToListFile, false);
return SequenceFile.createWriter(conf, SequenceFile.Writer.file(pathToListFile),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(CopyListingFileStatus.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.NONE));
}
}
/**
* Do backup copy based on different types.
* @param context The backup info
* @param conf The hadoop configuration
* @param copyType The backup copy type
* @param options Options for customized ExportSnapshot or DistCp
* @throws Exception exception
*/
@Override
public int copy(BackupInfo context, BackupManager backupManager, Configuration conf,
BackupType copyType, String[] options) throws IOException {
int res = 0;
try {
if (copyType == BackupType.FULL) {
SnapshotCopy snapshotCp = new SnapshotCopy(context, context.getTableBySnapshot(options[1]));
LOG.debug("Doing SNAPSHOT_COPY");
// Make a new instance of conf to be used by the snapshot copy class.
snapshotCp.setConf(new Configuration(conf));
res = snapshotCp.run(options);
} else if (copyType == BackupType.INCREMENTAL) {
LOG.debug("Doing COPY_TYPE_DISTCP");
setSubTaskPercntgInWholeTask(1f);
BackupDistCp distcp =
new BackupDistCp(new Configuration(conf), null, context, backupManager);
// Handle a special case where the source file is a single file.
// In this case, distcp will not create the target dir. It just take the
// target as a file name and copy source file to the target (as a file name).
// We need to create the target dir before run distcp.
LOG.debug("DistCp options: " + Arrays.toString(options));
Path dest = new Path(options[options.length - 1]);
String[] newOptions = new String[options.length + 1];
System.arraycopy(options, 0, newOptions, 1, options.length);
newOptions[0] = "-async"; // run DisCp in async mode
FileSystem destfs = dest.getFileSystem(conf);
if (!destfs.exists(dest)) {
destfs.mkdirs(dest);
}
res = distcp.run(newOptions);
}
return res;
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public void cancel(String jobId) throws IOException {
JobID id = JobID.forName(jobId);
Cluster cluster = new Cluster(this.getConf());
try {
Job job = cluster.getJob(id);
if (job == null) {
LOG.error("No job found for " + id);
// should we throw exception
return;
}
if (job.isComplete() || job.isRetired()) {
return;
}
job.killJob();
LOG.debug("Killed copy job " + id);
} catch (InterruptedException e) {
throw new IOException(e);
}
}
}