| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.tools; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.tools.util.DistCpUtils; |
| |
| import java.util.EnumSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.NoSuchElementException; |
| |
| /** |
| * The Options class encapsulates all DistCp options. |
| * These may be set from command-line (via the OptionsParser) |
| * or may be set manually. |
| */ |
| public class DistCpOptions { |
| |
| private boolean atomicCommit = false; |
| private boolean syncFolder = false; |
| private boolean deleteMissing = false; |
| private boolean ignoreFailures = false; |
| private boolean overwrite = false; |
| private boolean skipCRC = false; |
| private boolean blocking = true; |
| |
| private int maxMaps = DistCpConstants.DEFAULT_MAPS; |
| private int mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB; |
| |
| private String sslConfigurationFile; |
| |
| private String copyStrategy = DistCpConstants.UNIFORMSIZE; |
| |
| private EnumSet<FileAttribute> preserveStatus = EnumSet.noneOf(FileAttribute.class); |
| |
| private Path atomicWorkPath; |
| |
| private Path logPath; |
| |
| private Path sourceFileListing; |
| private List<Path> sourcePaths; |
| |
| private Path targetPath; |
| |
| public static enum FileAttribute{ |
| REPLICATION, BLOCKSIZE, USER, GROUP, PERMISSION; |
| |
| public static FileAttribute getAttribute(char symbol) { |
| for (FileAttribute attribute : values()) { |
| if (attribute.name().charAt(0) == Character.toUpperCase(symbol)) { |
| return attribute; |
| } |
| } |
| throw new NoSuchElementException("No attribute for " + symbol); |
| } |
| } |
| |
| /** |
| * Constructor, to initialize source/target paths. |
| * @param sourcePaths List of source-paths (including wildcards) |
| * to be copied to target. |
| * @param targetPath Destination path for the dist-copy. |
| */ |
| public DistCpOptions(List<Path> sourcePaths, Path targetPath) { |
| assert sourcePaths != null && !sourcePaths.isEmpty() : "Invalid source paths"; |
| assert targetPath != null : "Invalid Target path"; |
| |
| this.sourcePaths = sourcePaths; |
| this.targetPath = targetPath; |
| } |
| |
| /** |
| * Constructor, to initialize source/target paths. |
| * @param sourceFileListing File containing list of source paths |
| * @param targetPath Destination path for the dist-copy. |
| */ |
| public DistCpOptions(Path sourceFileListing, Path targetPath) { |
| assert sourceFileListing != null : "Invalid source paths"; |
| assert targetPath != null : "Invalid Target path"; |
| |
| this.sourceFileListing = sourceFileListing; |
| this.targetPath = targetPath; |
| } |
| |
| /** |
| * Copy constructor. |
| * @param that DistCpOptions being copied from. |
| */ |
| public DistCpOptions(DistCpOptions that) { |
| if (this != that && that != null) { |
| this.atomicCommit = that.atomicCommit; |
| this.syncFolder = that.syncFolder; |
| this.deleteMissing = that.deleteMissing; |
| this.ignoreFailures = that.ignoreFailures; |
| this.overwrite = that.overwrite; |
| this.skipCRC = that.skipCRC; |
| this.blocking = that.blocking; |
| this.maxMaps = that.maxMaps; |
| this.mapBandwidth = that.mapBandwidth; |
| this.sslConfigurationFile = that.getSslConfigurationFile(); |
| this.copyStrategy = that.copyStrategy; |
| this.preserveStatus = that.preserveStatus; |
| this.atomicWorkPath = that.getAtomicWorkPath(); |
| this.logPath = that.getLogPath(); |
| this.sourceFileListing = that.getSourceFileListing(); |
| this.sourcePaths = that.getSourcePaths(); |
| this.targetPath = that.getTargetPath(); |
| } |
| } |
| |
| /** |
| * Should the data be committed atomically? |
| * |
| * @return true if data should be committed automically. false otherwise |
| */ |
| public boolean shouldAtomicCommit() { |
| return atomicCommit; |
| } |
| |
| /** |
| * Set if data need to be committed automatically |
| * |
| * @param atomicCommit - boolean switch |
| */ |
| public void setAtomicCommit(boolean atomicCommit) { |
| validate(DistCpOptionSwitch.ATOMIC_COMMIT, atomicCommit); |
| this.atomicCommit = atomicCommit; |
| } |
| |
| /** |
| * Should the data be sync'ed between source and target paths? |
| * |
| * @return true if data should be sync'ed up. false otherwise |
| */ |
| public boolean shouldSyncFolder() { |
| return syncFolder; |
| } |
| |
| /** |
| * Set if source and target folder contents be sync'ed up |
| * |
| * @param syncFolder - boolean switch |
| */ |
| public void setSyncFolder(boolean syncFolder) { |
| validate(DistCpOptionSwitch.SYNC_FOLDERS, syncFolder); |
| this.syncFolder = syncFolder; |
| } |
| |
| /** |
| * Should target files missing in source should be deleted? |
| * |
| * @return true if zoombie target files to be removed. false otherwise |
| */ |
| public boolean shouldDeleteMissing() { |
| return deleteMissing; |
| } |
| |
| /** |
| * Set if files only present in target should be deleted |
| * |
| * @param deleteMissing - boolean switch |
| */ |
| public void setDeleteMissing(boolean deleteMissing) { |
| validate(DistCpOptionSwitch.DELETE_MISSING, deleteMissing); |
| this.deleteMissing = deleteMissing; |
| } |
| |
| /** |
| * Should failures be logged and ignored during copy? |
| * |
| * @return true if failures are to be logged and ignored. false otherwise |
| */ |
| public boolean shouldIgnoreFailures() { |
| return ignoreFailures; |
| } |
| |
| /** |
| * Set if failures during copy be ignored |
| * |
| * @param ignoreFailures - boolean switch |
| */ |
| public void setIgnoreFailures(boolean ignoreFailures) { |
| this.ignoreFailures = ignoreFailures; |
| } |
| |
| /** |
| * Should DistCp be running in blocking mode |
| * |
| * @return true if should run in blocking, false otherwise |
| */ |
| public boolean shouldBlock() { |
| return blocking; |
| } |
| |
| /** |
| * Set if Disctp should run blocking or non-blocking |
| * |
| * @param blocking - boolean switch |
| */ |
| public void setBlocking(boolean blocking) { |
| this.blocking = blocking; |
| } |
| |
| /** |
| * Should files be overwritten always? |
| * |
| * @return true if files in target that may exist before distcp, should always |
| * be overwritten. false otherwise |
| */ |
| public boolean shouldOverwrite() { |
| return overwrite; |
| } |
| |
| /** |
| * Set if files should always be overwritten on target |
| * |
| * @param overwrite - boolean switch |
| */ |
| public void setOverwrite(boolean overwrite) { |
| validate(DistCpOptionSwitch.OVERWRITE, overwrite); |
| this.overwrite = overwrite; |
| } |
| |
| /** |
| * Should CRC/checksum check be skipped while checking files are identical |
| * |
| * @return true if checksum check should be skipped while checking files are |
| * identical. false otherwise |
| */ |
| public boolean shouldSkipCRC() { |
| return skipCRC; |
| } |
| |
| /** |
| * Set if checksum comparison should be skipped while determining if |
| * source and destination files are identical |
| * |
| * @param skipCRC - boolean switch |
| */ |
| public void setSkipCRC(boolean skipCRC) { |
| validate(DistCpOptionSwitch.SKIP_CRC, skipCRC); |
| this.skipCRC = skipCRC; |
| } |
| |
| /** Get the max number of maps to use for this copy |
| * |
| * @return Max number of maps |
| */ |
| public int getMaxMaps() { |
| return maxMaps; |
| } |
| |
| /** |
| * Set the max number of maps to use for copy |
| * |
| * @param maxMaps - Number of maps |
| */ |
| public void setMaxMaps(int maxMaps) { |
| this.maxMaps = Math.max(maxMaps, 1); |
| } |
| |
| /** Get the map bandwidth in MB |
| * |
| * @return Bandwidth in MB |
| */ |
| public int getMapBandwidth() { |
| return mapBandwidth; |
| } |
| |
| /** |
| * Set per map bandwidth |
| * |
| * @param mapBandwidth - per map bandwidth |
| */ |
| public void setMapBandwidth(int mapBandwidth) { |
| assert mapBandwidth > 0 : "Bandwidth " + mapBandwidth + " is invalid (should be > 0)"; |
| this.mapBandwidth = mapBandwidth; |
| } |
| |
| /** |
| * Get path where the ssl configuration file is present to use for hftps:// |
| * |
| * @return Path on local file system |
| */ |
| public String getSslConfigurationFile() { |
| return sslConfigurationFile; |
| } |
| |
| /** |
| * Set the SSL configuration file path to use with hftps:// (local path) |
| * |
| * @param sslConfigurationFile - Local ssl config file path |
| */ |
| public void setSslConfigurationFile(String sslConfigurationFile) { |
| this.sslConfigurationFile = sslConfigurationFile; |
| } |
| |
| /** |
| * Returns an iterator with the list of file attributes to preserve |
| * |
| * @return iterator of file attributes to preserve |
| */ |
| public Iterator<FileAttribute> preserveAttributes() { |
| return preserveStatus.iterator(); |
| } |
| |
| /** |
| * Checks if the input attibute should be preserved or not |
| * |
| * @param attribute - Attribute to check |
| * @return True if attribute should be preserved, false otherwise |
| */ |
| public boolean shouldPreserve(FileAttribute attribute) { |
| return preserveStatus.contains(attribute); |
| } |
| |
| /** |
| * Add file attributes that need to be preserved. This method may be |
| * called multiple times to add attributes. |
| * |
| * @param fileAttribute - Attribute to add, one at a time |
| */ |
| public void preserve(FileAttribute fileAttribute) { |
| for (FileAttribute attribute : preserveStatus) { |
| if (attribute.equals(fileAttribute)) { |
| return; |
| } |
| } |
| preserveStatus.add(fileAttribute); |
| } |
| |
| /** Get work path for atomic commit. If null, the work |
| * path would be parentOf(targetPath) + "/._WIP_" + nameOf(targetPath) |
| * |
| * @return Atomic work path on the target cluster. Null if not set |
| */ |
| public Path getAtomicWorkPath() { |
| return atomicWorkPath; |
| } |
| |
| /** |
| * Set the work path for atomic commit |
| * |
| * @param atomicWorkPath - Path on the target cluster |
| */ |
| public void setAtomicWorkPath(Path atomicWorkPath) { |
| this.atomicWorkPath = atomicWorkPath; |
| } |
| |
| /** Get output directory for writing distcp logs. Otherwise logs |
| * are temporarily written to JobStagingDir/_logs and deleted |
| * upon job completion |
| * |
| * @return Log output path on the cluster where distcp job is run |
| */ |
| public Path getLogPath() { |
| return logPath; |
| } |
| |
| /** |
| * Set the log path where distcp output logs are stored |
| * Uses JobStagingDir/_logs by default |
| * |
| * @param logPath - Path where logs will be saved |
| */ |
| public void setLogPath(Path logPath) { |
| this.logPath = logPath; |
| } |
| |
| /** |
| * Get the copy strategy to use. Uses appropriate input format |
| * |
| * @return copy strategy to use |
| */ |
| public String getCopyStrategy() { |
| return copyStrategy; |
| } |
| |
| /** |
| * Set the copy strategy to use. Should map to a strategy implementation |
| * in distp-default.xml |
| * |
| * @param copyStrategy - copy Strategy to use |
| */ |
| public void setCopyStrategy(String copyStrategy) { |
| this.copyStrategy = copyStrategy; |
| } |
| |
| /** |
| * File path (hdfs:// or file://) that contains the list of actual |
| * files to copy |
| * |
| * @return - Source listing file path |
| */ |
| public Path getSourceFileListing() { |
| return sourceFileListing; |
| } |
| |
| /** |
| * Getter for sourcePaths. |
| * @return List of source-paths. |
| */ |
| public List<Path> getSourcePaths() { |
| return sourcePaths; |
| } |
| |
| /** |
| * Setter for sourcePaths. |
| * @param sourcePaths The new list of source-paths. |
| */ |
| public void setSourcePaths(List<Path> sourcePaths) { |
| assert sourcePaths != null && sourcePaths.size() != 0; |
| this.sourcePaths = sourcePaths; |
| } |
| |
| /** |
| * Getter for the targetPath. |
| * @return The target-path. |
| */ |
| public Path getTargetPath() { |
| return targetPath; |
| } |
| |
| public void validate(DistCpOptionSwitch option, boolean value) { |
| |
| boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ? |
| value : this.syncFolder); |
| boolean overwrite = (option == DistCpOptionSwitch.OVERWRITE ? |
| value : this.overwrite); |
| boolean deleteMissing = (option == DistCpOptionSwitch.DELETE_MISSING ? |
| value : this.deleteMissing); |
| boolean atomicCommit = (option == DistCpOptionSwitch.ATOMIC_COMMIT ? |
| value : this.atomicCommit); |
| boolean skipCRC = (option == DistCpOptionSwitch.SKIP_CRC ? |
| value : this.skipCRC); |
| |
| if (syncFolder && atomicCommit) { |
| throw new IllegalArgumentException("Atomic commit can't be used with " + |
| "sync folder or overwrite options"); |
| } |
| |
| if (deleteMissing && !(overwrite || syncFolder)) { |
| throw new IllegalArgumentException("Delete missing is applicable " + |
| "only with update or overwrite options"); |
| } |
| |
| if (overwrite && syncFolder) { |
| throw new IllegalArgumentException("Overwrite and update options are " + |
| "mutually exclusive"); |
| } |
| |
| if (!syncFolder && skipCRC) { |
| throw new IllegalArgumentException("Skip CRC is valid only with update options"); |
| } |
| |
| } |
| |
| /** |
| * Add options to configuration. These will be used in the Mapper/committer |
| * |
| * @param conf - Configruation object to which the options need to be added |
| */ |
| public void appendToConf(Configuration conf) { |
| DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.ATOMIC_COMMIT, |
| String.valueOf(atomicCommit)); |
| DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.IGNORE_FAILURES, |
| String.valueOf(ignoreFailures)); |
| DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SYNC_FOLDERS, |
| String.valueOf(syncFolder)); |
| DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DELETE_MISSING, |
| String.valueOf(deleteMissing)); |
| DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.OVERWRITE, |
| String.valueOf(overwrite)); |
| DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.SKIP_CRC, |
| String.valueOf(skipCRC)); |
| DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.BANDWIDTH, |
| String.valueOf(mapBandwidth)); |
| DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS, |
| DistCpUtils.packAttributes(preserveStatus)); |
| } |
| |
| /** |
| * Utility to easily string-ify Options, for logging. |
| * |
| * @return String representation of the Options. |
| */ |
| @Override |
| public String toString() { |
| return "DistCpOptions{" + |
| "atomicCommit=" + atomicCommit + |
| ", syncFolder=" + syncFolder + |
| ", deleteMissing=" + deleteMissing + |
| ", ignoreFailures=" + ignoreFailures + |
| ", maxMaps=" + maxMaps + |
| ", sslConfigurationFile='" + sslConfigurationFile + '\'' + |
| ", copyStrategy='" + copyStrategy + '\'' + |
| ", sourceFileListing=" + sourceFileListing + |
| ", sourcePaths=" + sourcePaths + |
| ", targetPath=" + targetPath + |
| '}'; |
| } |
| |
| @Override |
| protected DistCpOptions clone() throws CloneNotSupportedException { |
| return (DistCpOptions) super.clone(); |
| } |
| } |