blob: a399233e70b19aba4f2fe8b112d184bae54fa216 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.druid.query.context;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.BiFunction;
* The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s.
* The context is also transferred between Druid nodes with all the data it contains.
public abstract class ResponseContext
* The base interface of a response context key.
* Should be implemented by every context key.
public interface BaseKey
String getName();
* Merge function associated with a key: Object (Object oldValue, Object newValue)
BiFunction<Object, Object, Object> getMergeFunction();
* Keys associated with objects in the context.
* <p>
* If it's necessary to have some new keys in the context then they might be listed in a separate enum:
* <pre>{@code
* public enum ExtensionResponseContextKey implements BaseKey
* {
* EXTENSION_KEY_1("extension_key_1"), EXTENSION_KEY_2("extension_key_2");
* static {
* for (BaseKey key : values()) ResponseContext.Key.registerKey(key);
* }
* private final String name;
* private final BiFunction<Object, Object, Object> mergeFunction;
* ExtensionResponseContextKey(String name)
* {
* = name;
* this.mergeFunction = (oldValue, newValue) -> newValue;
* }
* @Override public String getName() { return name; }
* @Override public BiFunction<Object, Object, Object> getMergeFunction() { return mergeFunction; }
* }
* }</pre>
* Make sure all extension enum values added with {@link Key#registerKey} method.
public enum Key implements BaseKey
* Lists intervals for which NO segment is present.
(oldValue, newValue) -> {
final ArrayList<Interval> result = new ArrayList<Interval>((List) oldValue);
result.addAll((List) newValue);
return result;
* Indicates if the number of uncovered intervals exceeded the limit (true/false).
(oldValue, newValue) -> (boolean) oldValue || (boolean) newValue
* Lists missing segments.
(oldValue, newValue) -> {
final ArrayList<SegmentDescriptor> result = new ArrayList<SegmentDescriptor>((List) oldValue);
result.addAll((List) newValue);
return result;
* Entity tag. A part of HTTP cache validation mechanism.
* Is being removed from the context before sending and used as a separate HTTP header.
* Query fail time (current time + timeout).
* It is not updated continuously as {@link Key#TIMEOUT_AT}.
* Query total bytes gathered.
* This variable indicates when a running query should be expired,
* and is effective only when 'timeout' of queryContext has a positive value.
* Continuously updated by {@link org.apache.druid.query.scan.ScanQueryEngine}
* by reducing its value on the time of every scan iteration.
* The number of scanned rows.
* For backward compatibility the context key name still equals to "count".
(oldValue, newValue) -> ((Number) oldValue).longValue() + ((Number) newValue).longValue()
* The total CPU time for threads related to Sequence processing of the query.
* Resulting value on a Broker is a sum of downstream values from historicals / realtime nodes.
* For additional information see {@link org.apache.druid.query.CPUTimeMetricQueryRunner}
(oldValue, newValue) -> ((Number) oldValue).longValue() + ((Number) newValue).longValue()
* Indicates if a {@link ResponseContext} was truncated during serialization.
(oldValue, newValue) -> (boolean) oldValue || (boolean) newValue
* TreeMap is used to have the natural ordering of its keys
private static final Map<String, BaseKey> REGISTERED_KEYS = new TreeMap<>();
static {
for (BaseKey key : values()) {
* Primary way of registering context keys.
* @throws IllegalArgumentException if the key has already been registered.
public static synchronized void registerKey(BaseKey key)
"Key [%s] has already been registered as a context key",
REGISTERED_KEYS.put(key.getName(), key);
* Returns a registered key associated with the name {@param name}.
* @throws IllegalStateException if a corresponding key has not been registered.
public static BaseKey keyOf(String name)
"Key [%s] has not yet been registered as a context key",
return REGISTERED_KEYS.get(name);
* Returns all keys registered via {@link Key#registerKey}.
public static Collection<BaseKey> getAllRegisteredKeys()
return Collections.unmodifiableCollection(REGISTERED_KEYS.values());
private final String name;
private final BiFunction<Object, Object, Object> mergeFunction;
Key(String name)
{ = name;
this.mergeFunction = (oldValue, newValue) -> newValue;
Key(String name, BiFunction<Object, Object, Object> mergeFunction)
{ = name;
this.mergeFunction = mergeFunction;
public String getName()
return name;
public BiFunction<Object, Object, Object> getMergeFunction()
return mergeFunction;
protected abstract Map<BaseKey, Object> getDelegate();
private static final Comparator<Map.Entry<String, JsonNode>> VALUE_LENGTH_REVERSED_COMPARATOR =
Comparator.comparing((Map.Entry<String, JsonNode> e) -> e.getValue().toString().length()).reversed();
* Create an empty DefaultResponseContext instance
* @return empty DefaultResponseContext instance
public static ResponseContext createEmpty()
return DefaultResponseContext.createEmpty();
* Deserializes a string into {@link ResponseContext} using given {@link ObjectMapper}.
* @throws IllegalStateException if one of the deserialized map keys has not been registered.
public static ResponseContext deserialize(String responseContext, ObjectMapper objectMapper) throws IOException
final Map<String, Object> keyNameToObjects = objectMapper.readValue(
final ResponseContext context = ResponseContext.createEmpty();
keyNameToObjects.forEach((keyName, value) -> {
final BaseKey key = Key.keyOf(keyName);
context.add(key, value);
return context;
* Associates the specified object with the specified key.
* @throws IllegalStateException if the key has not been registered.
public Object put(BaseKey key, Object value)
final BaseKey registeredKey = Key.keyOf(key.getName());
return getDelegate().put(registeredKey, value);
public Object get(BaseKey key)
return getDelegate().get(key);
public Object remove(BaseKey key)
return getDelegate().remove(key);
* Adds (merges) a new value associated with a key to an old value.
* See merge function of a context key for a specific implementation.
* @throws IllegalStateException if the key has not been registered.
public Object add(BaseKey key, Object value)
final BaseKey registeredKey = Key.keyOf(key.getName());
return getDelegate().merge(registeredKey, value, key.getMergeFunction());
* Merges a response context into the current.
* @throws IllegalStateException If a key of the {@code responseContext} has not been registered.
public void merge(ResponseContext responseContext)
responseContext.getDelegate().forEach((key, newValue) -> {
if (newValue != null) {
add(key, newValue);
* Serializes the context given that the resulting string length is less than the provided limit.
* This method removes some elements from context collections if it's needed to satisfy the limit.
* There is no explicit priorities of keys which values are being truncated because for now there are only
* two potential limit breaking keys ({@link Key#UNCOVERED_INTERVALS}
* and {@link Key#MISSING_SEGMENTS}) and their values are arrays.
* Thus current implementation considers these arrays as equal prioritized and starts removing elements from
* the array which serialized value length is the biggest.
* The resulting string might be correctly deserialized to {@link ResponseContext}.
public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber) throws JsonProcessingException
final String fullSerializedString = objectMapper.writeValueAsString(getDelegate());
if (fullSerializedString.length() <= maxCharsNumber) {
return new SerializationResult(fullSerializedString, fullSerializedString);
} else {
// Indicates that the context is truncated during serialization.
add(Key.TRUNCATED, true);
final ObjectNode contextJsonNode = objectMapper.valueToTree(getDelegate());
final ArrayList<Map.Entry<String, JsonNode>> sortedNodesByLength = Lists.newArrayList(contextJsonNode.fields());
int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber;
// The complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size
for (Map.Entry<String, JsonNode> e : sortedNodesByLength) {
final String fieldName = e.getKey();
final JsonNode node = e.getValue();
if (node.isArray()) {
if (needToRemoveCharsNumber >= node.toString().length()) {
// We need to remove more chars than the field's length so removing it completely
// Since the field is completely removed (name + value) we need to do a recalculation
needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
} else {
final ArrayNode arrayNode = (ArrayNode) node;
needToRemoveCharsNumber -= removeNodeElementsToSatisfyCharsLimit(arrayNode, needToRemoveCharsNumber);
if (arrayNode.size() == 0) {
// The field is empty, removing it because an empty array field may be misleading
// for the recipients of the truncated response context.
// Since the field is completely removed (name + value) we need to do a recalculation
needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
} // node is not an array
} else {
// A context should not contain nulls so we completely remove the field.
// Since the field is completely removed (name + value) we need to do a recalculation
needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
if (needToRemoveCharsNumber <= 0) {
return new SerializationResult(contextJsonNode.toString(), fullSerializedString);
* Removes {@code node}'s elements which total length of serialized values is greater or equal to the passed limit.
* If it is impossible to satisfy the limit the method removes all {@code node}'s elements.
* On every iteration it removes exactly half of the remained elements to reduce the overall complexity.
* @param node {@link ArrayNode} which elements are being removed.
* @param needToRemoveCharsNumber the number of chars need to be removed.
* @return the number of removed chars.
private static int removeNodeElementsToSatisfyCharsLimit(ArrayNode node, int needToRemoveCharsNumber)
int removedCharsNumber = 0;
while (node.size() > 0 && needToRemoveCharsNumber > removedCharsNumber) {
final int lengthBeforeRemove = node.toString().length();
// Reducing complexity by removing half of array's elements
final int removeUntil = node.size() / 2;
for (int removeAt = node.size() - 1; removeAt >= removeUntil; removeAt--) {
final int lengthAfterRemove = node.toString().length();
removedCharsNumber += lengthBeforeRemove - lengthAfterRemove;
return removedCharsNumber;
* Serialization result of {@link ResponseContext}.
* Response context might be serialized using max legth limit, in this case the context might be reduced
* by removing max-length fields one by one unless serialization result length is less than the limit.
* This structure has a reduced serialization result along with full result and boolean property
* indicating if some fields were removed from the context.
public static class SerializationResult
private final String truncatedResult;
private final String fullResult;
SerializationResult(String truncatedResult, String fullResult)
this.truncatedResult = truncatedResult;
this.fullResult = fullResult;
public String getTruncatedResult()
return truncatedResult;
public String getFullResult()
return fullResult;
public Boolean isReduced()
return !truncatedResult.equals(fullResult);