blob: f38cc084e008efd4335380bae93917b88a20b40e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.hadoop.tsfile;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
public class TSFInputFormat extends FileInputFormat<NullWritable, MapWritable> {
/** key to configure whether reading time enable */
public static final String READ_TIME_ENABLE = "tsfile.read.time.enable";
/** key to configure whether reading deltaObjectId enable */
public static final String READ_DELTAOBJECT_ENABLE = "tsfile.read.deltaObjectId.enable";
/** key to configure the type of filter */
@Deprecated public static final String FILTER_TYPE = "tsfile.filter.type";
/** key to configure the filter */
@Deprecated public static final String FILTER_EXPRESSION = "tsfile.filter.expression";
/** key to configure whether filtering is enable */
public static final String FILTER_EXIST = "tsfile.filter.exist";
/** key to configure the reading deltaObjectIds */
public static final String READ_DELTAOBJECTS = "tsfile.read.deltaobject";
/** key to configure the reading measurementIds */
public static final String READ_MEASUREMENTID = "tsfile.read.measurement";
private static final Logger logger = LoggerFactory.getLogger(TSFInputFormat.class);
private static final String SEPARATOR = ",";
/**
* Set the deltaObjectIds which want to be read
*
* @param job hadoop job
* @param value the deltaObjectIds will be read
* @throws TSFHadoopException
*/
public static void setReadDeviceIds(Job job, String[] value) throws TSFHadoopException {
if (value == null || value.length < 1) {
throw new TSFHadoopException("The devices selected is null or empty");
} else {
StringBuilder deltaObjectIdsBuilder = new StringBuilder();
for (String deltaObjectId : value) {
deltaObjectIdsBuilder.append(deltaObjectId).append(SEPARATOR);
}
String deltaObjectIds = deltaObjectIdsBuilder.toString();
job.getConfiguration()
.set(
READ_DELTAOBJECTS,
(String) deltaObjectIds.subSequence(0, deltaObjectIds.length() - 1));
}
}
/**
* Get the deltaObjectIds which want to be read
*
* @param configuration
* @return List of device, if configuration has been set the deviceIds. null, if configuration has
* not been set the deviceIds.
*/
public static List<String> getReadDeviceIds(Configuration configuration) {
String deviceIds = configuration.get(READ_DELTAOBJECTS);
if (deviceIds == null || deviceIds.length() < 1) {
return new LinkedList<>();
} else {
return Arrays.stream(deviceIds.split(SEPARATOR)).collect(Collectors.toList());
}
}
/**
* Set the measurementIds which want to be read
*
* @param job hadoop job
* @param value the measurementIds will be read
* @throws TSFHadoopException
*/
public static void setReadMeasurementIds(Job job, String[] value) throws TSFHadoopException {
if (value == null || value.length < 1) {
throw new TSFHadoopException("The sensors selected is null or empty");
} else {
StringBuilder measurementIdsBuilder = new StringBuilder();
for (String measurementId : value) {
measurementIdsBuilder.append(measurementId).append(SEPARATOR);
}
String measurementIds = measurementIdsBuilder.toString();
// Get conf type
job.getConfiguration()
.set(
READ_MEASUREMENTID,
(String) measurementIds.subSequence(0, measurementIds.length() - 1));
}
}
/**
* Get the measurementIds which want to be read
*
* @param configuration hadoop configuration
* @return if not set the measurementIds, return null
*/
public static List<String> getReadMeasurementIds(Configuration configuration) {
String measurementIds = configuration.get(READ_MEASUREMENTID);
if (measurementIds == null || measurementIds.length() < 1) {
return new LinkedList<>();
} else {
return Arrays.stream(measurementIds.split(SEPARATOR)).collect(Collectors.toList());
}
}
/**
* @param job
* @param value
*/
public static void setReadDeviceId(Job job, boolean value) {
job.getConfiguration().setBoolean(READ_DELTAOBJECT_ENABLE, value);
}
/**
* @param configuration
* @return
*/
public static boolean getReadDeviceId(Configuration configuration) {
return configuration.getBoolean(READ_DELTAOBJECT_ENABLE, true);
}
/**
* @param job
* @param value
*/
public static void setReadTime(Job job, boolean value) {
job.getConfiguration().setBoolean(READ_TIME_ENABLE, value);
}
public static boolean getReadTime(Configuration configuration) {
return configuration.getBoolean(READ_TIME_ENABLE, true);
}
/**
* Set filter exist or not
*
* @param job
* @param value
*/
@Deprecated
public static void setHasFilter(Job job, boolean value) {
job.getConfiguration().setBoolean(FILTER_EXIST, value);
}
// check is we didn't set this key, the value will be null or empty
/**
* Get filter exist or not
*
* @param configuration
* @return
*/
@Deprecated
public static boolean getHasFilter(Configuration configuration) {
return configuration.getBoolean(FILTER_EXIST, false);
}
/**
* @param job
* @param value
*/
@Deprecated
public static void setFilterType(Job job, String value) {
job.getConfiguration().set(FILTER_TYPE, value);
}
/**
* Get the filter type
*
* @param configuration
* @return
*/
// check if not set the filter type, the result will null or empty
@Deprecated
public static String getFilterType(Configuration configuration) {
return configuration.get(FILTER_TYPE);
}
@Deprecated
public static void setFilterExp(Job job, String value) {
job.getConfiguration().set(FILTER_EXPRESSION, value);
}
@Deprecated
public static String getFilterExp(Configuration configuration) {
return configuration.get(FILTER_EXPRESSION);
}
@Override
public RecordReader<NullWritable, MapWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) {
return new TSFRecordReader();
}
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE, true);
List<FileStatus> listFileStatus = super.listStatus(job);
return new ArrayList<>(getTSFInputSplit(job.getConfiguration(), listFileStatus, logger));
}
public static List<TSFInputSplit> getTSFInputSplit(
Configuration configuration, List<FileStatus> listFileStatus, Logger logger)
throws IOException {
BlockLocation[] blockLocations;
List<TSFInputSplit> splits = new ArrayList<>();
// get the all file in the directory
logger.info("The number of this job file is {}", listFileStatus.size());
// For each file
for (FileStatus fileStatus : listFileStatus) {
logger.info("The file path is {}", fileStatus.getPath());
// Get the file path
Path path = fileStatus.getPath();
if (!path.toString().endsWith(TSFILE_SUFFIX)) {
continue;
}
// Get the file length
long length = fileStatus.getLen();
// Check the file length. if the length is less than 0, return the
// empty splits
if (length > 0) {
FileSystem fileSystem = path.getFileSystem(configuration);
logger.info("The file status is {}", fileStatus.getClass().getName());
logger.info("The file system is {}", fileSystem.getClass());
blockLocations = fileSystem.getFileBlockLocations(fileStatus, 0, length);
String locationInfo = Arrays.toString(blockLocations);
logger.info("The block location information is {}", locationInfo);
splits.addAll(generateSplits(path, blockLocations));
} else {
logger.warn("The file length is {}", length);
}
}
configuration.setLong(NUM_INPUT_FILES, listFileStatus.size());
logger.info("The number of splits is {}", splits.size());
return splits;
}
/**
* get the TSFInputSplit from tsfMetaData and hdfs block location information with the filter
*
* @throws IOException
*/
private static List<TSFInputSplit> generateSplits(Path path, BlockLocation[] blockLocations)
throws IOException {
List<TSFInputSplit> splits = new ArrayList<>();
for (BlockLocation blockLocation : blockLocations) {
splits.add(
new TSFInputSplit(
path,
blockLocation.getHosts(),
blockLocation.getOffset(),
blockLocation.getLength()));
}
return splits;
}
}