blob: 5f7fe8cbe61e3da6ad6c4380208a84708dffad01 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.fnexecution.artifact;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ArtifactMetadata;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestRequest;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.CommitManifestResponse;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.ProxyManifest.Location;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactMetadata;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactRequest;
import org.apache.beam.model.jobmanagement.v1.ArtifactApi.PutArtifactResponse;
import org.apache.beam.model.jobmanagement.v1.ArtifactStagingServiceGrpc.ArtifactStagingServiceImplBase;
import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.util.JsonFormat;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.StatusRuntimeException;
import org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.hash.Hasher;
import org.apache.beam.vendor.guava.v20_0.com.google.common.hash.Hashing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This implementation is experimental.
*
* <p>{@link ArtifactStagingServiceImplBase} based on beam file system. {@link
* BeamFileSystemArtifactStagingService} requires {@link StagingSessionToken} in every me call. The
* manifest is put in {@link StagingSessionToken#getBasePath()}/{@link
* StagingSessionToken#getSessionId()} and artifacts are put in {@link
* StagingSessionToken#getBasePath()}/{@link StagingSessionToken#getSessionId()}/{@link
* BeamFileSystemArtifactStagingService#ARTIFACTS}.
*
* <p>The returned token is the path to the manifest file.
*
* <p>The manifest file is encoded in {@link ProxyManifest}.
*/
public class BeamFileSystemArtifactStagingService extends ArtifactStagingServiceImplBase
implements FnService {
private static final Logger LOG =
LoggerFactory.getLogger(BeamFileSystemArtifactStagingService.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
// Use UTF8 for all text encoding.
private static final Charset CHARSET = StandardCharsets.UTF_8;
public static final String MANIFEST = "MANIFEST";
public static final String ARTIFACTS = "artifacts";
@Override
public StreamObserver<PutArtifactRequest> putArtifact(
StreamObserver<PutArtifactResponse> responseObserver) {
return new PutArtifactStreamObserver(responseObserver);
}
@Override
public void commitManifest(
CommitManifestRequest request, StreamObserver<CommitManifestResponse> responseObserver) {
try {
StagingSessionToken stagingSessionToken =
StagingSessionToken.decode(request.getStagingSessionToken());
ResourceId manifestResourceId = getManifestFileResourceId(stagingSessionToken);
ResourceId artifactDirResourceId = getArtifactDirResourceId(stagingSessionToken);
ProxyManifest.Builder proxyManifestBuilder =
ProxyManifest.newBuilder().setManifest(request.getManifest());
for (ArtifactMetadata artifactMetadata : request.getManifest().getArtifactList()) {
proxyManifestBuilder.addLocation(
Location.newBuilder()
.setName(artifactMetadata.getName())
.setUri(
artifactDirResourceId
.resolve(
encodedFileName(artifactMetadata), StandardResolveOptions.RESOLVE_FILE)
.toString())
.build());
}
try (WritableByteChannel manifestWritableByteChannel =
FileSystems.create(manifestResourceId, MimeTypes.TEXT)) {
manifestWritableByteChannel.write(
CHARSET.encode(JsonFormat.printer().print(proxyManifestBuilder.build())));
}
// TODO: Validate integrity of staged files.
responseObserver.onNext(
CommitManifestResponse.newBuilder()
.setRetrievalToken(manifestResourceId.toString())
.build());
responseObserver.onCompleted();
} catch (Exception e) {
// TODO: Cleanup all the artifacts.
LOG.error("Unable to commit manifest.", e);
responseObserver.onError(e);
}
}
@Override
public void close() throws Exception {
// Nothing to close here.
}
/**
* Generate a stagingSessionToken compatible with {@link BeamFileSystemArtifactStagingService}.
*
* @param sessionId Unique sessionId for artifact staging.
* @param basePath Base path to upload artifacts.
* @return Encoded stagingSessionToken.
*/
public static String generateStagingSessionToken(String sessionId, String basePath) {
StagingSessionToken stagingSessionToken = new StagingSessionToken();
stagingSessionToken.setSessionId(sessionId);
stagingSessionToken.setBasePath(basePath);
return stagingSessionToken.encode();
}
private String encodedFileName(ArtifactMetadata artifactMetadata) {
return "artifact_"
+ Hashing.sha256().hashString(artifactMetadata.getName(), CHARSET).toString();
}
public void removeArtifacts(String stagingSessionToken) throws Exception {
StagingSessionToken parsedToken = StagingSessionToken.decode(stagingSessionToken);
ResourceId dir = getJobDirResourceId(parsedToken);
ResourceId manifestResourceId = dir.resolve(MANIFEST, StandardResolveOptions.RESOLVE_FILE);
LOG.debug("Removing dir {}", dir);
ProxyManifest proxyManifest =
BeamFileSystemArtifactRetrievalService.loadManifest(manifestResourceId);
for (Location location : proxyManifest.getLocationList()) {
String uri = location.getUri();
LOG.debug("Removing artifact: {}", uri);
FileSystems.delete(
Collections.singletonList(FileSystems.matchNewResource(uri, false /* is directory */)));
}
ResourceId artifactsResourceId =
dir.resolve(ARTIFACTS, StandardResolveOptions.RESOLVE_DIRECTORY);
if (!proxyManifest.getLocationList().isEmpty()) {
// directory only exists when there is at least one artifact
LOG.debug("Removing artifacts dir: {}", artifactsResourceId);
FileSystems.delete(Collections.singletonList(artifactsResourceId));
}
LOG.debug("Removing manifest: {}", manifestResourceId);
FileSystems.delete(Collections.singletonList(manifestResourceId));
LOG.debug("Removing empty dir: {}", dir);
FileSystems.delete(Collections.singletonList(dir));
LOG.info("Removed dir {}", dir);
}
private ResourceId getJobDirResourceId(StagingSessionToken stagingSessionToken) {
ResourceId baseResourceId;
// Get or Create the base path
baseResourceId =
FileSystems.matchNewResource(stagingSessionToken.getBasePath(), true /* isDirectory */);
// Using sessionId as the subDir to store artifacts and manifest.
return baseResourceId.resolve(
stagingSessionToken.getSessionId(), StandardResolveOptions.RESOLVE_DIRECTORY);
}
private ResourceId getManifestFileResourceId(StagingSessionToken stagingSessionToken) {
return getJobDirResourceId(stagingSessionToken)
.resolve(MANIFEST, StandardResolveOptions.RESOLVE_FILE);
}
private ResourceId getArtifactDirResourceId(StagingSessionToken stagingSessionToken) {
return getJobDirResourceId(stagingSessionToken)
.resolve(ARTIFACTS, StandardResolveOptions.RESOLVE_DIRECTORY);
}
private class PutArtifactStreamObserver implements StreamObserver<PutArtifactRequest> {
private final StreamObserver<PutArtifactResponse> outboundObserver;
private PutArtifactMetadata metadata;
private ResourceId artifactId;
private WritableByteChannel artifactWritableByteChannel;
private Hasher hasher;
PutArtifactStreamObserver(StreamObserver<PutArtifactResponse> outboundObserver) {
this.outboundObserver = outboundObserver;
}
@Override
public void onNext(PutArtifactRequest putArtifactRequest) {
// Create the directory structure for storing artifacts in the first call.
if (metadata == null) {
checkNotNull(putArtifactRequest);
checkNotNull(putArtifactRequest.getMetadata());
metadata = putArtifactRequest.getMetadata();
LOG.debug("stored metadata: {}", metadata);
// Check the base path exists or create the base path
try {
ResourceId artifactsDirId =
getArtifactDirResourceId(
StagingSessionToken.decode(
putArtifactRequest.getMetadata().getStagingSessionToken()));
artifactId =
artifactsDirId.resolve(
encodedFileName(metadata.getMetadata()), StandardResolveOptions.RESOLVE_FILE);
LOG.debug(
"Going to stage artifact {} to {}.", metadata.getMetadata().getName(), artifactId);
artifactWritableByteChannel = FileSystems.create(artifactId, MimeTypes.BINARY);
hasher = Hashing.sha256().newHasher();
} catch (Exception e) {
String message =
String.format(
"Failed to begin staging artifact %s", metadata.getMetadata().getName());
outboundObserver.onError(
new StatusRuntimeException(Status.DATA_LOSS.withDescription(message).withCause(e)));
}
} else {
try {
ByteString data = putArtifactRequest.getData().getData();
artifactWritableByteChannel.write(data.asReadOnlyByteBuffer());
hasher.putBytes(data.toByteArray());
} catch (IOException e) {
String message =
String.format(
"Failed to write chunk of artifact %s to %s",
metadata.getMetadata().getName(), artifactId);
outboundObserver.onError(
new StatusRuntimeException(Status.DATA_LOSS.withDescription(message).withCause(e)));
}
}
}
@Override
public void onError(Throwable throwable) {
// Delete the artifact.
LOG.error("Staging artifact failed for " + artifactId, throwable);
try {
if (artifactWritableByteChannel != null) {
artifactWritableByteChannel.close();
}
if (artifactId != null) {
FileSystems.delete(
Collections.singletonList(artifactId), StandardMoveOptions.IGNORE_MISSING_FILES);
}
} catch (IOException e) {
outboundObserver.onError(
new StatusRuntimeException(
Status.DATA_LOSS.withDescription(
String.format("Failed to clean up artifact file %s", artifactId))));
return;
}
outboundObserver.onError(
new StatusRuntimeException(
Status.DATA_LOSS
.withDescription(String.format("Failed to stage artifact %s", artifactId))
.withCause(throwable)));
}
@Override
public void onCompleted() {
// Close the stream.
LOG.debug("Staging artifact completed for " + artifactId);
if (artifactWritableByteChannel != null) {
try {
artifactWritableByteChannel.close();
} catch (IOException e) {
onError(e);
return;
}
}
String expectedSha256 = metadata.getMetadata().getSha256();
if (expectedSha256 != null && !expectedSha256.isEmpty()) {
String actualSha256 = hasher.hash().toString();
if (!actualSha256.equals(expectedSha256)) {
outboundObserver.onError(
new StatusRuntimeException(
Status.INVALID_ARGUMENT.withDescription(
String.format(
"Artifact %s is corrupt: expected sah256 %s, but has sha256 %s",
metadata.getMetadata().getName(), expectedSha256, actualSha256))));
return;
}
}
outboundObserver.onNext(PutArtifactResponse.newBuilder().build());
outboundObserver.onCompleted();
}
}
/**
* Serializable StagingSessionToken used to stage files with {@link
* BeamFileSystemArtifactStagingService}.
*/
private static class StagingSessionToken implements Serializable {
private String sessionId;
private String basePath;
/** Access is public for json conversion. */
public String getSessionId() {
return sessionId;
}
private void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
/** Access is public for json conversion. */
public String getBasePath() {
return basePath;
}
private void setBasePath(String basePath) {
this.basePath = basePath;
}
public String encode() {
try {
return MAPPER.writeValueAsString(this);
} catch (JsonProcessingException e) {
String message =
String.format("Error %s occurred while serializing %s", e.getMessage(), this);
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
}
}
public static StagingSessionToken decode(String stagingSessionToken) throws Exception {
try {
return MAPPER.readValue(stagingSessionToken, StagingSessionToken.class);
} catch (JsonProcessingException e) {
String message =
String.format(
"Unable to deserialize staging token %s. Expected format: %s. Error: %s",
stagingSessionToken,
"{\"sessionId\": \"sessionId\", \"basePath\": \"basePath\"}",
e.getMessage());
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription(message));
}
}
@Override
public String toString() {
return "StagingSessionToken{"
+ "sessionId='"
+ sessionId
+ "', "
+ "basePath='"
+ basePath
+ "'"
+ "}";
}
}
}