blob: 067bdfff91324e9d26fd524df6485c5745951997 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.scan;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import com.google.common.collect.Ordering;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Queries;
import org.apache.druid.query.Query;
import org.apache.druid.query.filter.DimFilter;
import org.apache.druid.query.spec.QuerySegmentSpec;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
public class ScanQuery extends BaseQuery<ScanResultValue>
{
public enum ResultFormat
{
RESULT_FORMAT_LIST,
RESULT_FORMAT_COMPACTED_LIST,
RESULT_FORMAT_VALUE_VECTOR;
@JsonValue
@Override
public String toString()
{
switch (this) {
case RESULT_FORMAT_LIST:
return "list";
case RESULT_FORMAT_COMPACTED_LIST:
return "compactedList";
case RESULT_FORMAT_VALUE_VECTOR:
return "valueVector";
default:
return "";
}
}
@JsonCreator
public static ResultFormat fromString(String name)
{
switch (name) {
case "compactedList":
return RESULT_FORMAT_COMPACTED_LIST;
case "valueVector":
return RESULT_FORMAT_VALUE_VECTOR;
case "list":
return RESULT_FORMAT_LIST;
default:
throw new UOE("Scan query result format [%s] is not supported.", name);
}
}
}
public enum Order
{
ASCENDING,
DESCENDING,
NONE;
@JsonValue
@Override
public String toString()
{
return StringUtils.toLowerCase(this.name());
}
@JsonCreator
public static Order fromString(String name)
{
return valueOf(StringUtils.toUpperCase(name));
}
}
/**
* This context flag corresponds to whether the query is running on the "outermost" process (i.e. the process
* the query is sent to).
*/
public static final String CTX_KEY_OUTERMOST = "scanOutermost";
private final VirtualColumns virtualColumns;
private final ResultFormat resultFormat;
private final int batchSize;
private final long scanRowsOffset;
private final long scanRowsLimit;
private final DimFilter dimFilter;
private final List<String> columns;
private final Boolean legacy;
private final Order order;
private final Integer maxRowsQueuedForOrdering;
private final Integer maxSegmentPartitionsOrderedInMemory;
@JsonCreator
public ScanQuery(
@JsonProperty("dataSource") DataSource dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("virtualColumns") VirtualColumns virtualColumns,
@JsonProperty("resultFormat") ResultFormat resultFormat,
@JsonProperty("batchSize") int batchSize,
@JsonProperty("offset") long scanRowsOffset,
@JsonProperty("limit") long scanRowsLimit,
@JsonProperty("order") Order order,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("columns") List<String> columns,
@JsonProperty("legacy") Boolean legacy,
@JsonProperty("context") Map<String, Object> context
)
{
super(dataSource, querySegmentSpec, false, context);
this.virtualColumns = VirtualColumns.nullToEmpty(virtualColumns);
this.resultFormat = (resultFormat == null) ? ResultFormat.RESULT_FORMAT_LIST : resultFormat;
this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize;
Preconditions.checkArgument(
this.batchSize > 0,
"batchSize must be greater than 0"
);
this.scanRowsOffset = scanRowsOffset;
Preconditions.checkArgument(
this.scanRowsOffset >= 0,
"offset must be greater than or equal to 0"
);
this.scanRowsLimit = (scanRowsLimit == 0) ? Long.MAX_VALUE : scanRowsLimit;
Preconditions.checkArgument(
this.scanRowsLimit > 0,
"limit must be greater than 0"
);
this.dimFilter = dimFilter;
this.columns = columns;
this.legacy = legacy;
this.order = (order == null) ? Order.NONE : order;
if (this.order != Order.NONE) {
Preconditions.checkArgument(
columns == null || columns.size() == 0 || columns.contains(ColumnHolder.TIME_COLUMN_NAME),
"The __time column must be selected if the results are time-ordered."
);
}
this.maxRowsQueuedForOrdering = validateAndGetMaxRowsQueuedForOrdering();
this.maxSegmentPartitionsOrderedInMemory = validateAndGetMaxSegmentPartitionsOrderedInMemory();
}
private Integer validateAndGetMaxRowsQueuedForOrdering()
{
final Integer maxRowsQueuedForOrdering =
getContextValue(ScanQueryConfig.CTX_KEY_MAX_ROWS_QUEUED_FOR_ORDERING, null);
Preconditions.checkArgument(
maxRowsQueuedForOrdering == null || maxRowsQueuedForOrdering > 0,
"maxRowsQueuedForOrdering must be greater than 0"
);
return maxRowsQueuedForOrdering;
}
private Integer validateAndGetMaxSegmentPartitionsOrderedInMemory()
{
final Integer maxSegmentPartitionsOrderedInMemory =
getContextValue(ScanQueryConfig.CTX_KEY_MAX_SEGMENT_PARTITIONS_FOR_ORDERING, null);
Preconditions.checkArgument(
maxSegmentPartitionsOrderedInMemory == null || maxSegmentPartitionsOrderedInMemory > 0,
"maxRowsQueuedForOrdering must be greater than 0"
);
return maxSegmentPartitionsOrderedInMemory;
}
@JsonProperty
@Override
public VirtualColumns getVirtualColumns()
{
return virtualColumns;
}
@JsonProperty
public ResultFormat getResultFormat()
{
return resultFormat;
}
@JsonProperty
public int getBatchSize()
{
return batchSize;
}
/**
* Offset for this query; behaves like SQL "OFFSET". Zero means no offset. Negative values are invalid.
*/
@JsonProperty("offset")
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public long getScanRowsOffset()
{
return scanRowsOffset;
}
/**
* Limit for this query; behaves like SQL "LIMIT". Will always be positive. {@link Long#MAX_VALUE} is used in
* situations where the user wants an effectively unlimited resultset.
*/
@JsonProperty("limit")
@JsonInclude(value = JsonInclude.Include.CUSTOM, valueFilter = ScanRowsLimitJsonIncludeFilter.class)
public long getScanRowsLimit()
{
return scanRowsLimit;
}
/**
* Returns whether this query is limited or not. Because {@link Long#MAX_VALUE} is used to signify unlimitedness,
* this is equivalent to {@code getScanRowsLimit() != Long.Max_VALUE}.
*
* @see #getScanRowsLimit()
*/
public boolean isLimited()
{
return scanRowsLimit != Long.MAX_VALUE;
}
@JsonProperty
public Order getOrder()
{
return order;
}
@Nullable
@JsonIgnore
public Integer getMaxRowsQueuedForOrdering()
{
return maxRowsQueuedForOrdering;
}
@Nullable
@JsonIgnore
public Integer getMaxSegmentPartitionsOrderedInMemory()
{
return maxSegmentPartitionsOrderedInMemory;
}
@Override
public boolean hasFilters()
{
return dimFilter != null;
}
@Override
@JsonProperty
public DimFilter getFilter()
{
return dimFilter;
}
@Override
public String getType()
{
return SCAN;
}
@JsonProperty
public List<String> getColumns()
{
return columns;
}
/**
* Compatibility mode with the legacy scan-query extension.
*/
@JsonProperty
public Boolean isLegacy()
{
return legacy;
}
@Override
public Ordering<ScanResultValue> getResultOrdering()
{
if (order == Order.NONE) {
return Ordering.natural();
}
return Ordering.from(
new ScanResultValueTimestampComparator(this).thenComparing(
order == Order.ASCENDING
? Comparator.naturalOrder()
: Comparator.<ScanResultValue>naturalOrder().reversed()
)
);
}
@Nullable
@Override
public Set<String> getRequiredColumns()
{
if (columns == null || columns.isEmpty()) {
// We don't know what columns we require. We'll find out when the segment shows up.
return null;
} else {
return Queries.computeRequiredColumns(
virtualColumns,
dimFilter,
Collections.emptyList(),
Collections.emptyList(),
columns
);
}
}
public ScanQuery withOffset(final long newOffset)
{
return Druids.ScanQueryBuilder.copy(this).offset(newOffset).build();
}
public ScanQuery withLimit(final long newLimit)
{
return Druids.ScanQueryBuilder.copy(this).limit(newLimit).build();
}
public ScanQuery withNonNullLegacy(final ScanQueryConfig scanQueryConfig)
{
return Druids.ScanQueryBuilder.copy(this).legacy(legacy != null ? legacy : scanQueryConfig.isLegacy()).build();
}
@Override
public Query<ScanResultValue> withQuerySegmentSpec(QuerySegmentSpec querySegmentSpec)
{
return Druids.ScanQueryBuilder.copy(this).intervals(querySegmentSpec).build();
}
@Override
public Query<ScanResultValue> withDataSource(DataSource dataSource)
{
return Druids.ScanQueryBuilder.copy(this).dataSource(dataSource).build();
}
@Override
public Query<ScanResultValue> withOverriddenContext(Map<String, Object> contextOverrides)
{
return Druids.ScanQueryBuilder.copy(this).context(computeOverriddenContext(getContext(), contextOverrides)).build();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
final ScanQuery scanQuery = (ScanQuery) o;
return batchSize == scanQuery.batchSize &&
scanRowsOffset == scanQuery.scanRowsOffset &&
scanRowsLimit == scanQuery.scanRowsLimit &&
Objects.equals(legacy, scanQuery.legacy) &&
Objects.equals(virtualColumns, scanQuery.virtualColumns) &&
Objects.equals(resultFormat, scanQuery.resultFormat) &&
Objects.equals(dimFilter, scanQuery.dimFilter) &&
Objects.equals(columns, scanQuery.columns);
}
@Override
public int hashCode()
{
return Objects.hash(
super.hashCode(),
virtualColumns,
resultFormat,
batchSize,
scanRowsOffset,
scanRowsLimit,
dimFilter,
columns,
legacy
);
}
@Override
public String toString()
{
return "ScanQuery{" +
"dataSource='" + getDataSource() + '\'' +
", querySegmentSpec=" + getQuerySegmentSpec() +
", virtualColumns=" + getVirtualColumns() +
", resultFormat='" + resultFormat + '\'' +
", batchSize=" + batchSize +
", offset=" + scanRowsOffset +
", limit=" + scanRowsLimit +
", dimFilter=" + dimFilter +
", columns=" + columns +
", legacy=" + legacy +
'}';
}
/**
* {@link JsonInclude} filter for {@link #getScanRowsLimit()}.
*
* This API works by "creative" use of equals. It requires warnings to be suppressed and also requires spotbugs
* exclusions (see spotbugs-exclude.xml).
*/
@SuppressWarnings({"EqualsAndHashcode"})
static class ScanRowsLimitJsonIncludeFilter // lgtm [java/inconsistent-equals-and-hashcode]
{
@Override
public boolean equals(Object obj)
{
if (obj == null) {
return false;
}
if (obj.getClass() == this.getClass()) {
return true;
}
return obj instanceof Long && (long) obj == Long.MAX_VALUE;
}
}
}