| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.flink.core.fs; |
| |
| import org.apache.flink.annotation.PublicEvolving; |
| import org.apache.flink.core.io.SimpleVersionedSerializer; |
| |
| import java.io.IOException; |
| |
| /** |
| * The RecoverableWriter creates and recovers {@link RecoverableFsDataOutputStream}. |
| * It can be used to write data to a file system in a way that the writing can be |
| * resumed consistently after a failure and recovery without loss of data or possible |
| * duplication of bytes. |
| * |
| * <p>The streams do not make the files they write to immediately visible, but instead write |
| * to temp files or other temporary storage. To publish the data atomically in the |
| * end, the stream offers the {@link RecoverableFsDataOutputStream#closeForCommit()} method |
| * to create a committer that publishes the result. |
| * |
| * <p>These writers are useful in the context of checkpointing. The example below illustrates |
| * how to use them: |
| * |
| * <pre>{@code |
| * // --------- initial run -------- |
| * RecoverableWriter writer = fileSystem.createRecoverableWriter(); |
| * RecoverableFsDataOutputStream out = writer.open(path); |
| * out.write(...); |
| * |
| * // persist intermediate state |
| * ResumeRecoverable intermediateState = out.persist(); |
| * storeInCheckpoint(intermediateState); |
| * |
| * // --------- recovery -------- |
| * ResumeRecoverable lastCheckpointState = ...; // get state from checkpoint |
| * RecoverableWriter writer = fileSystem.createRecoverableWriter(); |
| * RecoverableFsDataOutputStream out = writer.recover(lastCheckpointState); |
| * |
| * out.write(...); // append more data |
| * |
| * out.closeForCommit().commit(); // close stream and publish all the data |
| * |
| * // --------- recovery without appending -------- |
| * ResumeRecoverable lastCheckpointState = ...; // get state from checkpoint |
| * RecoverableWriter writer = fileSystem.createRecoverableWriter(); |
| * Committer committer = writer.recoverForCommit(lastCheckpointState); |
| * committer.commit(); // publish the state as of the last checkpoint |
| * }</pre> |
| * |
| * <h3>Recovery</h3> |
| * |
| * <p>Recovery relies on data persistence in the target file system or object store. While the |
| * code itself works with the specific primitives that the target storage offers, recovery will |
| * fail if the data written so far was deleted by an external factor. |
| * For example, some implementations stage data in temp files or object parts. If these |
| * were deleted by someone or by an automated cleanup policy, then resuming |
| * may fail. This is not surprising and should be expected, but we want to explicitly point |
| * this out here. |
| * |
| * <p>Specific care is needed for systems like S3, where the implementation uses Multipart Uploads |
| * to incrementally upload and persist parts of the result. Timeouts for Multipart Uploads |
| * and life time of Parts in unfinished Multipart Uploads need to be set in the bucket policy |
| * high enough to accommodate the recovery. These values are typically in the days, so regular |
| * recovery is typically not a problem. What can become an issue is situations where a Flink |
| * application is hard killed (all processes or containers removed) and then one tries to |
| * manually recover the application from an externalized checkpoint some days later. In that |
| * case, systems like S3 may have removed uncommitted parts and recovery will not succeed. |
| * |
| * <h3>Implementer's Note</h3> |
| * |
| * <p>From the perspective of the implementer, it would be desirable to make this class |
| * generic with respect to the concrete types of 'CommitRecoverable' and 'ResumeRecoverable'. |
| * However, we found that this makes the code more clumsy to use and we hence dropped the |
| * generics at the cost of doing some explicit casts in the implementation that would |
| * otherwise have been implicitly generated by the generics compiler. |
| */ |
| @PublicEvolving |
| public interface RecoverableWriter { |
| |
| /** |
| * Opens a new recoverable stream to write to the given path. |
| * Whether existing files will be overwritten is implementation specific and should |
| * not be relied upon. |
| * |
| * @param path The path of the file/object to write to. |
| * @return A new RecoverableFsDataOutputStream writing a new file/object. |
| * |
| * @throws IOException Thrown if the stream could not be opened/initialized. |
| */ |
| RecoverableFsDataOutputStream open(Path path) throws IOException; |
| |
| /** |
| * Resumes a recoverable stream consistently at the point indicated by the given ResumeRecoverable. |
| * Future writes to the stream will continue / append the file as of that point. |
| * |
| * <p>This method is optional and whether it is supported is indicated through the |
| * {@link #supportsResume()} method. |
| * |
| * @param resumable The opaque handle with the recovery information. |
| * @return A recoverable stream writing to the file/object as it was at the point when the |
| * ResumeRecoverable was created. |
| * |
| * @throws IOException Thrown, if resuming fails. |
| * @throws UnsupportedOperationException Thrown if this optional method is not supported. |
| */ |
| RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws IOException; |
| |
| /** |
| * Marks if the writer requires to do any additional cleanup/freeing of resources occupied |
| * as part of a {@link ResumeRecoverable}, e.g. temporarily files created or objects uploaded |
| * to external systems. |
| * |
| * <p>In case cleanup is required, then {@link #cleanupRecoverableState(ResumeRecoverable)} should |
| * be called. |
| * |
| * @return {@code true} if cleanup is required, {@code false} otherwise. |
| */ |
| boolean requiresCleanupOfRecoverableState(); |
| |
| /** |
| * Frees up any resources that were previously occupied in order to be able to |
| * recover from a (potential) failure. These can be temporary files that were written |
| * to the filesystem or objects that were uploaded to S3. |
| * |
| * <p><b>NOTE:</b> This operation should not throw an exception if the resumable has already |
| * been cleaned up and the resources have been freed. But the contract is that it will throw |
| * an {@link UnsupportedOperationException} if it is called for a {@code RecoverableWriter} |
| * whose {@link #requiresCleanupOfRecoverableState()} returns {@code false}. |
| * |
| * @param resumable The {@link ResumeRecoverable} whose state we want to clean-up. |
| * @return {@code true} if the resources were successfully freed, {@code false} otherwise |
| * (e.g. the file to be deleted was not there for any reason - already deleted or never created). |
| */ |
| boolean cleanupRecoverableState(ResumeRecoverable resumable) throws IOException; |
| |
| /** |
| * Recovers a recoverable stream consistently at the point indicated by the given CommitRecoverable |
| * for finalizing and committing. This will publish the target file with exactly the data |
| * that was written up to the point then the CommitRecoverable was created. |
| * |
| * @param resumable The opaque handle with the recovery information. |
| * @return A committer that publishes the target file. |
| * |
| * @throws IOException Thrown, if recovery fails. |
| */ |
| RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable resumable) throws IOException; |
| |
| /** |
| * The serializer for the CommitRecoverable types created in this writer. |
| * This serializer should be used to store the CommitRecoverable in checkpoint |
| * state or other forms of persistent state. |
| */ |
| SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer(); |
| |
| /** |
| * The serializer for the ResumeRecoverable types created in this writer. |
| * This serializer should be used to store the ResumeRecoverable in checkpoint |
| * state or other forms of persistent state. |
| */ |
| SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer(); |
| |
| /** |
| * Checks whether the writer and its streams support resuming (appending to) files after |
| * recovery (via the {@link #recover(ResumeRecoverable)} method). |
| * |
| * <p>If true, then this writer supports the {@link #recover(ResumeRecoverable)} method. |
| * If false, then that method may not be supported and streams can only be recovered via |
| * {@link #recoverForCommit(CommitRecoverable)}. |
| */ |
| boolean supportsResume(); |
| |
| // ------------------------------------------------------------------------ |
| |
| /** |
| * A handle to an in-progress stream with a defined and persistent amount of data. |
| * The handle can be used to recover the stream as of exactly that point and |
| * publish the result file. |
| */ |
| interface CommitRecoverable {} |
| |
| /** |
| * A handle to an in-progress stream with a defined and persistent amount of data. |
| * The handle can be used to recover the stream exactly as of that point and either |
| * publish the result file or keep appending data to the stream. |
| */ |
| interface ResumeRecoverable extends CommitRecoverable {} |
| } |