blob: 0b69b33098d2ca9bc776850705dcf02df2bcacd4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
import org.apache.druid.indexer.path.PathSpec;
import org.apache.druid.initialization.Initialization;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.FunctionalIterable;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.server.DruidNode;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.ShardSpecLookup;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
/**
*/
public class HadoopDruidIndexerConfig
{
private static final Injector INJECTOR;
static final String CONFIG_PROPERTY = "druid.indexer.config";
static final Charset JAVA_NATIVE_CHARSET = Charset.forName("Unicode");
static final Splitter TAB_SPLITTER = Splitter.on("\t");
static final Joiner TAB_JOINER = Joiner.on("\t");
public static final ObjectMapper JSON_MAPPER;
public static final IndexIO INDEX_IO;
static final IndexMerger INDEX_MERGER_V9;
static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG;
static final DataSegmentPusher DATA_SEGMENT_PUSHER;
private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";
/**
* Hadoop tasks running in an Indexer process need a reference to the Properties instance created
* in PropertiesModule so that the task sees properties that were specified in Druid's config files.
*
* This is not strictly necessary for Peon-based tasks which have all properties, including config file properties,
* specified on their command line by ForkingTaskRunner (so they could use System.getProperties() only),
* but we always use the injected Properties for consistency.
*/
public static final Properties PROPERTIES;
static {
INJECTOR = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
ImmutableList.of(
(Module) binder -> {
JsonConfigProvider.bindInstance(
binder,
Key.get(DruidNode.class, Self.class),
new DruidNode("hadoop-indexer", null, false, null, null, true, false)
);
JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HadoopKerberosConfig.class);
},
new IndexingHadoopModule()
)
);
JSON_MAPPER = INJECTOR.getInstance(ObjectMapper.class);
INDEX_IO = INJECTOR.getInstance(IndexIO.class);
INDEX_MERGER_V9 = INJECTOR.getInstance(IndexMergerV9.class);
HADOOP_KERBEROS_CONFIG = INJECTOR.getInstance(HadoopKerberosConfig.class);
DATA_SEGMENT_PUSHER = INJECTOR.getInstance(DataSegmentPusher.class);
PROPERTIES = INJECTOR.getInstance(Properties.class);
}
public enum IndexJobCounters
{
INVALID_ROW_COUNTER,
ROWS_PROCESSED_COUNTER,
ROWS_PROCESSED_WITH_ERRORS_COUNTER,
ROWS_UNPARSEABLE_COUNTER,
ROWS_THROWN_AWAY_COUNTER
}
public static HadoopDruidIndexerConfig fromSpec(HadoopIngestionSpec spec)
{
return new HadoopDruidIndexerConfig(spec);
}
private static HadoopDruidIndexerConfig fromMap(Map<String, Object> argSpec)
{
// Eventually PathSpec needs to get rid of its Hadoop dependency, then maybe this can be ingested directly without
// the Map<> intermediary
if (argSpec.containsKey("spec")) {
return HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
argSpec,
HadoopDruidIndexerConfig.class
);
}
return new HadoopDruidIndexerConfig(
HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
argSpec,
HadoopIngestionSpec.class
)
);
}
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromFile(File file)
{
try {
return fromMap(
HadoopDruidIndexerConfig.JSON_MAPPER.readValue(file, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromString(String str)
{
// This is a map to try and prevent dependency screwbally-ness
try {
return fromMap(
HadoopDruidIndexerConfig.JSON_MAPPER.readValue(str, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings("unchecked")
public static HadoopDruidIndexerConfig fromDistributedFileSystem(String path)
{
try {
Path pt = new Path(path);
FileSystem fs = pt.getFileSystem(new Configuration());
Reader reader = new InputStreamReader(fs.open(pt), StandardCharsets.UTF_8);
return fromMap(
HadoopDruidIndexerConfig.JSON_MAPPER.readValue(reader, JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT)
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)
{
final HadoopDruidIndexerConfig retVal = fromString(conf.get(HadoopDruidIndexerConfig.CONFIG_PROPERTY));
retVal.verify();
return retVal;
}
private HadoopIngestionSpec schema;
private PathSpec pathSpec;
private String hadoopJobIdFileName;
private final Map<Long, ShardSpecLookup> shardSpecLookups = new HashMap<>();
private final Map<Long, Map<ShardSpec, HadoopyShardSpec>> hadoopShardSpecLookup = new HashMap<>();
private final Granularity rollupGran;
private final List<String> allowedHadoopPrefix;
@JsonCreator
public HadoopDruidIndexerConfig(
final @JsonProperty("spec") HadoopIngestionSpec spec
)
{
this.schema = spec;
this.pathSpec = JSON_MAPPER.convertValue(spec.getIOConfig().getPathSpec(), PathSpec.class);
for (Map.Entry<Long, List<HadoopyShardSpec>> entry : spec.getTuningConfig().getShardSpecs().entrySet()) {
if (entry.getValue() == null || entry.getValue().isEmpty()) {
continue;
}
final ShardSpec actualSpec = entry.getValue().get(0).getActualSpec();
shardSpecLookups.put(
entry.getKey(), actualSpec.getLookup(
Lists.transform(
entry.getValue(), HadoopyShardSpec::getActualSpec
)
)
);
Map<ShardSpec, HadoopyShardSpec> innerHadoopShardSpecLookup = new HashMap<>();
for (HadoopyShardSpec hadoopyShardSpec : entry.getValue()) {
innerHadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec);
}
hadoopShardSpecLookup.put(entry.getKey(), innerHadoopShardSpecLookup);
}
this.rollupGran = spec.getDataSchema().getGranularitySpec().getQueryGranularity();
// User-specified list plus our additional bonus list.
this.allowedHadoopPrefix = new ArrayList<>();
this.allowedHadoopPrefix.add("druid.storage");
this.allowedHadoopPrefix.add("druid.javascript");
this.allowedHadoopPrefix.addAll(DATA_SEGMENT_PUSHER.getAllowedPropertyPrefixesForHadoop());
this.allowedHadoopPrefix.addAll(spec.getTuningConfig().getUserAllowedHadoopPrefix());
}
@JsonProperty(value = "spec")
public HadoopIngestionSpec getSchema()
{
return schema;
}
@JsonIgnore
public PathSpec getPathSpec()
{
return pathSpec;
}
public String getDataSource()
{
return schema.getDataSchema().getDataSource();
}
public GranularitySpec getGranularitySpec()
{
return schema.getDataSchema().getGranularitySpec();
}
public void setGranularitySpec(GranularitySpec granularitySpec)
{
this.schema = schema.withDataSchema(schema.getDataSchema().withGranularitySpec(granularitySpec));
this.pathSpec = JSON_MAPPER.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
}
public DimensionBasedPartitionsSpec getPartitionsSpec()
{
return schema.getTuningConfig().getPartitionsSpec();
}
public IndexSpec getIndexSpec()
{
return schema.getTuningConfig().getIndexSpec();
}
public IndexSpec getIndexSpecForIntermediatePersists()
{
return schema.getTuningConfig().getIndexSpecForIntermediatePersists();
}
boolean isOverwriteFiles()
{
return schema.getTuningConfig().isOverwriteFiles();
}
public void setShardSpecs(Map<Long, List<HadoopyShardSpec>> shardSpecs)
{
this.schema = schema.withTuningConfig(schema.getTuningConfig().withShardSpecs(shardSpecs));
this.pathSpec = JSON_MAPPER.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
}
public Optional<List<Interval>> getIntervals()
{
Optional<SortedSet<Interval>> setOptional = schema.getDataSchema().getGranularitySpec().bucketIntervals();
if (setOptional.isPresent()) {
return Optional.of(JodaUtils.condenseIntervals(setOptional.get()));
} else {
return Optional.absent();
}
}
boolean isDeterminingPartitions()
{
return schema.getTuningConfig().getPartitionsSpec().needsDeterminePartitions(true);
}
public int getTargetPartitionSize()
{
DimensionBasedPartitionsSpec spec = schema.getTuningConfig().getPartitionsSpec();
if (spec.getTargetRowsPerSegment() != null) {
return spec.getTargetRowsPerSegment();
}
final Integer targetPartitionSize = spec.getMaxRowsPerSegment();
return targetPartitionSize == null ? -1 : targetPartitionSize;
}
boolean isForceExtendableShardSpecs()
{
return schema.getTuningConfig().isForceExtendableShardSpecs();
}
public boolean isUpdaterJobSpecSet()
{
return (schema.getIOConfig().getMetadataUpdateSpec() != null);
}
public boolean isCombineText()
{
return schema.getTuningConfig().isCombineText();
}
public InputRowParser getParser()
{
return Preconditions.checkNotNull(schema.getDataSchema().getParser(), "inputRowParser");
}
public HadoopyShardSpec getShardSpec(Bucket bucket)
{
return schema.getTuningConfig().getShardSpecs().get(bucket.time.getMillis()).get(bucket.partitionNum);
}
int getShardSpecCount(Bucket bucket)
{
return schema.getTuningConfig().getShardSpecs().get(bucket.time.getMillis()).size();
}
public boolean isLogParseExceptions()
{
return schema.getTuningConfig().isLogParseExceptions();
}
public int getMaxParseExceptions()
{
return schema.getTuningConfig().getMaxParseExceptions();
}
public Map<String, String> getAllowedProperties()
{
Map<String, String> allowedPropertiesMap = new HashMap<>();
for (String propName : PROPERTIES.stringPropertyNames()) {
for (String prefix : allowedHadoopPrefix) {
if (propName.equals(prefix) || propName.startsWith(prefix + ".")) {
allowedPropertiesMap.put(propName, PROPERTIES.getProperty(propName));
break;
}
}
}
return allowedPropertiesMap;
}
boolean isUseYarnRMJobStatusFallback()
{
return schema.getTuningConfig().isUseYarnRMJobStatusFallback();
}
void setHadoopJobIdFileName(String hadoopJobIdFileName)
{
this.hadoopJobIdFileName = hadoopJobIdFileName;
}
String getHadoopJobIdFileName()
{
return hadoopJobIdFileName;
}
/**
* Job instance should have Configuration set (by calling {@link #addJobProperties(Job)}
* or via injected system properties) before this method is called. The {@link PathSpec} may
* create objects which depend on the values of these configurations.
*/
public Job addInputPaths(Job job) throws IOException
{
return pathSpec.addInputPaths(this, job);
}
/********************************************
Granularity/Bucket Helper Methods
********************************************/
/**
* Get the proper bucket for some input row.
*
* @param inputRow an InputRow
*
* @return the Bucket that this row belongs to
*/
Optional<Bucket> getBucket(InputRow inputRow)
{
final Optional<Interval> timeBucket = schema.getDataSchema().getGranularitySpec().bucketInterval(
DateTimes.utc(inputRow.getTimestampFromEpoch())
);
if (!timeBucket.isPresent()) {
return Optional.absent();
}
final DateTime bucketStart = timeBucket.get().getStart();
final ShardSpec actualSpec = shardSpecLookups.get(bucketStart.getMillis())
.getShardSpec(
rollupGran.bucketStart(inputRow.getTimestamp()).getMillis(),
inputRow
);
final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(bucketStart.getMillis()).get(actualSpec);
return Optional.of(
new Bucket(
hadoopyShardSpec.getShardNum(),
bucketStart,
actualSpec.getPartitionNum()
)
);
}
Optional<Set<Interval>> getSegmentGranularIntervals()
{
return Optional.fromNullable(
schema.getDataSchema()
.getGranularitySpec()
.bucketIntervals()
.orNull()
);
}
public List<Interval> getInputIntervals()
{
return schema.getDataSchema()
.getGranularitySpec()
.inputIntervals();
}
Optional<Iterable<Bucket>> getAllBuckets()
{
Optional<Set<Interval>> intervals = getSegmentGranularIntervals();
if (intervals.isPresent()) {
return Optional.of(
FunctionalIterable
.create(intervals.get())
.transformCat(
input -> {
final DateTime bucketTime = input.getStart();
final List<HadoopyShardSpec> specs = schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis());
if (specs == null) {
return ImmutableList.of();
}
return FunctionalIterable
.create(specs)
.transform(
new Function<HadoopyShardSpec, Bucket>()
{
int i = 0;
@Override
public Bucket apply(HadoopyShardSpec input)
{
return new Bucket(input.getShardNum(), bucketTime, i++);
}
}
);
}
)
);
} else {
return Optional.absent();
}
}
public String getWorkingPath()
{
final String workingPath = schema.getTuningConfig().getWorkingPath();
return workingPath == null ? DEFAULT_WORKING_PATH : workingPath;
}
/******************************************
Path helper logic
******************************************/
/**
* Make the intermediate path for this job run.
*
* @return the intermediate path for this job run.
*/
Path makeIntermediatePath()
{
return new Path(
StringUtils.format(
"%s/%s/%s_%s",
getWorkingPath(),
schema.getDataSchema().getDataSource(),
StringUtils.removeChar(schema.getTuningConfig().getVersion(), ':'),
schema.getUniqueId()
)
);
}
Path makeSegmentPartitionInfoPath(Interval bucketInterval)
{
return new Path(
StringUtils.format(
"%s/%s_%s/partitions.json",
makeIntermediatePath(),
ISODateTimeFormat.basicDateTime().print(bucketInterval.getStart()),
ISODateTimeFormat.basicDateTime().print(bucketInterval.getEnd())
)
);
}
Path makeIntervalInfoPath()
{
return new Path(
StringUtils.format(
"%s/intervals.json",
makeIntermediatePath()
)
);
}
Path makeDescriptorInfoDir()
{
return new Path(makeIntermediatePath(), "segmentDescriptorInfo");
}
Path makeGroupedDataDir()
{
return new Path(makeIntermediatePath(), "groupedData");
}
Path makeDescriptorInfoPath(DataSegment segment)
{
return new Path(makeDescriptorInfoDir(), StringUtils.removeChar(segment.getId() + ".json", ':'));
}
void addJobProperties(Job job)
{
addJobProperties(job.getConfiguration());
}
void addJobProperties(Configuration conf)
{
for (final Map.Entry<String, String> entry : schema.getTuningConfig().getJobProperties().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
}
public void intoConfiguration(Job job)
{
Configuration conf = job.getConfiguration();
try {
conf.set(HadoopDruidIndexerConfig.CONFIG_PROPERTY, HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(this));
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
public void verify()
{
Preconditions.checkNotNull(schema.getDataSchema().getDataSource(), "dataSource");
Preconditions.checkNotNull(schema.getDataSchema().getParser(), "inputRowParser");
Preconditions.checkNotNull(schema.getDataSchema().getParser().getParseSpec(), "parseSpec");
Preconditions.checkNotNull(schema.getDataSchema().getGranularitySpec(), "granularitySpec");
Preconditions.checkNotNull(pathSpec, "inputSpec");
Preconditions.checkNotNull(schema.getTuningConfig().getWorkingPath(), "workingPath");
Preconditions.checkNotNull(schema.getIOConfig().getSegmentOutputPath(), "segmentOutputPath");
Preconditions.checkNotNull(schema.getTuningConfig().getVersion(), "version");
}
}