blob: 951a8cb293bc884714621ec4640661083245067f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.fnexecution.jobsubmission;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.jar.Attributes;
import java.util.jar.Attributes.Name;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactChunk;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.GetArtifactRequest;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.GetManifestRequest;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.GetManifestResponse;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc;
import org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.PipelineResources;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.InProcessServerFactory;
import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.MessageOrBuilder;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.util.JsonFormat;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.commons.compress.utils.IOUtils;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link PortablePipelineRunner} that bundles the input pipeline along with all dependencies,
* artifacts, etc. required to run the pipeline into a jar that can be executed later.
*
* <p>Each {@link PortablePipelineJarCreator} instance is not threadsafe; a new instance is expected
* to be constructed and {@link #run} once per job.
*/
public class PortablePipelineJarCreator implements PortablePipelineRunner {
private static final Logger LOG = LoggerFactory.getLogger(PortablePipelineJarCreator.class);
private final Class mainClass;
@VisibleForTesting JarOutputStream outputStream;
/** Wrapper over {@link #outputStream}. */
@VisibleForTesting WritableByteChannel outputChannel;
public PortablePipelineJarCreator(Class mainClass) {
this.mainClass = mainClass;
}
/**
* <em>Does not actually run the pipeline.</em> Instead bundles the input pipeline along with all
* dependencies, artifacts, etc. required to run the pipeline into a jar that can be executed
* later.
*/
@Override
public PortablePipelineResult run(Pipeline pipeline, JobInfo jobInfo) throws Exception {
PortablePipelineOptions pipelineOptions =
PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())
.as(PortablePipelineOptions.class);
final String jobName = jobInfo.jobName();
File outputFile = new File(pipelineOptions.getOutputExecutablePath());
LOG.info("Creating jar {} for job {}", outputFile.getAbsolutePath(), jobName);
outputStream =
new JarOutputStream(new FileOutputStream(outputFile), createManifest(mainClass, jobName));
outputChannel = Channels.newChannel(outputStream);
PortablePipelineJarUtils.writeDefaultJobName(outputStream, jobName);
writeClassPathResources(mainClass.getClassLoader());
writeAsJson(pipeline, PortablePipelineJarUtils.getPipelineUri(jobName));
writeAsJson(
PipelineOptionsTranslation.toProto(pipelineOptions),
PortablePipelineJarUtils.getPipelineOptionsUri(jobName));
writeArtifacts(jobInfo.retrievalToken(), jobName);
// Closing the channel also closes the underlying stream.
outputChannel.close();
LOG.info("Jar {} created successfully.", outputFile.getAbsolutePath());
return new JarCreatorPipelineResult();
}
@VisibleForTesting
Manifest createManifest(Class mainClass, String defaultJobName) {
Manifest manifest = new Manifest();
manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
boolean classHasMainMethod = false;
try {
Class returnType = mainClass.getMethod("main", String[].class).getReturnType();
if (returnType == Void.TYPE) {
classHasMainMethod = true;
} else {
LOG.warn(
"No Main-Class will be set in jar because main method in {} returns {}, expected void",
mainClass,
returnType);
}
} catch (NoSuchMethodException e) {
LOG.warn("No Main-Class will be set in jar because {} lacks a main method.", mainClass);
}
if (classHasMainMethod) {
manifest.getMainAttributes().put(Name.MAIN_CLASS, mainClass.getName());
}
return manifest;
}
/** Copy resources from {@code classLoader} to {@link #outputStream}. */
private void writeClassPathResources(ClassLoader classLoader) throws IOException {
List<String> classPathResources =
PipelineResources.detectClassPathResourcesToStage(classLoader);
Preconditions.checkArgument(
classPathResources.size() == 1, "Expected exactly one jar on " + classLoader.toString());
copyResourcesFromJar(new JarFile(classPathResources.get(0)));
}
/** Copy resources from {@code inputJar} to {@link #outputStream}. */
@VisibleForTesting
protected void copyResourcesFromJar(JarFile inputJar) throws IOException {
Enumeration<JarEntry> inputJarEntries = inputJar.entries();
// The zip spec allows multiple files with the same name; the Java zip libraries do not.
// Keep track of the files we've already written to filter out duplicates.
// Also, ignore the old manifest; we want to write our own.
Set<String> previousEntryNames = new HashSet<>(ImmutableList.of(JarFile.MANIFEST_NAME));
while (inputJarEntries.hasMoreElements()) {
JarEntry inputJarEntry = inputJarEntries.nextElement();
InputStream inputStream = inputJar.getInputStream(inputJarEntry);
String entryName = inputJarEntry.getName();
if (previousEntryNames.contains(entryName)) {
LOG.debug("Skipping duplicated file {}", entryName);
} else {
JarEntry outputJarEntry = new JarEntry(inputJarEntry);
outputStream.putNextEntry(outputJarEntry);
LOG.trace("Copying jar entry {}", inputJarEntry);
IOUtils.copy(inputStream, outputStream);
previousEntryNames.add(entryName);
}
}
}
@VisibleForTesting
interface ArtifactRetriever {
GetManifestResponse getManifest(GetManifestRequest request);
Iterator<ArtifactChunk> getArtifact(GetArtifactRequest request);
}
/**
* Copy all artifacts retrievable via the {@link ArtifactRetrievalServiceBlockingStub} to the
* {@code outputStream}.
*
* @return A {@link ProxyManifest} pointing to the artifacts' location in the output jar.
*/
@VisibleForTesting
ProxyManifest copyStagedArtifacts(
String retrievalToken, ArtifactRetriever retrievalServiceStub, String jobName)
throws IOException {
GetManifestRequest manifestRequest =
GetManifestRequest.newBuilder().setRetrievalToken(retrievalToken).build();
ArtifactApi.Manifest manifest = retrievalServiceStub.getManifest(manifestRequest).getManifest();
// Create a new proxy manifest to locate artifacts at jar runtime.
ProxyManifest.Builder proxyManifestBuilder = ProxyManifest.newBuilder().setManifest(manifest);
for (ArtifactMetadata artifact : manifest.getArtifactList()) {
String outputPath =
PortablePipelineJarUtils.getArtifactUri(jobName, UUID.randomUUID().toString());
LOG.trace("Copying artifact {} to {}", artifact.getName(), outputPath);
proxyManifestBuilder.addLocation(
Location.newBuilder().setName(artifact.getName()).setUri("/" + outputPath).build());
outputStream.putNextEntry(new JarEntry(outputPath));
GetArtifactRequest artifactRequest =
GetArtifactRequest.newBuilder()
.setRetrievalToken(retrievalToken)
.setName(artifact.getName())
.build();
Iterator<ArtifactChunk> artifactResponse = retrievalServiceStub.getArtifact(artifactRequest);
while (artifactResponse.hasNext()) {
artifactResponse.next().getData().writeTo(outputStream);
}
}
return proxyManifestBuilder.build();
}
/**
* Uses {@link BeamFileSystemArtifactRetrievalService} to fetch artifacts, then writes the
* artifacts to {@code outputStream}. Include a {@link ProxyManifest} to locate artifacts later.
*/
private void writeArtifacts(String retrievalToken, String jobName) throws Exception {
try (GrpcFnServer artifactServer =
GrpcFnServer.allocatePortAndCreateFor(
BeamFileSystemArtifactRetrievalService.create(), InProcessServerFactory.create())) {
ManagedChannel grpcChannel =
InProcessManagedChannelFactory.create()
.forDescriptor(artifactServer.getApiServiceDescriptor());
ArtifactRetrievalServiceBlockingStub retrievalServiceStub =
ArtifactRetrievalServiceGrpc.newBlockingStub(grpcChannel);
ProxyManifest proxyManifest =
copyStagedArtifacts(
retrievalToken,
new ArtifactRetriever() {
@Override
public GetManifestResponse getManifest(GetManifestRequest request) {
return retrievalServiceStub.getManifest(request);
}
@Override
public Iterator<ArtifactChunk> getArtifact(GetArtifactRequest request) {
return retrievalServiceStub.getArtifact(request);
}
},
jobName);
writeAsJson(proxyManifest, PortablePipelineJarUtils.getArtifactManifestUri(jobName));
grpcChannel.shutdown();
}
}
/** Helper method for writing {@code message} in UTF-8 JSON format. */
private void writeAsJson(MessageOrBuilder message, String outputPath) throws IOException {
outputStream.putNextEntry(new JarEntry(outputPath));
outputChannel.write(StandardCharsets.UTF_8.encode(JsonFormat.printer().print(message)));
}
private static class JarCreatorPipelineResult implements PortablePipelineResult {
@Override
public State getState() {
return State.DONE;
}
@Override
public State cancel() {
return State.DONE;
}
@Override
public State waitUntilFinish(Duration duration) {
return State.DONE;
}
@Override
public State waitUntilFinish() {
return State.DONE;
}
@Override
public MetricResults metrics() {
throw new UnsupportedOperationException("Jar creation does not yield metrics.");
}
@Override
public JobApi.MetricResults portableMetrics() throws UnsupportedOperationException {
return JobApi.MetricResults.getDefaultInstance();
}
}
}