blob: a90a24571d527a806c7cb6298f21876d0bc44a83 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.fnexecution.environment;
import java.time.Duration;
import java.util.concurrent.TimeoutException;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.RemoteEnvironmentOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An {@link EnvironmentFactory} which forks processes based on the parameters in the Environment.
* The returned {@link ProcessEnvironment} has to make sure to stop the processes.
*/
public class ProcessEnvironmentFactory implements EnvironmentFactory {
private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentFactory.class);
public static ProcessEnvironmentFactory create(
ProcessManager processManager,
GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
GrpcFnServer<GrpcLoggingService> loggingServiceServer,
GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
ControlClientPool.Source clientSource,
IdGenerator idGenerator,
PipelineOptions pipelineOptions) {
return new ProcessEnvironmentFactory(
processManager,
controlServiceServer,
loggingServiceServer,
retrievalServiceServer,
provisioningServiceServer,
idGenerator,
clientSource,
pipelineOptions);
}
private final ProcessManager processManager;
private final GrpcFnServer<FnApiControlClientPoolService> controlServiceServer;
private final GrpcFnServer<GrpcLoggingService> loggingServiceServer;
private final GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer;
private final GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer;
private final IdGenerator idGenerator;
private final ControlClientPool.Source clientSource;
private final PipelineOptions pipelineOptions;
private ProcessEnvironmentFactory(
ProcessManager processManager,
GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
GrpcFnServer<GrpcLoggingService> loggingServiceServer,
GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
IdGenerator idGenerator,
ControlClientPool.Source clientSource,
PipelineOptions pipelineOptions) {
this.processManager = processManager;
this.controlServiceServer = controlServiceServer;
this.loggingServiceServer = loggingServiceServer;
this.retrievalServiceServer = retrievalServiceServer;
this.provisioningServiceServer = provisioningServiceServer;
this.idGenerator = idGenerator;
this.clientSource = clientSource;
this.pipelineOptions = pipelineOptions;
}
/** Creates a new, active {@link RemoteEnvironment} backed by a forked process. */
@Override
public RemoteEnvironment createEnvironment(Environment environment) throws Exception {
Preconditions.checkState(
environment
.getUrn()
.equals(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.PROCESS)),
"The passed environment does not contain a ProcessPayload.");
final RunnerApi.ProcessPayload processPayload =
RunnerApi.ProcessPayload.parseFrom(environment.getPayload());
final String workerId = idGenerator.getId();
String executable = processPayload.getCommand();
String loggingEndpoint = loggingServiceServer.getApiServiceDescriptor().getUrl();
String artifactEndpoint = retrievalServiceServer.getApiServiceDescriptor().getUrl();
String provisionEndpoint = provisioningServiceServer.getApiServiceDescriptor().getUrl();
String controlEndpoint = controlServiceServer.getApiServiceDescriptor().getUrl();
String semiPersistDir = pipelineOptions.as(RemoteEnvironmentOptions.class).getSemiPersistDir();
ImmutableList.Builder<String> argsBuilder =
ImmutableList.<String>builder()
.add(String.format("--id=%s", workerId))
.add(String.format("--logging_endpoint=%s", loggingEndpoint))
.add(String.format("--artifact_endpoint=%s", artifactEndpoint))
.add(String.format("--provision_endpoint=%s", provisionEndpoint))
.add(String.format("--control_endpoint=%s", controlEndpoint));
if (semiPersistDir != null) {
argsBuilder.add(String.format("--semi_persist_dir=%s", semiPersistDir));
}
LOG.debug("Creating Process for worker ID {}", workerId);
// Wrap the blocking call to clientSource.get in case an exception is thrown.
InstructionRequestHandler instructionHandler = null;
try {
ProcessManager.RunningProcess process =
processManager.startProcess(
workerId, executable, argsBuilder.build(), processPayload.getEnvMap());
// Wait on a client from the gRPC server.
while (instructionHandler == null) {
try {
// If the process is not alive anymore, we abort.
process.isAliveOrThrow();
instructionHandler = clientSource.take(workerId, Duration.ofMinutes(2));
} catch (TimeoutException timeoutEx) {
LOG.info(
"Still waiting for startup of environment '{}' for worker id {}",
processPayload.getCommand(),
workerId);
} catch (InterruptedException interruptEx) {
Thread.currentThread().interrupt();
throw new RuntimeException(interruptEx);
}
}
} catch (Exception e) {
try {
processManager.stopProcess(workerId);
} catch (Exception processKillException) {
e.addSuppressed(processKillException);
}
throw e;
}
return ProcessEnvironment.create(processManager, environment, workerId, instructionHandler);
}
/** Provider of ProcessEnvironmentFactory. */
public static class Provider implements EnvironmentFactory.Provider {
private final PipelineOptions pipelineOptions;
public Provider(PipelineOptions options) {
this.pipelineOptions = options;
}
@Override
public EnvironmentFactory createEnvironmentFactory(
GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
GrpcFnServer<GrpcLoggingService> loggingServiceServer,
GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
ControlClientPool clientPool,
IdGenerator idGenerator) {
return create(
ProcessManager.create(),
controlServiceServer,
loggingServiceServer,
retrievalServiceServer,
provisioningServiceServer,
clientPool.getSource(),
idGenerator,
pipelineOptions);
}
}
}