blob: 12acfa150f6d35a2bcc716476006049be1d05aef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.fnexecution.jobsubmission;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables.getRootCause;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables.getStackTraceAsString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.FutureCallback;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListenableFuture;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Internal representation of a Job which has been invoked (prepared and run) by a client. */
public class JobInvocation {
private static final Logger LOG = LoggerFactory.getLogger(JobInvocation.class);
private final RunnerApi.Pipeline pipeline;
private final PortablePipelineRunner pipelineRunner;
private final JobInfo jobInfo;
private final ListeningExecutorService executorService;
private List<Consumer<Enum>> stateObservers;
private List<Consumer<JobMessage>> messageObservers;
private JobState.Enum jobState;
private JobApi.MetricResults metrics;
private PortablePipelineResult resultHandle;
@Nullable private ListenableFuture<PortablePipelineResult> invocationFuture;
public JobInvocation(
JobInfo jobInfo,
ListeningExecutorService executorService,
Pipeline pipeline,
PortablePipelineRunner pipelineRunner) {
this.jobInfo = jobInfo;
this.executorService = executorService;
this.pipeline = pipeline;
this.pipelineRunner = pipelineRunner;
this.stateObservers = new ArrayList<>();
this.messageObservers = new ArrayList<>();
this.invocationFuture = null;
this.jobState = JobState.Enum.STOPPED;
this.metrics = JobApi.MetricResults.newBuilder().build();
}
private PortablePipelineResult runPipeline() throws Exception {
return pipelineRunner.run(pipeline, jobInfo);
}
/** Start the job. */
public synchronized void start() {
LOG.info("Starting job invocation {}", getId());
if (getState() != JobState.Enum.STOPPED) {
throw new IllegalStateException(String.format("Job %s already running.", getId()));
}
setState(JobState.Enum.STARTING);
invocationFuture = executorService.submit(this::runPipeline);
// TODO: Defer transitioning until the pipeline is up and running.
setState(JobState.Enum.RUNNING);
Futures.addCallback(
invocationFuture,
new FutureCallback<PortablePipelineResult>() {
@Override
public void onSuccess(PortablePipelineResult pipelineResult) {
if (pipelineResult != null) {
PipelineResult.State state = pipelineResult.getState();
if (state.isTerminal()) {
metrics = pipelineResult.portableMetrics();
} else {
resultHandle = pipelineResult;
}
switch (state) {
case DONE:
setState(Enum.DONE);
break;
case RUNNING:
setState(Enum.RUNNING);
break;
case CANCELLED:
setState(Enum.CANCELLED);
break;
case FAILED:
setState(Enum.FAILED);
break;
default:
setState(JobState.Enum.UNSPECIFIED);
}
} else {
setState(JobState.Enum.UNSPECIFIED);
}
}
@Override
public void onFailure(@Nonnull Throwable throwable) {
if (throwable instanceof CancellationException) {
// We have canceled execution, just update the job state
setState(JobState.Enum.CANCELLED);
return;
}
String message = String.format("Error during job invocation %s.", getId());
LOG.error(message, throwable);
sendMessage(
JobMessage.newBuilder()
.setMessageText(getStackTraceAsString(throwable))
.setImportance(JobMessage.MessageImportance.JOB_MESSAGE_DEBUG)
.build());
sendMessage(
JobMessage.newBuilder()
.setMessageText(getRootCause(throwable).toString())
.setImportance(JobMessage.MessageImportance.JOB_MESSAGE_ERROR)
.build());
setState(JobState.Enum.FAILED);
}
},
executorService);
}
/** @return Unique identifier for the job invocation. */
public String getId() {
return jobInfo.jobId();
}
/** Cancel the job. */
public synchronized void cancel() {
LOG.info("Canceling job invocation {}", getId());
if (this.invocationFuture != null) {
this.invocationFuture.cancel(true /* mayInterruptIfRunning */);
Futures.addCallback(
invocationFuture,
new FutureCallback<PortablePipelineResult>() {
@Override
public void onSuccess(PortablePipelineResult pipelineResult) {
// Do not cancel when we are already done.
if (pipelineResult != null
&& pipelineResult.getState() != PipelineResult.State.DONE) {
try {
pipelineResult.cancel();
setState(JobState.Enum.CANCELLED);
} catch (IOException exn) {
throw new RuntimeException(exn);
}
}
}
@Override
public void onFailure(Throwable throwable) {}
},
executorService);
}
}
public JobApi.MetricResults getMetrics() {
if (resultHandle != null) {
metrics = resultHandle.portableMetrics();
}
return metrics;
}
/** Retrieve the job's current state. */
public JobState.Enum getState() {
return this.jobState;
}
/** Retrieve the job's pipeline. */
public RunnerApi.Pipeline getPipeline() {
return this.pipeline;
}
/** Listen for job state changes with a {@link Consumer}. */
public synchronized void addStateListener(Consumer<JobState.Enum> stateStreamObserver) {
stateStreamObserver.accept(getState());
stateObservers.add(stateStreamObserver);
}
/** Listen for job messages with a {@link Consumer}. */
public synchronized void addMessageListener(Consumer<JobMessage> messageStreamObserver) {
messageObservers.add(messageStreamObserver);
}
/** Convert to {@link JobApi.JobInfo}. */
public JobApi.JobInfo toProto() {
return JobApi.JobInfo.newBuilder()
.setJobId(jobInfo.jobId())
.setJobName(jobInfo.jobName())
.setPipelineOptions(jobInfo.pipelineOptions())
.setState(getState())
.build();
}
private synchronized void setState(JobState.Enum state) {
this.jobState = state;
for (Consumer<JobState.Enum> observer : stateObservers) {
observer.accept(state);
}
}
private synchronized void sendMessage(JobMessage message) {
for (Consumer<JobMessage> observer : messageObservers) {
observer.accept(message);
}
}
static Boolean isTerminated(Enum state) {
switch (state) {
case DONE:
case FAILED:
case CANCELLED:
case DRAINED:
return true;
default:
return false;
}
}
}