blob: 5ca05b27306b4a891187bb813b2f09a17625ecab [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.io.*;
import java.net.Socket;
import java.util.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.util.LineReader;
/** A parser for line-oriented, character delimited text files (such as CVS) */
public class LineParser implements Parser {
final static int maxLineLength = 1000;
boolean in_memory;
FSDataInputStream fsin; // for HDFS processing
LineReader in;
BufferedReader buffered_in; // for in-memory processing
Text line;
long start;
long end;
long pos;
String delimiter;
Tree type;
byte[] types; // a vector of basic type ids (see MRContainer in MapReduceData)
int type_length;
static byte[] relational_record ( Tree tp ) {
match tp {
case record(...al):
Trees attrs = #[];
byte[] types = new byte[al.length()];
for ( int i = 0; i < types.length; i++ )
match al.nth(i) {
case bind(`v,any):
types[i] = -1;
if (attrs.member(v))
TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
attrs = attrs.append(v);
case bind(`v,`t):
if (!t.is_variable())
fail;
types[i] = MRContainer.type_code(t.toString());
if (attrs.member(v))
TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
attrs = attrs.append(v);
if (!MRContainer.basic_type(types[i]))
TypeInference.error("Expected a basic type for a relational attribute: "+t);
case `t: TypeInference.error("Expected a basic type for a relational attribute: "
+TypeInference.print_type(t));
};
return types;
case tuple(...al):
byte[] types = new byte[al.length()];
for ( int i = 0; i < types.length; i++ )
match al.nth(i) {
case any:
types[i] = -1;
case `t:
if (!t.is_variable())
fail;
types[i] = MRContainer.type_code(t.toString());
if (!MRContainer.basic_type(types[i]))
TypeInference.error("Expected a basic type for a relational attribute: "+t);
case `t: TypeInference.error("Expected a basic type for a relational attribute: "
+TypeInference.print_type(t));
};
return types;
};
TypeInference.error("Expected a relational record or a tuple type: "
+TypeInference.print_type(tp));
return null;
}
static Tree relational_record_type ( Tree tp ) {
match tp {
case record(...al):
Trees ts = #[];
for ( Tree a: al )
match a {
case bind(_,any): ;
case `t: ts = ts.append(t);
};
return #<record(...ts)>;
case tuple(...al):
Trees ts = #[];
for ( Tree a: al )
if (!a.equals(#<any>))
ts = ts.append(a);
return #<tuple(...ts)>;
};
TypeInference.error("Expected a relational record type: "
+TypeInference.print_type(tp));
return null;
}
public Tree type () {
return relational_record_type(type);
}
public void initialize ( Trees args ) {
if (Config.hadoop_mode && Plan.conf == null)
Plan.conf = Evaluator.evaluator.new_configuration();
if (args.length() != 2)
throw new Error("The line parser must have two arguments: "+args);
if (!(args.nth(0) instanceof StringLeaf))
throw new Error("Expected a delimiter: "+args.nth(0));
delimiter = ((StringLeaf)args.nth(0)).value();
if (delimiter.length() == 0)
throw new Error("Expected a delimiter with at least one character: "+delimiter);
type = ((Node)args.nth(1)).children().nth(0);
types = relational_record(type);
type_length = 0;
for ( int i = 0; i < types.length; i++ )
if (types[i] >= 0)
type_length++;
if (type_length < 1)
TypeInference.error("A relational record type must have at least one component: "
+TypeInference.print_type(type));
}
public void open ( String file ) {
in_memory = true;
try {
buffered_in = new BufferedReader(new InputStreamReader(new FileInputStream(file)),
10000);
} catch ( Exception e ) {
throw new Error("Cannot open the file: "+file);
}
}
public void open ( String host, int port ) {
in_memory = true;
try {
Socket s = new Socket(host,port);
buffered_in = new BufferedReader(new InputStreamReader(s.getInputStream()),
10000);
} catch ( Exception e ) {
throw new Error("Cannot open the socket: "+host+":"+port);
}
}
public void open ( FSDataInputStream fsin, long fstart, long fend ) {
in_memory = false;
this.fsin = fsin;
start = fstart;
end = fend;
line = new Text();
try {
if (start != 0) { // for all but the first data split, skip the first record
--start;
fsin.seek(start);
in = new LineReader(fsin,Plan.conf);
start += in.readLine(new Text(),0,(int) Math.min(Integer.MAX_VALUE,end-start));
} else in = new LineReader(fsin,Plan.conf);
pos = start;
} catch ( IOException e ) {
System.err.println("*** Cannot parse the data split: "+fsin);
this.start = end;
}
}
public String slice () {
try {
if (in_memory)
return buffered_in.readLine();
while (pos < end) {
int newSize = in.readLine(line,maxLineLength,
Math.max((int)Math.min(Integer.MAX_VALUE,end-pos),
maxLineLength));
if (newSize == 0)
return null;
pos += newSize;
if (newSize < maxLineLength)
return line.toString();
};
return null;
} catch ( Exception e ) {
System.err.println("*** Cannot slice the text: "+e);
return "";
}
}
private static MRData parse_value ( String text, byte type ) {
switch (type) {
case MRContainer.BYTE: return new MR_byte(Byte.parseByte(text));
case MRContainer.SHORT: return new MR_short(Short.parseShort(text));
case MRContainer.INT: return new MR_int(Integer.parseInt(text));
case MRContainer.LONG: return new MR_long(Long.parseLong(text));
case MRContainer.FLOAT: return new MR_float(Float.parseFloat(text));
case MRContainer.DOUBLE: return new MR_double(Double.parseDouble(text));
case MRContainer.CHAR: return new MR_char(text.charAt(0));
case MRContainer.STRING: return new MR_string(text);
};
System.err.println("*** Cannot parse the type "+MRContainer.type_names[type]+" in '"+text+"'");
return null;
}
public Bag parse ( String line ) {
try {
if (line == null)
return new Bag();
Tuple t = new Tuple(type_length);
int loc = 0;
int j = 0;
for ( int i = 0; i < types.length; i++ ) {
int k = line.indexOf(delimiter,loc);
if (types[i] >= 0) {
String s = (k > 0) ? line.substring(loc,k) : line.substring(loc);
MRData v = parse_value(s,types[i]);
if (v == null)
return new Bag();
t.set(j++,v);
};
loc = k+delimiter.length();
if (k < 0 && i+1 < types.length) {
System.err.println("*** Incomplete parsed text line: "+line);
return new Bag();
}
};
return new Bag(t);
} catch ( Exception e ) {
System.err.println("*** Cannot parse the text line: "+line);
return new Bag();
}
}
}