blob: a1e38c3873b747717bd45272c6e617dc2c19a49c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.contrib.index.example;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.contrib.index.mapred.DocumentAndOp;
import org.apache.hadoop.contrib.index.mapred.DocumentID;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
/**
* A simple RecordReader for LineDoc for plain text files where each line is a
* doc. Each line is as follows: documentID<SPACE>op<SPACE>content<EOF>,
* where op can be "i", "ins" or "insert" for insert, "d", "del" or "delete"
* for delete, or "u", "upd" or "update" for update.
*/
public class LineDocRecordReader implements
RecordReader<DocumentID, LineDocTextAndOp> {
private static final char SPACE = ' ';
private static final char EOL = '\n';
private long start;
private long pos;
private long end;
private BufferedInputStream in;
private ByteArrayOutputStream buffer = new ByteArrayOutputStream(256);
/**
* Provide a bridge to get the bytes from the ByteArrayOutputStream without
* creating a new byte array.
*/
private static class TextStuffer extends OutputStream {
public Text target;
public void write(int b) {
throw new UnsupportedOperationException("write(byte) not supported");
}
public void write(byte[] data, int offset, int len) throws IOException {
target.set(data, offset, len);
}
}
private TextStuffer bridge = new TextStuffer();
/**
* Constructor
* @param job
* @param split
* @throws IOException
*/
public LineDocRecordReader(Configuration job, FileSplit split)
throws IOException {
long start = split.getStart();
long end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
InputStream in = fileIn;
boolean skipFirstLine = false;
if (start != 0) {
skipFirstLine = true; // wait till BufferedInputStream to skip
--start;
fileIn.seek(start);
}
this.in = new BufferedInputStream(in);
if (skipFirstLine) { // skip first line and re-establish "start".
start += LineDocRecordReader.readData(this.in, null, EOL);
}
this.start = start;
this.pos = start;
this.end = end;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.RecordReader#close()
*/
public void close() throws IOException {
in.close();
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.RecordReader#createKey()
*/
public DocumentID createKey() {
return new DocumentID();
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.RecordReader#createValue()
*/
public LineDocTextAndOp createValue() {
return new LineDocTextAndOp();
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.RecordReader#getPos()
*/
public long getPos() throws IOException {
return pos;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.RecordReader#getProgress()
*/
public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float) (end - start));
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapred.RecordReader#next(java.lang.Object, java.lang.Object)
*/
public synchronized boolean next(DocumentID key, LineDocTextAndOp value)
throws IOException {
if (pos >= end) {
return false;
}
// key is document id, which are bytes until first space
if (!readInto(key.getText(), SPACE)) {
return false;
}
// read operation: i/d/u, or ins/del/upd, or insert/delete/update
Text opText = new Text();
if (!readInto(opText, SPACE)) {
return false;
}
String opStr = opText.toString();
DocumentAndOp.Op op;
if (opStr.equals("i") || opStr.equals("ins") || opStr.equals("insert")) {
op = DocumentAndOp.Op.INSERT;
} else if (opStr.equals("d") || opStr.equals("del")
|| opStr.equals("delete")) {
op = DocumentAndOp.Op.DELETE;
} else if (opStr.equals("u") || opStr.equals("upd")
|| opStr.equals("update")) {
op = DocumentAndOp.Op.UPDATE;
} else {
// default is insert
op = DocumentAndOp.Op.INSERT;
}
value.setOp(op);
if (op == DocumentAndOp.Op.DELETE) {
return true;
} else {
// read rest of the line
return readInto(value.getText(), EOL);
}
}
private boolean readInto(Text text, char delimiter) throws IOException {
buffer.reset();
long bytesRead = readData(in, buffer, delimiter);
if (bytesRead == 0) {
return false;
}
pos += bytesRead;
bridge.target = text;
buffer.writeTo(bridge);
return true;
}
private static long readData(InputStream in, OutputStream out, char delimiter)
throws IOException {
long bytes = 0;
while (true) {
int b = in.read();
if (b == -1) {
break;
}
bytes += 1;
byte c = (byte) b;
if (c == EOL || c == delimiter) {
break;
}
if (c == '\r') {
in.mark(1);
byte nextC = (byte) in.read();
if (nextC != EOL || c == delimiter) {
in.reset();
} else {
bytes += 1;
}
break;
}
if (out != null) {
out.write(c);
}
}
return bytes;
}
}