blob: a9737a13491b63e1e8147b9a779221c1efbb8097 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.mapreduce;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.PartitionQuery;
import org.apache.gora.query.Query;
import org.apache.gora.query.impl.FileSplitPartitionQuery;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.apache.gora.store.FileBackedDataStore;
import org.apache.gora.util.IOUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
* {@link InputFormat} to fetch the input from Gora data stores. The
* query to fetch the items from the datastore should be prepared and
* set via {@link #setQuery(Job, Query)}, before submitting the job.
*
* <p> The {@link InputSplit}s are prepared from the {@link PartitionQuery}s
* obtained by calling {@link DataStore#getPartitions(Query)}.
* <p>
* Hadoop jobs can be either configured through static
* <code>setInput()</code> methods, or from {@link GoraMapper}.
*
* @see GoraMapper
*/
public class GoraInputFormat<K, T extends Persistent>
extends InputFormat<K, T> implements Configurable {
public static final String QUERY_KEY = "gora.inputformat.query";
private DataStore<K, T> dataStore;
private Configuration conf;
private Query<K, T> query;
@SuppressWarnings({ "rawtypes" })
private void setInputPath(PartitionQuery<K,T> partitionQuery
, TaskAttemptContext context) throws IOException {
//if the data store is file based
if(partitionQuery instanceof FileSplitPartitionQuery) {
FileSplit split = ((FileSplitPartitionQuery<K,T>)partitionQuery).getSplit();
//set the input path to FileSplit's path.
((FileBackedDataStore)partitionQuery.getDataStore()).setInputPath(
split.getPath().toString());
}
}
@Override
@SuppressWarnings("unchecked")
public RecordReader<K, T> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException, InterruptedException {
PartitionQuery<K,T> partitionQuery = (PartitionQuery<K, T>)
((GoraInputSplit)split).getQuery();
setInputPath(partitionQuery, context);
return new GoraRecordReader<K, T>(partitionQuery, context);
}
@Override
public List<InputSplit> getSplits(JobContext context) throws IOException,
InterruptedException {
List<PartitionQuery<K, T>> queries = dataStore.getPartitions(query);
List<InputSplit> splits = new ArrayList<InputSplit>(queries.size());
for(PartitionQuery<K,T> query : queries) {
splits.add(new GoraInputSplit(context.getConfiguration(), query));
}
return splits;
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
try {
this.query = getQuery(conf);
this.dataStore = query.getDataStore();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public static<K, T extends Persistent> void setQuery(Job job
, Query<K, T> query) throws IOException {
IOUtils.storeToConf(query, job.getConfiguration(), QUERY_KEY);
}
public Query<K, T> getQuery(Configuration conf) throws IOException {
return IOUtils.loadFromConf(conf, QUERY_KEY);
}
/**
* Sets the input parameters for the job
* @param job the job to set the properties for
* @param query the query to get the inputs from
* @param reuseObjects whether to reuse objects in serialization
* @throws IOException
*/
public static <K1, V1 extends Persistent> void setInput(Job job
, Query<K1,V1> query, boolean reuseObjects) throws IOException {
setInput(job, query, query.getDataStore(), reuseObjects);
}
/**
* Sets the input parameters for the job
* @param job the job to set the properties for
* @param query the query to get the inputs from
* @param dataStore the datastore as the input
* @param reuseObjects whether to reuse objects in serialization
* @throws IOException
*/
public static <K1, V1 extends Persistent> void setInput(Job job
, Query<K1,V1> query, DataStore<K1,V1> dataStore, boolean reuseObjects)
throws IOException {
Configuration conf = job.getConfiguration();
GoraMapReduceUtils.setIOSerializations(conf, reuseObjects);
job.setInputFormatClass(GoraInputFormat.class);
GoraInputFormat.setQuery(job, query);
}
/**
* Sets the input parameters for the job
* @param job the job to set the properties for
* @param dataStoreClass the datastore class
* @param inKeyClass Map input key class
* @param inValueClass Map input value class
* @param reuseObjects whether to reuse objects in serialization
* @throws IOException
*/
public static <K1, V1 extends Persistent> void setInput(
Job job,
Class<? extends DataStore<K1,V1>> dataStoreClass,
Class<K1> inKeyClass,
Class<V1> inValueClass,
boolean reuseObjects)
throws IOException {
DataStore<K1,V1> store = DataStoreFactory.getDataStore(dataStoreClass
, inKeyClass, inValueClass, job.getConfiguration());
setInput(job, store.newQuery(), store, reuseObjects);
}
}