| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.tools.mapred; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.SequenceFile; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.ipc.RemoteException; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.JobStatus; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; |
| import org.apache.hadoop.tools.CopyListing; |
| import org.apache.hadoop.tools.CopyListingFileStatus; |
| import org.apache.hadoop.tools.DistCpConstants; |
| import org.apache.hadoop.tools.DistCpOptionSwitch; |
| import org.apache.hadoop.tools.DistCpOptions; |
| import org.apache.hadoop.tools.DistCpOptions.FileAttribute; |
| import org.apache.hadoop.tools.GlobbedCopyListing; |
| import org.apache.hadoop.tools.util.DistCpUtils; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.EnumSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| |
| /** |
| * The CopyCommitter class is DistCp's OutputCommitter implementation. It is |
| * responsible for handling the completion/cleanup of the DistCp run. |
| * Specifically, it does the following: |
| * 1. Cleanup of the meta-folder (where DistCp maintains its file-list, etc.) |
| * 2. Preservation of user/group/replication-factor on any directories that |
| * have been copied. (Files are taken care of in their map-tasks.) |
| * 3. Atomic-move of data from the temporary work-folder to the final path |
| * (if atomic-commit was opted for). |
| * 4. Deletion of files from the target that are missing at source (if opted for). |
| * 5. Cleanup of any partially copied files, from previous, failed attempts. |
| */ |
| public class CopyCommitter extends FileOutputCommitter { |
| private static final Log LOG = LogFactory.getLog(CopyCommitter.class); |
| |
| private final TaskAttemptContext taskAttemptContext; |
| private boolean syncFolder = false; |
| private boolean overwrite = false; |
| private boolean targetPathExists = true; |
| private boolean ignoreFailures = false; |
| private int blocksPerChunk = 0; |
| |
| /** |
| * Create a output committer |
| * |
| * @param outputPath the job's output path |
| * @param context the task's context |
| * @throws IOException - Exception if any |
| */ |
| public CopyCommitter(Path outputPath, TaskAttemptContext context) throws IOException { |
| super(outputPath, context); |
| blocksPerChunk = context.getConfiguration().getInt( |
| DistCpOptionSwitch.BLOCKS_PER_CHUNK.getConfigLabel(), 0); |
| LOG.debug("blocks per chunk " + blocksPerChunk); |
| this.taskAttemptContext = context; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void commitJob(JobContext jobContext) throws IOException { |
| Configuration conf = jobContext.getConfiguration(); |
| syncFolder = conf.getBoolean(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, false); |
| overwrite = conf.getBoolean(DistCpConstants.CONF_LABEL_OVERWRITE, false); |
| targetPathExists = conf.getBoolean( |
| DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, true); |
| ignoreFailures = conf.getBoolean( |
| DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false); |
| |
| if (blocksPerChunk > 0) { |
| concatFileChunks(conf); |
| } |
| |
| super.commitJob(jobContext); |
| |
| cleanupTempFiles(jobContext); |
| |
| String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); |
| final boolean preserveRawXattrs = |
| conf.getBoolean(DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false); |
| if ((attributes != null && !attributes.isEmpty()) || preserveRawXattrs) { |
| preserveFileAttributesForDirectories(conf); |
| } |
| |
| try { |
| if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) { |
| deleteMissing(conf); |
| } else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) { |
| commitData(conf); |
| } |
| taskAttemptContext.setStatus("Commit Successful"); |
| } |
| finally { |
| cleanup(conf); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void abortJob(JobContext jobContext, |
| JobStatus.State state) throws IOException { |
| try { |
| super.abortJob(jobContext, state); |
| } finally { |
| cleanupTempFiles(jobContext); |
| cleanup(jobContext.getConfiguration()); |
| } |
| } |
| |
| private void cleanupTempFiles(JobContext context) { |
| try { |
| Configuration conf = context.getConfiguration(); |
| |
| Path targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); |
| FileSystem targetFS = targetWorkPath.getFileSystem(conf); |
| |
| String jobId = context.getJobID().toString(); |
| deleteAttemptTempFiles(targetWorkPath, targetFS, jobId); |
| deleteAttemptTempFiles(targetWorkPath.getParent(), targetFS, jobId); |
| } catch (Throwable t) { |
| LOG.warn("Unable to cleanup temp files", t); |
| } |
| } |
| |
| private void deleteAttemptTempFiles(Path targetWorkPath, |
| FileSystem targetFS, |
| String jobId) throws IOException { |
| if (targetWorkPath == null) { |
| return; |
| } |
| |
| FileStatus[] tempFiles = targetFS.globStatus( |
| new Path(targetWorkPath, ".distcp.tmp." + jobId.replaceAll("job","attempt") + "*")); |
| |
| if (tempFiles != null && tempFiles.length > 0) { |
| for (FileStatus file : tempFiles) { |
| LOG.info("Cleaning up " + file.getPath()); |
| targetFS.delete(file.getPath(), false); |
| } |
| } |
| } |
| |
| /** |
| * Cleanup meta folder and other temporary files |
| * |
| * @param conf - Job Configuration |
| */ |
| private void cleanup(Configuration conf) { |
| Path metaFolder = new Path(conf.get(DistCpConstants.CONF_LABEL_META_FOLDER)); |
| try { |
| FileSystem fs = metaFolder.getFileSystem(conf); |
| LOG.info("Cleaning up temporary work folder: " + metaFolder); |
| fs.delete(metaFolder, true); |
| } catch (IOException ignore) { |
| LOG.error("Exception encountered ", ignore); |
| } |
| } |
| |
| private boolean isFileNotFoundException(IOException e) { |
| if (e instanceof FileNotFoundException) { |
| return true; |
| } |
| |
| if (e instanceof RemoteException) { |
| return ((RemoteException)e).unwrapRemoteException() |
| instanceof FileNotFoundException; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Concat chunk files for the same file into one. |
| * Iterate through copy listing, identify chunk files for the same file, |
| * concat them into one. |
| */ |
| private void concatFileChunks(Configuration conf) throws IOException { |
| |
| LOG.info("concat file chunks ..."); |
| |
| String spath = conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH); |
| if (spath == null || spath.isEmpty()) { |
| return; |
| } |
| Path sourceListing = new Path(spath); |
| SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, |
| SequenceFile.Reader.file(sourceListing)); |
| Path targetRoot = |
| new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); |
| |
| try { |
| CopyListingFileStatus srcFileStatus = new CopyListingFileStatus(); |
| Text srcRelPath = new Text(); |
| CopyListingFileStatus lastFileStatus = null; |
| LinkedList<Path> allChunkPaths = new LinkedList<Path>(); |
| |
| // Iterate over every source path that was copied. |
| while (sourceReader.next(srcRelPath, srcFileStatus)) { |
| if (srcFileStatus.isDirectory()) { |
| continue; |
| } |
| Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath); |
| Path targetFileChunkPath = |
| DistCpUtils.getSplitChunkPath(targetFile, srcFileStatus); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(" add " + targetFileChunkPath + " to concat."); |
| } |
| allChunkPaths.add(targetFileChunkPath); |
| if (srcFileStatus.getChunkOffset() + srcFileStatus.getChunkLength() |
| == srcFileStatus.getLen()) { |
| // This is the last chunk of the splits, consolidate allChunkPaths |
| try { |
| concatFileChunks(conf, targetFile, allChunkPaths); |
| } catch (IOException e) { |
| // If the concat failed because a chunk file doesn't exist, |
| // then we assume that the CopyMapper has skipped copying this |
| // file, and we ignore the exception here. |
| // If a chunk file should have been created but it was not, then |
| // the CopyMapper would have failed. |
| if (!isFileNotFoundException(e)) { |
| String emsg = "Failed to concat chunk files for " + targetFile; |
| if (!ignoreFailures) { |
| throw new IOException(emsg, e); |
| } else { |
| LOG.warn(emsg, e); |
| } |
| } |
| } |
| allChunkPaths.clear(); |
| lastFileStatus = null; |
| } else { |
| if (lastFileStatus == null) { |
| lastFileStatus = new CopyListingFileStatus(srcFileStatus); |
| } else { |
| // Two neighboring chunks have to be consecutive ones for the same |
| // file, for them to be merged |
| if (!srcFileStatus.getPath().equals(lastFileStatus.getPath()) || |
| srcFileStatus.getChunkOffset() != |
| (lastFileStatus.getChunkOffset() + |
| lastFileStatus.getChunkLength())) { |
| String emsg = "Inconsistent sequence file: current " + |
| "chunk file " + srcFileStatus + " doesnt match prior " + |
| "entry " + lastFileStatus; |
| if (!ignoreFailures) { |
| throw new IOException(emsg); |
| } else { |
| LOG.warn(emsg + ", skipping concat this set."); |
| } |
| } else { |
| lastFileStatus.setChunkOffset(srcFileStatus.getChunkOffset()); |
| lastFileStatus.setChunkLength(srcFileStatus.getChunkLength()); |
| } |
| } |
| } |
| } |
| } finally { |
| IOUtils.closeStream(sourceReader); |
| } |
| } |
| |
| // This method changes the target-directories' file-attributes (owner, |
| // user/group permissions, etc.) based on the corresponding source directories. |
| private void preserveFileAttributesForDirectories(Configuration conf) |
| throws IOException { |
| String attrSymbols = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); |
| final boolean syncOrOverwrite = syncFolder || overwrite; |
| |
| LOG.info("About to preserve attributes: " + attrSymbols); |
| |
| EnumSet<FileAttribute> attributes = DistCpUtils.unpackAttributes(attrSymbols); |
| final boolean preserveRawXattrs = |
| conf.getBoolean(DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false); |
| |
| Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH)); |
| FileSystem clusterFS = sourceListing.getFileSystem(conf); |
| SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, |
| SequenceFile.Reader.file(sourceListing)); |
| long totalLen = clusterFS.getFileStatus(sourceListing).getLen(); |
| |
| Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); |
| |
| long preservedEntries = 0; |
| try { |
| CopyListingFileStatus srcFileStatus = new CopyListingFileStatus(); |
| Text srcRelPath = new Text(); |
| |
| // Iterate over every source path that was copied. |
| while (sourceReader.next(srcRelPath, srcFileStatus)) { |
| // File-attributes for files are set at the time of copy, |
| // in the map-task. |
| if (! srcFileStatus.isDirectory()) continue; |
| |
| Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath); |
| // |
| // Skip the root folder when syncOrOverwrite is true. |
| // |
| if (targetRoot.equals(targetFile) && syncOrOverwrite) continue; |
| |
| FileSystem targetFS = targetFile.getFileSystem(conf); |
| DistCpUtils.preserve(targetFS, targetFile, srcFileStatus, attributes, |
| preserveRawXattrs); |
| |
| taskAttemptContext.progress(); |
| taskAttemptContext.setStatus("Preserving status on directory entries. [" + |
| sourceReader.getPosition() * 100 / totalLen + "%]"); |
| } |
| } finally { |
| IOUtils.closeStream(sourceReader); |
| } |
| LOG.info("Preserved status on " + preservedEntries + " dir entries on target"); |
| } |
| |
| // This method deletes "extra" files from the target, if they're not |
| // available at the source. |
| private void deleteMissing(Configuration conf) throws IOException { |
| LOG.info("-delete option is enabled. About to remove entries from " + |
| "target that are missing in source"); |
| |
| // Sort the source-file listing alphabetically. |
| Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH)); |
| FileSystem clusterFS = sourceListing.getFileSystem(conf); |
| Path sortedSourceListing = DistCpUtils.sortListing(clusterFS, conf, sourceListing); |
| |
| // Similarly, create the listing of target-files. Sort alphabetically. |
| Path targetListing = new Path(sourceListing.getParent(), "targetListing.seq"); |
| CopyListing target = new GlobbedCopyListing(new Configuration(conf), null); |
| |
| List<Path> targets = new ArrayList<Path>(1); |
| Path targetFinalPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH)); |
| targets.add(targetFinalPath); |
| Path resultNonePath = Path.getPathWithoutSchemeAndAuthority(targetFinalPath) |
| .toString().startsWith(DistCpConstants.HDFS_RESERVED_RAW_DIRECTORY_NAME) |
| ? DistCpConstants.RAW_NONE_PATH : DistCpConstants.NONE_PATH; |
| DistCpOptions options = new DistCpOptions(targets, resultNonePath); |
| // |
| // Set up options to be the same from the CopyListing.buildListing's perspective, |
| // so to collect similar listings as when doing the copy |
| // |
| options.setOverwrite(overwrite); |
| options.setSyncFolder(syncFolder); |
| options.setTargetPathExists(targetPathExists); |
| |
| target.buildListing(targetListing, options); |
| Path sortedTargetListing = DistCpUtils.sortListing(clusterFS, conf, targetListing); |
| long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen(); |
| |
| SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, |
| SequenceFile.Reader.file(sortedSourceListing)); |
| SequenceFile.Reader targetReader = new SequenceFile.Reader(conf, |
| SequenceFile.Reader.file(sortedTargetListing)); |
| |
| // Walk both source and target file listings. |
| // Delete all from target that doesn't also exist on source. |
| long deletedEntries = 0; |
| try { |
| CopyListingFileStatus srcFileStatus = new CopyListingFileStatus(); |
| Text srcRelPath = new Text(); |
| CopyListingFileStatus trgtFileStatus = new CopyListingFileStatus(); |
| Text trgtRelPath = new Text(); |
| |
| FileSystem targetFS = targetFinalPath.getFileSystem(conf); |
| boolean srcAvailable = sourceReader.next(srcRelPath, srcFileStatus); |
| while (targetReader.next(trgtRelPath, trgtFileStatus)) { |
| // Skip sources that don't exist on target. |
| while (srcAvailable && trgtRelPath.compareTo(srcRelPath) > 0) { |
| srcAvailable = sourceReader.next(srcRelPath, srcFileStatus); |
| } |
| |
| if (srcAvailable && trgtRelPath.equals(srcRelPath)) continue; |
| |
| // Target doesn't exist at source. Delete. |
| boolean result = targetFS.delete(trgtFileStatus.getPath(), true) |
| || !targetFS.exists(trgtFileStatus.getPath()); |
| if (result) { |
| LOG.info("Deleted " + trgtFileStatus.getPath() + " - Missing at source"); |
| deletedEntries++; |
| } else { |
| throw new IOException("Unable to delete " + trgtFileStatus.getPath()); |
| } |
| taskAttemptContext.progress(); |
| taskAttemptContext.setStatus("Deleting missing files from target. [" + |
| targetReader.getPosition() * 100 / totalLen + "%]"); |
| } |
| } finally { |
| IOUtils.closeStream(sourceReader); |
| IOUtils.closeStream(targetReader); |
| } |
| LOG.info("Deleted " + deletedEntries + " from target: " + targets.get(0)); |
| } |
| |
| private void commitData(Configuration conf) throws IOException { |
| |
| Path workDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); |
| Path finalDir = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH)); |
| FileSystem targetFS = workDir.getFileSystem(conf); |
| |
| LOG.info("Atomic commit enabled. Moving " + workDir + " to " + finalDir); |
| if (targetFS.exists(finalDir) && targetFS.exists(workDir)) { |
| LOG.error("Pre-existing final-path found at: " + finalDir); |
| throw new IOException("Target-path can't be committed to because it " + |
| "exists at " + finalDir + ". Copied data is in temp-dir: " + workDir + ". "); |
| } |
| |
| boolean result = targetFS.rename(workDir, finalDir); |
| if (!result) { |
| LOG.warn("Rename failed. Perhaps data already moved. Verifying..."); |
| result = targetFS.exists(finalDir) && !targetFS.exists(workDir); |
| } |
| if (result) { |
| LOG.info("Data committed successfully to " + finalDir); |
| taskAttemptContext.setStatus("Data committed successfully to " + finalDir); |
| } else { |
| LOG.error("Unable to commit data to " + finalDir); |
| throw new IOException("Atomic commit failed. Temporary data in " + workDir + |
| ", Unable to move to " + finalDir); |
| } |
| } |
| |
| /** |
| * Concat the passed chunk files into one and rename it the targetFile. |
| */ |
| private void concatFileChunks(Configuration conf, Path targetFile, |
| LinkedList<Path> allChunkPaths) throws IOException { |
| if (allChunkPaths.size() == 1) { |
| return; |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("concat " + targetFile + " allChunkSize+ " |
| + allChunkPaths.size()); |
| } |
| FileSystem dstfs = targetFile.getFileSystem(conf); |
| |
| Path firstChunkFile = allChunkPaths.removeFirst(); |
| Path[] restChunkFiles = new Path[allChunkPaths.size()]; |
| allChunkPaths.toArray(restChunkFiles); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("concat: firstchunk: " + dstfs.getFileStatus(firstChunkFile)); |
| int i = 0; |
| for (Path f : restChunkFiles) { |
| LOG.debug("concat: other chunk: " + i + ": " + dstfs.getFileStatus(f)); |
| ++i; |
| } |
| } |
| dstfs.concat(firstChunkFile, restChunkFiles); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("concat: result: " + dstfs.getFileStatus(firstChunkFile)); |
| } |
| rename(dstfs, firstChunkFile, targetFile); |
| } |
| |
| /** |
| * Rename tmp to dst on destFileSys. |
| * @param destFileSys the file ssystem |
| * @param tmp the source path |
| * @param dst the destination path |
| * @throws IOException if renaming failed |
| */ |
| private static void rename(FileSystem destFileSys, Path tmp, Path dst) |
| throws IOException { |
| try { |
| if (destFileSys.exists(dst)) { |
| destFileSys.delete(dst, true); |
| } |
| destFileSys.rename(tmp, dst); |
| } catch (IOException ioe) { |
| throw new IOException("Fail to rename tmp file (=" + tmp |
| + ") to destination file (=" + dst + ")", ioe); |
| } |
| } |
| |
| } |