blob: a095c8a4503c5f328975fc48b5289d93e7fe9196 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.hadoop.format;
import java.lang.reflect.InvocationTargetException;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
/** Utility class for working with Hadoop related objects. */
final class HadoopFormats {
private static final int DEFAULT_JOB_NUMBER = 0;
static final Class<HashPartitioner> DEFAULT_PARTITIONER_CLASS_ATTR = HashPartitioner.class;
private static final int DEFAULT_NUM_REDUCERS = 1;
private HadoopFormats() {}
/**
* Creates {@link JobID} with random jtIdentifier and default job number.
*
* @return new {@link JobID}
*/
public static JobID createJobId() {
return new JobID(UUID.randomUUID().toString(), DEFAULT_JOB_NUMBER);
}
/**
* Creates {@link JobID} with specified jtIdentifier and default job number.
*
* @param jtIdentifier jtIdentifier to specify
* @return new {@link JobID}
*/
public static JobID createJobId(String jtIdentifier) {
return new JobID(jtIdentifier, DEFAULT_JOB_NUMBER);
}
/**
* Creates new setup {@link TaskAttemptContext} from hadoop {@link Configuration} and {@link
* JobID}.
*
* @param conf hadoop {@link Configuration}
* @param jobID jobId of the created {@link TaskAttemptContext}
* @return new setup {@link TaskAttemptContext}
*/
static TaskAttemptContext createSetupTaskContext(Configuration conf, JobID jobID) {
final TaskID taskId = new TaskID(jobID, TaskType.JOB_SETUP, 0);
return createTaskAttemptContext(conf, new TaskAttemptID(taskId, 0));
}
/**
* Creates new {@link TaskAttemptContext} from hadoop {@link Configuration}, {@link JobID} and
* specified taskNumber.
*
* @param conf hadoop {@link Configuration}
* @param jobID jobId of the created {@link TaskAttemptContext}
* @param taskNumber number of the task (should be unique across one job)
* @return new {@link TaskAttemptContext}
*/
static TaskAttemptContext createTaskAttemptContext(
Configuration conf, JobID jobID, int taskNumber) {
TaskAttemptID taskAttemptID = createTaskAttemptID(jobID, taskNumber, 0);
return createTaskAttemptContext(conf, taskAttemptID);
}
/**
* Creates {@link TaskAttemptContext}.
*
* @param conf configuration
* @param taskAttemptID taskAttemptId
* @return new {@link TaskAttemptContext}
*/
static TaskAttemptContext createTaskAttemptContext(
Configuration conf, TaskAttemptID taskAttemptID) {
return new TaskAttemptContextImpl(conf, taskAttemptID);
}
/**
* Creates new {@link TaskAttemptID}.
*
* @param jobID jobId
* @param taskId taskId
* @param attemptId attemptId
* @return new {@link TaskAttemptID}
*/
static TaskAttemptID createTaskAttemptID(JobID jobID, int taskId, int attemptId) {
final TaskID tId = createTaskID(jobID, taskId);
return new TaskAttemptID(tId, attemptId);
}
/**
* Creates new {@link TaskID} with specified {@code taskNumber} for given {@link JobID}.
*
* @param jobID jobId of the created {@link TaskID}
* @param taskNumber number of the task (should be unique across one job)
* @return new {@link TaskID} for given {@link JobID}
*/
static TaskID createTaskID(JobID jobID, int taskNumber) {
return new TaskID(jobID, TaskType.REDUCE, taskNumber);
}
/**
* Creates cleanup {@link TaskAttemptContext} for given {@link JobID}.
*
* @param conf hadoop configuration
* @param jobID jobId of the created {@link TaskID}
* @return new cleanup {@link TaskID} for given {@link JobID}
*/
static TaskAttemptContext createCleanupTaskContext(Configuration conf, JobID jobID) {
final TaskID taskId = new TaskID(jobID, TaskType.JOB_CLEANUP, 0);
return createTaskAttemptContext(conf, new TaskAttemptID(taskId, 0));
}
/**
* Returns instance of {@link OutputFormat} by class name stored in the configuration under key
* {@link MRJobConfig#OUTPUT_FORMAT_CLASS_ATTR}.
*
* @param conf Hadoop configuration
* @param <KeyT> KeyType of output format
* @param <ValueT> ValueType of output format
* @return OutputFormatter
* @throws IllegalArgumentException if particular key was not found in the config or Formatter was
* unable to construct.
*/
@SuppressWarnings("unchecked")
static <KeyT, ValueT> OutputFormat<KeyT, ValueT> createOutputFormatFromConfig(Configuration conf)
throws IllegalArgumentException {
return (OutputFormat<KeyT, ValueT>)
createInstanceFromConfig(
conf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, null, OutputFormat.class);
}
/**
* Creates new instance of {@link Partitioner} by class specified in hadoop {@link Configuration}.
*
* @param conf hadoop Configuration
* @param <KeyT> KeyType of {@link Partitioner}
* @param <ValueT> ValueTYpe of {@link Partitioner}
* @return new {@link Partitioner}
*/
@SuppressWarnings("unchecked")
static <KeyT, ValueT> Partitioner<KeyT, ValueT> getPartitioner(Configuration conf) {
return (Partitioner<KeyT, ValueT>)
createInstanceFromConfig(
conf,
MRJobConfig.PARTITIONER_CLASS_ATTR,
DEFAULT_PARTITIONER_CLASS_ATTR,
Partitioner.class);
}
/**
* Creates object from class specified in the configuration under specified {@code
* configClassKey}.
*
* @param conf hadoop Configuration where is stored class name of returned object
* @param configClassKey key for class name
* @param defaultClass Default class if any result was not found under specified {@code
* configClassKey}
* @param xface interface of given class
* @return created object
*/
private static <T> T createInstanceFromConfig(
Configuration conf,
String configClassKey,
@Nullable Class<? extends T> defaultClass,
Class<T> xface) {
try {
String className = conf.get(configClassKey);
Preconditions.checkArgument(
className != null || defaultClass != null,
String.format(
"Configuration does not contains any value under %s key. Unable to initialize class instance from configuration. ",
configClassKey));
Class<? extends T> requiredClass = conf.getClass(configClassKey, defaultClass, xface);
return requiredClass.getConstructor().newInstance();
} catch (InstantiationException
| IllegalAccessException
| NoSuchMethodException
| InvocationTargetException e) {
throw new IllegalArgumentException(
String.format(
"Unable to create instance of object from configuration under key %s.",
configClassKey),
e);
}
}
/**
* Creates {@link JobID} with {@code jtIdentifier} specified in hadoop {@link Configuration} under
* {@link MRJobConfig#ID} key.
*
* @param conf hadoop {@link Configuration}
* @return JobID created from {@link Configuration}
*/
static JobID getJobId(Configuration conf) {
String jobJtIdentifier =
Preconditions.checkNotNull(
conf.get(MRJobConfig.ID),
"Configuration must contain jobID under key \"%s\".",
HadoopFormatIO.JOB_ID);
return new JobID(jobJtIdentifier, DEFAULT_JOB_NUMBER);
}
/**
* Returns count of the reducers specified under key {@link MRJobConfig#NUM_REDUCES} in hadoop
* {@link Configuration}.
*
* @param conf hadoop {@link Configuration}
* @return configured count of reducers
*/
static int getReducersCount(Configuration conf) {
return conf.getInt(MRJobConfig.NUM_REDUCES, DEFAULT_NUM_REDUCERS);
}
}