blob: 7248fcab865ecb5b09c44f02d5d3081f96837fdb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
import org.apache.druid.indexing.common.actions.SegmentAllocateResult;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
* Remembers which activeTasks have locked which intervals or which segments. Tasks are permitted to lock an interval
* or a segment if no other task outside their group has locked an overlapping interval for the same datasource or
* the same segments. Note that TaskLockbox is also responsible for allocating segmentIds when a task requests to lock
* a new segment. Task lock might involve version assignment.
*
* - When a task locks an interval or a new segment, it is assigned a new version string that it can use to publish
* segments.
* - When a task locks a existing segment, it doesn't need to be assigned a new version.
*
* Note that tasks of higher priorities can revoke locks of tasks of lower priorities.
*/
public class TaskLockbox
{
// Datasource -> startTime -> Interval -> list of (Tasks + TaskLock)
// Multiple shared locks can be acquired for the same dataSource and interval.
// Note that revoked locks are also maintained in this map to notify that those locks are revoked to the callers when
// they acquire the same locks again.
// Also, the key of the second inner map is the start time to find all intervals properly starting with the same
// startTime.
private final Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> running = new HashMap<>();
private final TaskStorage taskStorage;
private final IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private final ReentrantLock giant = new ReentrantLock(true);
private final Condition lockReleaseCondition = giant.newCondition();
private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class);
/**
* Set of active tasks. Locks can be granted only to a task present in this set.
* Should be accessed only under the giant lock.
*/
private final Set<String> activeTasks = new HashSet<>();
/**
* Map from a taskAllocatorId to the set of active taskIds using that allocator id.
* Used to clean up pending segments for a taskAllocatorId as soon as the set
* of corresponding active taskIds becomes empty.
*/
@GuardedBy("giant")
private final Map<String, Set<String>> activeAllocatorIdToTaskIds = new HashMap<>();
@Inject
public TaskLockbox(
TaskStorage taskStorage,
IndexerMetadataStorageCoordinator metadataStorageCoordinator
)
{
this.taskStorage = taskStorage;
this.metadataStorageCoordinator = metadataStorageCoordinator;
}
/**
* Wipe out our current in-memory state and resync it from our bundled {@link TaskStorage}.
*
* @return SyncResult which needs to be processed by the caller
*/
public TaskLockboxSyncResult syncFromStorage()
{
giant.lock();
try {
// Load stuff from taskStorage first. If this fails, we don't want to lose all our locks.
final Set<Task> storedActiveTasks = new HashSet<>();
final List<Pair<Task, TaskLock>> storedLocks = new ArrayList<>();
for (final Task task : taskStorage.getActiveTasks()) {
storedActiveTasks.add(task);
for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) {
storedLocks.add(Pair.of(task, taskLock));
}
}
// Sort locks by version, so we add them back in the order they were acquired.
final Ordering<Pair<Task, TaskLock>> byVersionOrdering = new Ordering<Pair<Task, TaskLock>>()
{
@Override
public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
{
// The second compare shouldn't be necessary, but, whatever.
return ComparisonChain.start()
.compare(left.rhs.getVersion(), right.rhs.getVersion())
.compare(left.lhs.getId(), right.lhs.getId())
.result();
}
};
running.clear();
activeTasks.clear();
activeTasks.addAll(storedActiveTasks.stream()
.map(Task::getId)
.collect(Collectors.toSet())
);
// Set of task groups in which at least one task failed to re-acquire a lock
final Set<String> failedToReacquireLockTaskGroups = new HashSet<>();
// Bookkeeping for a log message at the end
int taskLockCount = 0;
for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
final Task task = Preconditions.checkNotNull(taskAndLock.lhs, "task");
final TaskLock savedTaskLock = Preconditions.checkNotNull(taskAndLock.rhs, "savedTaskLock");
if (savedTaskLock.getInterval().toDurationMillis() <= 0) {
// "Impossible", but you never know what crazy stuff can be restored from storage.
log.warn("Ignoring lock[%s] with empty interval for task: %s", savedTaskLock, task.getId());
continue;
}
// Create a new taskLock if it doesn't have a proper priority,
// so that every taskLock in memory has the priority.
final TaskLock savedTaskLockWithPriority = savedTaskLock.getPriority() == null
? savedTaskLock.withPriority(task.getPriority())
: savedTaskLock;
final TaskLockPosse taskLockPosse = verifyAndCreateOrFindLockPosse(
task,
savedTaskLockWithPriority
);
if (taskLockPosse != null) {
final TaskLock taskLock = taskLockPosse.getTaskLock();
if (savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) {
taskLockCount++;
log.info(
"Reacquired lock[%s] for task: %s",
taskLock,
task.getId()
);
} else {
taskLockCount++;
log.info(
"Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
taskLock.getVersion(),
task.getId()
);
}
} else {
failedToReacquireLockTaskGroups.add(task.getGroupId());
log.error(
"Could not reacquire lock on interval[%s] version[%s] for task: %s from group %s.",
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
task.getId(),
task.getGroupId()
);
}
}
Set<Task> tasksToFail = new HashSet<>();
for (Task task : storedActiveTasks) {
if (failedToReacquireLockTaskGroups.contains(task.getGroupId())) {
tasksToFail.add(task);
activeTasks.remove(task.getId());
}
}
activeAllocatorIdToTaskIds.clear();
for (Task task : storedActiveTasks) {
if (activeTasks.contains(task.getId())) {
trackAppendingTask(task);
}
}
log.info(
"Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).",
taskLockCount,
activeTasks.size(),
storedLocks.size() - taskLockCount
);
if (!failedToReacquireLockTaskGroups.isEmpty()) {
log.warn("Marking all tasks from task groups[%s] to be failed "
+ "as they failed to reacquire at least one lock.", failedToReacquireLockTaskGroups);
}
return new TaskLockboxSyncResult(tasksToFail);
}
finally {
giant.unlock();
}
}
/**
* This method is called only in {@link #syncFromStorage()} and verifies the given task and the taskLock have the same
* groupId, dataSource, and priority.
*/
@VisibleForTesting
@Nullable
protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskLock)
{
giant.lock();
try {
Preconditions.checkArgument(
task.getGroupId().equals(taskLock.getGroupId()),
"lock groupId[%s] is different from task groupId[%s]",
taskLock.getGroupId(),
task.getGroupId()
);
Preconditions.checkArgument(
task.getDataSource().equals(taskLock.getDataSource()),
"lock dataSource[%s] is different from task dataSource[%s]",
taskLock.getDataSource(),
task.getDataSource()
);
final int taskPriority = task.getPriority();
final int lockPriority = taskLock.getNonNullPriority();
Preconditions.checkArgument(
lockPriority == taskPriority,
"lock priority[%s] is different from task priority[%s]",
lockPriority,
taskPriority
);
final LockRequest request;
switch (taskLock.getGranularity()) {
case SEGMENT:
final SegmentLock segmentLock = (SegmentLock) taskLock;
request = new SpecificSegmentLockRequest(
segmentLock.getType(),
segmentLock.getGroupId(),
segmentLock.getDataSource(),
segmentLock.getInterval(),
segmentLock.getVersion(),
segmentLock.getPartitionId(),
taskPriority,
segmentLock.isRevoked()
);
break;
case TIME_CHUNK:
final TimeChunkLock timeChunkLock = (TimeChunkLock) taskLock;
request = new TimeChunkLockRequest(
timeChunkLock.getType(),
timeChunkLock.getGroupId(),
timeChunkLock.getDataSource(),
timeChunkLock.getInterval(),
timeChunkLock.getVersion(),
taskPriority,
timeChunkLock.isRevoked()
);
break;
default:
throw new ISE("Unknown lockGranularity[%s]", taskLock.getGranularity());
}
return createOrFindLockPosse(request, task, false);
}
catch (Exception e) {
log.error(e,
"Could not reacquire lock for task: %s from metadata store", task.getId()
);
return null;
}
finally {
giant.unlock();
}
}
/**
* Acquires a lock on behalf of a task. Blocks until the lock is acquired.
*
* @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a
* {@link LockResult#revoked} flag.
*
* @throws InterruptedException if the current thread is interrupted
*/
public LockResult lock(final Task task, final LockRequest request) throws InterruptedException
{
giant.lockInterruptibly();
try {
LockResult lockResult;
while (!(lockResult = tryLock(task, request)).isOk()) {
if (lockResult.isRevoked()) {
return lockResult;
}
lockReleaseCondition.await();
}
return lockResult;
}
finally {
giant.unlock();
}
}
/**
* Acquires a lock on behalf of a task, waiting up to the specified wait time if necessary.
*
* @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a
* {@link LockResult#revoked} flag.
*
* @throws InterruptedException if the current thread is interrupted
*/
public LockResult lock(final Task task, final LockRequest request, long timeoutMs) throws InterruptedException
{
long nanos = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
giant.lockInterruptibly();
try {
LockResult lockResult;
while (!(lockResult = tryLock(task, request)).isOk()) {
if (nanos <= 0 || lockResult.isRevoked()) {
return lockResult;
}
nanos = lockReleaseCondition.awaitNanos(nanos);
}
return lockResult;
}
finally {
giant.unlock();
}
}
/**
* Attempt to acquire a lock for a task, without removing it from the queue. Can safely be called multiple times on
* the same task until the lock is preempted.
*
* @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a
* {@link LockResult#revoked} flag.
*
* @throws IllegalStateException if the task is not a valid active task
*/
public LockResult tryLock(final Task task, final LockRequest request)
{
giant.lock();
try {
if (!activeTasks.contains(task.getId())) {
throw new ISE("Unable to grant lock to inactive Task [%s]", task.getId());
}
Preconditions.checkArgument(request.getInterval().toDurationMillis() > 0, "interval empty");
SegmentIdWithShardSpec newSegmentId = null;
final LockRequest convertedRequest;
if (request instanceof LockRequestForNewSegment) {
final LockRequestForNewSegment lockRequestForNewSegment = (LockRequestForNewSegment) request;
if (lockRequestForNewSegment.getGranularity() == LockGranularity.SEGMENT) {
newSegmentId = allocateSegmentId(lockRequestForNewSegment, request.getVersion(), null);
if (newSegmentId == null) {
return LockResult.fail();
}
convertedRequest = new SpecificSegmentLockRequest(lockRequestForNewSegment, newSegmentId);
} else {
convertedRequest = new TimeChunkLockRequest(lockRequestForNewSegment);
}
} else {
convertedRequest = request;
}
final TaskLockPosse posseToUse = createOrFindLockPosse(convertedRequest, task, true);
if (posseToUse != null && !posseToUse.getTaskLock().isRevoked()) {
if (request instanceof LockRequestForNewSegment) {
final LockRequestForNewSegment lockRequestForNewSegment = (LockRequestForNewSegment) request;
if (lockRequestForNewSegment.getGranularity() == LockGranularity.TIME_CHUNK) {
if (newSegmentId != null) {
throw new ISE(
"SegmentId must be allocated after getting a timeChunk lock,"
+ " but we already have [%s] before getting the lock?",
newSegmentId
);
}
final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
newSegmentId = allocateSegmentId(
lockRequestForNewSegment,
posseToUse.getTaskLock().getVersion(),
taskAllocatorId
);
}
}
return LockResult.ok(posseToUse.getTaskLock(), newSegmentId);
} else {
final boolean lockRevoked = posseToUse != null && posseToUse.getTaskLock().isRevoked();
if (lockRevoked) {
return LockResult.revoked(posseToUse.getTaskLock());
}
return LockResult.fail();
}
}
finally {
giant.unlock();
}
}
/**
* Attempts to allocate segments for the given requests. Each request contains
* a {@link Task} and a {@link SegmentAllocateAction}. This method tries to
* acquire the task locks on the required intervals/segments and then performs
* a batch allocation of segments. It is possible that some requests succeed
* successfully and others failed. In that case, only the failed ones should be
* retried.
*
* @param requests List of allocation requests
* @param dataSource Datasource for which segment is to be allocated.
* @param interval Interval for which segment is to be allocated.
* @param skipSegmentLineageCheck Whether lineage check is to be skipped
* (this is true for streaming ingestion)
* @param lockGranularity Granularity of task lock
* @return List of allocation results in the same order as the requests.
*/
public List<SegmentAllocateResult> allocateSegments(
List<SegmentAllocateRequest> requests,
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
LockGranularity lockGranularity
)
{
log.info("Allocating [%d] segments for datasource [%s], interval [%s]", requests.size(), dataSource, interval);
final boolean isTimeChunkLock = lockGranularity == LockGranularity.TIME_CHUNK;
final AllocationHolderList holderList = new AllocationHolderList(requests, interval);
holderList.getPending().forEach(this::verifyTaskIsActive);
giant.lock();
try {
if (isTimeChunkLock) {
// For time-chunk locking, segment must be allocated only after acquiring the lock
holderList.getPending().forEach(holder -> acquireTaskLock(holder, true));
allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending());
} else {
allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending());
holderList.getPending().forEach(holder -> acquireTaskLock(holder, false));
}
holderList.getPending().forEach(SegmentAllocationHolder::markSucceeded);
}
finally {
giant.unlock();
}
return holderList.getResults();
}
/**
* Marks the segment allocation as failed if the underlying task is not active.
*/
private void verifyTaskIsActive(SegmentAllocationHolder holder)
{
final String taskId = holder.task.getId();
if (!activeTasks.contains(taskId)) {
holder.markFailed("Unable to grant lock to inactive Task [%s]", taskId);
}
}
/**
* Creates a task lock request and creates or finds the lock for that request.
* Marks the segment allocation as failed if the lock could not be acquired or
* was revoked.
*/
private void acquireTaskLock(SegmentAllocationHolder holder, boolean isTimeChunkLock)
{
final LockRequest lockRequest;
if (isTimeChunkLock) {
lockRequest = new TimeChunkLockRequest(holder.lockRequest);
} else {
lockRequest = new SpecificSegmentLockRequest(holder.lockRequest, holder.allocatedSegment);
}
// Create or find the task lock for the created lock request
final TaskLockPosse posseToUse = createOrFindLockPosse(lockRequest, holder.task, true);
final TaskLock acquiredLock = posseToUse == null ? null : posseToUse.getTaskLock();
if (posseToUse == null) {
holder.markFailed("Could not find or create lock posse.");
} else if (acquiredLock.isRevoked()) {
holder.markFailed("Lock was revoked.");
} else {
holder.setAcquiredLock(posseToUse, lockRequest.getInterval());
}
}
@Nullable
private TaskLockPosse createOrFindLockPosse(LockRequest request, Task task, boolean persist)
{
Preconditions.checkState(!(request instanceof LockRequestForNewSegment), "Can't handle LockRequestForNewSegment");
giant.lock();
try {
final TaskLockPosse posseToUse;
final List<TaskLockPosse> foundPosses = findLockPossesOverlapsInterval(
request.getDataSource(),
request.getInterval()
);
final List<TaskLockPosse> conflictPosses = foundPosses
.stream()
.filter(taskLockPosse -> taskLockPosse.getTaskLock().conflict(request))
.collect(Collectors.toList());
if (!conflictPosses.isEmpty()) {
// If we have some locks for dataSource and interval, check they can be reused.
// If they can't be reused, check lock priority and revoke existing locks if possible.
final List<TaskLockPosse> reusablePosses = foundPosses
.stream()
.filter(posse -> posse.reusableFor(request))
.collect(Collectors.toList());
if (reusablePosses.isEmpty()) {
// case 1) this task doesn't have any lock, but others do
if ((request.getType().equals(TaskLockType.APPEND) || request.getType().equals(TaskLockType.REPLACE))
&& !request.getGranularity().equals(LockGranularity.TIME_CHUNK)) {
// APPEND and REPLACE locks are specific to time chunk locks
return null;
}
// First, check if the lock can coexist with its conflicting posses
// If not, revoke all lower priority locks of different types if the request has a greater priority
if (canLockCoexist(conflictPosses, request)
|| revokeAllIncompatibleActiveLocksIfPossible(conflictPosses, request)) {
posseToUse = createNewTaskLockPosse(request);
} else {
// During a rolling update, tasks of mixed versions can be run at the same time. Old tasks would request
// timeChunkLocks while new tasks would ask segmentLocks. The below check is to allow for old and new tasks
// to get locks of different granularities if they have the same groupId.
final boolean allDifferentGranularity = conflictPosses
.stream()
.allMatch(
conflictPosse -> conflictPosse.taskLock.getGranularity() != request.getGranularity()
&& conflictPosse.getTaskLock().getGroupId().equals(request.getGroupId())
&& conflictPosse.getTaskLock().getInterval().equals(request.getInterval())
);
if (allDifferentGranularity) {
// Lock collision was because of the different granularity in the same group.
// We can add a new taskLockPosse.
posseToUse = createNewTaskLockPosse(request);
} else {
log.info(
"Cannot create a new taskLockPosse for request[%s] because existing locks[%s] have same or higher priorities",
request,
conflictPosses
);
return null;
}
}
} else if (reusablePosses.size() == 1) {
// case 2) we found a lock posse for the given request
posseToUse = reusablePosses.get(0);
} else {
// case 3) we found multiple lock posses for the given task
throw new ISE(
"Task group[%s] has multiple locks for the same interval[%s]?",
request.getGroupId(),
request.getInterval()
);
}
} else {
// We don't have any locks for dataSource and interval.
// Let's make a new one.
posseToUse = createNewTaskLockPosse(request);
}
if (posseToUse == null || posseToUse.getTaskLock() == null) {
return null;
}
// Add to existing TaskLockPosse
if (posseToUse.addTask(task)) {
log.info("Added task[%s] to TaskLock[%s]", task.getId(), posseToUse.getTaskLock());
// If the task lock can be used instead of the conflicing posses, their locks can be released
for (TaskLockPosse conflictPosse : conflictPosses) {
if (conflictPosse.containsTask(task) && posseToUse.supersedes(conflictPosse)) {
unlock(task, conflictPosse.getTaskLock().getInterval());
}
}
if (persist) {
// Update task storage facility. If it fails, unlock it
try {
taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
}
catch (Exception e) {
log.makeAlert("Failed to persist lock in storage")
.addData("task", task.getId())
.addData("dataSource", posseToUse.getTaskLock().getDataSource())
.addData("interval", posseToUse.getTaskLock().getInterval())
.addData("version", posseToUse.getTaskLock().getVersion())
.emit();
unlock(
task,
posseToUse.getTaskLock().getInterval(),
posseToUse.getTaskLock().getGranularity() == LockGranularity.SEGMENT
? ((SegmentLock) posseToUse.taskLock).getPartitionId()
: null
);
return null;
}
}
} else {
log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
}
return posseToUse;
}
finally {
giant.unlock();
}
}
/**
* Create a new {@link TaskLockPosse} for a new {@link TaskLock}. This method will attempt to assign version strings
* that obey the invariant that every version string is lexicographically greater than any other version string
* previously assigned to the same interval. This invariant is only mostly guaranteed, however; we assume clock
* monotonicity and that callers specifying {@code preferredVersion} are doing the right thing.
*
* @param request request to lock
* @return a new {@link TaskLockPosse}
*/
private TaskLockPosse createNewTaskLockPosse(LockRequest request)
{
giant.lock();
try {
final TaskLockPosse posseToUse = new TaskLockPosse(request.toLock());
running.computeIfAbsent(request.getDataSource(), k -> new TreeMap<>())
.computeIfAbsent(
request.getInterval().getStart(),
k -> new TreeMap<>(Comparators.intervalsByStartThenEnd())
)
.computeIfAbsent(request.getInterval(), k -> new ArrayList<>())
.add(posseToUse);
return posseToUse;
}
finally {
giant.unlock();
}
}
/**
* Makes a call to the {@link #metadataStorageCoordinator} to allocate segments
* for the given requests. Updates the holder with the allocated segment if
* the allocation succeeds, otherwise marks it as failed.
*/
@VisibleForTesting
void allocateSegmentIds(
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
Collection<SegmentAllocationHolder> holders
)
{
if (holders.isEmpty()) {
return;
}
final List<SegmentCreateRequest> createRequests =
holders.stream()
.map(SegmentAllocationHolder::getSegmentRequest)
.collect(Collectors.toList());
Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatedSegments =
metadataStorageCoordinator.allocatePendingSegments(
dataSource,
interval,
skipSegmentLineageCheck,
createRequests
);
for (SegmentAllocationHolder holder : holders) {
SegmentIdWithShardSpec segmentId = allocatedSegments.get(holder.getSegmentRequest());
if (segmentId == null) {
holder.markFailed("Storage coordinator could not allocate segment.");
} else {
holder.setAllocatedSegment(segmentId);
}
}
}
private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment request, String version, String allocatorId)
{
return metadataStorageCoordinator.allocatePendingSegment(
request.getDataSource(),
request.getSequenceName(),
request.getPreviousSegmentId(),
request.getInterval(),
request.getPartialShardSpec(),
version,
request.isSkipSegmentLineageCheck(),
allocatorId
);
}
/**
* Perform the given action with a guarantee that the locks of the task are not revoked in the middle of action. This
* method first checks that all locks for the given task and intervals are valid and perform the right action.
* <p>
* The given action should be finished as soon as possible because all other methods in this class are blocked until
* this method is finished.
*
* @param task task performing a critical action
* @param intervals intervals
* @param action action to be performed inside of the critical section
*/
public <T> T doInCriticalSection(Task task, Set<Interval> intervals, CriticalAction<T> action) throws Exception
{
giant.lock();
try {
return action.perform(isTaskLocksValid(task, intervals));
}
finally {
giant.unlock();
}
}
/**
* Check all locks task acquired are still valid.
* It doesn't check other semantics like acquired locks are enough to overwrite existing segments.
* This kind of semantic should be checked in each caller of {@link #doInCriticalSection}.
*/
private boolean isTaskLocksValid(Task task, Set<Interval> intervals)
{
giant.lock();
try {
return intervals
.stream()
.allMatch(interval -> {
final List<TaskLockPosse> lockPosses = getOnlyTaskLockPosseContainingInterval(task, interval);
return lockPosses.stream().map(TaskLockPosse::getTaskLock).noneMatch(
TaskLock::isRevoked
);
});
}
finally {
giant.unlock();
}
}
private void revokeLock(TaskLockPosse lockPosse)
{
giant.lock();
try {
lockPosse.forEachTask(taskId -> revokeLock(taskId, lockPosse.getTaskLock()));
}
finally {
giant.unlock();
}
}
/**
* Mark the lock as revoked. Note that revoked locks are NOT removed. Instead, they are maintained in {@link #running}
* and {@link #taskStorage} as the normal locks do. This is to check locks are revoked when they are requested to be
* acquired and notify to the callers if revoked. Revoked locks are removed by calling
* {@link #unlock(Task, Interval)}.
*
* @param taskId an id of the task holding the lock
* @param lock lock to be revoked
*/
@VisibleForTesting
public void revokeLock(String taskId, TaskLock lock)
{
giant.lock();
try {
if (!activeTasks.contains(taskId)) {
throw new ISE("Cannot revoke lock for inactive task[%s]", taskId);
}
final Task task = taskStorage.getTask(taskId).orNull();
if (task == null) {
throw new ISE("Cannot revoke lock for unknown task[%s]", taskId);
}
log.info("Revoking task lock[%s] for task[%s]", lock, taskId);
if (lock.isRevoked()) {
log.warn("TaskLock[%s] is already revoked", lock);
} else {
final TaskLock revokedLock = lock.revokedCopy();
taskStorage.replaceLock(taskId, lock, revokedLock);
final List<TaskLockPosse> possesHolder = running.get(task.getDataSource())
.get(lock.getInterval().getStart())
.get(lock.getInterval());
final TaskLockPosse foundPosse = possesHolder.stream()
.filter(posse -> posse.getTaskLock().equals(lock))
.findFirst()
.orElseThrow(
() -> new ISE("Failed to find lock posse for lock[%s]", lock)
);
possesHolder.remove(foundPosse);
possesHolder.add(foundPosse.withTaskLock(revokedLock));
log.info("Revoked taskLock[%s]", lock);
}
}
finally {
giant.unlock();
}
}
/**
* Return the currently-active locks for some task.
*
* @param task task for which to locate locks
* @return currently-active locks for the given task
*/
public List<TaskLock> findLocksForTask(final Task task)
{
giant.lock();
try {
return Lists.transform(findLockPossesForTask(task), TaskLockPosse::getTaskLock);
}
finally {
giant.unlock();
}
}
/**
* Finds the active non-revoked REPLACE locks held by the given task.
*/
public Set<ReplaceTaskLock> findReplaceLocksForTask(Task task)
{
giant.lock();
try {
return getNonRevokedReplaceLocks(findLockPossesForTask(task), task.getDataSource());
}
finally {
giant.unlock();
}
}
/**
* Finds all the active non-revoked REPLACE locks for the given datasource.
*/
public Set<ReplaceTaskLock> getAllReplaceLocksForDatasource(String datasource)
{
giant.lock();
try {
final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> activeLocks = running.get(datasource);
if (activeLocks == null) {
return ImmutableSet.of();
}
List<TaskLockPosse> lockPosses
= activeLocks.values()
.stream()
.flatMap(map -> map.values().stream())
.flatMap(Collection::stream)
.collect(Collectors.toList());
return getNonRevokedReplaceLocks(lockPosses, datasource);
}
finally {
giant.unlock();
}
}
private Set<ReplaceTaskLock> getNonRevokedReplaceLocks(List<TaskLockPosse> posses, String datasource)
{
final Set<ReplaceTaskLock> replaceLocks = new HashSet<>();
for (TaskLockPosse posse : posses) {
final TaskLock lock = posse.getTaskLock();
if (lock.isRevoked() || !TaskLockType.REPLACE.equals(posse.getTaskLock().getType())) {
continue;
}
// Replace locks are always held by the supervisor task
if (posse.taskIds.size() > 1) {
throw DruidException.defensive(
"Replace lock[%s] for datasource[%s] is held by multiple tasks[%s]",
lock, datasource, posse.taskIds
);
}
String supervisorTaskId = posse.taskIds.iterator().next();
replaceLocks.add(
new ReplaceTaskLock(supervisorTaskId, lock.getInterval(), lock.getVersion())
);
}
return replaceLocks;
}
/**
* @param lockFilterPolicies Lock filters for the given datasources
* @return Map from datasource to intervals locked by tasks satisfying the lock filter condititions
*/
public Map<String, List<Interval>> getLockedIntervals(List<LockFilterPolicy> lockFilterPolicies)
{
final Map<String, Set<Interval>> datasourceToIntervals = new HashMap<>();
// Take a lock and populate the maps
giant.lock();
try {
lockFilterPolicies.forEach(
lockFilter -> {
final String datasource = lockFilter.getDatasource();
if (!running.containsKey(datasource)) {
return;
}
final int priority = lockFilter.getPriority();
final boolean isReplaceLock = TaskLockType.REPLACE.name().equals(
lockFilter.getContext().getOrDefault(
Tasks.TASK_LOCK_TYPE,
Tasks.DEFAULT_TASK_LOCK_TYPE
)
);
final boolean isUsingConcurrentLocks = Boolean.TRUE.equals(
lockFilter.getContext().getOrDefault(
Tasks.USE_CONCURRENT_LOCKS,
Tasks.DEFAULT_USE_CONCURRENT_LOCKS
)
);
final boolean ignoreAppendLocks = isUsingConcurrentLocks || isReplaceLock;
running.get(datasource).forEach(
(startTime, startTimeLocks) -> startTimeLocks.forEach(
(interval, taskLockPosses) -> taskLockPosses.forEach(
taskLockPosse -> {
if (taskLockPosse.getTaskLock().isRevoked()) {
// do nothing
} else if (ignoreAppendLocks
&& TaskLockType.APPEND.equals(taskLockPosse.getTaskLock().getType())) {
// do nothing
} else if (taskLockPosse.getTaskLock().getPriority() == null
|| taskLockPosse.getTaskLock().getPriority() < priority) {
// do nothing
} else {
datasourceToIntervals.computeIfAbsent(datasource, k -> new HashSet<>())
.add(interval);
}
}
)
)
);
}
);
}
finally {
giant.unlock();
}
return datasourceToIntervals.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new ArrayList<>(entry.getValue())
));
}
/**
* Gets a List of Intervals locked by higher priority tasks for each datasource.
* Here, Segment Locks are being treated the same as Time Chunk Locks i.e.
* a Task with a Segment Lock is assumed to lock a whole Interval and not just
* the corresponding Segment.
*
* @param minTaskPriority Minimum task priority for each datasource. Only the
* Intervals that are locked by Tasks with equal or
* higher priority than this are returned. Locked intervals
* for datasources that are not present in this Map are
* not returned.
* @return Map from Datasource to List of Intervals locked by Tasks that have
* priority greater than or equal to the {@code minTaskPriority} for that datasource.
*/
public Map<String, List<Interval>> getLockedIntervals(Map<String, Integer> minTaskPriority)
{
final Map<String, Set<Interval>> datasourceToIntervals = new HashMap<>();
// Take a lock and populate the maps
giant.lock();
try {
running.forEach(
(datasource, datasourceLocks) -> {
// If this datasource is not requested, do not proceed
if (!minTaskPriority.containsKey(datasource)) {
return;
}
datasourceLocks.forEach(
(startTime, startTimeLocks) -> startTimeLocks.forEach(
(interval, taskLockPosses) -> taskLockPosses.forEach(
taskLockPosse -> {
if (taskLockPosse.getTaskLock().isRevoked()) {
// Do not proceed if the lock is revoked
return;
} else if (taskLockPosse.getTaskLock().getPriority() == null
|| taskLockPosse.getTaskLock().getPriority() < minTaskPriority.get(datasource)) {
// Do not proceed if the lock has a priority strictly less than the minimum
return;
}
datasourceToIntervals
.computeIfAbsent(datasource, k -> new HashSet<>())
.add(interval);
})
)
);
}
);
}
finally {
giant.unlock();
}
return datasourceToIntervals.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> new ArrayList<>(entry.getValue())
));
}
public void unlock(final Task task, final Interval interval)
{
unlock(task, interval, null);
}
/**
* Release lock held for a task on a particular interval. Does nothing if the task does not currently
* hold the mentioned lock.
*
* @param task task to unlock
* @param interval interval to unlock
*/
public void unlock(final Task task, final Interval interval, @Nullable Integer partitionId)
{
giant.lock();
try {
final String dataSource = task.getDataSource();
final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(
task.getDataSource()
);
if (dsRunning == null || dsRunning.isEmpty()) {
return;
}
final SortedMap<Interval, List<TaskLockPosse>> intervalToPosses = dsRunning.get(interval.getStart());
if (intervalToPosses == null || intervalToPosses.isEmpty()) {
return;
}
final List<TaskLockPosse> possesHolder = intervalToPosses.get(interval);
if (possesHolder == null || possesHolder.isEmpty()) {
return;
}
final List<TaskLockPosse> posses = possesHolder.stream()
.filter(posse -> posse.containsTask(task))
.collect(Collectors.toList());
for (TaskLockPosse taskLockPosse : posses) {
final TaskLock taskLock = taskLockPosse.getTaskLock();
final boolean match = (partitionId == null && taskLock.getGranularity() == LockGranularity.TIME_CHUNK)
|| (partitionId != null
&& taskLock.getGranularity() == LockGranularity.SEGMENT
&& ((SegmentLock) taskLock).getPartitionId() == partitionId);
if (match) {
// Remove task from live list
log.info("Removing task[%s] from TaskLock[%s]", task.getId(), taskLock);
final boolean removed = taskLockPosse.removeTask(task);
if (taskLockPosse.isTasksEmpty()) {
log.info("TaskLock is now empty: %s", taskLock);
possesHolder.remove(taskLockPosse);
}
if (possesHolder.isEmpty()) {
intervalToPosses.remove(interval);
}
if (intervalToPosses.isEmpty()) {
dsRunning.remove(interval.getStart());
}
if (running.get(dataSource).isEmpty()) {
running.remove(dataSource);
}
// Wake up blocking-lock waiters
lockReleaseCondition.signalAll();
// Remove lock from storage. If it cannot be removed, just ignore the failure.
try {
taskStorage.removeLock(task.getId(), taskLock);
}
catch (Exception e) {
log.makeAlert(e, "Failed to clean up lock from storage")
.addData("task", task.getId())
.addData("dataSource", taskLock.getDataSource())
.addData("interval", taskLock.getInterval())
.addData("version", taskLock.getVersion())
.emit();
}
if (!removed) {
log.makeAlert("Lock release without acquire")
.addData("task", task.getId())
.addData("interval", interval)
.emit();
}
}
}
}
finally {
giant.unlock();
}
}
public void unlockAll(Task task)
{
giant.lock();
try {
for (final TaskLockPosse taskLockPosse : findLockPossesForTask(task)) {
unlock(
task,
taskLockPosse.getTaskLock().getInterval(),
taskLockPosse.getTaskLock().getGranularity() == LockGranularity.SEGMENT
? ((SegmentLock) taskLockPosse.taskLock).getPartitionId()
: null
);
}
}
finally {
giant.unlock();
}
}
public void add(Task task)
{
giant.lock();
try {
log.info("Adding task[%s] to activeTasks", task.getId());
activeTasks.add(task.getId());
trackAppendingTask(task);
}
finally {
giant.unlock();
}
}
@GuardedBy("giant")
private void trackAppendingTask(Task task)
{
if (task instanceof PendingSegmentAllocatingTask) {
final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
if (taskAllocatorId != null) {
activeAllocatorIdToTaskIds.computeIfAbsent(taskAllocatorId, s -> new HashSet<>())
.add(task.getId());
}
}
}
/**
* Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task.
*
* @param task task to unlock
*/
public void remove(final Task task)
{
giant.lock();
try {
try {
log.info("Removing task[%s] from activeTasks", task.getId());
try {
// Clean upgrade segments table for entries associated with replacing task
if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) {
final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
log.info(
"Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.",
upgradeSegmentsDeleted, task.getId()
);
}
// Clean pending segments associated with the appending task
if (task instanceof PendingSegmentAllocatingTask) {
final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
final Set<String> idsInSameGroup = activeAllocatorIdToTaskIds.get(taskAllocatorId);
idsInSameGroup.remove(task.getId());
if (idsInSameGroup.isEmpty()) {
final int pendingSegmentsDeleted
= metadataStorageCoordinator.deletePendingSegmentsForTaskGroup(taskAllocatorId);
log.info(
"Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.",
pendingSegmentsDeleted, taskAllocatorId
);
}
activeAllocatorIdToTaskIds.remove(taskAllocatorId);
}
}
}
catch (Exception e) {
log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments tables.");
}
unlockAll(task);
}
finally {
activeTasks.remove(task.getId());
}
}
finally {
giant.unlock();
}
}
/**
* Return the currently-active lock posses for some task.
*
* @param task task for which to locate locks
*/
private List<TaskLockPosse> findLockPossesForTask(final Task task)
{
giant.lock();
try {
// Scan through all locks for this datasource
final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(task.getDataSource());
if (dsRunning == null) {
return ImmutableList.of();
} else {
return dsRunning.values().stream()
.flatMap(map -> map.values().stream())
.flatMap(Collection::stream)
.filter(taskLockPosse -> taskLockPosse.containsTask(task))
.collect(Collectors.toList());
}
}
finally {
giant.unlock();
}
}
private List<TaskLockPosse> findLockPossesContainingInterval(final String dataSource, final Interval interval)
{
giant.lock();
try {
final List<TaskLockPosse> intervalOverlapsPosses = findLockPossesOverlapsInterval(dataSource, interval);
return intervalOverlapsPosses.stream()
.filter(taskLockPosse -> taskLockPosse.taskLock.getInterval().contains(interval))
.collect(Collectors.toList());
}
finally {
giant.unlock();
}
}
/**
* Return all locks that overlap some search interval.
*/
private List<TaskLockPosse> findLockPossesOverlapsInterval(final String dataSource, final Interval interval)
{
giant.lock();
try {
final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> dsRunning = running.get(dataSource);
if (dsRunning == null) {
// No locks at all
return Collections.emptyList();
} else {
return dsRunning.navigableKeySet().stream()
.filter(java.util.Objects::nonNull)
.map(dsRunning::get)
.filter(java.util.Objects::nonNull)
.flatMap(sortedMap -> sortedMap.entrySet().stream())
.filter(entry -> entry.getKey().overlaps(interval))
.flatMap(entry -> entry.getValue().stream())
.collect(Collectors.toList());
}
}
finally {
giant.unlock();
}
}
@VisibleForTesting
List<TaskLockPosse> getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
{
giant.lock();
try {
return getOnlyTaskLockPosseContainingInterval(task, interval, Collections.emptySet());
}
finally {
giant.unlock();
}
}
@VisibleForTesting
List<TaskLockPosse> getOnlyTaskLockPosseContainingInterval(Task task, Interval interval, Set<Integer> partitionIds)
{
giant.lock();
try {
final List<TaskLockPosse> filteredPosses = findLockPossesContainingInterval(task.getDataSource(), interval)
.stream()
.filter(lockPosse -> lockPosse.containsTask(task))
.collect(Collectors.toList());
if (filteredPosses.isEmpty()) {
throw new ISE("Cannot find locks for task[%s] and interval[%s]", task.getId(), interval);
} else if (filteredPosses.size() > 1) {
if (filteredPosses.stream()
.anyMatch(posse -> posse.getTaskLock().getGranularity() == LockGranularity.TIME_CHUNK)) {
throw new ISE(
"There are multiple timeChunk lockPosses for task[%s] and interval[%s]?",
task.getId(),
interval
);
} else {
final Map<Integer, TaskLockPosse> partitionIdsOfLocks = new HashMap<>();
for (TaskLockPosse posse : filteredPosses) {
final SegmentLock segmentLock = (SegmentLock) posse.getTaskLock();
partitionIdsOfLocks.put(segmentLock.getPartitionId(), posse);
}
if (partitionIds.stream().allMatch(partitionIdsOfLocks::containsKey)) {
return partitionIds.stream().map(partitionIdsOfLocks::get).collect(Collectors.toList());
} else {
throw new ISE(
"Task[%s] doesn't have locks for interval[%s] partitions[%]",
task.getId(),
interval,
partitionIds.stream().filter(pid -> !partitionIdsOfLocks.containsKey(pid)).collect(Collectors.toList())
);
}
}
} else {
return filteredPosses;
}
}
finally {
giant.unlock();
}
}
@VisibleForTesting
Set<String> getActiveTasks()
{
return activeTasks;
}
@VisibleForTesting
Map<String, NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>> getAllLocks()
{
return running;
}
/**
* Check if the lock for a given request can coexist with a given set of conflicting posses without any revocation.
* @param conflictPosses conflict lock posses
* @param request lock request
* @return true iff the lock can coexist with all its conflicting locks
*/
private boolean canLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest request)
{
switch (request.getType()) {
case APPEND:
return canAppendLockCoexist(conflictPosses, request);
case REPLACE:
return canReplaceLockCoexist(conflictPosses, request);
case SHARED:
return canSharedLockCoexist(conflictPosses);
case EXCLUSIVE:
return canExclusiveLockCoexist(conflictPosses);
default:
throw new UOE("Unsupported lock type: " + request.getType());
}
}
/**
* Check if an APPEND lock can coexist with a given set of conflicting posses.
* An APPEND lock can coexist with any number of other APPEND locks
* OR with at most one REPLACE lock over an interval which encloes this request.
* @param conflictPosses conflicting lock posses
* @param appendRequest append lock request
* @return true iff append lock can coexist with all its conflicting locks
*/
private boolean canAppendLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest appendRequest)
{
TaskLock replaceLock = null;
for (TaskLockPosse posse : conflictPosses) {
if (posse.getTaskLock().isRevoked()) {
continue;
}
if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
|| posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
return false;
}
if (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
if (replaceLock != null) {
return false;
}
replaceLock = posse.getTaskLock();
if (!replaceLock.getInterval().contains(appendRequest.getInterval())) {
return false;
}
}
}
return true;
}
/**
* Check if a REPLACE lock can coexist with a given set of conflicting posses.
* A REPLACE lock can coexist with any number of other APPEND locks and revoked locks
* @param conflictPosses conflicting lock posses
* @param replaceLock replace lock request
* @return true iff replace lock can coexist with all its conflicting locks
*/
private boolean canReplaceLockCoexist(List<TaskLockPosse> conflictPosses, LockRequest replaceLock)
{
for (TaskLockPosse posse : conflictPosses) {
if (posse.getTaskLock().isRevoked()) {
continue;
}
if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
|| posse.getTaskLock().getType().equals(TaskLockType.SHARED)
|| posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
return false;
}
if (posse.getTaskLock().getType().equals(TaskLockType.APPEND)
&& !replaceLock.getInterval().contains(posse.getTaskLock().getInterval())) {
return false;
}
}
return true;
}
/**
* Check if a SHARED lock can coexist with a given set of conflicting posses.
* A SHARED lock can coexist with any number of other active SHARED locks
* @param conflictPosses conflicting lock posses
* @return true iff shared lock can coexist with all its conflicting locks
*/
private boolean canSharedLockCoexist(List<TaskLockPosse> conflictPosses)
{
for (TaskLockPosse posse : conflictPosses) {
if (posse.getTaskLock().isRevoked()) {
continue;
}
if (posse.getTaskLock().getType().equals(TaskLockType.EXCLUSIVE)
|| posse.getTaskLock().getType().equals(TaskLockType.APPEND)
|| posse.getTaskLock().getType().equals(TaskLockType.REPLACE)) {
return false;
}
}
return true;
}
/**
* Check if an EXCLUSIVE lock can coexist with a given set of conflicting posses.
* An EXCLUSIVE lock cannot coexist with any other overlapping active locks
* @param conflictPosses conflicting lock posses
* @return true iff the exclusive lock can coexist with all its conflicting locks
*/
private boolean canExclusiveLockCoexist(List<TaskLockPosse> conflictPosses)
{
for (TaskLockPosse posse : conflictPosses) {
if (posse.getTaskLock().isRevoked()) {
continue;
}
return false;
}
return true;
}
/**
* Verify if every incompatible active lock is revokable. If yes, revoke all of them.
* - EXCLUSIVE locks are incompatible with every other conflicting lock
* - SHARED locks are incompatible with conflicting locks of every other type
* - REPLACE locks are incompatible with every conflicting lock which is not (APPEND and enclosed) within its interval
* - APPEND locks are incompatible with every EXCLUSIVE and SHARED lock.
* Conflicting REPLACE locks which don't enclose its interval are also incompatible.
* @param conflictPosses conflicting lock posses
* @param request lock request
* @return true iff every incompatible lock is revocable.
*/
private boolean revokeAllIncompatibleActiveLocksIfPossible(
List<TaskLockPosse> conflictPosses,
LockRequest request
)
{
final int priority = request.getPriority();
final TaskLockType type = request.getType();
final List<TaskLockPosse> possesToRevoke = new ArrayList<>();
for (TaskLockPosse posse : conflictPosses) {
if (posse.getTaskLock().isRevoked()) {
continue;
}
switch (type) {
case EXCLUSIVE:
if (posse.getTaskLock().getNonNullPriority() >= priority) {
return false;
}
possesToRevoke.add(posse);
break;
case SHARED:
if (!posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
if (posse.getTaskLock().getNonNullPriority() >= priority) {
return false;
}
possesToRevoke.add(posse);
}
break;
case REPLACE:
if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
&& request.getInterval().contains(posse.getTaskLock().getInterval()))) {
if (posse.getTaskLock().getNonNullPriority() >= priority) {
return false;
}
possesToRevoke.add(posse);
}
break;
case APPEND:
if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
|| (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)
&& posse.getTaskLock().getInterval().contains(request.getInterval())))) {
if (posse.getTaskLock().getNonNullPriority() >= priority) {
return false;
}
possesToRevoke.add(posse);
}
break;
default:
throw new UOE("Unsupported lock type: " + type);
}
}
for (TaskLockPosse revokablePosse : possesToRevoke) {
revokeLock(revokablePosse);
}
return true;
}
/**
* Task locks for tasks of the same groupId
*/
static class TaskLockPosse
{
private final TaskLock taskLock;
private final Set<String> taskIds;
TaskLockPosse(TaskLock taskLock)
{
this.taskLock = taskLock;
this.taskIds = new HashSet<>();
}
private TaskLockPosse(TaskLock taskLock, Set<String> taskIds)
{
this.taskLock = taskLock;
this.taskIds = new HashSet<>(taskIds);
}
TaskLockPosse withTaskLock(TaskLock taskLock)
{
return new TaskLockPosse(taskLock, taskIds);
}
TaskLock getTaskLock()
{
return taskLock;
}
boolean addTask(Task task)
{
if (taskLock.getType() == TaskLockType.EXCLUSIVE) {
Preconditions.checkArgument(
taskLock.getGroupId().equals(task.getGroupId()),
"groupId[%s] of task[%s] is different from the existing lockPosse's groupId[%s]",
task.getGroupId(),
task.getId(),
taskLock.getGroupId()
);
}
Preconditions.checkArgument(
taskLock.getNonNullPriority() == task.getPriority(),
"priority[%s] of task[%s] is different from the existing lockPosse's priority[%s]",
task.getPriority(),
task.getId(),
taskLock.getNonNullPriority()
);
return taskIds.add(task.getId());
}
boolean containsTask(Task task)
{
Preconditions.checkNotNull(task, "task");
return taskIds.contains(task.getId());
}
boolean removeTask(Task task)
{
Preconditions.checkNotNull(task, "task");
return taskIds.remove(task.getId());
}
boolean isTasksEmpty()
{
return taskIds.isEmpty();
}
/**
* Checks if an APPEND time chunk lock can be reused for another append time chunk lock that already exists
* and has an interval that strictly contains the other's interval
* We do not expect multiple locks to exist with the same interval as the existing lock would be reused.
* A new append lock with a strictly encompassing interval can be created when a concurrent replace
* with a coarser granularity commits its segments and the appending task makes subsequent allocations
* @param other the conflicting lockPosse that already exists
* @return true if the task can be unlocked from the other posse after it has been added to the newly created posse.
*/
boolean supersedes(TaskLockPosse other)
{
final TaskLock otherLock = other.taskLock;
return !taskLock.isRevoked()
&& taskLock.getGranularity() == LockGranularity.TIME_CHUNK
&& taskLock.getGranularity() == otherLock.getGranularity()
&& taskLock.getType() == TaskLockType.APPEND
&& taskLock.getType() == otherLock.getType()
&& taskLock.getVersion().compareTo(otherLock.getVersion()) >= 0
&& !taskLock.getInterval().equals(otherLock.getInterval())
&& taskLock.getInterval().contains(otherLock.getInterval())
&& taskLock.getGroupId().equals(otherLock.getGroupId());
}
boolean reusableFor(LockRequest request)
{
if (taskLock.getType() == request.getType() && taskLock.getGranularity() == request.getGranularity()) {
switch (taskLock.getType()) {
case REPLACE:
case APPEND:
case SHARED:
if (request instanceof TimeChunkLockRequest) {
return taskLock.getInterval().contains(request.getInterval())
&& taskLock.getGroupId().equals(request.getGroupId());
}
return false;
case EXCLUSIVE:
if (request instanceof TimeChunkLockRequest) {
return taskLock.getInterval().contains(request.getInterval())
&& taskLock.getGroupId().equals(request.getGroupId());
} else if (request instanceof SpecificSegmentLockRequest) {
final SegmentLock segmentLock = (SegmentLock) taskLock;
final SpecificSegmentLockRequest specificSegmentLockRequest = (SpecificSegmentLockRequest) request;
return segmentLock.getInterval().contains(specificSegmentLockRequest.getInterval())
&& segmentLock.getGroupId().equals(specificSegmentLockRequest.getGroupId())
&& specificSegmentLockRequest.getPartitionId() == segmentLock.getPartitionId();
} else {
throw new ISE("Unknown request type[%s]", request);
}
//noinspection SuspiciousIndentAfterControlStatement
default:
throw new ISE("Unknown lock type[%s]", taskLock.getType());
}
}
return false;
}
void forEachTask(Consumer<String> action)
{
Preconditions.checkNotNull(action, "action");
taskIds.forEach(action);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || !getClass().equals(o.getClass())) {
return false;
}
TaskLockPosse that = (TaskLockPosse) o;
return java.util.Objects.equals(taskLock, that.taskLock) &&
java.util.Objects.equals(taskIds, that.taskIds);
}
@Override
public int hashCode()
{
return Objects.hashCode(taskLock, taskIds);
}
@Override
public String toString()
{
return MoreObjects.toStringHelper(this)
.add("taskLock", taskLock)
.add("taskIds", taskIds)
.toString();
}
}
/**
* Maintains a list of pending allocation holders.
*/
private static class AllocationHolderList
{
final List<SegmentAllocationHolder> all = new ArrayList<>();
final Set<SegmentAllocationHolder> pending = new HashSet<>();
final Set<SegmentAllocationHolder> recentlyCompleted = new HashSet<>();
AllocationHolderList(List<SegmentAllocateRequest> requests, Interval interval)
{
for (SegmentAllocateRequest request : requests) {
SegmentAllocationHolder holder = new SegmentAllocationHolder(request, interval, this);
all.add(holder);
pending.add(holder);
}
}
void markCompleted(SegmentAllocationHolder holder)
{
recentlyCompleted.add(holder);
}
Set<SegmentAllocationHolder> getPending()
{
pending.removeAll(recentlyCompleted);
recentlyCompleted.clear();
return pending;
}
List<SegmentAllocateResult> getResults()
{
return all.stream().map(holder -> holder.result).collect(Collectors.toList());
}
}
/**
* Contains the task, request, lock and final result for a segment allocation.
*/
@VisibleForTesting
static class SegmentAllocationHolder
{
final AllocationHolderList list;
final Task task;
final Interval allocateInterval;
final SegmentAllocateAction action;
final LockRequestForNewSegment lockRequest;
SegmentCreateRequest segmentRequest;
TaskLock acquiredLock;
TaskLockPosse taskLockPosse;
Interval lockRequestInterval;
SegmentIdWithShardSpec allocatedSegment;
SegmentAllocateResult result;
SegmentAllocationHolder(SegmentAllocateRequest request, Interval allocateInterval, AllocationHolderList list)
{
this.list = list;
this.allocateInterval = allocateInterval;
this.task = request.getTask();
this.action = request.getAction();
this.lockRequest = new LockRequestForNewSegment(
action.getLockGranularity(),
action.getTaskLockType(),
task.getGroupId(),
action.getDataSource(),
allocateInterval,
action.getPartialShardSpec(),
task.getPriority(),
action.getSequenceName(),
action.getPreviousSegmentId(),
action.isSkipSegmentLineageCheck()
);
}
SegmentCreateRequest getSegmentRequest()
{
// Initialize the first time this is requested
if (segmentRequest == null) {
segmentRequest = new SegmentCreateRequest(
action.getSequenceName(),
action.getPreviousSegmentId(),
acquiredLock == null ? lockRequest.getVersion() : acquiredLock.getVersion(),
action.getPartialShardSpec(),
null,
((PendingSegmentAllocatingTask) task).getTaskAllocatorId()
);
}
return segmentRequest;
}
void markFailed(String msgFormat, Object... args)
{
list.markCompleted(this);
result = new SegmentAllocateResult(null, StringUtils.format(msgFormat, args));
}
void markSucceeded()
{
list.markCompleted(this);
result = new SegmentAllocateResult(allocatedSegment, null);
}
void setAllocatedSegment(SegmentIdWithShardSpec segmentId)
{
this.allocatedSegment = segmentId;
}
void setAcquiredLock(TaskLockPosse lockPosse, Interval lockRequestInterval)
{
this.taskLockPosse = lockPosse;
this.acquiredLock = lockPosse == null ? null : lockPosse.getTaskLock();
this.lockRequestInterval = lockRequestInterval;
}
}
}