blob: 284f90f78bc99668317d282928f64189a6c7036a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.core.query.scheduler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Priority queues of scheduler groups that determines query priority based on tokens
*
* This is a multi-level query scheduling queue with each sublevel maintaining a waitlist of
* queries for the group. The priority between groups is provided by specific SchedulerGroup
* implementation. If two groups have the same priority then the group with lower
* resource utilization is selected first. Oldest query from the winning SchedulerGroup
* is selected for execution.
*/
public class MultiLevelPriorityQueue implements SchedulerPriorityQueue {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiLevelPriorityQueue.class);
public static final String QUERY_DEADLINE_SECONDS_KEY = "query_deadline_seconds";
public static final String MAX_PENDING_PER_GROUP_KEY = "max_pending_per_group";
public static final String QUEUE_WAKEUP_MICROS = "queue_wakeup_micros";
private static final int DEFAULT_WAKEUP_MICROS = 1000;
private static int _wakeUpTimeMicros = DEFAULT_WAKEUP_MICROS;
private final int _maxPendingPerGroup;
private final Map<String, SchedulerGroup> _schedulerGroups = new HashMap<>();
private final Lock _queueLock = new ReentrantLock();
private final Condition _queryReaderCondition = _queueLock.newCondition();
private final ResourceManager _resourceManager;
private final SchedulerGroupMapper _groupSelector;
private final int _queryDeadlineMillis;
private final SchedulerGroupFactory _groupFactory;
private final PinotConfiguration _config;
public MultiLevelPriorityQueue(PinotConfiguration config, ResourceManager resourceManager,
SchedulerGroupFactory groupFactory, SchedulerGroupMapper groupMapper) {
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(resourceManager);
Preconditions.checkNotNull(groupFactory);
Preconditions.checkNotNull(groupMapper);
// max available tokens per millisecond equals number of threads (total execution capacity)
// we are over provisioning tokens here because its better to keep pipe full rather than empty
_queryDeadlineMillis = config.getProperty(QUERY_DEADLINE_SECONDS_KEY, 30) * 1000;
_wakeUpTimeMicros = config.getProperty(QUEUE_WAKEUP_MICROS, DEFAULT_WAKEUP_MICROS);
_maxPendingPerGroup = config.getProperty(MAX_PENDING_PER_GROUP_KEY, 10);
_config = config;
_resourceManager = resourceManager;
_groupFactory = groupFactory;
_groupSelector = groupMapper;
}
@Override
public void put(SchedulerQueryContext query)
throws OutOfCapacityException {
Preconditions.checkNotNull(query);
_queueLock.lock();
String groupName = _groupSelector.getSchedulerGroupName(query);
try {
SchedulerGroup groupContext = getOrCreateGroupContext(groupName);
checkGroupHasCapacity(groupContext);
query.setSchedulerGroupContext(groupContext);
groupContext.addLast(query);
_queryReaderCondition.signal();
} finally {
_queueLock.unlock();
}
}
/**
* Blocking call to read the next query in order of priority
* @return
*/
@Nullable
@Override
public SchedulerQueryContext take() {
_queueLock.lock();
try {
while (true) {
SchedulerQueryContext schedulerQueryContext;
while ((schedulerQueryContext = takeNextInternal()) == null) {
try {
_queryReaderCondition.await(_wakeUpTimeMicros, TimeUnit.MICROSECONDS);
} catch (InterruptedException e) {
return null;
}
}
return schedulerQueryContext;
}
} finally {
_queueLock.unlock();
}
}
@Override
public List<SchedulerQueryContext> drain() {
List<SchedulerQueryContext> pending = new ArrayList<>();
_queueLock.lock();
try {
for (Map.Entry<String, SchedulerGroup> groupEntry : _schedulerGroups.entrySet()) {
SchedulerGroup group = groupEntry.getValue();
while (!group.isEmpty()) {
pending.add(group.removeFirst());
}
}
} finally {
_queueLock.unlock();
}
return pending;
}
private SchedulerQueryContext takeNextInternal() {
SchedulerGroup currentWinnerGroup = null;
long startTime = System.nanoTime();
StringBuilder sb = new StringBuilder("SchedulerInfo:");
long deadlineEpochMillis = currentTimeMillis() - _queryDeadlineMillis;
for (Map.Entry<String, SchedulerGroup> groupInfoEntry : _schedulerGroups.entrySet()) {
SchedulerGroup group = groupInfoEntry.getValue();
if (LOGGER.isDebugEnabled()) {
sb.append(group.toString());
}
group.trimExpired(deadlineEpochMillis);
if (group.isEmpty() || !_resourceManager.canSchedule(group)) {
continue;
}
if (currentWinnerGroup == null) {
currentWinnerGroup = group;
continue;
}
// Preconditions:
// a. currentGroupResources <= hardLimit
// b. selectedGroupResources <= hardLimit
// We prefer group with higher priority but with resource limits.
// If current group priority are greater than currently winning priority then we choose current
// group over currentWinnerGroup if
// a. current group is using less than softLimit resources
// b. if softLimit < currentGroupResources <= hardLimit then
// i. choose group if softLimit <= currentWinnerGroup <= hardLimit
// ii. continue with currentWinnerGroup otherwise
int comparison = group.compareTo(currentWinnerGroup);
if (comparison < 0) {
if (currentWinnerGroup.totalReservedThreads() > _resourceManager.getTableThreadsSoftLimit()
&& group.totalReservedThreads() < _resourceManager.getTableThreadsSoftLimit()) {
currentWinnerGroup = group;
}
continue;
}
if (comparison >= 0) {
if (group.totalReservedThreads() < _resourceManager.getTableThreadsSoftLimit()
|| group.totalReservedThreads() < currentWinnerGroup.totalReservedThreads()) {
currentWinnerGroup = group;
}
}
}
SchedulerQueryContext query = null;
if (currentWinnerGroup != null) {
ServerQueryRequest queryRequest = currentWinnerGroup.peekFirst().getQueryRequest();
if (LOGGER.isDebugEnabled()) {
sb.append(String.format(" Winner: %s: [%d,%d,%d,%d]", currentWinnerGroup.name(),
queryRequest.getTimerContext().getQueryArrivalTimeMs(), queryRequest.getRequestId(),
queryRequest.getSegmentsToQuery().size(), startTime));
}
query = currentWinnerGroup.removeFirst();
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(sb.toString());
}
return query;
}
private void checkGroupHasCapacity(SchedulerGroup groupContext)
throws OutOfCapacityException {
if (groupContext.numPending() >= _maxPendingPerGroup
&& groupContext.totalReservedThreads() >= _resourceManager.getTableThreadsHardLimit()) {
throw new OutOfCapacityException(String.format(
"SchedulerGroup %s is out of capacity. numPending: %d, maxPending: %d, reservedThreads: %d "
+ "threadsHardLimit: %d", groupContext.name(), groupContext.numPending(), _maxPendingPerGroup,
groupContext.totalReservedThreads(), _resourceManager.getTableThreadsHardLimit()));
}
}
private SchedulerGroup getOrCreateGroupContext(String groupName) {
SchedulerGroup groupContext = _schedulerGroups.get(groupName);
if (groupContext == null) {
groupContext = _groupFactory.create(_config, groupName);
_schedulerGroups.put(groupName, groupContext);
}
return groupContext;
}
// separate method to allow mocking for unit testing
private long currentTimeMillis() {
return System.currentTimeMillis();
}
@VisibleForTesting
long getWakeupTimeMicros() {
return _wakeUpTimeMicros;
}
}