blob: 326a8c2ad8e9ff8bb52d15cecd0208e9519de702 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.vxquery.hdfs2;
import com.google.common.io.Closeables;
import org.apache.commons.io.Charsets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import java.io.IOException;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
/**
* Reads records that are delimited by a specific begin/end tag.
*/
public class XmlCollectionWithTagInputFormat extends TextInputFormat {
public static String STARTING_TAG;
public static String ENDING_TAG;
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
try {
STARTING_TAG = context.getConfiguration().get("start_tag");
ENDING_TAG = context.getConfiguration().get("end_tag");
return new XmlRecordReader((FileSplit) split, context.getConfiguration());
} catch (IOException ioe) {
return null;
}
}
/**
* XMLRecordReader class to read through a given xml document to output xml blocks as records as specified
* by the end tag
*/
public static class XmlRecordReader extends RecordReader<LongWritable, Text> {
private final byte[] endTag;
private final byte[] startTag;
private final long start;
private final long end;
private final FSDataInputStream fsin;
private final DataOutputBuffer buffer = new DataOutputBuffer();
private LongWritable currentKey;
private Text currentValue;
BlockLocation[] blocks;
protected static byte[] nl = "\n".getBytes();
public XmlRecordReader(FileSplit split, Configuration conf) throws IOException {
endTag = ENDING_TAG.getBytes(Charsets.UTF_8);
startTag = STARTING_TAG.getBytes(Charsets.UTF_8);
// open the file and seek to the start of the split
start = split.getStart();
// set the end of the file
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(conf);
FileStatus fStatus = fs.getFileStatus(file);
blocks = fs.getFileBlockLocations(fStatus, 0, fStatus.getLen());
// seek the start of file
fsin = fs.open(split.getPath());
fsin.seek(start);
}
/**
* Get next block item
*
* @param key
* @param value
* @return
* @throws IOException
*/
private boolean next(LongWritable key, Text value) throws IOException {
if (fsin.getPos() < end) {
try {
if (readBlock(true)) {
key.set(fsin.getPos());
value.set(buffer.getData(), 0, buffer.getLength());
return true;
}
} finally {
buffer.reset();
}
}
return false;
}
@Override
public void close() throws IOException {
Closeables.close(fsin, true);
}
@Override
public float getProgress() throws IOException {
return (fsin.getPos() - start) / (float) (end - start);
}
/**
* Read the block from start till end and after that until you find a closing tag
*
* @param withinBlock
* @return
* @throws IOException
*/
private boolean readBlock(boolean withinBlock) throws IOException {
boolean read = false;
while (true) {
if (fsin.getPos() < end) {
if (readUntilMatch(startTag, false)) {
buffer.write(startTag);
readUntilMatch(endTag, true);
read = true;
}
} else {
return read;
}
}
}
/**
* Read from block(s) until you reach the end of file or find a matching bytes with match[]
*
* @param match
* @param withinBlock
* @return
* @throws IOException
*/
private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
int i = 0;
while (true) {
int b = fsin.read();
// end of file:
if (b == -1) {
return false;
}
// save to buffer:
if (withinBlock) {
buffer.write(b);
}
// check if we're matching:
if (b == match[i]) {
i++;
if (i >= match.length) {
return true;
}
} else {
i = 0;
}
// see if we've passed the stop point:
if (!withinBlock && i == 0 && fsin.getPos() >= end) {
return false;
}
}
}
private int nextBlock() throws IOException {
long pos = fsin.getPos();
long blockLength;
for (int i = 0; i < blocks.length; i++) {
blockLength = blocks[i].getOffset() + blocks[i].getLength();
if (pos == blockLength) {
return i + 1;
}
}
return 0;
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return currentKey;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return currentValue;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
currentKey = new LongWritable();
currentValue = new Text();
return next(currentKey, currentValue);
}
}
}