blob: 36284366e313aa7c31a21a620e2dbaad9aa3d752 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.highavailability;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rest.messages.json.JobResultDeserializer;
import org.apache.flink.runtime.rest.messages.json.JobResultSerializer;
import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;
import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
/**
* An implementation of the {@link JobResultStore} which persists job result data to an underlying
* distributed filesystem.
*/
public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore {
private static final Logger LOG = LoggerFactory.getLogger(FileSystemJobResultStore.class);
@VisibleForTesting static final String FILE_EXTENSION = ".json";
@VisibleForTesting static final String DIRTY_FILE_EXTENSION = "_DIRTY" + FILE_EXTENSION;
@VisibleForTesting
public static boolean hasValidDirtyJobResultStoreEntryExtension(String filename) {
return filename.endsWith(DIRTY_FILE_EXTENSION);
}
@VisibleForTesting
public static boolean hasValidJobResultStoreEntryExtension(String filename) {
return filename.endsWith(FILE_EXTENSION);
}
private final ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
private final FileSystem fileSystem;
private volatile boolean basePathCreated;
private final Path basePath;
private final boolean deleteOnCommit;
@VisibleForTesting
FileSystemJobResultStore(FileSystem fileSystem, Path basePath, boolean deleteOnCommit) {
this.fileSystem = fileSystem;
this.basePath = basePath;
this.deleteOnCommit = deleteOnCommit;
}
public static FileSystemJobResultStore fromConfiguration(Configuration config)
throws IOException {
Preconditions.checkNotNull(config);
final String jrsStoragePath = config.get(JobResultStoreOptions.STORAGE_PATH);
final Path basePath;
if (isNullOrWhitespaceOnly(jrsStoragePath)) {
final String haStoragePath = config.get(HighAvailabilityOptions.HA_STORAGE_PATH);
final String haClusterId = config.get(HighAvailabilityOptions.HA_CLUSTER_ID);
basePath = new Path(createDefaultJobResultStorePath(haStoragePath, haClusterId));
} else {
basePath = new Path(jrsStoragePath);
}
boolean deleteOnCommit = config.get(JobResultStoreOptions.DELETE_ON_COMMIT);
return new FileSystemJobResultStore(basePath.getFileSystem(), basePath, deleteOnCommit);
}
private void createBasePathIfNeeded() throws IOException {
if (!basePathCreated) {
LOG.info("Creating highly available job result storage directory at {}", basePath);
fileSystem.mkdirs(basePath);
LOG.info("Created highly available job result storage directory at {}", basePath);
basePathCreated = true;
}
}
public static String createDefaultJobResultStorePath(String baseDir, String clusterId) {
return baseDir + "/job-result-store/" + clusterId;
}
/**
* Given a job ID, construct the path for a dirty entry corresponding to it in the job result
* store.
*
* @param jobId The job ID to construct a dirty entry path from.
* @return A path for a dirty entry for the given the Job ID.
*/
private Path constructDirtyPath(JobID jobId) {
return constructEntryPath(jobId.toString() + DIRTY_FILE_EXTENSION);
}
/**
* Given a job ID, construct the path for a clean entry corresponding to it in the job result
* store.
*
* @param jobId The job ID to construct a clean entry path from.
* @return A path for a clean entry for the given the Job ID.
*/
private Path constructCleanPath(JobID jobId) {
return constructEntryPath(jobId.toString() + FILE_EXTENSION);
}
@VisibleForTesting
Path constructEntryPath(String fileName) {
return new Path(this.basePath, fileName);
}
@Override
public void createDirtyResultInternal(JobResultEntry jobResultEntry) throws IOException {
createBasePathIfNeeded();
final Path path = constructDirtyPath(jobResultEntry.getJobId());
try (OutputStream os = fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE)) {
mapper.writeValue(
// working around the internally used _writeAndClose method to ensure that close
// is only called once
new NonClosingOutputStreamDecorator(os),
new JsonJobResultEntry(jobResultEntry));
}
}
@Override
public void markResultAsCleanInternal(JobID jobId) throws IOException, NoSuchElementException {
Path dirtyPath = constructDirtyPath(jobId);
if (!fileSystem.exists(dirtyPath)) {
throw new NoSuchElementException(
String.format(
"Could not mark job %s as clean as it is not present in the job result store.",
jobId));
}
if (deleteOnCommit) {
fileSystem.delete(dirtyPath, false);
} else {
fileSystem.rename(dirtyPath, constructCleanPath(jobId));
}
}
@Override
public boolean hasDirtyJobResultEntryInternal(JobID jobId) throws IOException {
return fileSystem.exists(constructDirtyPath(jobId));
}
@Override
public boolean hasCleanJobResultEntryInternal(JobID jobId) throws IOException {
return fileSystem.exists(constructCleanPath(jobId));
}
@Override
public Set<JobResult> getDirtyResultsInternal() throws IOException {
createBasePathIfNeeded();
final FileStatus[] statuses = fileSystem.listStatus(this.basePath);
Preconditions.checkState(
statuses != null,
"The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.");
final Set<JobResult> dirtyResults = new HashSet<>();
for (FileStatus s : statuses) {
if (!s.isDir()) {
if (hasValidDirtyJobResultStoreEntryExtension(s.getPath().getName())) {
JsonJobResultEntry jre =
mapper.readValue(
fileSystem.open(s.getPath()), JsonJobResultEntry.class);
dirtyResults.add(jre.getJobResult());
}
}
}
return dirtyResults;
}
/**
* Wrapper class around {@link JobResultEntry} to allow for serialization of a schema version,
* so that future schema changes can be handled in a backwards compatible manner.
*/
@JsonIgnoreProperties(
value = {JsonJobResultEntry.FIELD_NAME_VERSION},
allowGetters = true)
@VisibleForTesting
static class JsonJobResultEntry extends JobResultEntry {
private static final String FIELD_NAME_RESULT = "result";
static final String FIELD_NAME_VERSION = "version";
private JsonJobResultEntry(JobResultEntry entry) {
this(entry.getJobResult());
}
@JsonCreator
private JsonJobResultEntry(@JsonProperty(FIELD_NAME_RESULT) JobResult jobResult) {
super(jobResult);
}
@Override
@JsonProperty(FIELD_NAME_RESULT)
@JsonSerialize(using = JobResultSerializer.class)
@JsonDeserialize(using = JobResultDeserializer.class)
public JobResult getJobResult() {
return super.getJobResult();
}
@JsonIgnore
@Override
public JobID getJobId() {
return super.getJobId();
}
public int getVersion() {
return 1;
}
}
}