blob: ab4d7f8a359a3983eca785449bdfe147aaa3562e [file] [log] [blame]
/**
* Copyright 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.streaming;
import java.io.*;
import java.lang.reflect.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;
/** An input format that performs globbing on DFS paths and
* selects a RecordReader based on a JobConf property.
* @author Michel Tourn
*/
public class StreamInputFormat extends InputFormatBase
{
// an InputFormat should be public with the synthetic public default constructor
// JobTracker's JobInProgress will instantiate with clazz.newInstance() (and a custom ClassLoader)
protected static final Log LOG = LogFactory.getLog(StreamInputFormat.class.getName());
static {
//LOG.setLevel(Level.FINE);
}
protected Path[] listPaths(FileSystem fs, JobConf job)
throws IOException
{
Path[] globs = job.getInputPaths();
ArrayList list = new ArrayList();
int dsup = globs.length;
for(int d=0; d<dsup; d++) {
String leafName = globs[d].getName();
LOG.info("StreamInputFormat: globs[" + d + "] leafName = " + leafName);
Path[] paths; Path dir;
PathFilter filter = new GlobFilter(fs, leafName);
dir = new Path(globs[d].getParent().toString());
if(dir == null) dir = new Path(".");
paths = fs.listPaths(dir, filter);
list.addAll(Arrays.asList(paths));
}
return (Path[])list.toArray(new Path[]{});
}
class GlobFilter implements PathFilter
{
public GlobFilter(FileSystem fs, String glob)
{
fs_ = fs;
pat_ = Pattern.compile(globToRegexp(glob));
}
String globToRegexp(String glob)
{
String re = glob;
re = re.replaceAll("\\.", "\\\\.");
re = re.replaceAll("\\+", "\\\\+");
re = re.replaceAll("\\*", ".*");
re = re.replaceAll("\\?", ".");
LOG.info("globToRegexp: |" + glob + "| -> |" + re + "|");
return re;
}
public boolean accept(Path pathname)
{
boolean acc = !fs_.isChecksumFile(pathname);
if(acc) {
acc = pat_.matcher(pathname.getName()).matches();
}
LOG.info("matches " + pat_ + ", " + pathname + " = " + acc);
return acc;
}
Pattern pat_;
FileSystem fs_;
}
public RecordReader getRecordReader(FileSystem fs, final FileSplit split,
JobConf job, Reporter reporter)
throws IOException {
LOG.info("getRecordReader start.....");
reporter.setStatus(split.toString());
final long start = split.getStart();
final long end = start + split.getLength();
String splitName = split.getFile() + ":" + start + "-" + end;
final FSDataInputStream in = fs.open(split.getFile());
// will open the file and seek to the start of the split
// Factory dispatch based on available params..
Class readerClass;
String c = job.get("stream.recordreader.class");
if(c == null) {
readerClass = StreamLineRecordReader.class;
} else {
readerClass = StreamUtil.goodClassOrNull(c, null);
if(readerClass == null) {
throw new RuntimeException("Class not found: " + c);
}
}
Constructor ctor;
try {
// reader = new StreamLineRecordReader(in, start, end, splitName, reporter, job);
ctor = readerClass.getConstructor(new Class[]{
FSDataInputStream.class, long.class, long.class, String.class, Reporter.class, JobConf.class});
} catch(NoSuchMethodException nsm) {
throw new RuntimeException(nsm);
}
StreamBaseRecordReader reader;
try {
reader = (StreamBaseRecordReader) ctor.newInstance(new Object[]{
in, new Long(start), new Long(end), splitName, reporter, job});
} catch(Exception nsm) {
throw new RuntimeException(nsm);
}
reader.init();
return reader;
}
}