blob: ea7e98ac7ea08cb5875198eacd021bf666d73251 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.io.impl;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mr.plan.PlanningParameters;
import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.crunch.io.CrunchOutputs;
import org.apache.crunch.io.FileNamingScheme;
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.OutputHandler;
import org.apache.crunch.io.PathTarget;
import org.apache.crunch.io.SourceTargetHelper;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FileTargetImpl implements PathTarget {
private static final Logger LOG = LoggerFactory.getLogger(FileTargetImpl.class);
protected Path path;
private final FormatBundle<? extends FileOutputFormat> formatBundle;
private final FileNamingScheme fileNamingScheme;
private FileSystem fileSystem;
public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
FileNamingScheme fileNamingScheme) {
this(path, outputFormatClass, fileNamingScheme, ImmutableMap.<String, String>of());
}
public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
FileNamingScheme fileNamingScheme, Map<String, String> extraConf) {
this.path = path;
this.formatBundle = FormatBundle.forOutput(outputFormatClass);
this.fileNamingScheme = fileNamingScheme;
if (extraConf != null && !extraConf.isEmpty()) {
for (Map.Entry<String, String> e : extraConf.entrySet()) {
formatBundle.set(e.getKey(), e.getValue());
}
}
}
@Override
public Target outputConf(String key, String value) {
formatBundle.set(key, value);
return this;
}
@Override
public Target fileSystem(FileSystem fileSystem) {
if (this.fileSystem != null) {
throw new IllegalStateException("Filesystem already set. Change is not supported.");
}
if (fileSystem != null) {
path = fileSystem.makeQualified(path);
this.fileSystem = fileSystem;
Configuration fsConf = fileSystem.getConf();
for (Entry<String, String> entry : fsConf) {
formatBundle.set(entry.getKey(), entry.getValue());
}
}
return this;
}
@Override
public FileSystem getFileSystem() {
return fileSystem;
}
@Override
public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
Converter converter = getConverter(ptype);
Class keyClass = converter.getKeyClass();
Class valueClass = converter.getValueClass();
configureForMapReduce(job, keyClass, valueClass, formatBundle, outputPath, name);
}
@Deprecated
protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
Class outputFormatClass, Path outputPath, String name) {
configureForMapReduce(job, keyClass, valueClass, FormatBundle.forOutput(outputFormatClass), outputPath, name);
}
protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
FormatBundle formatBundle, Path outputPath, String name) {
try {
FileOutputFormat.setOutputPath(job, outputPath);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (name == null) {
job.setOutputFormatClass(formatBundle.getFormatClass());
formatBundle.configure(job.getConfiguration());
job.setOutputKeyClass(keyClass);
job.setOutputValueClass(valueClass);
} else {
CrunchOutputs.addNamedOutput(job, name, formatBundle, keyClass, valueClass);
}
}
@Override
public boolean accept(OutputHandler handler, PType<?> ptype) {
handler.configure(this, ptype);
return true;
}
@Override
public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
return ptype.getConverter();
}
private class WorkingPathFileMover implements Callable<Boolean> {
private Configuration conf;
private Path src;
private Path dst;
private FileSystem srcFs;
private FileSystem dstFs;
private boolean sameFs;
public WorkingPathFileMover(Configuration conf, Path src, Path dst,
FileSystem srcFs, FileSystem dstFs, boolean sameFs) {
this.conf = conf;
this.src = src;
this.dst = dst;
this.srcFs = srcFs;
this.dstFs = dstFs;
this.sameFs = sameFs;
}
@Override
public Boolean call() throws IOException {
if (sameFs) {
return srcFs.rename(src, dst);
} else {
return FileUtil.copy(srcFs, src, dstFs, dst, true, true, conf);
}
}
}
@Override
public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
FileSystem srcFs = workingPath.getFileSystem(conf);
Configuration dstFsConf = getEffectiveBundleConfig(conf);
FileSystem dstFs = path.getFileSystem(dstFsConf);
if (!dstFs.exists(path)) {
dstFs.mkdirs(path);
}
Path srcPattern = getSourcePattern(workingPath, index);
boolean sameFs = isCompatible(srcFs, path);
boolean useDistributedCopy = conf.getBoolean(RuntimeParameters.FILE_TARGET_USE_DISTCP, true);
int maxDistributedCopyTasks = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_DISTCP_TASKS, 1000);
int maxThreads = conf.getInt(RuntimeParameters.FILE_TARGET_MAX_THREADS, 1);
if (!sameFs) {
if (useDistributedCopy) {
LOG.info("Source and destination are in different file systems, performing distributed copy from {} to {}", srcPattern,
path);
handleOutputsDistributedCopy(dstFsConf, srcPattern, srcFs, dstFs, maxDistributedCopyTasks);
} else {
LOG.info("Source and destination are in different file systems, performing asynch copies from {} to {}", srcPattern, path);
handleOutputsAsynchronously(conf, srcPattern, srcFs, dstFs, sameFs, maxThreads);
}
} else {
LOG.info("Source and destination are in the same file system, performing asynch renames from {} to {}", srcPattern, path);
handleOutputsAsynchronously(conf, srcPattern, srcFs, dstFs, sameFs, maxThreads);
}
}
private void handleOutputsAsynchronously(Configuration conf, Path srcPattern, FileSystem srcFs, FileSystem dstFs,
boolean sameFs, int maxThreads) throws IOException {
Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPattern), srcPattern);
List<ListenableFuture<Boolean>> renameFutures = Lists.newArrayList();
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
maxThreads));
Configuration dstFsConf = getEffectiveBundleConfig(conf);
for (Path s : srcs) {
Path d = getDestFile(dstFsConf, s, path, s.getName().contains("-m-"));
renameFutures.add(
executorService.submit(
new WorkingPathFileMover(conf, s, d, srcFs, dstFs, sameFs)));
}
if (sameFs) {
LOG.info("Renaming {} files using at most {} threads.", renameFutures.size(), maxThreads);
} else {
LOG.info("Copying {} files using at most {} threads.", renameFutures.size(), maxThreads);
}
ListenableFuture<List<Boolean>> future =
Futures.successfulAsList(renameFutures);
List<Boolean> renameResults = null;
try {
renameResults = future.get();
} catch (InterruptedException | ExecutionException e) {
Throwables.propagate(e);
} finally {
executorService.shutdownNow();
}
if (renameResults != null && !renameResults.contains(false)) {
if (sameFs) {
LOG.info("Renamed {} files.", renameFutures.size());
} else {
LOG.info("Copied {} files.", renameFutures.size());
}
dstFs.create(getSuccessIndicator(), true).close();
LOG.info("Created success indicator file");
}
}
private void handeOutputsDistributedCopy(Configuration conf, Path srcPattern, FileSystem srcFs, FileSystem dstFs,
int maxDistributedCopyTasks) throws IOException {
Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(srcPattern), srcPattern);
if (srcs.length > 0) {
LOG.info("Distributed copying {} files using at most {} tasks", srcs.length, maxDistributedCopyTasks);
// Once https://issues.apache.org/jira/browse/HADOOP-15281 is available, we can use the direct write
// distcp optimization if the target path is in S3
DistCpOptions options = new DistCpOptions(Arrays.asList(srcs), path);
options.setMaxMaps(maxDistributedCopyTasks);
options.setOverwrite(true);
options.setBlocking(true);
Configuration distCpConf = new Configuration(conf);
// Remove unnecessary and problematic properties from the DistCp configuration. This is necessary since
// files referenced by these properties may have already been deleted when the DistCp is being started.
distCpConf.unset("mapreduce.job.cache.files");
distCpConf.unset("mapreduce.job.classpath.files");
distCpConf.unset("tmpjars");
try {
DistCp distCp = new DistCp(distCpConf, options);
if (!distCp.execute().isSuccessful()) {
throw new CrunchRuntimeException("Distributed copy failed from " + srcPattern + " to " + path);
}
LOG.info("Distributed copy completed for {} files", srcs.length);
} catch (Exception e) {
throw new CrunchRuntimeException("Distributed copy failed from " + srcPattern + " to " + path, e);
}
} else {
LOG.info("No files found to distributed copy at {}", srcPattern);
}
dstFs.create(getSuccessIndicator(), true).close();
LOG.info("Created success indicator file");
}
protected Path getSuccessIndicator() {
return new Path(path, "_SUCCESS");
}
protected Path getSourcePattern(Path workingPath, int index) {
if (index < 0) {
return new Path(workingPath, "part-*");
} else {
return new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index + "-*");
}
}
@Override
public Path getPath() {
return path;
}
protected static boolean isCompatible(FileSystem fs, Path path) {
try {
fs.makeQualified(path);
return true;
} catch (IllegalArgumentException e) {
return false;
}
}
protected Path getDestFile(Configuration conf, Path src, Path dir, boolean mapOnlyJob)
throws IOException {
String outputFilename = null;
String sourceFilename = src.getName();
if (mapOnlyJob) {
outputFilename = getFileNamingScheme().getMapOutputName(conf, dir);
} else {
outputFilename = getFileNamingScheme().getReduceOutputName(conf, dir, extractPartitionNumber(sourceFilename));
}
if (sourceFilename.contains(".")) {
outputFilename += sourceFilename.substring(sourceFilename.indexOf("."));
}
return new Path(dir, outputFilename);
}
/**
* Extract the partition number from a raw reducer output filename.
*
* @param reduceOutputFileName The raw reducer output file name
* @return The partition number encoded in the filename
*/
public static int extractPartitionNumber(String reduceOutputFileName) {
Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName);
if (matcher.find()) {
return Integer.parseInt(matcher.group(1), 10);
} else {
throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed");
}
}
@Override
public FileNamingScheme getFileNamingScheme() {
return fileNamingScheme;
}
@Override
public boolean equals(Object other) {
if (other == null || !getClass().equals(other.getClass())) {
return false;
}
FileTargetImpl o = (FileTargetImpl) other;
return path.equals(o.path);
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(path).toHashCode();
}
@Override
public String toString() {
return new StringBuilder()
.append(formatBundle.getFormatClass().getSimpleName())
.append("(")
.append(path)
.append(")")
.toString();
}
@Override
public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
// By default, assume that we cannot do this.
return null;
}
private Configuration getEffectiveBundleConfig(Configuration configuration) {
// overlay the bundle config on top of a copy of the supplied config
return formatBundle.configure(new Configuration(configuration));
}
@Override
public boolean handleExisting(WriteMode strategy, long lastModForSource, Configuration conf) {
FileSystem fs = null;
try {
fs = path.getFileSystem(getEffectiveBundleConfig(conf));
} catch (IOException e) {
LOG.error("Could not retrieve FileSystem object to check for existing path", e);
throw new CrunchRuntimeException(e);
}
boolean exists = false;
boolean successful = false;
long lastModForTarget = -1;
try {
exists = fs.exists(path);
if (exists) {
successful = fs.exists(getSuccessIndicator());
// Last modified time is only relevant when the path exists and the
// write mode is checkpoint
if (successful && strategy == WriteMode.CHECKPOINT) {
lastModForTarget = SourceTargetHelper.getLastModifiedAt(fs, path);
}
}
} catch (IOException e) {
LOG.error("Exception checking existence of path: {}", path, e);
throw new CrunchRuntimeException(e);
}
if (exists) {
switch (strategy) {
case DEFAULT:
LOG.error("Path {} already exists!", path);
throw new CrunchRuntimeException("Path already exists: " + path);
case OVERWRITE:
LOG.info("Removing data at existing path: {}", path);
try {
fs.delete(path, true);
} catch (IOException e) {
LOG.error("Exception thrown removing data at path: {}", path, e);
}
break;
case APPEND:
LOG.info("Adding output files to existing path: {}", path);
break;
case CHECKPOINT:
if (successful && lastModForTarget > lastModForSource) {
LOG.info("Re-starting pipeline from checkpoint path: {}", path);
break;
} else {
if (!successful) {
LOG.info("_SUCCESS file not found, Removing data at existing checkpoint path: {}", path);
} else {
LOG.info("Source data has recent updates. Removing data at existing checkpoint path: {}", path);
}
try {
fs.delete(path, true);
} catch (IOException e) {
LOG.error("Exception thrown removing data at checkpoint path: {}", path, e);
}
return false;
}
default:
throw new CrunchRuntimeException("Unknown WriteMode: " + strategy);
}
} else {
LOG.info("Will write output files to new path: {}", path);
}
return exists;
}
}