blob: c0f5211a1e5504459bad5e03f41321d9f6df97f7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.scheduler;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
import com.google.common.util.concurrent.RateLimiter;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAccumulator;
import javax.annotation.Nullable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.request.context.TimerContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Abstract class providing common scheduler functionality
* including query runner and query worker pool
*/
@SuppressWarnings("UnstableApiUsage")
public abstract class QueryScheduler {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryScheduler.class);
private static final String INVALID_NUM_SCANNED = "-1";
private static final String INVALID_SEGMENTS_COUNT = "-1";
private static final String INVALID_FRESHNESS_MS = "-1";
private static final String INVALID_NUM_RESIZES = "-1";
private static final String INVALID_RESIZE_TIME_MS = "-1";
private static final String QUERY_LOG_MAX_RATE_KEY = "query.log.maxRatePerSecond";
private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
protected final ServerMetrics _serverMetrics;
protected final QueryExecutor _queryExecutor;
protected final ResourceManager _resourceManager;
protected final LongAccumulator _latestQueryTime;
private final RateLimiter _queryLogRateLimiter;
private final RateLimiter _numDroppedLogRateLimiter;
private final AtomicInteger _numDroppedLogCounter;
protected volatile boolean _isRunning = false;
/**
* Constructor to initialize QueryScheduler
* @param queryExecutor QueryExecutor engine to use
* @param resourceManager for managing server thread resources
* @param serverMetrics server metrics collector
*/
public QueryScheduler(PinotConfiguration config, QueryExecutor queryExecutor, ResourceManager resourceManager,
ServerMetrics serverMetrics, LongAccumulator latestQueryTime) {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(queryExecutor);
Preconditions.checkNotNull(resourceManager);
Preconditions.checkNotNull(serverMetrics);
Preconditions.checkNotNull(latestQueryTime);
_serverMetrics = serverMetrics;
_resourceManager = resourceManager;
_queryExecutor = queryExecutor;
_latestQueryTime = latestQueryTime;
_queryLogRateLimiter = RateLimiter.create(config.getProperty(QUERY_LOG_MAX_RATE_KEY, DEFAULT_QUERY_LOG_MAX_RATE));
_numDroppedLogRateLimiter = RateLimiter.create(1.0d);
_numDroppedLogCounter = new AtomicInteger(0);
LOGGER.info("Query log max rate: {}", _queryLogRateLimiter.getRate());
}
/**
* Submit a query for execution. The query will be scheduled for execution as per the scheduling algorithm
* @param queryRequest query to schedule for execution
* @return Listenable future for query result representing serialized response. It is possible that the
* future may return immediately or be scheduled for execution at a later time.
*/
public abstract ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest);
/**
* Query scheduler name for logging
*/
public abstract String name();
/**
* Start query scheduler thread
*/
public void start() {
_isRunning = true;
}
/**
* stop the scheduler and shutdown services
*/
public void stop() {
// don't stop resourcemanager yet...we need to wait for all running queries to finish
_isRunning = false;
}
/**
* Create a future task for the query
* @param queryRequest incoming query request
* @param executorService executor service to use for parallelizing query. This is passed to the QueryExecutor
* @return Future task that can be scheduled for execution on an ExecutorService. Ideally, this future
* should be executed on a different executor service than {@code e} to avoid deadlock.
*/
protected ListenableFutureTask<byte[]> createQueryFutureTask(ServerQueryRequest queryRequest,
ExecutorService executorService) {
return ListenableFutureTask.create(() -> processQueryAndSerialize(queryRequest, executorService));
}
/**
* Process query and serialize response
* @param queryRequest incoming query request
* @param executorService Executor service to use for parallelizing query processing
* @return serialized query response
*/
@Nullable
protected byte[] processQueryAndSerialize(ServerQueryRequest queryRequest, ExecutorService executorService) {
_latestQueryTime.accumulate(System.currentTimeMillis());
DataTable dataTable;
try {
dataTable = _queryExecutor.processQuery(queryRequest, executorService);
} catch (Exception e) {
LOGGER.error("Encountered exception while processing requestId {} from broker {}", queryRequest.getRequestId(),
queryRequest.getBrokerId(), e);
// For not handled exceptions
_serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
dataTable = DataTableFactory.getEmptyDataTable();
dataTable.addException(QueryException.getException(QueryException.INTERNAL_ERROR, e));
}
long requestId = queryRequest.getRequestId();
Map<String, String> dataTableMetadata = dataTable.getMetadata();
dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), Long.toString(requestId));
byte[] responseBytes = serializeDataTable(queryRequest, dataTable);
// Log the statistics
String tableNameWithType = queryRequest.getTableNameWithType();
long numDocsScanned =
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.NUM_DOCS_SCANNED.getName(), INVALID_NUM_SCANNED));
long numEntriesScannedInFilter = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER.getName(), INVALID_NUM_SCANNED));
long numEntriesScannedPostFilter = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), INVALID_NUM_SCANNED));
long numSegmentsProcessed = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT));
long numSegmentsMatched = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.NUM_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT));
long numSegmentsPrunedInvalid = Long.parseLong(
dataTableMetadata.getOrDefault(
MetadataKey.NUM_SEGMENTS_PRUNED_INVALID.getName(), INVALID_SEGMENTS_COUNT));
long numSegmentsPrunedByLimit = Long.parseLong(
dataTableMetadata.getOrDefault(
MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT.getName(), INVALID_SEGMENTS_COUNT));
long numSegmentsPrunedByValue = Long.parseLong(
dataTableMetadata.getOrDefault(
MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE.getName(), INVALID_SEGMENTS_COUNT));
long numSegmentsConsuming = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED.getName(), INVALID_SEGMENTS_COUNT));
long numConsumingSegmentsProcessed = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED.getName(), INVALID_SEGMENTS_COUNT));
long numConsumingSegmentsMatched = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED.getName(), INVALID_SEGMENTS_COUNT));
long minConsumingFreshnessMs = Long.parseLong(
dataTableMetadata.getOrDefault(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), INVALID_FRESHNESS_MS));
int numResizes =
Integer.parseInt(dataTableMetadata.getOrDefault(MetadataKey.NUM_RESIZES.getName(), INVALID_NUM_RESIZES));
long resizeTimeMs =
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(), INVALID_RESIZE_TIME_MS));
long threadCpuTimeNs =
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(), "0"));
long systemActivitiesCpuTimeNs =
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), "0"));
long responseSerializationCpuTimeNs =
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), "0"));
long totalCpuTimeNs = threadCpuTimeNs + systemActivitiesCpuTimeNs + responseSerializationCpuTimeNs;
if (numDocsScanned > 0) {
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
}
if (numEntriesScannedInFilter > 0) {
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_ENTRIES_SCANNED_IN_FILTER,
numEntriesScannedInFilter);
}
if (numEntriesScannedPostFilter > 0) {
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_ENTRIES_SCANNED_POST_FILTER,
numEntriesScannedPostFilter);
}
if (numResizes > 0) {
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_RESIZES, numResizes);
}
if (resizeTimeMs > 0) {
_serverMetrics.addValueToTableGauge(tableNameWithType, ServerGauge.RESIZE_TIME_MS, resizeTimeMs);
}
if (threadCpuTimeNs > 0) {
_serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.EXECUTION_THREAD_CPU_TIME_NS, threadCpuTimeNs,
TimeUnit.NANOSECONDS);
}
if (systemActivitiesCpuTimeNs > 0) {
_serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.SYSTEM_ACTIVITIES_CPU_TIME_NS,
systemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
}
if (responseSerializationCpuTimeNs > 0) {
_serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.RESPONSE_SER_CPU_TIME_NS,
responseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
}
if (totalCpuTimeNs > 0) {
_serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.TOTAL_CPU_TIME_NS, totalCpuTimeNs,
TimeUnit.NANOSECONDS);
}
TimerContext timerContext = queryRequest.getTimerContext();
int numSegmentsQueried = queryRequest.getSegmentsToQuery().size();
long schedulerWaitMs = timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT);
// Please keep the format as name=value comma-separated with no spaces
// Please add new entries at the end
if (_queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned, numSegmentsPrunedInvalid)) {
LOGGER.info("Processed requestId={},table={},"
+ "segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/"
+ "invalid/limit/value)={}/{}/{}/{}/{}/{}/{}/{}/{},"
+ "schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},minConsumingFreshnessMs={},"
+ "broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},"
+ "threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}", requestId, tableNameWithType,
numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming,
numConsumingSegmentsProcessed, numConsumingSegmentsMatched,
numSegmentsPrunedInvalid, numSegmentsPrunedByLimit, numSegmentsPrunedByValue, schedulerWaitMs,
timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION),
timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), minConsumingFreshnessMs,
queryRequest.getBrokerId(), numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name(),
totalCpuTimeNs, threadCpuTimeNs, systemActivitiesCpuTimeNs, responseSerializationCpuTimeNs);
// Limit the dropping log message at most once per second.
if (_numDroppedLogRateLimiter.tryAcquire()) {
// NOTE: the reported number may not be accurate since we will be missing some increments happened between
// get() and set().
int numDroppedLog = _numDroppedLogCounter.get();
if (numDroppedLog > 0) {
LOGGER.info("{} logs were dropped. (log max rate per second: {})", numDroppedLog,
_queryLogRateLimiter.getRate());
_numDroppedLogCounter.set(0);
}
}
} else {
_numDroppedLogCounter.incrementAndGet();
}
if (minConsumingFreshnessMs > -1) {
_serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.FRESHNESS_LAG_MS,
(System.currentTimeMillis() - minConsumingFreshnessMs), TimeUnit.MILLISECONDS);
}
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_MATCHED, numSegmentsMatched);
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_INVALID,
numSegmentsPrunedInvalid);
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_BY_LIMIT,
numSegmentsPrunedByLimit);
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PRUNED_BY_VALUE,
numSegmentsPrunedByValue);
return responseBytes;
}
/**
* Helper function to decide whether to force the log
*
* TODO: come up with other criteria for forcing a log and come up with better numbers
*
*/
private boolean forceLog(long schedulerWaitMs, long numDocsScanned, long numSegmentsPrunedInvalid) {
// If scheduler wait time is larger than 100ms, force the log
if (schedulerWaitMs > 100L) {
return true;
}
// If there are invalid segments, force the log
if (numSegmentsPrunedInvalid > 0) {
return true;
}
// If the number of document scanned is larger than 1 million rows, force the log
return numDocsScanned > 1_000_000L;
}
/**
* Serialize the DataTable response for query request
* @param queryRequest Server query request for which response is serialized
* @param dataTable DataTable to serialize
* @return serialized response bytes
*/
@Nullable
private byte[] serializeDataTable(ServerQueryRequest queryRequest, DataTable dataTable) {
TimerContext timerContext = queryRequest.getTimerContext();
TimerContext.Timer responseSerializationTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.RESPONSE_SERIALIZATION);
byte[] responseByte = null;
try {
responseByte = dataTable.toBytes();
} catch (Exception e) {
_serverMetrics.addMeteredGlobalValue(ServerMeter.RESPONSE_SERIALIZATION_EXCEPTIONS, 1);
LOGGER.error("Caught exception while serializing response for requestId: {}, brokerId: {}",
queryRequest.getRequestId(), queryRequest.getBrokerId(), e);
}
responseSerializationTimer.stopAndRecord();
timerContext.startNewPhaseTimer(ServerQueryPhase.TOTAL_QUERY_TIME, timerContext.getQueryArrivalTimeMs())
.stopAndRecord();
return responseByte;
}
/**
* Error response future in case of internal error where query response is not available. This can happen if the query
* can not be executed.
*/
protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest queryRequest,
ProcessingException error) {
DataTable result = DataTableFactory.getEmptyDataTable();
Map<String, String> dataTableMetadata = result.getMetadata();
dataTableMetadata.put(MetadataKey.REQUEST_ID.getName(), Long.toString(queryRequest.getRequestId()));
result.addException(error);
return Futures.immediateFuture(serializeDataTable(queryRequest, result));
}
}