| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.fnexecution.environment; |
| |
| import java.time.Duration; |
| import java.util.concurrent.TimeoutException; |
| import org.apache.beam.model.fnexecution.v1.BeamFnApi; |
| import org.apache.beam.model.fnexecution.v1.BeamFnExternalWorkerPoolGrpc; |
| import org.apache.beam.model.pipeline.v1.RunnerApi; |
| import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; |
| import org.apache.beam.runners.core.construction.BeamUrns; |
| import org.apache.beam.runners.fnexecution.GrpcFnServer; |
| import org.apache.beam.runners.fnexecution.ServerFactory; |
| import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService; |
| import org.apache.beam.runners.fnexecution.control.ControlClientPool; |
| import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService; |
| import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler; |
| import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService; |
| import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService; |
| import org.apache.beam.sdk.fn.IdGenerator; |
| import org.apache.beam.sdk.fn.channel.ManagedChannelFactory; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** An {@link EnvironmentFactory} which requests workers via the given URL in the Environment. */ |
| public class ExternalEnvironmentFactory implements EnvironmentFactory { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ExternalEnvironmentFactory.class); |
| // setting the environment variable allows to connect to worker pool running in Docker on Mac |
| private static final boolean IS_WORKER_POOL_IN_DOCKER_VM = |
| System.getenv().containsKey("BEAM_WORKER_POOL_IN_DOCKER_VM"); |
| |
| public static ExternalEnvironmentFactory create( |
| GrpcFnServer<FnApiControlClientPoolService> controlServiceServer, |
| GrpcFnServer<GrpcLoggingService> loggingServiceServer, |
| GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer, |
| GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer, |
| ControlClientPool.Source clientSource, |
| IdGenerator idGenerator) { |
| return new ExternalEnvironmentFactory( |
| controlServiceServer, |
| loggingServiceServer, |
| retrievalServiceServer, |
| provisioningServiceServer, |
| idGenerator, |
| clientSource); |
| } |
| |
| private final GrpcFnServer<FnApiControlClientPoolService> controlServiceServer; |
| private final GrpcFnServer<GrpcLoggingService> loggingServiceServer; |
| private final GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer; |
| private final GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer; |
| private final IdGenerator idGenerator; |
| private final ControlClientPool.Source clientSource; |
| |
| private ExternalEnvironmentFactory( |
| GrpcFnServer<FnApiControlClientPoolService> controlServiceServer, |
| GrpcFnServer<GrpcLoggingService> loggingServiceServer, |
| GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer, |
| GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer, |
| IdGenerator idGenerator, |
| ControlClientPool.Source clientSource) { |
| this.controlServiceServer = controlServiceServer; |
| this.loggingServiceServer = loggingServiceServer; |
| this.retrievalServiceServer = retrievalServiceServer; |
| this.provisioningServiceServer = provisioningServiceServer; |
| this.idGenerator = idGenerator; |
| this.clientSource = clientSource; |
| } |
| |
| /** Creates a new, active {@link RemoteEnvironment} backed by an unmanaged worker. */ |
| @Override |
| public RemoteEnvironment createEnvironment(Environment environment) throws Exception { |
| Preconditions.checkState( |
| environment |
| .getUrn() |
| .equals(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.EXTERNAL)), |
| "The passed environment does not contain an ExternalPayload."); |
| final RunnerApi.ExternalPayload externalPayload = |
| RunnerApi.ExternalPayload.parseFrom(environment.getPayload()); |
| final String workerId = idGenerator.getId(); |
| |
| BeamFnApi.StartWorkerRequest startWorkerRequest = |
| BeamFnApi.StartWorkerRequest.newBuilder() |
| .setWorkerId(workerId) |
| .setControlEndpoint(controlServiceServer.getApiServiceDescriptor()) |
| .setLoggingEndpoint(loggingServiceServer.getApiServiceDescriptor()) |
| .setArtifactEndpoint(retrievalServiceServer.getApiServiceDescriptor()) |
| .setProvisionEndpoint(provisioningServiceServer.getApiServiceDescriptor()) |
| .putAllParams(externalPayload.getParamsMap()) |
| .build(); |
| |
| LOG.debug("Requesting worker ID {}", workerId); |
| BeamFnApi.StartWorkerResponse startWorkerResponse = |
| BeamFnExternalWorkerPoolGrpc.newBlockingStub( |
| ManagedChannelFactory.createDefault().forDescriptor(externalPayload.getEndpoint())) |
| .startWorker(startWorkerRequest); |
| if (!startWorkerResponse.getError().isEmpty()) { |
| throw new RuntimeException(startWorkerResponse.getError()); |
| } |
| |
| // Wait on a client from the gRPC server. |
| InstructionRequestHandler instructionHandler = null; |
| while (instructionHandler == null) { |
| try { |
| instructionHandler = clientSource.take(workerId, Duration.ofMinutes(2)); |
| } catch (TimeoutException timeoutEx) { |
| LOG.info( |
| "Still waiting for startup of environment from {} for worker id {}", |
| externalPayload.getEndpoint().getUrl(), |
| workerId); |
| } catch (InterruptedException interruptEx) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(interruptEx); |
| } |
| } |
| final InstructionRequestHandler finalInstructionHandler = instructionHandler; |
| |
| return new RemoteEnvironment() { |
| @Override |
| public Environment getEnvironment() { |
| return environment; |
| } |
| |
| @Override |
| public InstructionRequestHandler getInstructionRequestHandler() { |
| return finalInstructionHandler; |
| } |
| |
| @Override |
| public void close() throws Exception { |
| finalInstructionHandler.close(); |
| BeamFnApi.StopWorkerRequest stopWorkerRequest = |
| BeamFnApi.StopWorkerRequest.newBuilder().setWorkerId(workerId).build(); |
| LOG.debug("Closing worker ID {}", workerId); |
| BeamFnApi.StopWorkerResponse stopWorkerResponse = |
| BeamFnExternalWorkerPoolGrpc.newBlockingStub( |
| ManagedChannelFactory.createDefault() |
| .forDescriptor(externalPayload.getEndpoint())) |
| .stopWorker(stopWorkerRequest); |
| if (!stopWorkerResponse.getError().isEmpty()) { |
| throw new RuntimeException(stopWorkerResponse.getError()); |
| } |
| } |
| }; |
| } |
| |
| /** Provider of ExternalEnvironmentFactory. */ |
| public static class Provider implements EnvironmentFactory.Provider { |
| @Override |
| public EnvironmentFactory createEnvironmentFactory( |
| GrpcFnServer<FnApiControlClientPoolService> controlServiceServer, |
| GrpcFnServer<GrpcLoggingService> loggingServiceServer, |
| GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer, |
| GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer, |
| ControlClientPool clientPool, |
| IdGenerator idGenerator) { |
| return create( |
| controlServiceServer, |
| loggingServiceServer, |
| retrievalServiceServer, |
| provisioningServiceServer, |
| clientPool.getSource(), |
| idGenerator); |
| } |
| |
| @Override |
| public ServerFactory getServerFactory() { |
| if (IS_WORKER_POOL_IN_DOCKER_VM) { |
| return DockerEnvironmentFactory.DockerOnMac.getServerFactory(); |
| } |
| return ServerFactory.createDefault(); |
| } |
| } |
| } |