blob: 5b3ecad403501262c4d00f912436674a54716fdd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.hadoop.job;
import com.google.common.base.Preconditions;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.data.Schema;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.hadoop.job.mapper.SegmentCreationMapper;
import org.apache.pinot.hadoop.utils.PushLocation;
public class SegmentCreationJob extends BaseSegmentJob {
protected static final String APPEND = "APPEND";
protected final Path _inputPattern;
protected final Path _outputDir;
protected final Path _stagingDir;
protected final String _rawTableName;
// Optional
protected final Path _depsJarDir;
protected final Path _schemaFile;
protected final String _defaultPermissionsMask;
protected final List<PushLocation> _pushLocations;
protected FileSystem _fileSystem;
public SegmentCreationJob(Properties properties) {
super(properties);
_conf.set("mapreduce.job.user.classpath.first", "true");
_inputPattern = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_INPUT));
_outputDir = Preconditions.checkNotNull(getPathFromProperty(JobConfigConstants.PATH_TO_OUTPUT));
_stagingDir = new Path(_outputDir, UUID.randomUUID().toString());
_rawTableName = Preconditions.checkNotNull(_properties.getProperty(JobConfigConstants.SEGMENT_TABLE_NAME));
// Optional
_depsJarDir = getPathFromProperty(JobConfigConstants.PATH_TO_DEPS_JAR);
_schemaFile = getPathFromProperty(JobConfigConstants.PATH_TO_SCHEMA);
_defaultPermissionsMask = _properties.getProperty(JobConfigConstants.DEFAULT_PERMISSIONS_MASK);
// Optional push location and table parameters. If set, will use the table config and schema from the push hosts.
String pushHostsString = _properties.getProperty(JobConfigConstants.PUSH_TO_HOSTS);
String pushPortString = _properties.getProperty(JobConfigConstants.PUSH_TO_PORT);
if (pushHostsString != null && pushPortString != null) {
_pushLocations =
PushLocation.getPushLocations(StringUtils.split(pushHostsString, ','), Integer.parseInt(pushPortString));
} else {
_pushLocations = null;
}
_logger.info("*********************************************************************");
_logger.info("Input Pattern: {}", _inputPattern);
_logger.info("Output Directory: {}", _outputDir);
_logger.info("Staging Directory: {}", _stagingDir);
_logger.info("Raw Table Name: {}", _rawTableName);
_logger.info("Dependencies Directory: {}", _depsJarDir);
_logger.info("Schema File: {}", _schemaFile);
_logger.info("Default Permissions Mask: {}", _defaultPermissionsMask);
_logger.info("Push Locations: {}", _pushLocations);
_logger.info("*********************************************************************");
}
@Override
protected boolean isDataFile(String fileName) {
return fileName.endsWith(".avro") || fileName.endsWith(".csv") || fileName.endsWith(".json") || fileName
.endsWith(".thrift");
}
public void run()
throws Exception {
_logger.info("Starting {}", getClass().getSimpleName());
// Initialize all directories
_fileSystem = FileSystem.get(_conf);
mkdirs(_outputDir);
mkdirs(_stagingDir);
Path stagingInputDir = new Path(_stagingDir, "input");
mkdirs(stagingInputDir);
// Gather all data files
List<Path> dataFilePaths = getDataFilePaths(_inputPattern);
int numDataFiles = dataFilePaths.size();
if (numDataFiles == 0) {
String errorMessage = "No data file founded with pattern: " + _inputPattern;
_logger.error(errorMessage);
throw new RuntimeException(errorMessage);
} else {
_logger.info("Creating segments with data files: {}", dataFilePaths);
for (int i = 0; i < numDataFiles; i++) {
Path dataFilePath = dataFilePaths.get(i);
try (DataOutputStream dataOutputStream = _fileSystem.create(new Path(stagingInputDir, Integer.toString(i)))) {
dataOutputStream.write(StringUtil.encodeUtf8(dataFilePath.toString() + " " + i));
dataOutputStream.flush();
}
}
}
// Set up the job
Job job = Job.getInstance(_conf);
job.setJarByClass(getClass());
job.setJobName(getClass().getName());
Configuration jobConf = job.getConfiguration();
String hadoopTokenFileLocation = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
if (hadoopTokenFileLocation != null) {
jobConf.set("mapreduce.job.credentials.binary", hadoopTokenFileLocation);
}
jobConf.setInt(JobContext.NUM_MAPS, numDataFiles);
// Set table config and schema
TableConfig tableConfig = getTableConfig();
if (tableConfig != null) {
validateTableConfig(tableConfig);
jobConf.set(JobConfigConstants.TABLE_CONFIG, tableConfig.toJSONConfigString());
}
jobConf.set(JobConfigConstants.SCHEMA, getSchema().toSingleLineJsonString());
// Set additional configurations
for (Map.Entry<Object, Object> entry : _properties.entrySet()) {
jobConf.set(entry.getKey().toString(), entry.getValue().toString());
}
job.setMapperClass(getMapperClass());
job.setNumReduceTasks(0);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, stagingInputDir);
FileOutputFormat.setOutputPath(job, new Path(_stagingDir, "output"));
addDepsJarToDistributedCache(job);
addAdditionalJobProperties(job);
// Submit the job
job.waitForCompletion(true);
if (!job.isSuccessful()) {
throw new RuntimeException("Job failed: " + job);
}
moveSegmentsToOutputDir();
// Delete the staging directory
_logger.info("Deleting the staging directory: {}", _stagingDir);
_fileSystem.delete(_stagingDir, true);
}
protected void mkdirs(Path dirPath)
throws IOException {
if (_fileSystem.exists(dirPath)) {
_logger.warn("Deleting existing file: {}", dirPath);
_fileSystem.delete(dirPath, true);
}
_logger.info("Making directory: {}", dirPath);
_fileSystem.mkdirs(dirPath);
setDirPermission(dirPath);
}
protected void setDirPermission(Path dirPath)
throws IOException {
if (_defaultPermissionsMask != null) {
FsPermission permission = FsPermission.getDirDefault().applyUMask(new FsPermission(_defaultPermissionsMask));
_logger.info("Setting permission: {} to directory: {}", permission, dirPath);
_fileSystem.setPermission(dirPath, permission);
}
}
@Nullable
protected TableConfig getTableConfig()
throws IOException {
try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
return controllerRestApi != null ? controllerRestApi.getTableConfig() : null;
}
}
protected Schema getSchema()
throws IOException {
try (ControllerRestApi controllerRestApi = getControllerRestApi()) {
if (controllerRestApi != null) {
return controllerRestApi.getSchema();
} else {
try (InputStream inputStream = _fileSystem.open(_schemaFile)) {
return Schema.fromInputSteam(inputStream);
}
}
}
}
/**
* Can be overridden to provide custom controller Rest API.
*/
@Nullable
protected ControllerRestApi getControllerRestApi() {
return _pushLocations != null ? new DefaultControllerRestApi(_pushLocations, _rawTableName) : null;
}
protected void validateTableConfig(TableConfig tableConfig) {
SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
// For APPEND use case, timeColumnName and timeType must be set
if (APPEND.equalsIgnoreCase(validationConfig.getSegmentPushType())) {
Preconditions.checkState(validationConfig.getTimeColumnName() != null && validationConfig.getTimeType() != null,
"For APPEND use case, time column and type must be set");
}
}
/**
* Can be overridden to plug in custom mapper.
*/
protected Class<? extends Mapper<LongWritable, Text, LongWritable, Text>> getMapperClass() {
return SegmentCreationMapper.class;
}
protected void addDepsJarToDistributedCache(Job job)
throws IOException {
if (_depsJarDir != null) {
addDepsJarToDistributedCacheHelper(job, _depsJarDir);
}
}
protected void addDepsJarToDistributedCacheHelper(Job job, Path depsJarDir)
throws IOException {
FileStatus[] fileStatuses = _fileSystem.listStatus(depsJarDir);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
addDepsJarToDistributedCacheHelper(job, fileStatus.getPath());
} else {
Path depJarPath = fileStatus.getPath();
if (depJarPath.getName().endsWith(".jar")) {
_logger.info("Adding deps jar: {} to distributed cache", depJarPath);
job.addCacheArchive(depJarPath.toUri());
}
}
}
}
/**
* Can be overridden to set additional job properties.
*/
@SuppressWarnings("unused")
protected void addAdditionalJobProperties(Job job) {
}
protected void moveSegmentsToOutputDir()
throws IOException {
Path segmentTarDir = new Path(new Path(_stagingDir, "output"), JobConfigConstants.SEGMENT_TAR_DIR);
for (FileStatus segmentTarStatus : _fileSystem.listStatus(segmentTarDir)) {
Path segmentTarPath = segmentTarStatus.getPath();
Path dest = new Path(_outputDir, segmentTarPath.getName());
_logger.info("Moving segment tar file from: {} to: {}", segmentTarPath, dest);
_fileSystem.rename(segmentTarPath, dest);
}
}
}