| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.query; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.google.common.base.Function; |
| import org.apache.druid.guice.annotations.ExtensionPoint; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.query.aggregation.AggregatorFactory; |
| |
| import java.util.Iterator; |
| import java.util.List; |
| |
| /** |
| * Handles caching-related tasks for a particular query type. |
| * |
| * Generally returned by the toolchest method {@link QueryToolChest#getCacheStrategy}. |
| */ |
| @ExtensionPoint |
| public interface CacheStrategy<T, CacheType, QueryType extends Query<T>> |
| { |
| /** |
| * Returns whether the given query is cacheable or not. |
| * The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node. |
| * |
| * @param query the query to be cached |
| * @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)} will be |
| * called on the cached by-segment results |
| * |
| * @return true if the query is cacheable, otherwise false. |
| */ |
| boolean isCacheable(QueryType query, boolean willMergeRunners); |
| |
| /** |
| * Computes the per-segment cache key for the given query. Because this is a per-segment cache key, it should only |
| * include parts of the query that affect the results for a specific segment (i.e., the results returned from |
| * {@link QueryRunnerFactory#createRunner}). |
| * |
| * @param query the query to be cached |
| * |
| * @return the per-segment cache key |
| */ |
| byte[] computeCacheKey(QueryType query); |
| |
| /** |
| * Computes the result-level cache key for the given query. The result-level cache will tack on datasource and |
| * interval details, so this key does not need to include datasource and interval. But it should include anything |
| * else that might affect the results of the query. |
| * |
| * Some implementations will need to include query parameters that are not used in {@link #computeCacheKey} for the |
| * same query. |
| * |
| * @param query the query to be cached |
| * |
| * @return the result-level cache key |
| */ |
| byte[] computeResultLevelCacheKey(QueryType query); |
| |
| /** |
| * Returns the class type of what is used in the cache |
| * |
| * @return Returns the class type of what is used in the cache |
| */ |
| TypeReference<CacheType> getCacheObjectClazz(); |
| |
| /** |
| * Returns a function that converts from the QueryType's result type to something cacheable. |
| * <p> |
| * The resulting function must be thread-safe. |
| * |
| * @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching |
| * |
| * @return a thread-safe function that converts the QueryType's result type into something cacheable |
| */ |
| Function<T, CacheType> prepareForCache(boolean isResultLevelCache); |
| |
| /** |
| * A function that does the inverse of the operation that the function prepareForCache returns |
| * |
| * @param isResultLevelCache indicates whether the function is invoked for result-level caching or segment-level caching |
| * |
| * @return A function that does the inverse of the operation that the function prepareForCache returns |
| */ |
| Function<CacheType, T> pullFromCache(boolean isResultLevelCache); |
| |
| |
| default Function<T, CacheType> prepareForSegmentLevelCache() |
| { |
| return prepareForCache(false); |
| } |
| |
| default Function<CacheType, T> pullFromSegmentLevelCache() |
| { |
| return pullFromCache(false); |
| } |
| |
| /** |
| * Helper function used by TopN, GroupBy, Timeseries queries in {@link #pullFromCache(boolean)}. |
| * When using the result level cache, the agg values seen here are |
| * finalized values generated by AggregatorFactory.finalizeComputation(). |
| * These finalized values are deserialized from the cache as generic Objects, which will |
| * later be reserialized and returned to the user without further modification. |
| * Because the agg values are deserialized as generic Objects, the values are subject to the same |
| * type consistency issues handled by DimensionHandlerUtils.convertObjectToType() in the pullFromCache implementations |
| * for dimension values (e.g., a Float would become Double). |
| */ |
| static void fetchAggregatorsFromCache( |
| List<AggregatorFactory> aggregators, |
| Iterator<Object> resultIter, |
| boolean isResultLevelCache, |
| AddToResultFunction addToResultFunction |
| ) |
| { |
| for (int i = 0; i < aggregators.size(); i++) { |
| final AggregatorFactory aggregator = aggregators.get(i); |
| if (!resultIter.hasNext()) { |
| throw new ISE("Ran out of objects while reading aggregators from cache!"); |
| } |
| |
| if (isResultLevelCache) { |
| addToResultFunction.apply(aggregator.getName(), i, resultIter.next()); |
| } else { |
| addToResultFunction.apply(aggregator.getName(), i, aggregator.deserialize(resultIter.next())); |
| } |
| } |
| } |
| |
| interface AddToResultFunction |
| { |
| void apply(String aggregatorName, int aggregatorIndex, Object object); |
| } |
| } |