blob: 509738ee6803aa8407fd5290d73b99ddc5b8aff1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.base.Function;
import org.apache.druid.guice.annotations.ExtensionPoint;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.MetricManipulationFn;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.LogicalSegment;
import javax.annotation.Nullable;
import java.util.Comparator;
import java.util.List;
import java.util.function.BinaryOperator;
/**
* The broker-side (also used by server in some cases) API for a specific Query type.
*/
@ExtensionPoint
public abstract class QueryToolChest<ResultType, QueryType extends Query<ResultType>>
{
private final JavaType baseResultType;
private final JavaType bySegmentResultType;
protected QueryToolChest()
{
final TypeFactory typeFactory = TypeFactory.defaultInstance();
TypeReference<ResultType> resultTypeReference = getResultTypeReference();
// resultTypeReference is null in MaterializedViewQueryQueryToolChest.
// See https://github.com/apache/druid/issues/6977
if (resultTypeReference != null) {
baseResultType = typeFactory.constructType(resultTypeReference);
bySegmentResultType = typeFactory.constructParametrizedType(
Result.class,
Result.class,
typeFactory.constructParametrizedType(
BySegmentResultValueClass.class,
BySegmentResultValueClass.class,
baseResultType
)
);
} else {
baseResultType = null;
bySegmentResultType = null;
}
}
public final JavaType getBaseResultType()
{
return baseResultType;
}
public final JavaType getBySegmentResultType()
{
return bySegmentResultType;
}
/**
* Perform any per-query decoration of an {@link ObjectMapper} that enables it to read and write objects of the
* query's {@link ResultType}. It is used by QueryResource on the write side, and DirectDruidClient on the read side.
*
* For most queries, this is a no-op, but it can be useful for query types that support more than one result
* serialization format. Queries that implement this method must not modify the provided ObjectMapper, but instead
* must return a copy.
*/
public ObjectMapper decorateObjectMapper(final ObjectMapper objectMapper, final QueryType query)
{
return objectMapper;
}
/**
* This method wraps a QueryRunner. The input QueryRunner, by contract, will provide a series of
* ResultType objects in time order (ascending or descending). This method should return a new QueryRunner that
* potentially merges the stream of ordered ResultType objects.
*
* A default implementation constructs a {@link ResultMergeQueryRunner} which creates a
* {@link org.apache.druid.common.guava.CombiningSequence} using the supplied {@link QueryRunner} with
* {@link QueryToolChest#createResultComparator(Query)} and {@link QueryToolChest#createMergeFn(Query)}} supplied by this
* toolchest.
*
* @param runner A QueryRunner that provides a series of ResultType objects in time order (ascending or descending)
*
* @return a QueryRunner that potentially merges the stream of ordered ResultType objects
*/
public QueryRunner<ResultType> mergeResults(QueryRunner<ResultType> runner)
{
return new ResultMergeQueryRunner<>(runner, this::createResultComparator, this::createMergeFn);
}
/**
* Creates a merge function that is used to merge intermediate aggregates from historicals in broker. This merge
* function is used in the default {@link ResultMergeQueryRunner} provided by
* {@link QueryToolChest#mergeResults(QueryRunner)} and also used in
* {@link org.apache.druid.java.util.common.guava.ParallelMergeCombiningSequence} by 'CachingClusteredClient' if it
* does not return null.
*
* Returning null from this function means that a query does not support result merging, at
* least via the mechanisms that utilize this function.
*/
@Nullable
public BinaryOperator<ResultType> createMergeFn(Query<ResultType> query)
{
return null;
}
/**
* Creates an ordering comparator that is used to order results. This comparator is used in the default
* {@link ResultMergeQueryRunner} provided by {@link QueryToolChest#mergeResults(QueryRunner)}
*/
public Comparator<ResultType> createResultComparator(Query<ResultType> query)
{
throw new UOE("%s doesn't provide a result comparator", query.getClass().getName());
}
/**
* Creates a {@link QueryMetrics} object that is used to generate metrics for this specific query type. This exists
* to allow for query-specific dimensions and metrics. That is, the ToolChest is expected to set some
* meaningful dimensions for metrics given this query type. Examples might be the topN threshold for
* a TopN query or the number of dimensions included for a groupBy query.
*
* <p>QueryToolChests for query types in core (druid-processing) and public extensions (belonging to the Druid source
* tree) should use delegate this method to {@link GenericQueryMetricsFactory#makeMetrics(Query)} on an injected
* instance of {@link GenericQueryMetricsFactory}, as long as they don't need to emit custom dimensions and/or
* metrics.
*
* <p>If some custom dimensions and/or metrics should be emitted for a query type, a plan described in
* "Making subinterfaces of QueryMetrics" section in {@link QueryMetrics}'s class-level Javadocs should be followed.
*
* <p>One way or another, this method should ensure that {@link QueryMetrics#query(Query)} is called with the given
* query passed on the created QueryMetrics object before returning.
*
* @param query The query that is being processed
*
* @return A QueryMetrics that can be used to make metrics for the provided query
*/
public abstract QueryMetrics<? super QueryType> makeMetrics(QueryType query);
/**
* Creates a Function that can take in a ResultType and return a new ResultType having applied
* the MetricManipulatorFn to each of the metrics.
* <p>
* This exists because the QueryToolChest is the only thing that understands the internal serialization
* format of ResultType, so it's primary responsibility is to "decompose" that structure and apply the
* given function to all metrics.
* <p>
* This function is called very early in the processing pipeline on the Broker.
*
* @param query The Query that is currently being processed
* @param fn The function that should be applied to all metrics in the results
*
* @return A function that will apply the provided fn to all metrics in the input ResultType object
*/
public abstract Function<ResultType, ResultType> makePreComputeManipulatorFn(
QueryType query,
MetricManipulationFn fn
);
/**
* Generally speaking this is the exact same thing as makePreComputeManipulatorFn. It is leveraged in
* order to compute PostAggregators on results after they have been completely merged together, which
* should actually be done in the mergeResults() call instead of here.
* <p>
* This should never actually be overridden and it should be removed as quickly as possible.
*
* @param query The Query that is currently being processed
* @param fn The function that should be applied to all metrics in the results
*
* @return A function that will apply the provided fn to all metrics in the input ResultType object
*/
public Function<ResultType, ResultType> makePostComputeManipulatorFn(QueryType query, MetricManipulationFn fn)
{
return makePreComputeManipulatorFn(query, fn);
}
/**
* Returns a TypeReference object that is just passed through to Jackson in order to deserialize
* the results of this type of query.
*
* @return A TypeReference to indicate to Jackson what type of data will exist for this query
*/
public abstract TypeReference<ResultType> getResultTypeReference();
/**
* Returns a CacheStrategy to be used to load data into the cache and remove it from the cache.
* <p>
* This is optional. If it returns null, caching is effectively disabled for the query.
*
* @param query The query whose results might be cached
* @param <T> The type of object that will be stored in the cache
*
* @return A CacheStrategy that can be used to populate and read from the Cache
*/
@Nullable
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query)
{
return null;
}
/**
* Wraps a QueryRunner. The input QueryRunner is the QueryRunner as it exists *before* being passed to
* mergeResults().
* <p>
* In fact, the return value of this method is always passed to mergeResults, so it is equivalent to
* just implement this functionality as extra decoration on the QueryRunner during mergeResults().
* <p>
* In the interests of potentially simplifying these interfaces, the recommendation is to actually not
* override this method and instead apply anything that might be needed here in the mergeResults() call.
*
* @param runner The runner to be wrapped
*
* @return The wrapped runner
*/
public QueryRunner<ResultType> preMergeQueryDecoration(QueryRunner<ResultType> runner)
{
return runner;
}
/**
* Wraps a QueryRunner. The input QueryRunner is the QueryRunner as it exists coming out of mergeResults()
* <p>
* In fact, the input value of this method is always the return value from mergeResults, so it is equivalent
* to just implement this functionality as extra decoration on the QueryRunner during mergeResults().
* <p>
* In the interests of potentially simplifying these interfaces, the recommendation is to actually not
* override this method and instead apply anything that might be needed here in the mergeResults() call.
*
* @param runner The runner to be wrapped
*
* @return The wrapped runner
*/
public QueryRunner<ResultType> postMergeQueryDecoration(QueryRunner<ResultType> runner)
{
return runner;
}
/**
* This method is called to allow the query to prune segments that it does not believe need to actually
* be queried. It can use whatever criteria it wants in order to do the pruning, it just needs to
* return the list of Segments it actually wants to see queried.
*
* @param query The query being processed
* @param segments The list of candidate segments to be queried
* @param <T> A Generic parameter because Java is cool
*
* @return The list of segments to actually query
*/
public <T extends LogicalSegment> List<T> filterSegments(QueryType query, List<T> segments)
{
return segments;
}
/**
* Returns whether this toolchest is able to handle the provided subquery.
*
* When this method returns true, the core query stack will pass subquery datasources over to the toolchest and will
* assume they are properly handled.
*
* When this method returns false, the core query stack will throw an error if subqueries are present. In the future,
* instead of throwing an error, the core query stack will handle the subqueries on its own.
*/
public boolean canPerformSubquery(final Query<?> subquery)
{
return false;
}
/**
* Returns a {@link RowSignature} for the arrays returned by {@link #resultsAsArrays}. The returned signature will
* be the same length as each array returned by {@link #resultsAsArrays}.
*
* @param query same query passed to {@link #resultsAsArrays}
*
* @return row signature
*
* @throws UnsupportedOperationException if this query type does not support returning results as arrays
*/
public RowSignature resultArraySignature(QueryType query)
{
throw new UOE("Query type '%s' does not support returning results as arrays", query.getType());
}
/**
* Converts a sequence of this query's ResultType into arrays. The array signature is given by
* {@link #resultArraySignature}. This functionality is useful because it allows higher-level processors to operate on
* the results of any query in a consistent way. This is useful for the SQL layer and for any algorithm that might
* operate on the results of an inner query.
*
* Not all query types support this method. They will throw {@link UnsupportedOperationException}, and they cannot
* be used by the SQL layer or by generic higher-level algorithms.
*
* Some query types return less information after translating their results into arrays, especially in situations
* where there is no clear way to translate fully rich results into flat arrays. For example, the scan query does not
* include the segmentId in its array-based results, because it could potentially conflict with a 'segmentId' field
* in the actual datasource being scanned.
*
* It is possible that there will be multiple arrays returned for a single result object. For example, in the topN
* query, each {@link org.apache.druid.query.topn.TopNResultValue} will generate a separate array for each of its
* {@code values}.
*
* By convention, the array form should include the __time column, if present, as a long (milliseconds since epoch).
*
* @param resultSequence results of the form returned by {@link #mergeResults}
*
* @return results in array form
*
* @throws UnsupportedOperationException if this query type does not support returning results as arrays
*/
public Sequence<Object[]> resultsAsArrays(QueryType query, Sequence<ResultType> resultSequence)
{
throw new UOE("Query type '%s' does not support returning results as arrays", query.getType());
}
}