blob: 24f0444df7d3f6c81ffd8e6e0e9a11c4f5c51441 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.scheduler;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.util.DualKeyLinkedMap;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* Shared slot implementation for the {@link SlotSharingExecutionSlotAllocator}.
*
* <p>The shared slots are owned and tracked by {@link SlotSharingExecutionSlotAllocator}. The
* shared slot represents a collection of {@link SingleLogicalSlot} requests which share one
* physical slot. The shared slot is created by the {@link SlotSharingExecutionSlotAllocator} from
* the physical slot request. Afterwards, {@link SlotSharingExecutionSlotAllocator} requests logical
* slots from the underlying physical slot for {@link Execution executions} which share it.
*
* <p>The shared slot becomes a {@link PhysicalSlot.Payload} of its underlying physical slot once
* the physical slot is obtained. If the allcoated physical slot gets released then it calls back
* the shared slot to release the logical slots which fail their execution payloads.
*
* <p>A logical slot request can be cancelled if it is not completed yet or returned by the
* execution if it has been completed and given to the execution by {@link
* SlotSharingExecutionSlotAllocator}. If the underlying physical slot fails, it fails all logical
* slot requests. The failed, cancelled or returned logical slot requests are removed from the
* shared slot. Once the shared slot has no registered logical slot requests, it calls back its
* {@link SlotSharingExecutionSlotAllocator} to remove it from the allocator and cancel its
* underlying physical slot request if the request is not fulfilled yet.
*/
class SharedSlot implements SlotOwner, PhysicalSlot.Payload {
private static final Logger LOG = LoggerFactory.getLogger(SharedSlot.class);
private final SlotRequestId physicalSlotRequestId;
private final ResourceProfile physicalSlotResourceProfile;
private final ExecutionSlotSharingGroup executionSlotSharingGroup;
private final CompletableFuture<PhysicalSlot> slotContextFuture;
private final DualKeyLinkedMap<
ExecutionVertexID, SlotRequestId, CompletableFuture<SingleLogicalSlot>>
requestedLogicalSlots;
private final boolean slotWillBeOccupiedIndefinitely;
private final Consumer<ExecutionSlotSharingGroup> externalReleaseCallback;
private State state;
SharedSlot(
SlotRequestId physicalSlotRequestId,
ResourceProfile physicalSlotResourceProfile,
ExecutionSlotSharingGroup executionSlotSharingGroup,
CompletableFuture<PhysicalSlot> slotContextFuture,
boolean slotWillBeOccupiedIndefinitely,
Consumer<ExecutionSlotSharingGroup> externalReleaseCallback) {
this.physicalSlotRequestId = physicalSlotRequestId;
this.physicalSlotResourceProfile = physicalSlotResourceProfile;
this.executionSlotSharingGroup = executionSlotSharingGroup;
this.slotContextFuture =
slotContextFuture.thenApply(
physicalSlot -> {
Preconditions.checkState(
physicalSlot.tryAssignPayload(this),
"Unexpected physical slot payload assignment failure!");
return physicalSlot;
});
this.requestedLogicalSlots =
new DualKeyLinkedMap<>(executionSlotSharingGroup.getExecutionVertexIds().size());
this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
this.externalReleaseCallback = externalReleaseCallback;
this.state = State.ALLOCATED;
}
SlotRequestId getPhysicalSlotRequestId() {
return physicalSlotRequestId;
}
ResourceProfile getPhysicalSlotResourceProfile() {
return physicalSlotResourceProfile;
}
public ExecutionSlotSharingGroup getExecutionSlotSharingGroup() {
return executionSlotSharingGroup;
}
CompletableFuture<PhysicalSlot> getSlotContextFuture() {
return slotContextFuture;
}
/**
* Registers an allocation request for a logical slot.
*
* <p>The logical slot request is complete once the underlying physical slot request is
* complete.
*
* @param executionVertexId {@link ExecutionVertexID} of the execution for which to allocate the
* logical slot
* @return the logical slot future
*/
CompletableFuture<LogicalSlot> allocateLogicalSlot(ExecutionVertexID executionVertexId) {
Preconditions.checkArgument(
executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexId),
"Trying to allocate a logical slot for execution %s which is not in the ExecutionSlotSharingGroup",
executionVertexId);
CompletableFuture<SingleLogicalSlot> logicalSlotFuture =
requestedLogicalSlots.getValueByKeyA(executionVertexId);
if (logicalSlotFuture != null) {
LOG.debug("Request for {} already exists", getLogicalSlotString(executionVertexId));
} else {
logicalSlotFuture = allocateNonExistentLogicalSlot(executionVertexId);
}
return logicalSlotFuture.thenApply(Function.identity());
}
private CompletableFuture<SingleLogicalSlot> allocateNonExistentLogicalSlot(
ExecutionVertexID executionVertexId) {
CompletableFuture<SingleLogicalSlot> logicalSlotFuture;
SlotRequestId logicalSlotRequestId = new SlotRequestId();
String logMessageBase = getLogicalSlotString(logicalSlotRequestId, executionVertexId);
LOG.debug("Request a {}", logMessageBase);
logicalSlotFuture =
slotContextFuture.thenApply(
physicalSlot -> {
LOG.debug("Allocated {}", logMessageBase);
return createLogicalSlot(physicalSlot, logicalSlotRequestId);
});
requestedLogicalSlots.put(executionVertexId, logicalSlotRequestId, logicalSlotFuture);
// If the physical slot request fails (slotContextFuture), it will also fail the
// logicalSlotFuture.
// Therefore, the next `exceptionally` callback will call removeLogicalSlotRequest and do
// the cleanup
// in requestedLogicalSlots and eventually in sharedSlots
logicalSlotFuture.exceptionally(
cause -> {
LOG.debug("Failed {}", logMessageBase, cause);
removeLogicalSlotRequest(logicalSlotRequestId);
return null;
});
return logicalSlotFuture;
}
private SingleLogicalSlot createLogicalSlot(
PhysicalSlot physicalSlot, SlotRequestId logicalSlotRequestId) {
return new SingleLogicalSlot(
logicalSlotRequestId,
physicalSlot,
Locality.UNKNOWN,
this,
slotWillBeOccupiedIndefinitely);
}
/**
* Cancels a logical slot request.
*
* <p>If the logical slot request is already complete, nothing happens because the logical slot
* is already given to the execution and it the responsibility of the execution to call {@link
* #returnLogicalSlot(LogicalSlot)}.
*
* <p>If the logical slot request is not complete yet, its future gets cancelled or failed.
*
* @param executionVertexID {@link ExecutionVertexID} of the execution for which to cancel the
* logical slot
* @param cause the reason of cancellation or null if it is not available
*/
void cancelLogicalSlotRequest(ExecutionVertexID executionVertexID, @Nullable Throwable cause) {
Preconditions.checkState(
state == State.ALLOCATED,
"SharedSlot (physical request %s) has been released",
physicalSlotRequestId);
CompletableFuture<SingleLogicalSlot> logicalSlotFuture =
requestedLogicalSlots.getValueByKeyA(executionVertexID);
SlotRequestId logicalSlotRequestId = requestedLogicalSlots.getKeyBByKeyA(executionVertexID);
if (logicalSlotFuture != null) {
LOG.debug(
"Cancel {} from {}",
getLogicalSlotString(logicalSlotRequestId),
executionVertexID);
// If the logicalSlotFuture was not completed and now it fails, the exceptionally
// callback will also call removeLogicalSlotRequest
if (cause == null) {
logicalSlotFuture.cancel(false);
} else {
logicalSlotFuture.completeExceptionally(cause);
}
} else {
LOG.debug(
"No SlotExecutionVertexAssignment for logical {} from physical {}}",
logicalSlotRequestId,
physicalSlotRequestId);
}
}
@Override
public void returnLogicalSlot(LogicalSlot logicalSlot) {
removeLogicalSlotRequest(logicalSlot.getSlotRequestId());
}
private void removeLogicalSlotRequest(SlotRequestId logicalSlotRequestId) {
LOG.debug("Remove {}", getLogicalSlotString(logicalSlotRequestId));
Preconditions.checkState(
requestedLogicalSlots.removeKeyB(logicalSlotRequestId) != null,
"Trying to remove a logical slot request which has been either already removed or never created.");
releaseExternally();
}
@Override
public void release(Throwable cause) {
Preconditions.checkState(
slotContextFuture.isDone(),
"Releasing of the shared slot is expected only from its successfully allocated physical slot ({})",
physicalSlotRequestId);
LOG.debug("Release shared slot ({})", physicalSlotRequestId);
// copy the logical slot collection to avoid ConcurrentModificationException
// if logical slot releases cause cancellation of other executions
// which will try to call returnLogicalSlot and modify requestedLogicalSlots collection
Map<ExecutionVertexID, CompletableFuture<SingleLogicalSlot>> logicalSlotFutures =
requestedLogicalSlots.keySetA().stream()
.collect(
Collectors.toMap(
executionVertexId -> executionVertexId,
requestedLogicalSlots::getValueByKeyA));
for (Map.Entry<ExecutionVertexID, CompletableFuture<SingleLogicalSlot>> entry :
logicalSlotFutures.entrySet()) {
LOG.debug("Release {}", getLogicalSlotString(entry.getKey()));
CompletableFuture<SingleLogicalSlot> logicalSlotFuture = entry.getValue();
Preconditions.checkNotNull(logicalSlotFuture);
Preconditions.checkState(
logicalSlotFuture.isDone(),
"Logical slot future must already done when release call comes from the successfully allocated physical slot ({})",
physicalSlotRequestId);
logicalSlotFuture.thenAccept(logicalSlot -> logicalSlot.release(cause));
}
requestedLogicalSlots.clear();
releaseExternally();
}
private void releaseExternally() {
if (state != State.RELEASED && requestedLogicalSlots.values().isEmpty()) {
state = State.RELEASED;
LOG.debug("Release shared slot externally ({})", physicalSlotRequestId);
externalReleaseCallback.accept(executionSlotSharingGroup);
}
}
@Override
public boolean willOccupySlotIndefinitely() {
return slotWillBeOccupiedIndefinitely;
}
private String getLogicalSlotString(SlotRequestId logicalSlotRequestId) {
return getLogicalSlotString(
logicalSlotRequestId, requestedLogicalSlots.getKeyAByKeyB(logicalSlotRequestId));
}
private String getLogicalSlotString(ExecutionVertexID executionVertexId) {
return getLogicalSlotString(
requestedLogicalSlots.getKeyBByKeyA(executionVertexId), executionVertexId);
}
private String getLogicalSlotString(
SlotRequestId logicalSlotRequestId, ExecutionVertexID executionVertexId) {
return String.format(
"logical slot (%s) for execution vertex (id %s) from the physical slot (%s)",
logicalSlotRequestId, executionVertexId, physicalSlotRequestId);
}
/** Returns whether the shared slot has no assigned logical slot requests. */
boolean isEmpty() {
return requestedLogicalSlots.size() == 0;
}
private enum State {
ALLOCATED,
RELEASED
}
}