| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.mrql; |
| |
| import org.apache.mrql.gen.*; |
| import java.io.*; |
| import java.net.Socket; |
| import java.util.*; |
| import org.apache.hadoop.fs.*; |
| import org.apache.hadoop.io.*; |
| import org.apache.hadoop.util.LineReader; |
| |
| |
| /** A parser for line-oriented, character delimited text files (such as CVS) */ |
| public class LineParser implements Parser { |
| final static int maxLineLength = 1000; |
| boolean in_memory; |
| FSDataInputStream fsin; // for HDFS processing |
| LineReader in; |
| BufferedReader buffered_in; // for in-memory processing |
| Text line; |
| long start; |
| long end; |
| long pos; |
| String delimiter; |
| Tree type; |
| byte[] types; // a vector of basic type ids (see MRContainer in MapReduceData) |
| int type_length; |
| |
| static byte[] relational_record ( Tree tp ) { |
| match tp { |
| case record(...al): |
| Trees attrs = #[]; |
| byte[] types = new byte[al.length()]; |
| for ( int i = 0; i < types.length; i++ ) |
| match al.nth(i) { |
| case bind(`v,any): |
| types[i] = -1; |
| if (attrs.member(v)) |
| TypeInference.type_error(tp,"Duplicate record attribute name: "+v); |
| attrs = attrs.append(v); |
| case bind(`v,`t): |
| if (!t.is_variable()) |
| fail; |
| types[i] = MRContainer.type_code(t.toString()); |
| if (attrs.member(v)) |
| TypeInference.type_error(tp,"Duplicate record attribute name: "+v); |
| attrs = attrs.append(v); |
| if (!MRContainer.basic_type(types[i])) |
| TypeInference.error("Expected a basic type for a relational attribute: "+t); |
| case `t: TypeInference.error("Expected a basic type for a relational attribute: " |
| +TypeInference.print_type(t)); |
| }; |
| return types; |
| case tuple(...al): |
| byte[] types = new byte[al.length()]; |
| for ( int i = 0; i < types.length; i++ ) |
| match al.nth(i) { |
| case any: |
| types[i] = -1; |
| case `t: |
| if (!t.is_variable()) |
| fail; |
| types[i] = MRContainer.type_code(t.toString()); |
| if (!MRContainer.basic_type(types[i])) |
| TypeInference.error("Expected a basic type for a relational attribute: "+t); |
| case `t: TypeInference.error("Expected a basic type for a relational attribute: " |
| +TypeInference.print_type(t)); |
| }; |
| return types; |
| }; |
| TypeInference.error("Expected a relational record or a tuple type: " |
| +TypeInference.print_type(tp)); |
| return null; |
| } |
| |
| static Tree relational_record_type ( Tree tp ) { |
| match tp { |
| case record(...al): |
| Trees ts = #[]; |
| for ( Tree a: al ) |
| match a { |
| case bind(_,any): ; |
| case `t: ts = ts.append(t); |
| }; |
| return #<record(...ts)>; |
| case tuple(...al): |
| Trees ts = #[]; |
| for ( Tree a: al ) |
| if (!a.equals(#<any>)) |
| ts = ts.append(a); |
| return #<tuple(...ts)>; |
| }; |
| TypeInference.error("Expected a relational record type: " |
| +TypeInference.print_type(tp)); |
| return null; |
| } |
| |
| public Tree type () { |
| return relational_record_type(type); |
| } |
| |
| public void initialize ( Trees args ) { |
| if (Config.hadoop_mode && Plan.conf == null) |
| Plan.conf = Evaluator.evaluator.new_configuration(); |
| if (args.length() != 2) |
| throw new Error("The line parser must have two arguments: "+args); |
| if (!(args.nth(0) instanceof StringLeaf)) |
| throw new Error("Expected a delimiter: "+args.nth(0)); |
| delimiter = ((StringLeaf)args.nth(0)).value(); |
| if (delimiter.length() == 0) |
| throw new Error("Expected a delimiter with at least one character: "+delimiter); |
| type = ((Node)args.nth(1)).children().nth(0); |
| types = relational_record(type); |
| type_length = 0; |
| for ( int i = 0; i < types.length; i++ ) |
| if (types[i] >= 0) |
| type_length++; |
| if (type_length < 1) |
| TypeInference.error("A relational record type must have at least one component: " |
| +TypeInference.print_type(type)); |
| } |
| |
| public void open ( String file ) { |
| in_memory = true; |
| try { |
| buffered_in = new BufferedReader(new InputStreamReader(new FileInputStream(file)), |
| 10000); |
| } catch ( Exception e ) { |
| throw new Error("Cannot open the file: "+file); |
| } |
| } |
| |
| public void open ( String host, int port ) { |
| in_memory = true; |
| try { |
| Socket s = new Socket(host,port); |
| buffered_in = new BufferedReader(new InputStreamReader(s.getInputStream()), |
| 10000); |
| } catch ( Exception e ) { |
| throw new Error("Cannot open the socket: "+host+":"+port); |
| } |
| } |
| |
| public void open ( FSDataInputStream fsin, long fstart, long fend ) { |
| in_memory = false; |
| this.fsin = fsin; |
| start = fstart; |
| end = fend; |
| line = new Text(); |
| try { |
| if (start != 0) { // for all but the first data split, skip the first record |
| --start; |
| fsin.seek(start); |
| in = new LineReader(fsin,Plan.conf); |
| start += in.readLine(new Text(),0,(int) Math.min(Integer.MAX_VALUE,end-start)); |
| } else in = new LineReader(fsin,Plan.conf); |
| pos = start; |
| } catch ( IOException e ) { |
| System.err.println("*** Cannot parse the data split: "+fsin); |
| this.start = end; |
| } |
| } |
| |
| public String slice () { |
| try { |
| if (in_memory) |
| return buffered_in.readLine(); |
| while (pos < end) { |
| int newSize = in.readLine(line,maxLineLength, |
| Math.max((int)Math.min(Integer.MAX_VALUE,end-pos), |
| maxLineLength)); |
| if (newSize == 0) |
| return null; |
| pos += newSize; |
| if (newSize < maxLineLength) |
| return line.toString(); |
| }; |
| return null; |
| } catch ( Exception e ) { |
| System.err.println("*** Cannot slice the text: "+e); |
| return ""; |
| } |
| } |
| |
| private static MRData parse_value ( String text, byte type ) { |
| switch (type) { |
| case MRContainer.BYTE: return new MR_byte(Byte.parseByte(text)); |
| case MRContainer.SHORT: return new MR_short(Short.parseShort(text)); |
| case MRContainer.INT: return new MR_int(Integer.parseInt(text)); |
| case MRContainer.LONG: return new MR_long(Long.parseLong(text)); |
| case MRContainer.FLOAT: return new MR_float(Float.parseFloat(text)); |
| case MRContainer.DOUBLE: return new MR_double(Double.parseDouble(text)); |
| case MRContainer.CHAR: return new MR_char(text.charAt(0)); |
| case MRContainer.STRING: return new MR_string(text); |
| }; |
| System.err.println("*** Cannot parse the type "+MRContainer.type_names[type]+" in '"+text+"'"); |
| return null; |
| } |
| |
| public Bag parse ( String line ) { |
| try { |
| if (line == null) |
| return new Bag(); |
| Tuple t = new Tuple(type_length); |
| int loc = 0; |
| int j = 0; |
| for ( int i = 0; i < types.length; i++ ) { |
| int k = line.indexOf(delimiter,loc); |
| if (types[i] >= 0) { |
| String s = (k > 0) ? line.substring(loc,k) : line.substring(loc); |
| MRData v = parse_value(s,types[i]); |
| if (v == null) |
| return new Bag(); |
| t.set(j++,v); |
| }; |
| loc = k+delimiter.length(); |
| if (k < 0 && i+1 < types.length) { |
| System.err.println("*** Incomplete parsed text line: "+line); |
| return new Bag(); |
| } |
| }; |
| return new Bag(t); |
| } catch ( Exception e ) { |
| System.err.println("*** Cannot parse the text line: "+line); |
| return new Bag(); |
| } |
| } |
| } |