blob: 235d65e9904ae8c2f3244861951cd62a9f95e488 [file] [log] [blame]
/**
* Copyright 2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.streaming;
import java.io.*;
import java.util.regex.*;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
/** A way to interpret XML fragments as Mapper input records.
* Values are XML subtrees delimited by configurable tags.
* Keys could be the value of a certain attribute in the XML subtree,
* but this is left to the stream processor application.
*
* The name-value properties that StreamXmlRecordReader understands are:
* String begin (chars marking beginning of record)
* String end (chars marking end of record)
* int maxrec (maximum record size)
* int lookahead(maximum lookahead to sync CDATA)
* boolean slowmatch
*
* @author Michel Tourn
*/
public class StreamXmlRecordReader extends StreamBaseRecordReader
{
public StreamXmlRecordReader(
FSDataInputStream in, long start, long end,
String splitName, Reporter reporter, JobConf job)
throws IOException
{
super(in, start, end, splitName, reporter, job);
beginMark_ = checkJobGet(CONF_NS + "begin");
endMark_ = checkJobGet(CONF_NS + "end");
maxRecSize_= job_.getInt(CONF_NS + "maxrec", 50*1000);
lookAhead_ = job_.getInt(CONF_NS + "lookahead", 2*maxRecSize_);
synched_ = false;
slowMatch_ = job_.getBoolean(CONF_NS + "slowmatch", false);
if(slowMatch_) {
beginPat_ = makePatternCDataOrMark(beginMark_);
endPat_ = makePatternCDataOrMark(endMark_);
}
}
int numNext = 0;
public synchronized boolean next(Writable key, Writable value)
throws IOException
{
long pos = in_.getPos();
numNext++;
if (pos >= end_) {
return false;
}
StringBuffer buf = new StringBuffer();
if(!readUntilMatchBegin()) {
return false;
}
if(!readUntilMatchEnd(buf)) {
return false;
}
numRecStats(buf);
// There is only one elem..key/value splitting is not done here.
((UTF8)key).set(buf.toString());
((UTF8)value).set("");
/*if(numNext < 5) {
System.out.println("@@@ " + numNext + ". true next k=|" + key.toString().replaceAll("[\\r\\n]", " ")
+ "|, len=" + buf.length() + " v=|" + value.toString().replaceAll("[\\r\\n]", " ") + "|");
}*/
return true;
}
public void seekNextRecordBoundary() throws IOException
{
readUntilMatchBegin();
}
boolean readUntilMatchBegin() throws IOException
{
if(slowMatch_) {
return slowReadUntilMatch(beginPat_, false, null);
} else {
return fastReadUntilMatch(beginMark_, false, null);
}
}
boolean readUntilMatchEnd(StringBuffer buf) throws IOException
{
if(slowMatch_) {
return slowReadUntilMatch(endPat_, true, buf);
} else {
return fastReadUntilMatch(endMark_, true, buf);
}
}
boolean slowReadUntilMatch(Pattern markPattern, boolean includePat, StringBuffer outBufOrNull)
throws IOException
{
try {
long inStart = in_.getPos();
byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)];
int read = 0;
boolean success = true;
in_.mark(lookAhead_ + 2);
read = in_.read(buf);
String sbuf = new String(buf);
Matcher match = markPattern.matcher(sbuf);
firstMatchStart_ = NA;
firstMatchEnd_ = NA;
int bufPos = 0;
int state = synched_ ? CDATA_OUT : CDATA_UNK;
int s=0;
int matchLen = 0;
while(match.find(bufPos)) {
int input;
matchLen = match.group(0).length();
if(match.group(1) != null) {
input = CDATA_BEGIN;
} else if(match.group(2) != null) {
input = CDATA_END;
firstMatchStart_ = NA; // |<DOC CDATA[ </DOC> ]]> should keep it
} else {
input = RECORD_MAYBE;
}
if(input == RECORD_MAYBE) {
if(firstMatchStart_ == NA) {
firstMatchStart_ = match.start();
firstMatchEnd_ = match.end();
}
}
state = nextState(state, input, match.start());
/*System.out.println("@@@" +
s + ". Match " + match.start() + " " + match.groupCount() +
" state=" + state + " input=" + input +
" firstMatchStart_=" + firstMatchStart_ + " startinstream=" + (inStart+firstMatchStart_) +
" match=" + match.group(0) + " in=" + in_.getPos());*/
if(state == RECORD_ACCEPT) {
break;
}
bufPos = match.end();
s++;
}
if(state != CDATA_UNK) {
synched_ = true;
}
boolean matched = (firstMatchStart_ != NA) && (state == RECORD_ACCEPT || state == CDATA_UNK);
if(matched) {
int endPos = includePat ? firstMatchEnd_ : firstMatchStart_;
//System.out.println("firstMatchStart_=" + firstMatchStart_ + " firstMatchEnd_=" + firstMatchEnd_);
String snip = sbuf.substring(firstMatchStart_, firstMatchEnd_);
//System.out.println(" match snip=|" + snip + "| markPattern=" + markPattern);
if(outBufOrNull != null) {
buf = new byte[endPos];
in_.reset();
read = in_.read(buf);
if(read != endPos) {
//System.out.println("@@@ BAD re-read less: " + read + " < " + endPos);
}
outBufOrNull.append(new String(buf));
} else {
//System.out.println("Skip to " + (inStart + endPos));
in_.seek(inStart + endPos);
}
}
return matched;
} catch(Exception e) {
e.printStackTrace();
} finally {
// in_ ?
}
return false;
}
// states
final static int CDATA_IN = 10;
final static int CDATA_OUT = 11;
final static int CDATA_UNK = 12;
final static int RECORD_ACCEPT = 13;
// inputs
final static int CDATA_BEGIN = 20;
final static int CDATA_END = 21;
final static int RECORD_MAYBE= 22;
/* also updates firstMatchStart_;*/
int nextState(int state, int input, int bufPos)
{
switch(state) {
case CDATA_UNK:
case CDATA_OUT:
switch(input) {
case CDATA_BEGIN:
return CDATA_IN;
case CDATA_END:
if(state==CDATA_OUT) {
//System.out.println("buggy XML " + bufPos);
}
return CDATA_OUT;
case RECORD_MAYBE:
return (state==CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT;
}
break;
case CDATA_IN:
return (input==CDATA_END) ? CDATA_OUT : CDATA_IN;
}
throw new IllegalStateException(state + " " + input + " " + bufPos + " " + splitName_);
}
Pattern makePatternCDataOrMark(String escapedMark)
{
StringBuffer pat = new StringBuffer();
addGroup(pat, StreamUtil.regexpEscape("CDATA[")); // CDATA_BEGIN
addGroup(pat, StreamUtil.regexpEscape("]]>")); // CDATA_END
addGroup(pat, escapedMark); // RECORD_MAYBE
return Pattern.compile(pat.toString());
}
void addGroup(StringBuffer pat, String escapedGroup)
{
if(pat.length() > 0) {
pat.append("|");
}
pat.append("(");
pat.append(escapedGroup);
pat.append(")");
}
boolean fastReadUntilMatch(String textPat, boolean includePat, StringBuffer outBufOrNull) throws IOException
{
//System.out.println("@@@BEGIN readUntilMatch inPos=" + in_.getPos());
char[] cpat = textPat.toCharArray();
int m = 0;
boolean match = false;
long markPos = -1;
int msup = cpat.length;
if(!includePat) {
int LL = 120000 * 10;
markPos = in_.getPos();
in_.mark(LL); // lookAhead_
}
while (true) {
int b = in_.read();
if (b == -1)
break;
char c = (char)b; // this assumes eight-bit matching. OK with UTF-8
if(outBufOrNull != null) {
outBufOrNull.append(c);
}
if (c == cpat[m]) {
m++;
if(m==msup) {
match = true;
break;
}
} else {
m = 0;
}
}
if(!includePat && match) {
if(outBufOrNull != null) {
outBufOrNull.setLength(outBufOrNull.length() - textPat.length());
}
long pos = in_.getPos() - textPat.length();
in_.reset();
in_.seek(pos);
}
//System.out.println("@@@DONE readUntilMatch inPos=" + in_.getPos() + " includePat=" + includePat + " pat=" + textPat + ", buf=|" + outBufOrNull + "|");
return match;
}
String checkJobGet(String prop) throws IOException
{
String val = job_.get(prop);
if(val == null) {
throw new IOException("JobConf: missing required property: " + prop);
}
return val;
}
String beginMark_;
String endMark_;
Pattern beginPat_;
Pattern endPat_;
boolean slowMatch_;
int lookAhead_; // bytes to read to try to synch CDATA/non-CDATA. Should be more than max record size
int maxRecSize_;
final static int NA = -1;
int firstMatchStart_ = 0; // candidate record boundary. Might just be CDATA.
int firstMatchEnd_ = 0;
boolean isRecordMatch_;
boolean synched_;
}