blob: a399233e70b19aba4f2fe8b112d184bae54fa216 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.context;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.SegmentDescriptor;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.BiFunction;
/**
* The context for storing and passing data between chains of {@link org.apache.druid.query.QueryRunner}s.
* The context is also transferred between Druid nodes with all the data it contains.
*/
@PublicApi
public abstract class ResponseContext
{
/**
* The base interface of a response context key.
* Should be implemented by every context key.
*/
public interface BaseKey
{
@JsonValue
String getName();
/**
* Merge function associated with a key: Object (Object oldValue, Object newValue)
*/
BiFunction<Object, Object, Object> getMergeFunction();
}
/**
* Keys associated with objects in the context.
* <p>
* If it's necessary to have some new keys in the context then they might be listed in a separate enum:
* <pre>{@code
* public enum ExtensionResponseContextKey implements BaseKey
* {
* EXTENSION_KEY_1("extension_key_1"), EXTENSION_KEY_2("extension_key_2");
*
* static {
* for (BaseKey key : values()) ResponseContext.Key.registerKey(key);
* }
*
* private final String name;
* private final BiFunction<Object, Object, Object> mergeFunction;
*
* ExtensionResponseContextKey(String name)
* {
* this.name = name;
* this.mergeFunction = (oldValue, newValue) -> newValue;
* }
*
* @Override public String getName() { return name; }
*
* @Override public BiFunction<Object, Object, Object> getMergeFunction() { return mergeFunction; }
* }
* }</pre>
* Make sure all extension enum values added with {@link Key#registerKey} method.
*/
public enum Key implements BaseKey
{
/**
* Lists intervals for which NO segment is present.
*/
UNCOVERED_INTERVALS(
"uncoveredIntervals",
(oldValue, newValue) -> {
final ArrayList<Interval> result = new ArrayList<Interval>((List) oldValue);
result.addAll((List) newValue);
return result;
}
),
/**
* Indicates if the number of uncovered intervals exceeded the limit (true/false).
*/
UNCOVERED_INTERVALS_OVERFLOWED(
"uncoveredIntervalsOverflowed",
(oldValue, newValue) -> (boolean) oldValue || (boolean) newValue
),
/**
* Lists missing segments.
*/
MISSING_SEGMENTS(
"missingSegments",
(oldValue, newValue) -> {
final ArrayList<SegmentDescriptor> result = new ArrayList<SegmentDescriptor>((List) oldValue);
result.addAll((List) newValue);
return result;
}
),
/**
* Entity tag. A part of HTTP cache validation mechanism.
* Is being removed from the context before sending and used as a separate HTTP header.
*/
ETAG("ETag"),
/**
* Query fail time (current time + timeout).
* It is not updated continuously as {@link Key#TIMEOUT_AT}.
*/
QUERY_FAIL_DEADLINE_MILLIS("queryFailTime"),
/**
* Query total bytes gathered.
*/
QUERY_TOTAL_BYTES_GATHERED("queryTotalBytesGathered"),
/**
* This variable indicates when a running query should be expired,
* and is effective only when 'timeout' of queryContext has a positive value.
* Continuously updated by {@link org.apache.druid.query.scan.ScanQueryEngine}
* by reducing its value on the time of every scan iteration.
*/
TIMEOUT_AT("timeoutAt"),
/**
* The number of scanned rows.
* For backward compatibility the context key name still equals to "count".
*/
NUM_SCANNED_ROWS(
"count",
(oldValue, newValue) -> ((Number) oldValue).longValue() + ((Number) newValue).longValue()
),
/**
* The total CPU time for threads related to Sequence processing of the query.
* Resulting value on a Broker is a sum of downstream values from historicals / realtime nodes.
* For additional information see {@link org.apache.druid.query.CPUTimeMetricQueryRunner}
*/
CPU_CONSUMED_NANOS(
"cpuConsumed",
(oldValue, newValue) -> ((Number) oldValue).longValue() + ((Number) newValue).longValue()
),
/**
* Indicates if a {@link ResponseContext} was truncated during serialization.
*/
TRUNCATED(
"truncated",
(oldValue, newValue) -> (boolean) oldValue || (boolean) newValue
);
/**
* TreeMap is used to have the natural ordering of its keys
*/
private static final Map<String, BaseKey> REGISTERED_KEYS = new TreeMap<>();
static {
for (BaseKey key : values()) {
registerKey(key);
}
}
/**
* Primary way of registering context keys.
* @throws IllegalArgumentException if the key has already been registered.
*/
public static synchronized void registerKey(BaseKey key)
{
Preconditions.checkArgument(
!REGISTERED_KEYS.containsKey(key.getName()),
"Key [%s] has already been registered as a context key",
key.getName()
);
REGISTERED_KEYS.put(key.getName(), key);
}
/**
* Returns a registered key associated with the name {@param name}.
* @throws IllegalStateException if a corresponding key has not been registered.
*/
public static BaseKey keyOf(String name)
{
Preconditions.checkState(
REGISTERED_KEYS.containsKey(name),
"Key [%s] has not yet been registered as a context key",
name
);
return REGISTERED_KEYS.get(name);
}
/**
* Returns all keys registered via {@link Key#registerKey}.
*/
public static Collection<BaseKey> getAllRegisteredKeys()
{
return Collections.unmodifiableCollection(REGISTERED_KEYS.values());
}
private final String name;
private final BiFunction<Object, Object, Object> mergeFunction;
Key(String name)
{
this.name = name;
this.mergeFunction = (oldValue, newValue) -> newValue;
}
Key(String name, BiFunction<Object, Object, Object> mergeFunction)
{
this.name = name;
this.mergeFunction = mergeFunction;
}
@Override
public String getName()
{
return name;
}
@Override
public BiFunction<Object, Object, Object> getMergeFunction()
{
return mergeFunction;
}
}
protected abstract Map<BaseKey, Object> getDelegate();
private static final Comparator<Map.Entry<String, JsonNode>> VALUE_LENGTH_REVERSED_COMPARATOR =
Comparator.comparing((Map.Entry<String, JsonNode> e) -> e.getValue().toString().length()).reversed();
/**
* Create an empty DefaultResponseContext instance
* @return empty DefaultResponseContext instance
*/
public static ResponseContext createEmpty()
{
return DefaultResponseContext.createEmpty();
}
/**
* Deserializes a string into {@link ResponseContext} using given {@link ObjectMapper}.
* @throws IllegalStateException if one of the deserialized map keys has not been registered.
*/
public static ResponseContext deserialize(String responseContext, ObjectMapper objectMapper) throws IOException
{
final Map<String, Object> keyNameToObjects = objectMapper.readValue(
responseContext,
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);
final ResponseContext context = ResponseContext.createEmpty();
keyNameToObjects.forEach((keyName, value) -> {
final BaseKey key = Key.keyOf(keyName);
context.add(key, value);
});
return context;
}
/**
* Associates the specified object with the specified key.
* @throws IllegalStateException if the key has not been registered.
*/
public Object put(BaseKey key, Object value)
{
final BaseKey registeredKey = Key.keyOf(key.getName());
return getDelegate().put(registeredKey, value);
}
public Object get(BaseKey key)
{
return getDelegate().get(key);
}
public Object remove(BaseKey key)
{
return getDelegate().remove(key);
}
/**
* Adds (merges) a new value associated with a key to an old value.
* See merge function of a context key for a specific implementation.
* @throws IllegalStateException if the key has not been registered.
*/
public Object add(BaseKey key, Object value)
{
final BaseKey registeredKey = Key.keyOf(key.getName());
return getDelegate().merge(registeredKey, value, key.getMergeFunction());
}
/**
* Merges a response context into the current.
* @throws IllegalStateException If a key of the {@code responseContext} has not been registered.
*/
public void merge(ResponseContext responseContext)
{
responseContext.getDelegate().forEach((key, newValue) -> {
if (newValue != null) {
add(key, newValue);
}
});
}
/**
* Serializes the context given that the resulting string length is less than the provided limit.
* This method removes some elements from context collections if it's needed to satisfy the limit.
* There is no explicit priorities of keys which values are being truncated because for now there are only
* two potential limit breaking keys ({@link Key#UNCOVERED_INTERVALS}
* and {@link Key#MISSING_SEGMENTS}) and their values are arrays.
* Thus current implementation considers these arrays as equal prioritized and starts removing elements from
* the array which serialized value length is the biggest.
* The resulting string might be correctly deserialized to {@link ResponseContext}.
*/
public SerializationResult serializeWith(ObjectMapper objectMapper, int maxCharsNumber) throws JsonProcessingException
{
final String fullSerializedString = objectMapper.writeValueAsString(getDelegate());
if (fullSerializedString.length() <= maxCharsNumber) {
return new SerializationResult(fullSerializedString, fullSerializedString);
} else {
// Indicates that the context is truncated during serialization.
add(Key.TRUNCATED, true);
final ObjectNode contextJsonNode = objectMapper.valueToTree(getDelegate());
final ArrayList<Map.Entry<String, JsonNode>> sortedNodesByLength = Lists.newArrayList(contextJsonNode.fields());
sortedNodesByLength.sort(VALUE_LENGTH_REVERSED_COMPARATOR);
int needToRemoveCharsNumber = fullSerializedString.length() - maxCharsNumber;
// The complexity of this block is O(n*m*log(m)) where n - context size, m - context's array size
for (Map.Entry<String, JsonNode> e : sortedNodesByLength) {
final String fieldName = e.getKey();
final JsonNode node = e.getValue();
if (node.isArray()) {
if (needToRemoveCharsNumber >= node.toString().length()) {
// We need to remove more chars than the field's length so removing it completely
contextJsonNode.remove(fieldName);
// Since the field is completely removed (name + value) we need to do a recalculation
needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
} else {
final ArrayNode arrayNode = (ArrayNode) node;
needToRemoveCharsNumber -= removeNodeElementsToSatisfyCharsLimit(arrayNode, needToRemoveCharsNumber);
if (arrayNode.size() == 0) {
// The field is empty, removing it because an empty array field may be misleading
// for the recipients of the truncated response context.
contextJsonNode.remove(fieldName);
// Since the field is completely removed (name + value) we need to do a recalculation
needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
}
} // node is not an array
} else {
// A context should not contain nulls so we completely remove the field.
contextJsonNode.remove(fieldName);
// Since the field is completely removed (name + value) we need to do a recalculation
needToRemoveCharsNumber = contextJsonNode.toString().length() - maxCharsNumber;
}
if (needToRemoveCharsNumber <= 0) {
break;
}
}
return new SerializationResult(contextJsonNode.toString(), fullSerializedString);
}
}
/**
* Removes {@code node}'s elements which total length of serialized values is greater or equal to the passed limit.
* If it is impossible to satisfy the limit the method removes all {@code node}'s elements.
* On every iteration it removes exactly half of the remained elements to reduce the overall complexity.
* @param node {@link ArrayNode} which elements are being removed.
* @param needToRemoveCharsNumber the number of chars need to be removed.
* @return the number of removed chars.
*/
private static int removeNodeElementsToSatisfyCharsLimit(ArrayNode node, int needToRemoveCharsNumber)
{
int removedCharsNumber = 0;
while (node.size() > 0 && needToRemoveCharsNumber > removedCharsNumber) {
final int lengthBeforeRemove = node.toString().length();
// Reducing complexity by removing half of array's elements
final int removeUntil = node.size() / 2;
for (int removeAt = node.size() - 1; removeAt >= removeUntil; removeAt--) {
node.remove(removeAt);
}
final int lengthAfterRemove = node.toString().length();
removedCharsNumber += lengthBeforeRemove - lengthAfterRemove;
}
return removedCharsNumber;
}
/**
* Serialization result of {@link ResponseContext}.
* Response context might be serialized using max legth limit, in this case the context might be reduced
* by removing max-length fields one by one unless serialization result length is less than the limit.
* This structure has a reduced serialization result along with full result and boolean property
* indicating if some fields were removed from the context.
*/
public static class SerializationResult
{
private final String truncatedResult;
private final String fullResult;
SerializationResult(String truncatedResult, String fullResult)
{
this.truncatedResult = truncatedResult;
this.fullResult = fullResult;
}
public String getTruncatedResult()
{
return truncatedResult;
}
public String getFullResult()
{
return fullResult;
}
public Boolean isReduced()
{
return !truncatedResult.equals(fullResult);
}
}
}