blob: 04d2bbf452f1ff5fdb25803aceb5ab1179099a78 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import org.apache.mrql.gen.*;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.Configuration;
/** A DataSource is any input data source, such as a text file, a key/value map, a data base, an intermediate file, etc */
public class DataSource {
final public static String separator = "%%%";
public static DataSourceDirectory dataSourceDirectory = new DataSourceDirectory();
public static ParserDirectory parserDirectory = new ParserDirectory();
private static boolean loaded = false;
public String path;
public Class<? extends MRQLFileInputFormat> inputFormat;
public int source_num;
public boolean to_be_merged; // if the path is a directory with multiple files, merge them
final static class ParserDirectory extends HashMap<String,Class<? extends Parser>> {
}
/** A dictionary that maps data source paths to DataSource data.
* It assumes that each path can only be associated with a single data source format and parser
*/
final static class DataSourceDirectory extends HashMap<String,DataSource> {
public void read ( Configuration conf ) {
clear();
for ( String s: conf.get("mrql.data.source.directory").split("@@@") ) {
String[] p = s.split("===");
put(p[0],DataSource.read(p[1],conf));
}
}
public String toString () {
String s = "";
for ( String k: keySet() )
s += "@@@"+k+"==="+get(k);
if (s.equals(""))
return s;
else return s.substring(3);
}
public DataSource get ( String name ) {
for ( Map.Entry<String,DataSource> e: entrySet() )
if (name.startsWith(e.getKey()))
return e.getValue();
return null;
}
public void distribute ( Configuration conf ) {
conf.set("mrql.data.source.directory",toString());
}
}
DataSource () {}
DataSource ( int source_num,
String path,
Class<? extends MRQLFileInputFormat> inputFormat,
Configuration conf ) {
this.source_num = source_num;
this.path = path;
this.inputFormat = inputFormat;
to_be_merged = false;
if (Config.flink_mode)
dataSourceDirectory.put(path,this);
else try {
Path p = new Path(path);
FileSystem fs = p.getFileSystem(conf);
String complete_path = fs.getFileStatus(p).getPath().toString();
this.path = complete_path;
dataSourceDirectory.put(this.path,this);
} catch (IOException e) {
throw new Error(e);
}
}
public static void loadParsers() {
if (!loaded) {
DataSource.parserDirectory.put("xml",XMLParser.class);
DataSource.parserDirectory.put("json",JsonFormatParser.class);
DataSource.parserDirectory.put("line",LineParser.class);
loaded = true;
}
}
static {
loadParsers();
}
private static long size ( Path path, Configuration conf ) throws IOException {
FileStatus s = path.getFileSystem(conf).getFileStatus(path);
if (!s.isDir())
return s.getLen();
long size = 0;
for ( FileStatus fs: path.getFileSystem(conf).listStatus(path) )
size += fs.getLen();
return size;
}
/** data set size in bytes */
public long size ( Configuration conf ) {
try {
return size(new Path(path),conf);
} catch (IOException e) {
throw new Error(e);
}
}
public static DataSource read ( String buffer, Configuration conf ) {
try {
String[] s = buffer.split(separator);
int n = Integer.parseInt(s[1]);
Main.initialize(); // the worker node may not have initialized MRQL
if (s[0].equals("Binary"))
return new BinaryDataSource(n,s[2],conf);
else if (s[0].equals("Generator"))
return new GeneratorDataSource(n,s[2],conf);
else if (s[0].equals("Text"))
return new ParsedDataSource(n,s[3],parserDirectory.get(s[2]),((Node)Tree.parse(s[4])).children(),conf);
else throw new Error("Unrecognized data source: "+s[0]);
} catch (Exception e) {
throw new Error(e);
}
}
public static DataSource get ( String path, Configuration conf ) {
if (dataSourceDirectory.isEmpty())
dataSourceDirectory.read(conf);
return dataSourceDirectory.get(path);
}
public static DataSource getCached ( String remote_path, String local_path, Configuration conf ) {
DataSource ds = get(remote_path,conf);
ds.path = local_path;
dataSourceDirectory.put(local_path,ds);
return ds;
}
/** return the first num values */
public List<MRData> take ( int num ) {
int count = num;
try {
ArrayList<MRData> res = new ArrayList<MRData>();
Iterator<MRData> it = inputFormat.newInstance().collect(new DataSet(this,0,0),false).iterator();
for ( int i = num; (num < 0 || i > 0) && it.hasNext(); i-- )
if (Config.hadoop_mode && Config.bsp_mode && !to_be_merged)
res.add(((Tuple)it.next()).get(1)); // strip tag in BSP mode
else res.add(it.next());
return res;
} catch (Exception ex) {
throw new Error(ex);
}
}
static Tuple tuple_container = new Tuple(new Tuple(),new Tuple());
/** accumulate all datasource values */
public MRData reduce ( MRData zero, final Function acc ) {
try {
MRData res = zero;
for ( MRData x: inputFormat.newInstance().materialize(new Path(path)) ) {
if (Config.hadoop_mode && Config.bsp_mode)
x = ((Tuple)x).get(1); // strip tag in BSP mode
tuple_container.set(0,res).set(1,x);
res = acc.eval(tuple_container);
};
return res;
} catch (Exception ex) {
throw new Error(ex);
}
}
}