blob: 443955e3577bf38324dc879921492908c5d779f2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.inputtools;
import java.io.IOException;
import java.util.regex.PatternSyntaxException;
import org.apache.hadoop.mapred.Reporter;
import junit.framework.TestCase;
import org.apache.hadoop.chukwa.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
public class TestInputFormat extends TestCase {
String[] lines = { "the rain", "in spain", "falls mainly", "in the plain" };
public void testInputFormat() {
verifyInputFormatForSequenceFile();
verifyInputFormatIllegalRegex();
}
private void verifyInputFormatForSequenceFile() {
try {
JobConf conf = new JobConf();
String TMP_DIR = System.getProperty("test.build.data", "/tmp");
Path filename = new Path("file:///" + TMP_DIR + "/tmpSeqFile");
SequenceFile.Writer sfw = SequenceFile.createWriter(FileSystem
.getLocal(conf), conf, filename, ChukwaArchiveKey.class,
ChunkImpl.class, SequenceFile.CompressionType.NONE, Reporter.NULL);
StringBuilder buf = new StringBuilder();
int offsets[] = new int[lines.length];
for (int i = 0; i < lines.length; ++i) {
buf.append(lines[i]);
buf.append("\n");
offsets[i] = buf.length() - 1;
}
ChukwaArchiveKey key = new ChukwaArchiveKey(0, "datatype", "sname", 0);
ChunkImpl val = new ChunkImpl("datatype", "sname", 0, buf.toString()
.getBytes(), null);
val.setRecordOffsets(offsets);
sfw.append(key, val);
sfw.append(key, val); // write it twice
sfw.close();
long len = FileSystem.getLocal(conf).getFileStatus(filename).getLen();
InputSplit split = new FileSplit(filename, 0, len, (String[]) null);
ChukwaInputFormat in = new ChukwaInputFormat();
RecordReader<LongWritable, Text> r = in.getRecordReader(split, conf,
Reporter.NULL);
LongWritable l = r.createKey();
Text line = r.createValue();
for (int i = 0; i < lines.length * 2; ++i) {
boolean succeeded = r.next(l, line);
assertTrue(succeeded);
assertEquals(i, l.get());
assertEquals(lines[i % lines.length], line.toString());
System.out.println("read line: " + l.get() + " " + line);
}
boolean succeeded = r.next(l, line);
assertFalse(succeeded);
} catch (IOException e) {
e.printStackTrace();
fail("IO exception " + e);
}
}
private void verifyInputFormatIllegalRegex() {
try {
JobConf conf = new JobConf();
conf.set("chukwa.inputfilter.datatype", "(");
String TMP_DIR = System.getProperty("test.build.data", "/tmp");
Path filename = new Path("file:///" + TMP_DIR + "/tmpSeqFile");
long len = FileSystem.getLocal(conf).getFileStatus(filename).getLen();
InputSplit split = new FileSplit(filename, 0, len, (String[]) null);
ChukwaInputFormat in = new ChukwaInputFormat();
RecordReader<LongWritable, Text> r = in.getRecordReader(split, conf,
Reporter.NULL);
} catch (PatternSyntaxException e) {
e.printStackTrace();
fail("Illegal regular expression caused PatternSyntaxException: " + e);
} catch (IOException e) {
e.printStackTrace();
fail("IO exception " + e);
}
}
}