blob: 1e717f5d17b6bdd6539df9ec88f842f29ec8b976 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.indexing;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Multiset;
import com.google.common.collect.TreeMultiset;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.transform.TransformSpec;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
/**
*
*/
public class DataSchema
{
private static final Logger log = new Logger(DataSchema.class);
private final String dataSource;
private final AggregatorFactory[] aggregators;
private final GranularitySpec granularitySpec;
private final TransformSpec transformSpec;
private final Map<String, Object> parserMap;
private final ObjectMapper objectMapper;
// The below fields can be initialized lazily from parser for backward compatibility.
private TimestampSpec timestampSpec;
private DimensionsSpec dimensionsSpec;
// This is used for backward compatibility
private InputRowParser inputRowParser;
@JsonCreator
public DataSchema(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("timestampSpec") @Nullable TimestampSpec timestampSpec, // can be null in old task spec
@JsonProperty("dimensionsSpec") @Nullable DimensionsSpec dimensionsSpec, // can be null in old task spec
@JsonProperty("metricsSpec") AggregatorFactory[] aggregators,
@JsonProperty("granularitySpec") GranularitySpec granularitySpec,
@JsonProperty("transformSpec") TransformSpec transformSpec,
@Deprecated @JsonProperty("parser") @Nullable Map<String, Object> parserMap,
@JacksonInject ObjectMapper objectMapper
)
{
validateDatasourceName(dataSource);
this.dataSource = dataSource;
this.timestampSpec = timestampSpec;
this.aggregators = aggregators == null ? new AggregatorFactory[]{} : aggregators;
this.dimensionsSpec = dimensionsSpec == null
? null
: computeDimensionsSpec(
Preconditions.checkNotNull(timestampSpec, "timestampSpec"),
dimensionsSpec,
this.aggregators
);
if (granularitySpec == null) {
log.warn("No granularitySpec has been specified. Using UniformGranularitySpec as default.");
this.granularitySpec = new UniformGranularitySpec(null, null, null);
} else {
this.granularitySpec = granularitySpec;
}
this.transformSpec = transformSpec == null ? TransformSpec.NONE : transformSpec;
this.parserMap = parserMap;
this.objectMapper = objectMapper;
// Fail-fast if there are output name collisions. Note: because of the pull-from-parser magic in getDimensionsSpec,
// this validation is not necessarily going to be able to catch everything. It will run again in getDimensionsSpec.
computeAndValidateOutputFieldNames(this.dimensionsSpec, this.aggregators);
if (this.granularitySpec.isRollup() && this.aggregators.length == 0) {
log.warn(
"Rollup is enabled for dataSource [%s] but no metricsSpec has been provided. "
+ "Are you sure this is what you want?",
dataSource
);
}
}
@VisibleForTesting
public DataSchema(
String dataSource,
TimestampSpec timestampSpec,
DimensionsSpec dimensionsSpec,
AggregatorFactory[] aggregators,
GranularitySpec granularitySpec,
TransformSpec transformSpec
)
{
this(dataSource, timestampSpec, dimensionsSpec, aggregators, granularitySpec, transformSpec, null, null);
}
// old constructor for backward compatibility
@Deprecated
public DataSchema(
String dataSource,
Map<String, Object> parserMap,
AggregatorFactory[] aggregators,
GranularitySpec granularitySpec,
TransformSpec transformSpec,
ObjectMapper objectMapper
)
{
this(dataSource, null, null, aggregators, granularitySpec, transformSpec, parserMap, objectMapper);
}
private static void validateDatasourceName(String dataSource)
{
IdUtils.validateId("dataSource", dataSource);
}
/**
* Computes the {@link DimensionsSpec} that we will actually use. It is derived from, but not necessarily identical
* to, the one that we were given.
*/
private static DimensionsSpec computeDimensionsSpec(
final TimestampSpec timestampSpec,
final DimensionsSpec dimensionsSpec,
final AggregatorFactory[] aggregators
)
{
final Set<String> inputFieldNames = computeInputFieldNames(timestampSpec, dimensionsSpec, aggregators);
final Set<String> outputFieldNames = computeAndValidateOutputFieldNames(dimensionsSpec, aggregators);
// Set up additional exclusions: all inputs and outputs, minus defined dimensions.
final Set<String> additionalDimensionExclusions = new HashSet<>();
additionalDimensionExclusions.addAll(inputFieldNames);
additionalDimensionExclusions.addAll(outputFieldNames);
additionalDimensionExclusions.removeAll(dimensionsSpec.getDimensionNames());
return dimensionsSpec.withDimensionExclusions(additionalDimensionExclusions);
}
private static Set<String> computeInputFieldNames(
final TimestampSpec timestampSpec,
final DimensionsSpec dimensionsSpec,
final AggregatorFactory[] aggregators
)
{
final Set<String> fields = new HashSet<>();
fields.add(timestampSpec.getTimestampColumn());
fields.addAll(dimensionsSpec.getDimensionNames());
Arrays.stream(aggregators)
.flatMap(aggregator -> aggregator.requiredFields().stream())
.forEach(fields::add);
return fields;
}
/**
* Computes the set of field names that are specified by the provided dimensions and aggregator lists.
*
* If either list is null, it is ignored.
*
* @throws IllegalArgumentException if there are duplicate field names, or if any dimension or aggregator
* has a null name
*/
private static Set<String> computeAndValidateOutputFieldNames(
@Nullable final DimensionsSpec dimensionsSpec,
@Nullable final AggregatorFactory[] aggregators
)
{
// Field name -> where it was seen
final Map<String, Multiset<String>> fields = new TreeMap<>();
fields.computeIfAbsent(ColumnHolder.TIME_COLUMN_NAME, k -> TreeMultiset.create()).add(
StringUtils.format(
"primary timestamp (%s cannot appear as a dimension or metric)",
ColumnHolder.TIME_COLUMN_NAME
)
);
if (dimensionsSpec != null) {
for (int i = 0; i < dimensionsSpec.getDimensions().size(); i++) {
final String field = dimensionsSpec.getDimensions().get(i).getName();
if (Strings.isNullOrEmpty(field)) {
throw new IAE("Encountered dimension with null or empty name at position %d", i);
}
fields.computeIfAbsent(field, k -> TreeMultiset.create()).add("dimensions list");
}
}
if (aggregators != null) {
for (int i = 0; i < aggregators.length; i++) {
final String field = aggregators[i].getName();
if (Strings.isNullOrEmpty(field)) {
throw new IAE("Encountered metric with null or empty name at position %d", i);
}
fields.computeIfAbsent(field, k -> TreeMultiset.create()).add("metricsSpec list");
}
}
final List<String> errors = new ArrayList<>();
for (Map.Entry<String, Multiset<String>> fieldEntry : fields.entrySet()) {
if (fieldEntry.getValue().entrySet().stream().mapToInt(Multiset.Entry::getCount).sum() > 1) {
errors.add(
StringUtils.format(
"[%s] seen in %s",
fieldEntry.getKey(),
fieldEntry.getValue().entrySet().stream().map(
entry ->
StringUtils.format(
"%s%s",
entry.getElement(),
entry.getCount() == 1 ? "" : StringUtils.format(
" (%d occurrences)",
entry.getCount()
)
)
).collect(Collectors.joining(", "))
)
);
}
}
if (errors.isEmpty()) {
return fields.keySet();
} else {
throw new IAE("Cannot specify a column more than once: %s", String.join("; ", errors));
}
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@Nullable
@JsonProperty("timestampSpec")
private TimestampSpec getGivenTimestampSpec()
{
return timestampSpec;
}
public TimestampSpec getTimestampSpec()
{
if (timestampSpec == null) {
timestampSpec = Preconditions.checkNotNull(getParser(), "inputRowParser").getParseSpec().getTimestampSpec();
}
return timestampSpec;
}
@Nullable
@JsonProperty("dimensionsSpec")
private DimensionsSpec getGivenDimensionsSpec()
{
return dimensionsSpec;
}
public DimensionsSpec getDimensionsSpec()
{
if (dimensionsSpec == null) {
dimensionsSpec = computeDimensionsSpec(
getTimestampSpec(),
Preconditions.checkNotNull(getParser(), "inputRowParser").getParseSpec().getDimensionsSpec(),
aggregators
);
}
return dimensionsSpec;
}
@JsonProperty("metricsSpec")
public AggregatorFactory[] getAggregators()
{
return aggregators;
}
@JsonProperty
public GranularitySpec getGranularitySpec()
{
return granularitySpec;
}
@JsonProperty
public TransformSpec getTransformSpec()
{
return transformSpec;
}
@Deprecated
@JsonProperty("parser")
@Nullable
@JsonInclude(Include.NON_NULL)
public Map<String, Object> getParserMap()
{
return parserMap;
}
@Nullable
public InputRowParser getParser()
{
if (inputRowParser == null) {
if (parserMap == null) {
return null;
}
//noinspection unchecked
inputRowParser = transformSpec.decorate(objectMapper.convertValue(this.parserMap, InputRowParser.class));
ParseSpec parseSpec = inputRowParser.getParseSpec();
parseSpec = parseSpec.withDimensionsSpec(
computeDimensionsSpec(parseSpec.getTimestampSpec(), parseSpec.getDimensionsSpec(), aggregators)
);
if (timestampSpec != null) {
parseSpec = parseSpec.withTimestampSpec(timestampSpec);
}
if (dimensionsSpec != null) {
parseSpec = parseSpec.withDimensionsSpec(dimensionsSpec);
}
inputRowParser = inputRowParser.withParseSpec(parseSpec);
}
return inputRowParser;
}
public DataSchema withGranularitySpec(GranularitySpec granularitySpec)
{
return new DataSchema(
dataSource,
timestampSpec,
dimensionsSpec,
aggregators,
granularitySpec,
transformSpec,
parserMap,
objectMapper
);
}
public DataSchema withTransformSpec(TransformSpec transformSpec)
{
return new DataSchema(
dataSource,
timestampSpec,
dimensionsSpec,
aggregators,
granularitySpec,
transformSpec,
parserMap,
objectMapper
);
}
public DataSchema withDimensionsSpec(DimensionsSpec dimensionsSpec)
{
return new DataSchema(
dataSource,
timestampSpec,
dimensionsSpec,
aggregators,
granularitySpec,
transformSpec,
parserMap,
objectMapper
);
}
@Override
public String toString()
{
return "DataSchema{" +
"dataSource='" + dataSource + '\'' +
", aggregators=" + Arrays.toString(aggregators) +
", granularitySpec=" + granularitySpec +
", transformSpec=" + transformSpec +
", parserMap=" + parserMap +
", timestampSpec=" + timestampSpec +
", dimensionsSpec=" + dimensionsSpec +
", inputRowParser=" + inputRowParser +
'}';
}
}