blob: 522889533ce55031dc44263d927b965c8276dc47 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gora.mapreduce;
import java.io.IOException;
import org.apache.gora.persistency.Persistent;
import org.apache.gora.query.Query;
import org.apache.gora.store.DataStore;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* Base class for Gora based {@link Mapper}s.
*/
public class GoraMapper<K1, V1 extends Persistent, K2, V2>
extends Mapper<K1, V1, K2, V2> {
/**
* Initializes the Mapper, and sets input parameters for the job. All of
* the records in the dataStore are used as the input. If you want to
* include a specific subset, use one of the overloaded methods which takes
* query parameter.
* @param job the job to set the properties for
* @param dataStoreClass the datastore class
* @param inKeyClass Map input key class
* @param inValueClass Map input value class
* @param outKeyClass Map output key class
* @param outValueClass Map output value class
* @param mapperClass the mapper class extending GoraMapper
* @param partitionerClass optional partitioner class
* @param reuseObjects whether to reuse objects in serialization
*/
@SuppressWarnings("rawtypes")
public static <K1, V1 extends Persistent, K2, V2>
void initMapperJob(
Job job,
Class<? extends DataStore<K1,V1>> dataStoreClass,
Class<K1> inKeyClass,
Class<V1> inValueClass,
Class<K2> outKeyClass,
Class<V2> outValueClass,
Class<? extends GoraMapper> mapperClass,
Class<? extends Partitioner> partitionerClass,
boolean reuseObjects)
throws IOException {
//set the input via GoraInputFormat
GoraInputFormat.setInput(job, dataStoreClass, inKeyClass, inValueClass, reuseObjects);
job.setMapperClass(mapperClass);
job.setMapOutputKeyClass(outKeyClass);
job.setMapOutputValueClass(outValueClass);
if (partitionerClass != null) {
job.setPartitionerClass(partitionerClass);
}
}
/**
* Initializes the Mapper, and sets input parameters for the job. All of
* the records in the dataStore are used as the input. If you want to
* include a specific subset, use one of the overloaded methods which takes
* query parameter.
* @param job the job to set the properties for
* @param dataStoreClass the datastore class
* @param inKeyClass Map input key class
* @param inValueClass Map input value class
* @param outKeyClass Map output key class
* @param outValueClass Map output value class
* @param mapperClass the mapper class extending GoraMapper
* @param reuseObjects whether to reuse objects in serialization
*/
@SuppressWarnings("rawtypes")
public static <K1, V1 extends Persistent, K2, V2>
void initMapperJob(
Job job,
Class<? extends DataStore<K1,V1>> dataStoreClass,
Class<K1> inKeyClass,
Class<V1> inValueClass,
Class<K2> outKeyClass,
Class<V2> outValueClass,
Class<? extends GoraMapper> mapperClass,
boolean reuseObjects)
throws IOException {
initMapperJob(job, dataStoreClass, inKeyClass, inValueClass, outKeyClass
, outValueClass, mapperClass, null, reuseObjects);
}
/**
* Initializes the Mapper, and sets input parameters for the job
* @param job the job to set the properties for
* @param query the query to get the inputs from
* @param dataStore the datastore as the input
* @param outKeyClass Map output key class
* @param outValueClass Map output value class
* @param mapperClass the mapper class extending GoraMapper
* @param partitionerClass optional partitioner class
* @param reuseObjects whether to reuse objects in serialization
*/
@SuppressWarnings("rawtypes")
public static <K1, V1 extends Persistent, K2, V2>
void initMapperJob(
Job job,
Query<K1,V1> query,
DataStore<K1,V1> dataStore,
Class<K2> outKeyClass,
Class<V2> outValueClass,
Class<? extends GoraMapper> mapperClass,
Class<? extends Partitioner> partitionerClass,
boolean reuseObjects)
throws IOException {
//set the input via GoraInputFormat
GoraInputFormat.setInput(job, query, dataStore, reuseObjects);
job.setMapperClass(mapperClass);
job.setMapOutputKeyClass(outKeyClass);
job.setMapOutputValueClass(outValueClass);
if (partitionerClass != null) {
job.setPartitionerClass(partitionerClass);
}
}
/**
* Initializes the Mapper, and sets input parameters for the job
* @param job the job to set the properties for
* @param dataStore the datastore as the input
* @param outKeyClass Map output key class
* @param outValueClass Map output value class
* @param mapperClass the mapper class extending GoraMapper
* @param reuseObjects whether to reuse objects in serialization
*/
@SuppressWarnings({ "rawtypes" })
public static <K1, V1 extends Persistent, K2, V2>
void initMapperJob(
Job job,
DataStore<K1,V1> dataStore,
Class<K2> outKeyClass,
Class<V2> outValueClass,
Class<? extends GoraMapper> mapperClass,
boolean reuseObjects)
throws IOException {
initMapperJob(job, dataStore.newQuery(), dataStore,
outKeyClass, outValueClass, mapperClass, reuseObjects);
}
/**
* Initializes the Mapper, and sets input parameters for the job
* @param job the job to set the properties for
* @param query the query to get the inputs from
* @param dataStore the datastore as the input
* @param outKeyClass Map output key class
* @param outValueClass Map output value class
* @param mapperClass the mapper class extending GoraMapper
* @param reuseObjects whether to reuse objects in serialization
*/
@SuppressWarnings({ "rawtypes" })
public static <K1, V1 extends Persistent, K2, V2>
void initMapperJob(
Job job,
Query<K1,V1> query,
DataStore<K1,V1> dataStore,
Class<K2> outKeyClass,
Class<V2> outValueClass,
Class<? extends GoraMapper> mapperClass,
boolean reuseObjects)
throws IOException {
initMapperJob(job, query, dataStore, outKeyClass, outValueClass,
mapperClass, null, reuseObjects);
}
}