blob: 2db2f38b486b3628c22836a54a80bcff9d96d627 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.scheduler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.state.KeyGroupRange;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Consumer;
/** Testing implementation of the {@link SchedulerNG}. */
public class TestingSchedulerNG implements SchedulerNG {
private final CompletableFuture<Void> terminationFuture;
private final Runnable startSchedulingRunnable;
private final Consumer<Throwable> suspendConsumer;
private final BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction;
private final Consumer<Throwable> handleGlobalFailureConsumer;
public TestingSchedulerNG(
CompletableFuture<Void> terminationFuture,
Runnable startSchedulingRunnable,
Consumer<Throwable> suspendConsumer,
BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction,
Consumer<Throwable> handleGlobalFailureConsumer) {
this.terminationFuture = terminationFuture;
this.startSchedulingRunnable = startSchedulingRunnable;
this.suspendConsumer = suspendConsumer;
this.triggerSavepointFunction = triggerSavepointFunction;
this.handleGlobalFailureConsumer = handleGlobalFailureConsumer;
}
@Override
public void setMainThreadExecutor(ComponentMainThreadExecutor mainThreadExecutor) {}
@Override
public void registerJobStatusListener(JobStatusListener jobStatusListener) {}
@Override
public void startScheduling() {
startSchedulingRunnable.run();
}
private void failOperation() {
throw new UnsupportedOperationException("This operation is not supported.");
}
@Override
public void suspend(Throwable cause) {
suspendConsumer.accept(cause);
}
@Override
public void cancel() {}
@Override
public CompletableFuture<Void> getTerminationFuture() {
return terminationFuture;
}
@Override
public void handleGlobalFailure(Throwable cause) {
handleGlobalFailureConsumer.accept(cause);
}
@Override
public boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState) {
failOperation();
return false;
}
@Override
public SerializedInputSplit requestNextInputSplit(
JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
failOperation();
return null;
}
@Override
public ExecutionState requestPartitionState(
IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId)
throws PartitionProducerDisposedException {
failOperation();
return null;
}
@Override
public void scheduleOrUpdateConsumers(ResultPartitionID partitionID) {
failOperation();
}
@Override
public ArchivedExecutionGraph requestJob() {
failOperation();
return null;
}
@Override
public JobStatus requestJobStatus() {
return JobStatus.CREATED;
}
@Override
public JobDetails requestJobDetails() {
failOperation();
return null;
}
@Override
public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) {
failOperation();
return null;
}
@Override
public void notifyKvStateRegistered(
JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName,
KvStateID kvStateId,
InetSocketAddress kvStateServerAddress) {
failOperation();
}
@Override
public void notifyKvStateUnregistered(
JobID jobId,
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName) {
failOperation();
}
@Override
public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
failOperation();
}
@Override
public Optional<OperatorBackPressureStats> requestOperatorBackPressureStats(
JobVertexID jobVertexId) {
failOperation();
return Optional.empty();
}
@Override
public CompletableFuture<String> triggerSavepoint(
@Nullable String targetDirectory, boolean cancelJob) {
failOperation();
return null;
}
@Override
public void acknowledgeCheckpoint(
JobID jobID,
ExecutionAttemptID executionAttemptID,
long checkpointId,
CheckpointMetrics checkpointMetrics,
TaskStateSnapshot checkpointState) {
failOperation();
}
@Override
public void declineCheckpoint(DeclineCheckpoint decline) {
failOperation();
}
@Override
public CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean terminate) {
failOperation();
return null;
}
@Override
public void deliverOperatorEventToCoordinator(
ExecutionAttemptID taskExecution, OperatorID operator, OperatorEvent evt) {
failOperation();
}
@Override
public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(
OperatorID operator, CoordinationRequest request) {
failOperation();
return null;
}
public static Builder newBuilder() {
return new Builder();
}
/** Builder for the TestingSchedulerNG. */
public static final class Builder {
private CompletableFuture<Void> terminationFuture = new CompletableFuture<>();
private Runnable startSchedulingRunnable = () -> {};
private Consumer<Throwable> suspendConsumer = ignored -> {};
private BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction =
(ignoredA, ignoredB) -> new CompletableFuture<>();
private Consumer<Throwable> handleGlobalFailureConsumer = (ignored) -> {};
public Builder setTerminationFuture(CompletableFuture<Void> terminationFuture) {
this.terminationFuture = terminationFuture;
return this;
}
public Builder setStartSchedulingRunnable(Runnable startSchedulingRunnable) {
this.startSchedulingRunnable = startSchedulingRunnable;
return this;
}
public Builder setSuspendConsumer(Consumer<Throwable> suspendConsumer) {
this.suspendConsumer = suspendConsumer;
return this;
}
public Builder setTriggerSavepointFunction(
BiFunction<String, Boolean, CompletableFuture<String>> triggerSavepointFunction) {
this.triggerSavepointFunction = triggerSavepointFunction;
return this;
}
public Builder setHandleGlobalFailureConsumer(
Consumer<Throwable> handleGlobalFailureConsumer) {
this.handleGlobalFailureConsumer = handleGlobalFailureConsumer;
return this;
}
public TestingSchedulerNG build() {
return new TestingSchedulerNG(
terminationFuture,
startSchedulingRunnable,
suspendConsumer,
triggerSavepointFunction,
handleGlobalFailureConsumer);
}
}
}