blob: 154aaecc5bb94cd7c2b05c9e0e3c25d6e873ad99 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.fnexecution.environment;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.control.ControlClientPool;
import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.options.ManualDockerEnvironmentOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.RemoteEnvironmentOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.net.HostAndPort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An {@link EnvironmentFactory} that creates docker containers by shelling out to docker. Returned
* {@link RemoteEnvironment RemoteEnvironments} own their respective docker containers. Not
* thread-safe.
*/
public class DockerEnvironmentFactory implements EnvironmentFactory {
private static final Logger LOG = LoggerFactory.getLogger(DockerEnvironmentFactory.class);
static DockerEnvironmentFactory forServicesWithDocker(
DockerCommand docker,
GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
GrpcFnServer<GrpcLoggingService> loggingServiceServer,
GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
ControlClientPool.Source clientSource,
IdGenerator idGenerator,
PipelineOptions pipelineOptions) {
return new DockerEnvironmentFactory(
docker,
controlServiceServer,
loggingServiceServer,
retrievalServiceServer,
provisioningServiceServer,
idGenerator,
clientSource,
pipelineOptions);
}
private final DockerCommand docker;
private final GrpcFnServer<FnApiControlClientPoolService> controlServiceServer;
private final GrpcFnServer<GrpcLoggingService> loggingServiceServer;
private final GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer;
private final GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer;
private final IdGenerator idGenerator;
private final ControlClientPool.Source clientSource;
private final PipelineOptions pipelineOptions;
private DockerEnvironmentFactory(
DockerCommand docker,
GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
GrpcFnServer<GrpcLoggingService> loggingServiceServer,
GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
IdGenerator idGenerator,
ControlClientPool.Source clientSource,
PipelineOptions pipelineOptions) {
this.docker = docker;
this.controlServiceServer = controlServiceServer;
this.loggingServiceServer = loggingServiceServer;
this.retrievalServiceServer = retrievalServiceServer;
this.provisioningServiceServer = provisioningServiceServer;
this.idGenerator = idGenerator;
this.clientSource = clientSource;
this.pipelineOptions = pipelineOptions;
}
/** Creates a new, active {@link RemoteEnvironment} backed by a local Docker container. */
@Override
public RemoteEnvironment createEnvironment(Environment environment) throws Exception {
Preconditions.checkState(
environment
.getUrn()
.equals(BeamUrns.getUrn(RunnerApi.StandardEnvironments.Environments.DOCKER)),
"The passed environment does not contain a DockerPayload.");
final RunnerApi.DockerPayload dockerPayload =
RunnerApi.DockerPayload.parseFrom(environment.getPayload());
final String workerId = idGenerator.getId();
// Prepare docker invocation.
String containerImage = dockerPayload.getContainerImage();
// TODO: https://issues.apache.org/jira/browse/BEAM-4148 The default service address will not
// work for Docker for Mac.
String loggingEndpoint = loggingServiceServer.getApiServiceDescriptor().getUrl();
String artifactEndpoint = retrievalServiceServer.getApiServiceDescriptor().getUrl();
String provisionEndpoint = provisioningServiceServer.getApiServiceDescriptor().getUrl();
String controlEndpoint = controlServiceServer.getApiServiceDescriptor().getUrl();
ImmutableList.Builder<String> dockerOptsBuilder =
ImmutableList.<String>builder()
.addAll(gcsCredentialArgs())
// NOTE: Host networking does not work on Mac, but the command line flag is accepted.
.add("--network=host")
// We need to pass on the information about Docker-on-Mac environment (due to missing
// host networking on Mac)
.add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
Boolean retainDockerContainer =
pipelineOptions.as(ManualDockerEnvironmentOptions.class).getRetainDockerContainers();
if (!retainDockerContainer) {
dockerOptsBuilder.add("--rm");
}
String semiPersistDir = pipelineOptions.as(RemoteEnvironmentOptions.class).getSemiPersistDir();
ImmutableList.Builder<String> argsBuilder =
ImmutableList.<String>builder()
.add(String.format("--id=%s", workerId))
.add(String.format("--logging_endpoint=%s", loggingEndpoint))
.add(String.format("--artifact_endpoint=%s", artifactEndpoint))
.add(String.format("--provision_endpoint=%s", provisionEndpoint))
.add(String.format("--control_endpoint=%s", controlEndpoint));
if (semiPersistDir != null) {
argsBuilder.add(String.format("--semi_persist_dir=%s", semiPersistDir));
}
LOG.debug("Creating Docker Container with ID {}", workerId);
// Wrap the blocking call to clientSource.get in case an exception is thrown.
String containerId = null;
InstructionRequestHandler instructionHandler = null;
try {
containerId = docker.runImage(containerImage, dockerOptsBuilder.build(), argsBuilder.build());
LOG.debug("Created Docker Container with Container ID {}", containerId);
// Wait on a client from the gRPC server.
try {
instructionHandler = clientSource.take(workerId, Duration.ofMinutes(1));
} catch (TimeoutException timeoutEx) {
RuntimeException runtimeException =
new RuntimeException(
String.format(
"Docker container %s failed to start up successfully within 1 minute.",
containerImage),
timeoutEx);
try {
String containerLogs = docker.getContainerLogs(containerId);
LOG.error("Docker container {} logs:\n{}", containerId, containerLogs);
} catch (Exception getLogsException) {
runtimeException.addSuppressed(getLogsException);
}
throw runtimeException;
} catch (InterruptedException interruptEx) {
Thread.currentThread().interrupt();
throw new RuntimeException(interruptEx);
}
} catch (Exception e) {
if (containerId != null) {
// Kill the launched docker container if we can't retrieve a client for it.
try {
docker.killContainer(containerId);
if (!retainDockerContainer) {
docker.removeContainer(containerId);
}
} catch (Exception dockerException) {
e.addSuppressed(dockerException);
}
}
throw e;
}
return DockerContainerEnvironment.create(
docker, environment, containerId, instructionHandler, retainDockerContainer);
}
private List<String> gcsCredentialArgs() {
String dockerGcloudConfig = "/root/.config/gcloud";
String localGcloudConfig =
firstNonNull(
System.getenv("CLOUDSDK_CONFIG"),
Paths.get(System.getProperty("user.home"), ".config", "gcloud").toString());
// TODO(BEAM-4729): Allow this to be disabled manually.
if (Files.exists(Paths.get(localGcloudConfig))) {
return ImmutableList.of(
"--mount",
String.format("type=bind,src=%s,dst=%s", localGcloudConfig, dockerGcloudConfig));
} else {
return ImmutableList.of();
}
}
/**
* NOTE: Deployment on Macs is intended for local development. As of 18.03, Docker-for-Mac does
* not implement host networking (--networking=host is effectively a no-op). Instead, we use a
* special DNS entry that points to the host:
* https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds The special
* hostname has historically changed between versions, so this is subject to breakages and will
* likely only support the latest version at any time.
*/
static class DockerOnMac {
// TODO: This host name seems to change with every other Docker release. Do we attempt to keep
// up
// or attempt to document the supported Docker version(s)?
private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal";
// True if we're inside a container (i.e. job-server container) with MacOS as the host system
private static final boolean RUNNING_INSIDE_DOCKER_ON_MAC =
"1".equals(System.getenv("DOCKER_MAC_CONTAINER"));
// Port offset for MacOS since we don't have host networking and need to use published ports
private static final int MAC_PORT_START = 8100;
private static final int MAC_PORT_END = 8200;
private static final AtomicInteger MAC_PORT = new AtomicInteger(MAC_PORT_START);
static ServerFactory getServerFactory() {
ServerFactory.UrlFactory dockerUrlFactory =
(host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString();
if (RUNNING_INSIDE_DOCKER_ON_MAC) {
// If we're already running in a container, we need to use a fixed port range due to
// non-existing host networking in Docker-for-Mac. The port range needs to be published
// when bringing up the Docker container, see DockerEnvironmentFactory.
return ServerFactory.createWithUrlFactoryAndPortSupplier(
dockerUrlFactory,
// We only use the published Docker ports 8100-8200 in a round-robin fashion
() -> MAC_PORT.getAndUpdate(val -> val == MAC_PORT_END ? MAC_PORT_START : val + 1));
} else {
return ServerFactory.createWithUrlFactory(dockerUrlFactory);
}
}
}
/** Provider for DockerEnvironmentFactory. */
public static class Provider implements EnvironmentFactory.Provider {
private final PipelineOptions pipelineOptions;
public Provider(PipelineOptions options) {
this.pipelineOptions = options;
}
@Override
public EnvironmentFactory createEnvironmentFactory(
GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
GrpcFnServer<GrpcLoggingService> loggingServiceServer,
GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
ControlClientPool clientPool,
IdGenerator idGenerator) {
return DockerEnvironmentFactory.forServicesWithDocker(
DockerCommand.getDefault(),
controlServiceServer,
loggingServiceServer,
retrievalServiceServer,
provisioningServiceServer,
clientPool.getSource(),
idGenerator,
pipelineOptions);
}
@Override
public ServerFactory getServerFactory() {
switch (getPlatform()) {
case LINUX:
return ServerFactory.createDefault();
case MAC:
return DockerOnMac.getServerFactory();
default:
LOG.warn("Unknown Docker platform. Falling back to default server factory");
return ServerFactory.createDefault();
}
}
private static Platform getPlatform() {
String osName = System.getProperty("os.name").toLowerCase();
// TODO: Make this more robust?
// The DOCKER_MAC_CONTAINER environment variable is necessary to detect whether we run on
// a container on MacOs. MacOs internally uses a Linux VM which makes it indistinguishable
// from Linux.
// We still need to apply port mapping due to missing host networking.
if (osName.startsWith("mac") || DockerOnMac.RUNNING_INSIDE_DOCKER_ON_MAC) {
return Platform.MAC;
} else if (osName.startsWith("linux")) {
return Platform.LINUX;
}
return Platform.OTHER;
}
private enum Platform {
MAC,
LINUX,
OTHER,
}
}
}