blob: aff8399296b53056d935b0c26f828608562813e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.CacheUtil;
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.SequenceWrapper;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.server.QueryResource;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class ResultLevelCachingQueryRunner<T> implements QueryRunner<T>
{
private static final Logger log = new Logger(ResultLevelCachingQueryRunner.class);
private final QueryRunner baseRunner;
private ObjectMapper objectMapper;
private final Cache cache;
private final CacheConfig cacheConfig;
private final boolean useResultCache;
private final boolean populateResultCache;
private Query<T> query;
private final CacheStrategy<T, Object, Query<T>> strategy;
public ResultLevelCachingQueryRunner(
QueryRunner baseRunner,
QueryToolChest queryToolChest,
Query<T> query,
ObjectMapper objectMapper,
Cache cache,
CacheConfig cacheConfig
)
{
this.baseRunner = baseRunner;
this.objectMapper = objectMapper;
this.cache = cache;
this.cacheConfig = cacheConfig;
this.query = query;
this.strategy = queryToolChest.getCacheStrategy(query);
this.populateResultCache = CacheUtil.isPopulateResultCache(
query,
strategy,
cacheConfig,
CacheUtil.ServerType.BROKER
);
this.useResultCache = CacheUtil.isUseResultCache(query, strategy, cacheConfig, CacheUtil.ServerType.BROKER);
}
@Override
public Sequence<T> run(QueryPlus queryPlus, ResponseContext responseContext)
{
if (useResultCache || populateResultCache) {
final String cacheKeyStr = StringUtils.fromUtf8(strategy.computeResultLevelCacheKey(query));
final byte[] cachedResultSet = fetchResultsFromResultLevelCache(cacheKeyStr);
String existingResultSetId = extractEtagFromResults(cachedResultSet);
existingResultSetId = existingResultSetId == null ? "" : existingResultSetId;
query = query.withOverriddenContext(
ImmutableMap.of(QueryResource.HEADER_IF_NONE_MATCH, existingResultSetId));
Sequence<T> resultFromClient = baseRunner.run(
QueryPlus.wrap(query),
responseContext
);
String newResultSetId = (String) responseContext.get(ResponseContext.Key.ETAG);
if (useResultCache && newResultSetId != null && newResultSetId.equals(existingResultSetId)) {
log.debug("Return cached result set as there is no change in identifiers for query %s ", query.getId());
return deserializeResults(cachedResultSet, strategy, existingResultSetId);
} else {
@Nullable
ResultLevelCachePopulator resultLevelCachePopulator = createResultLevelCachePopulator(
cacheKeyStr,
newResultSetId
);
if (resultLevelCachePopulator == null) {
return resultFromClient;
}
final Function<T, Object> cacheFn = strategy.prepareForCache(true);
return Sequences.wrap(
Sequences.map(
resultFromClient,
new Function<T, T>()
{
@Override
public T apply(T input)
{
if (resultLevelCachePopulator.isShouldPopulate()) {
resultLevelCachePopulator.cacheResultEntry(input, cacheFn);
}
return input;
}
}
),
new SequenceWrapper()
{
@Override
public void after(boolean isDone, Throwable thrown)
{
Preconditions.checkNotNull(
resultLevelCachePopulator,
"ResultLevelCachePopulator cannot be null during cache population"
);
if (thrown != null) {
log.error(
thrown,
"Error while preparing for result level caching for query %s with error %s ",
query.getId(),
thrown.getMessage()
);
} else if (resultLevelCachePopulator.isShouldPopulate()) {
// The resultset identifier and its length is cached along with the resultset
resultLevelCachePopulator.populateResults();
log.debug("Cache population complete for query %s", query.getId());
}
resultLevelCachePopulator.stopPopulating();
}
}
);
}
} else {
return baseRunner.run(
queryPlus,
responseContext
);
}
}
@Nullable
private byte[] fetchResultsFromResultLevelCache(
final String queryCacheKey
)
{
if (useResultCache && queryCacheKey != null) {
return cache.get(CacheUtil.computeResultLevelCacheKey(queryCacheKey));
}
return null;
}
private String extractEtagFromResults(
final byte[] cachedResult
)
{
if (cachedResult == null) {
return null;
}
log.debug("Fetching result level cache identifier for query: %s", query.getId());
int etagLength = ByteBuffer.wrap(cachedResult, 0, Integer.BYTES).getInt();
return StringUtils.fromUtf8(Arrays.copyOfRange(cachedResult, Integer.BYTES, etagLength + Integer.BYTES));
}
private Sequence<T> deserializeResults(final byte[] cachedResult, CacheStrategy strategy, String resultSetId)
{
if (cachedResult == null) {
log.error("Cached result set is null");
}
final Function<Object, T> pullFromCacheFunction = strategy.pullFromCache(true);
final TypeReference<T> cacheObjectClazz = strategy.getCacheObjectClazz();
//Skip the resultsetID and its length bytes
Sequence<T> cachedSequence = Sequences.simple(() -> {
try {
int resultOffset = Integer.BYTES + resultSetId.length();
return objectMapper.readValues(
objectMapper.getFactory().createParser(
cachedResult,
resultOffset,
cachedResult.length - resultOffset
),
cacheObjectClazz
);
}
catch (IOException e) {
throw new RE(e, "Failed to retrieve results from cache for query ID [%s]", query.getId());
}
});
return Sequences.map(cachedSequence, pullFromCacheFunction);
}
private ResultLevelCachePopulator createResultLevelCachePopulator(
String cacheKeyStr,
String resultSetId
)
{
if (resultSetId != null && populateResultCache) {
ResultLevelCachePopulator resultLevelCachePopulator = new ResultLevelCachePopulator(
cache,
objectMapper,
CacheUtil.computeResultLevelCacheKey(cacheKeyStr),
cacheConfig,
true
);
try {
// Save the resultSetId and its length
resultLevelCachePopulator.cacheObjectStream.write(ByteBuffer.allocate(Integer.BYTES)
.putInt(resultSetId.length())
.array());
resultLevelCachePopulator.cacheObjectStream.write(StringUtils.toUtf8(resultSetId));
}
catch (IOException ioe) {
log.error(ioe, "Failed to write cached values for query %s", query.getId());
return null;
}
return resultLevelCachePopulator;
} else {
return null;
}
}
private class ResultLevelCachePopulator
{
private final Cache cache;
private final ObjectMapper mapper;
private final Cache.NamedKey key;
private final CacheConfig cacheConfig;
@Nullable
private ByteArrayOutputStream cacheObjectStream;
private ResultLevelCachePopulator(
Cache cache,
ObjectMapper mapper,
Cache.NamedKey key,
CacheConfig cacheConfig,
boolean shouldPopulate
)
{
this.cache = cache;
this.mapper = mapper;
this.key = key;
this.cacheConfig = cacheConfig;
this.cacheObjectStream = shouldPopulate ? new ByteArrayOutputStream() : null;
}
boolean isShouldPopulate()
{
return cacheObjectStream != null;
}
void stopPopulating()
{
cacheObjectStream = null;
}
private void cacheResultEntry(
T resultEntry,
Function<T, Object> cacheFn
)
{
Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream");
int cacheLimit = cacheConfig.getResultLevelCacheLimit();
try (JsonGenerator gen = mapper.getFactory().createGenerator(cacheObjectStream)) {
gen.writeObject(cacheFn.apply(resultEntry));
if (cacheLimit > 0 && cacheObjectStream.size() > cacheLimit) {
stopPopulating();
}
}
catch (IOException ex) {
log.error(ex, "Failed to retrieve entry to be cached. Result Level caching will not be performed!");
stopPopulating();
}
}
public void populateResults()
{
CacheUtil.populateResultCache(
cache,
key,
Preconditions.checkNotNull(cacheObjectStream, "cacheObjectStream").toByteArray()
);
}
}
}