blob: 07517afdef9ddc09b8f768942aa5cd04e24d6ddd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct.portable.job;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
import org.apache.beam.model.jobmanagement.v1.JobApi.PrepareJobResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.RunJobResponse;
import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceImplBase;
import org.apache.beam.runners.direct.portable.ReferenceRunner;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagingService;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This JobService implements the grpc calls for running jobs by using the {@code ReferenceRunner}
* as an engine.
*/
public class ReferenceRunnerJobService extends JobServiceImplBase implements FnService {
/** A configuration object for constructing the {@code ReferenceRunnerJobService}. */
public static class Configuration {
public String artifactStagingPath;
public boolean keepArtifacts;
}
private static final Logger LOG = LoggerFactory.getLogger(ReferenceRunnerJobService.class);
private static final int WAIT_MS = 1000;
public static ReferenceRunnerJobService create(
final ServerFactory serverFactory, Configuration configuration) {
LOG.info("Starting {}", ReferenceRunnerJobService.class);
return new ReferenceRunnerJobService(serverFactory, configuration);
}
private final ServerFactory serverFactory;
private final Configuration configuration;
private final ConcurrentMap<String, PreparingJob> unpreparedJobs;
private final ConcurrentMap<String, ReferenceRunner> runningJobs;
private final ConcurrentMap<String, JobState.Enum> jobStates;
private final ExecutorService executor;
private final ConcurrentLinkedQueue<GrpcFnServer<BeamFileSystemArtifactStagingService>>
artifactStagingServices;
private ReferenceRunnerJobService(ServerFactory serverFactory, Configuration configuration) {
this.serverFactory = serverFactory;
this.configuration = configuration;
unpreparedJobs = new ConcurrentHashMap<>();
runningJobs = new ConcurrentHashMap<>();
jobStates = new ConcurrentHashMap<>();
executor =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(false)
.setNameFormat("reference-runner-pipeline-%s")
.build());
artifactStagingServices = new ConcurrentLinkedQueue<>();
}
@Override
public void prepare(
JobApi.PrepareJobRequest request,
StreamObserver<JobApi.PrepareJobResponse> responseObserver) {
try {
LOG.trace("{} {}", PrepareJobResponse.class.getSimpleName(), request);
String preparationId = request.getJobName() + ThreadLocalRandom.current().nextInt();
GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
createArtifactStagingService();
artifactStagingServices.add(artifactStagingService);
String stagingSessionToken =
BeamFileSystemArtifactStagingService.generateStagingSessionToken(
preparationId, configuration.artifactStagingPath);
PreparingJob existingJob =
unpreparedJobs.putIfAbsent(
preparationId,
PreparingJob.builder()
.setArtifactStagingServer(artifactStagingService)
.setPipeline(request.getPipeline())
.setOptions(request.getPipelineOptions())
.setStagingSessionToken(stagingSessionToken)
.build());
checkArgument(
existingJob == null, "Unexpected existing job with preparation ID %s", preparationId);
responseObserver.onNext(
PrepareJobResponse.newBuilder()
.setPreparationId(preparationId)
.setArtifactStagingEndpoint(artifactStagingService.getApiServiceDescriptor())
.setStagingSessionToken(stagingSessionToken)
.build());
responseObserver.onCompleted();
} catch (Exception e) {
LOG.error("Could not prepare job with name {}", request.getJobName(), e);
responseObserver.onError(Status.INTERNAL.withCause(e).asException());
}
}
private GrpcFnServer<BeamFileSystemArtifactStagingService> createArtifactStagingService()
throws Exception {
BeamFileSystemArtifactStagingService service = new BeamFileSystemArtifactStagingService();
return GrpcFnServer.allocatePortAndCreateFor(service, serverFactory);
}
@Override
@SuppressWarnings("FutureReturnValueIgnored") // Run API does not block on execution
public void run(
JobApi.RunJobRequest request, StreamObserver<JobApi.RunJobResponse> responseObserver) {
try {
LOG.trace("{} {}", RunJobRequest.class.getSimpleName(), request);
String preparationId = request.getPreparationId();
PreparingJob preparingJob = unpreparedJobs.get(preparationId);
if (preparingJob == null) {
responseObserver.onError(
Status.INVALID_ARGUMENT
.withDescription(String.format("Unknown Preparation Id %s", preparationId))
.asException());
return;
}
try {
// Close any preparation-time only resources.
preparingJob.close();
} catch (Exception e) {
responseObserver.onError(e);
}
ReferenceRunner runner =
ReferenceRunner.forPipeline(
preparingJob.getPipeline(), preparingJob.getOptions(), request.getRetrievalToken());
String jobId = "job-" + Integer.toString(ThreadLocalRandom.current().nextInt());
responseObserver.onNext(RunJobResponse.newBuilder().setJobId(jobId).build());
responseObserver.onCompleted();
runningJobs.put(jobId, runner);
jobStates.putIfAbsent(jobId, Enum.RUNNING);
executor.submit(
() -> {
try {
jobStates.computeIfPresent(jobId, (id, status) -> Enum.RUNNING);
runner.execute();
jobStates.computeIfPresent(jobId, (id, status) -> Enum.DONE);
} catch (Exception e) {
jobStates.computeIfPresent(jobId, (id, status) -> Enum.FAILED);
throw e;
}
// Delete artifacts after job is done.
if (!configuration.keepArtifacts) {
String stagingSessionToken = preparingJob.getStagingSessionToken();
try {
preparingJob
.getArtifactStagingServer()
.getService()
.removeArtifacts(stagingSessionToken);
} catch (Exception e) {
LOG.error(
"Failed to remove job staging directory for token {}: {}",
stagingSessionToken,
e);
}
}
return null;
});
} catch (StatusRuntimeException e) {
responseObserver.onError(e);
} catch (Exception e) {
responseObserver.onError(Status.INTERNAL.withCause(e).asException());
}
}
@Override
public void getState(
GetJobStateRequest request, StreamObserver<GetJobStateResponse> responseObserver) {
LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request);
try {
responseObserver.onNext(
GetJobStateResponse.newBuilder()
.setState(jobStates.getOrDefault(request.getJobId(), Enum.UNRECOGNIZED))
.build());
responseObserver.onCompleted();
} catch (Exception e) {
String errMessage =
String.format("Encountered Unexpected Exception for Invocation %s", request.getJobId());
LOG.error(errMessage, e);
responseObserver.onError(Status.INTERNAL.withCause(e).asException());
}
}
@Override
public void getStateStream(
GetJobStateRequest request, StreamObserver<GetJobStateResponse> responseObserver) {
LOG.trace("{} {}", GetJobStateRequest.class.getSimpleName(), request);
String invocationId = request.getJobId();
try {
Thread.sleep(WAIT_MS);
Enum state = jobStates.getOrDefault(request.getJobId(), Enum.UNRECOGNIZED);
responseObserver.onNext(GetJobStateResponse.newBuilder().setState(state).build());
while (Enum.RUNNING.equals(state)) {
Thread.sleep(WAIT_MS);
state = jobStates.getOrDefault(request.getJobId(), Enum.UNRECOGNIZED);
}
responseObserver.onNext(GetJobStateResponse.newBuilder().setState(state).build());
} catch (Exception e) {
String errMessage =
String.format("Encountered Unexpected Exception for Invocation %s", invocationId);
LOG.error(errMessage, e);
responseObserver.onError(Status.INTERNAL.withCause(e).asException());
}
responseObserver.onCompleted();
}
@Override
public void describePipelineOptions(
JobApi.DescribePipelineOptionsRequest request,
StreamObserver<JobApi.DescribePipelineOptionsResponse> responseObserver) {
LOG.trace("{} {}", JobApi.DescribePipelineOptionsRequest.class.getSimpleName(), request);
try {
JobApi.DescribePipelineOptionsResponse response =
JobApi.DescribePipelineOptionsResponse.newBuilder()
.addAllOptions(
PipelineOptionsFactory.describe(PipelineOptionsFactory.getRegisteredOptions()))
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
LOG.error("Error describing pipeline options", e);
responseObserver.onError(Status.INTERNAL.withCause(e).asException());
}
}
@Override
public void getMessageStream(
JobMessagesRequest request, StreamObserver<JobMessagesResponse> responseObserver) {
// Not implemented
LOG.trace("{} {}", JobMessagesRequest.class.getSimpleName(), request);
}
@Override
public void cancel(CancelJobRequest request, StreamObserver<CancelJobResponse> responseObserver) {
LOG.trace("{} {}", CancelJobRequest.class.getSimpleName(), request);
responseObserver.onError(
Status.NOT_FOUND
.withDescription(String.format("Unknown Job ID %s", request.getJobId()))
.asException());
}
@Override
public void close() throws Exception {
for (PreparingJob preparingJob : ImmutableList.copyOf(unpreparedJobs.values())) {
try {
preparingJob.close();
} catch (Exception e) {
LOG.warn("Exception while closing preparing job {}", preparingJob);
}
}
while (!artifactStagingServices.isEmpty()) {
GrpcFnServer<BeamFileSystemArtifactStagingService> artifactStagingService =
artifactStagingServices.remove();
try {
artifactStagingService.close();
} catch (Exception e) {
LOG.error(
"Unable to close staging sevice started on %s",
artifactStagingService.getApiServiceDescriptor().getUrl(), e);
}
}
}
}