blob: 6944684a771e4ec2c64cf64f9feb4473ae361a34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;
import com.fasterxml.jackson.core.Base64Variants;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.ArtifactInformation;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.DockerPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.model.pipeline.v1.RunnerApi.ExternalPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.ProcessPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardArtifacts;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms.SplittableParDoComponents;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardProtocols;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.ZipFiles;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.HashCode;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.Files;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Utilities for interacting with portability {@link Environment environments}. */
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class Environments {
private static final Logger LOG = LoggerFactory.getLogger(Environments.class);
private static final ObjectMapper MAPPER =
new ObjectMapper()
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
public static final String ENVIRONMENT_DOCKER = "DOCKER";
public static final String ENVIRONMENT_PROCESS = "PROCESS";
public static final String ENVIRONMENT_EXTERNAL = "EXTERNAL";
public static final String ENVIRONMENT_EMBEDDED = "EMBEDDED"; // Non Public urn for testing
public static final String ENVIRONMENT_LOOPBACK = "LOOPBACK"; // Non Public urn for testing
/* For development, use the container build by the current user to ensure that the SDK harness and
* the SDK agree on how they should interact. This should be changed to a version-specific
* container during a release.
*
* See https://beam.apache.org/contribute/docker-images/ for more information on how to build a
* container.
*/
private static final String JAVA_SDK_HARNESS_CONTAINER_URL =
ReleaseInfo.getReleaseInfo().getDefaultDockerRepoRoot()
+ "/"
+ ReleaseInfo.getReleaseInfo().getDefaultDockerRepoPrefix()
+ "java_sdk:"
+ ReleaseInfo.getReleaseInfo().getSdkVersion();
public static final Environment JAVA_SDK_HARNESS_ENVIRONMENT =
createDockerEnvironment(JAVA_SDK_HARNESS_CONTAINER_URL);
private Environments() {}
public static Environment createOrGetDefaultEnvironment(PortablePipelineOptions options) {
String type = options.getDefaultEnvironmentType();
String config = options.getDefaultEnvironmentConfig();
Environment defaultEnvironment;
if (Strings.isNullOrEmpty(type)) {
defaultEnvironment = JAVA_SDK_HARNESS_ENVIRONMENT;
} else {
switch (type) {
case ENVIRONMENT_EMBEDDED:
defaultEnvironment = createEmbeddedEnvironment(config);
break;
case ENVIRONMENT_EXTERNAL:
case ENVIRONMENT_LOOPBACK:
defaultEnvironment = createExternalEnvironment(config);
break;
case ENVIRONMENT_PROCESS:
defaultEnvironment = createProcessEnvironment(config);
break;
case ENVIRONMENT_DOCKER:
default:
defaultEnvironment = createDockerEnvironment(config);
}
}
return defaultEnvironment
.toBuilder()
.addAllDependencies(getDeferredArtifacts(options))
.addAllCapabilities(getJavaCapabilities())
.build();
}
public static Environment createDockerEnvironment(String dockerImageUrl) {
if (Strings.isNullOrEmpty(dockerImageUrl)) {
return JAVA_SDK_HARNESS_ENVIRONMENT;
}
return Environment.newBuilder()
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER))
.setPayload(
DockerPayload.newBuilder().setContainerImage(dockerImageUrl).build().toByteString())
.build();
}
private static Environment createExternalEnvironment(String config) {
return Environment.newBuilder()
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.EXTERNAL))
.setPayload(
ExternalPayload.newBuilder()
.setEndpoint(ApiServiceDescriptor.newBuilder().setUrl(config).build())
.build()
.toByteString())
.build();
}
private static Environment createProcessEnvironment(String config) {
try {
ProcessPayloadReferenceJSON payloadReferenceJSON =
MAPPER.readValue(config, ProcessPayloadReferenceJSON.class);
return createProcessEnvironment(
payloadReferenceJSON.getOs(),
payloadReferenceJSON.getArch(),
payloadReferenceJSON.getCommand(),
payloadReferenceJSON.getEnv());
} catch (IOException e) {
throw new RuntimeException(
String.format("Unable to parse process environment config: %s", config), e);
}
}
private static Environment createEmbeddedEnvironment(String config) {
return Environment.newBuilder()
.setUrn(ENVIRONMENT_EMBEDDED)
.setPayload(ByteString.copyFromUtf8(MoreObjects.firstNonNull(config, "")))
.build();
}
public static Environment createProcessEnvironment(
String os, String arch, String command, Map<String, String> env) {
ProcessPayload.Builder builder = ProcessPayload.newBuilder();
if (!Strings.isNullOrEmpty(os)) {
builder.setOs(os);
}
if (!Strings.isNullOrEmpty(arch)) {
builder.setArch(arch);
}
if (!Strings.isNullOrEmpty(command)) {
builder.setCommand(command);
}
if (env != null) {
builder.putAllEnv(env);
}
return Environment.newBuilder()
.setUrn(BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS))
.setPayload(builder.build().toByteString())
.build();
}
public static Optional<Environment> getEnvironment(String ptransformId, Components components) {
PTransform ptransform = components.getTransformsOrThrow(ptransformId);
String envId = ptransform.getEnvironmentId();
if (Strings.isNullOrEmpty(envId)) {
// Some PTransform payloads may have an unspecified (empty) Environment ID, for example a
// WindowIntoPayload with a known WindowFn. Others will never have an Environment ID, such
// as a GroupByKeyPayload, and we return null in this case.
return Optional.empty();
} else {
return Optional.of(components.getEnvironmentsOrThrow(envId));
}
}
public static Optional<Environment> getEnvironment(
PTransform ptransform, RehydratedComponents components) {
String envId = ptransform.getEnvironmentId();
if (Strings.isNullOrEmpty(envId)) {
return Optional.empty();
} else {
// Some PTransform payloads may have an empty (default) Environment ID, for example a
// WindowIntoPayload with a known WindowFn. Others will never have an Environment ID, such
// as a GroupByKeyPayload, and we return null in this case.
return Optional.of(components.getEnvironment(envId));
}
}
public static List<ArtifactInformation> getArtifacts(List<String> stagingFiles) {
ImmutableList.Builder<ArtifactInformation> artifactsBuilder = ImmutableList.builder();
Set<String> deduplicatedStagingFiles = new LinkedHashSet<>(stagingFiles);
for (String path : deduplicatedStagingFiles) {
File file;
String stagedName = null;
if (path.contains("=")) {
String[] components = path.split("=", 2);
file = new File(components[1]);
stagedName = components[0];
} else {
file = new File(path);
}
// Spurious items get added to the classpath. Filter by just those that exist.
if (file.exists()) {
ArtifactInformation.Builder artifactBuilder = ArtifactInformation.newBuilder();
artifactBuilder.setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.FILE));
artifactBuilder.setRoleUrn(BeamUrns.getUrn(StandardArtifacts.Roles.STAGING_TO));
HashCode hashCode;
if (file.isDirectory()) {
File zippedFile;
try {
zippedFile = zipDirectory(file);
hashCode = Files.asByteSource(zippedFile).hash(Hashing.sha256());
} catch (IOException e) {
throw new RuntimeException(e);
}
artifactBuilder.setTypePayload(
RunnerApi.ArtifactFilePayload.newBuilder()
.setPath(zippedFile.getPath())
.setSha256(hashCode.toString())
.build()
.toByteString());
} else {
try {
hashCode = Files.asByteSource(file).hash(Hashing.sha256());
} catch (IOException e) {
throw new RuntimeException(e);
}
artifactBuilder.setTypePayload(
RunnerApi.ArtifactFilePayload.newBuilder()
.setPath(file.getPath())
.setSha256(hashCode.toString())
.build()
.toByteString());
}
if (stagedName == null) {
stagedName = createStagingFileName(file, hashCode);
}
artifactBuilder.setRolePayload(
RunnerApi.ArtifactStagingToRolePayload.newBuilder()
.setStagedName(stagedName)
.build()
.toByteString());
artifactsBuilder.add(artifactBuilder.build());
}
}
return artifactsBuilder.build();
}
public static List<ArtifactInformation> getDeferredArtifacts(PipelineOptions options) {
List<String> stagingFiles = options.as(PortablePipelineOptions.class).getFilesToStage();
if (stagingFiles == null || stagingFiles.isEmpty()) {
return ImmutableList.of();
}
String key = UUID.randomUUID().toString();
DefaultArtifactResolver.INSTANCE.register(
(info) -> {
if (BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED).equals(info.getTypeUrn())) {
RunnerApi.DeferredArtifactPayload deferredArtifactPayload;
try {
deferredArtifactPayload =
RunnerApi.DeferredArtifactPayload.parseFrom(info.getTypePayload());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Error parsing deferred artifact payload.", e);
}
if (key.equals(deferredArtifactPayload.getKey())) {
return Optional.of(getArtifacts(stagingFiles));
} else {
return Optional.empty();
}
} else {
return Optional.empty();
}
});
return ImmutableList.of(
ArtifactInformation.newBuilder()
.setTypeUrn(BeamUrns.getUrn(StandardArtifacts.Types.DEFERRED))
.setTypePayload(
RunnerApi.DeferredArtifactPayload.newBuilder().setKey(key).build().toByteString())
.build());
}
public static Set<String> getJavaCapabilities() {
ImmutableSet.Builder<String> capabilities = ImmutableSet.builder();
capabilities.addAll(ModelCoders.urns());
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.MULTI_CORE_BUNDLE_PROCESSING));
capabilities.add(BeamUrns.getUrn(StandardProtocols.Enum.PROGRESS_REPORTING));
capabilities.add("beam:version:sdk_base:" + JAVA_SDK_HARNESS_CONTAINER_URL);
capabilities.add(BeamUrns.getUrn(SplittableParDoComponents.TRUNCATE_SIZED_RESTRICTION));
return capabilities.build();
}
public static String createStagingFileName(File path, HashCode hash) {
String encodedHash = Base64Variants.MODIFIED_FOR_URL.encode(hash.asBytes());
String fileName = Files.getNameWithoutExtension(path.getAbsolutePath());
String ext = path.isDirectory() ? "jar" : Files.getFileExtension(path.getAbsolutePath());
String suffix = Strings.isNullOrEmpty(ext) ? "" : "." + ext;
return String.format("%s-%s%s", fileName, encodedHash, suffix);
}
private static File zipDirectory(File directory) throws IOException {
File zipFile = File.createTempFile(directory.getName(), ".zip");
try (FileOutputStream fos = new FileOutputStream(zipFile)) {
ZipFiles.zipDirectory(directory, fos);
}
return zipFile;
}
private static class ProcessPayloadReferenceJSON {
private @Nullable String os;
private @Nullable String arch;
private @Nullable String command;
private @Nullable Map<String, String> env;
public @Nullable String getOs() {
return os;
}
public @Nullable String getArch() {
return arch;
}
public @Nullable String getCommand() {
return command;
}
public @Nullable Map<String, String> getEnv() {
return env;
}
}
}