blob: cbd87e395574268da0b26990dfa67831934fe01b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.io.impl;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.impl.mr.plan.PlanningParameters;
import org.apache.crunch.io.CrunchOutputs;
import org.apache.crunch.io.FileNamingScheme;
import org.apache.crunch.io.OutputHandler;
import org.apache.crunch.io.PathTarget;
import org.apache.crunch.io.SourceTargetHelper;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FileTargetImpl implements PathTarget {
private static final Log LOG = LogFactory.getLog(FileTargetImpl.class);
protected final Path path;
private final Class<? extends FileOutputFormat> outputFormatClass;
private final FileNamingScheme fileNamingScheme;
public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
FileNamingScheme fileNamingScheme) {
this.path = path;
this.outputFormatClass = outputFormatClass;
this.fileNamingScheme = fileNamingScheme;
}
@Override
public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
Converter converter = ptype.getConverter();
Class keyClass = converter.getKeyClass();
Class valueClass = converter.getValueClass();
configureForMapReduce(job, keyClass, valueClass, outputFormatClass, outputPath, name);
}
protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
Class outputFormatClass, Path outputPath, String name) {
try {
FileOutputFormat.setOutputPath(job, outputPath);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (name == null) {
job.setOutputFormatClass(outputFormatClass);
job.setOutputKeyClass(keyClass);
job.setOutputValueClass(valueClass);
} else {
CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
}
}
@Override
public boolean accept(OutputHandler handler, PType<?> ptype) {
handler.configure(this, ptype);
return true;
}
@Override
public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) {
return ptype.getConverter();
}
@Override
public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException {
FileSystem srcFs = workingPath.getFileSystem(conf);
Path src = getSourcePattern(workingPath, index);
Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src);
FileSystem dstFs = path.getFileSystem(conf);
if (!dstFs.exists(path)) {
dstFs.mkdirs(path);
}
boolean sameFs = isCompatible(srcFs, path);
for (Path s : srcs) {
Path d = getDestFile(conf, s, path, s.getName().contains("-m-"));
if (sameFs) {
srcFs.rename(s, d);
} else {
FileUtil.copy(srcFs, s, dstFs, d, true, true, conf);
}
}
dstFs.create(getSuccessIndicator(), true).close();
}
private Path getSuccessIndicator() {
return new Path(path, "_SUCCESS");
}
protected Path getSourcePattern(Path workingPath, int index) {
return new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + index + "-*");
}
@Override
public Path getPath() {
return path;
}
protected static boolean isCompatible(FileSystem fs, Path path) {
try {
fs.makeQualified(path);
return true;
} catch (IllegalArgumentException e) {
return false;
}
}
protected Path getDestFile(Configuration conf, Path src, Path dir, boolean mapOnlyJob)
throws IOException {
String outputFilename = null;
String sourceFilename = src.getName();
if (mapOnlyJob) {
outputFilename = getFileNamingScheme().getMapOutputName(conf, dir);
} else {
outputFilename = getFileNamingScheme().getReduceOutputName(conf, dir, extractPartitionNumber(sourceFilename));
}
if (sourceFilename.contains(".")) {
outputFilename += sourceFilename.substring(sourceFilename.indexOf("."));
}
return new Path(dir, outputFilename);
}
/**
* Extract the partition number from a raw reducer output filename.
*
* @param reduceOutputFileName The raw reducer output file name
* @return The partition number encoded in the filename
*/
public static int extractPartitionNumber(String reduceOutputFileName) {
Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName);
if (matcher.find()) {
return Integer.parseInt(matcher.group(1), 10);
} else {
throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed");
}
}
@Override
public FileNamingScheme getFileNamingScheme() {
return fileNamingScheme;
}
@Override
public boolean equals(Object other) {
if (other == null || !getClass().equals(other.getClass())) {
return false;
}
FileTargetImpl o = (FileTargetImpl) other;
return path.equals(o.path);
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(path).toHashCode();
}
@Override
public String toString() {
return new StringBuilder().append(outputFormatClass.getSimpleName()).append("(").append(path).append(")")
.toString();
}
@Override
public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
// By default, assume that we cannot do this.
return null;
}
@Override
public boolean handleExisting(WriteMode strategy, long lastModForSource, Configuration conf) {
FileSystem fs = null;
try {
fs = path.getFileSystem(conf);
} catch (IOException e) {
LOG.error("Could not retrieve FileSystem object to check for existing path", e);
throw new CrunchRuntimeException(e);
}
boolean exists = false;
boolean successful = false;
long lastModForTarget = -1;
try {
exists = fs.exists(path);
if (exists) {
successful = fs.exists(getSuccessIndicator());
lastModForTarget = SourceTargetHelper.getLastModifiedAt(fs, path);
}
} catch (IOException e) {
LOG.error("Exception checking existence of path: " + path, e);
throw new CrunchRuntimeException(e);
}
if (exists) {
switch (strategy) {
case DEFAULT:
LOG.error("Path " + path + " already exists!");
throw new CrunchRuntimeException("Path already exists: " + path);
case OVERWRITE:
LOG.info("Removing data at existing path: " + path);
try {
fs.delete(path, true);
} catch (IOException e) {
LOG.error("Exception thrown removing data at path: " + path, e);
}
break;
case APPEND:
LOG.info("Adding output files to existing path: " + path);
break;
case CHECKPOINT:
if (successful && lastModForTarget > lastModForSource) {
LOG.info("Re-starting pipeline from checkpoint path: " + path);
break;
} else {
if (!successful) {
LOG.info("_SUCCESS file not found, Removing data at existing checkpoint path: " + path);
} else {
LOG.info("Source data has recent updates. Removing data at existing checkpoint path: " + path);
}
try {
fs.delete(path, true);
} catch (IOException e) {
LOG.error("Exception thrown removing data at checkpoint path: " + path, e);
}
return false;
}
default:
throw new CrunchRuntimeException("Unknown WriteMode: " + strategy);
}
} else {
LOG.info("Will write output files to new path: " + path);
}
return exists;
}
}