blob: d0760b48a5cb41bbe1153ac5d6e428286d93d663 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.mrql;
import java.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/** A superclass for all MRQL FileInputFormats */
abstract public class MapReduceMRQLFileInputFormat extends FileInputFormat<MRContainer,MRContainer> implements MRQLFileInputFormat {
public MapReduceMRQLFileInputFormat () {}
/** record reader for map-reduce */
abstract public RecordReader<MRContainer,MRContainer>
createRecordReader ( InputSplit split,
TaskAttemptContext context ) throws IOException, InterruptedException;
/** materialize the input file into a memory Bag */
abstract public Bag materialize ( final Path path ) throws IOException;
/** materialize the entire dataset into a Bag
* @param x the DataSet in HDFS to collect values from
* @param strip is not used in MapReduce mode
* @return the Bag that contains the collected values
*/
public final Bag collect ( final DataSet x, boolean strip ) throws Exception {
Bag res = new Bag();
for ( DataSource s: x.source )
if (s.to_be_merged)
res = res.union(Plan.merge(s));
else res = res.union(s.inputFormat.newInstance().materialize(new Path(s.path)));
return res;
}
}