blob: 033e4859dcc11f5908333e8729a6948f3da094aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.state.api;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.state.api.runtime.SavepointLoader;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND_MAX_PARALLELISM;
/**
* This class provides entry points for loading an existing savepoint, or a new empty savepoint.
*
* @see ExistingSavepoint
* @see NewSavepoint
* @see SavepointReader
* @see SavepointWriter
* @deprecated For creating a new savepoint, use {@link SavepointWriter} and the data stream api
* under batch execution. For reading a savepoint, use {@link SavepointReader} and the data
* stream api under batch execution.
*/
@PublicEvolving
@Deprecated
public final class Savepoint {
private Savepoint() {}
/**
* Loads an existing savepoint. Useful if you want to query, modify, or extend the state of an
* existing application. The savepoint will be read using the state backend defined via the
* clusters configuration.
*
* @param env The execution environment used to transform the savepoint.
* @param path The path to an existing savepoint on disk.
* @see #load(ExecutionEnvironment, String, StateBackend)
*/
public static ExistingSavepoint load(ExecutionEnvironment env, String path) throws IOException {
CheckpointMetadata metadata = SavepointLoader.loadSavepointMetadata(path);
int maxParallelism =
metadata.getOperatorStates().stream()
.map(OperatorState::getMaxParallelism)
.max(Comparator.naturalOrder())
.orElseThrow(
() ->
new RuntimeException(
"Savepoint must contain at least one operator state."));
SavepointMetadata savepointMetadata =
new SavepointMetadata(
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
return new ExistingSavepoint(env, savepointMetadata, null);
}
/**
* Loads an existing savepoint. Useful if you want to query, modify, or extend the state of an
* existing application.
*
* @param env The execution environment used to transform the savepoint.
* @param path The path to an existing savepoint on disk.
* @param stateBackend The state backend of the savepoint.
* @see #load(ExecutionEnvironment, String)
*/
public static ExistingSavepoint load(
ExecutionEnvironment env, String path, StateBackend stateBackend) throws IOException {
Preconditions.checkNotNull(stateBackend, "The state backend must not be null");
CheckpointMetadata metadata = SavepointLoader.loadSavepointMetadata(path);
int maxParallelism =
metadata.getOperatorStates().stream()
.map(OperatorState::getMaxParallelism)
.max(Comparator.naturalOrder())
.orElseThrow(
() ->
new RuntimeException(
"Savepoint must contain at least one operator state."));
SavepointMetadata savepointMetadata =
new SavepointMetadata(
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
return new ExistingSavepoint(env, savepointMetadata, stateBackend);
}
/**
* Creates a new savepoint. The savepoint will be read using the state backend defined via the
* clusters configuration.
*
* @param maxParallelism The max parallelism of the savepoint.
* @return A new savepoint.
* @see #create(StateBackend, int)
*/
public static NewSavepoint create(int maxParallelism) {
Preconditions.checkArgument(
maxParallelism > 0 && maxParallelism <= UPPER_BOUND_MAX_PARALLELISM,
"Maximum parallelism must be between 1 and "
+ UPPER_BOUND_MAX_PARALLELISM
+ ". Found: "
+ maxParallelism);
SavepointMetadata metadata =
new SavepointMetadata(
maxParallelism, Collections.emptyList(), Collections.emptyList());
return new NewSavepoint(metadata, null);
}
/**
* Creates a new savepoint.
*
* @param stateBackend The state backend of the savepoint used for keyed state.
* @param maxParallelism The max parallelism of the savepoint.
* @return A new savepoint.
* @see #create(int)
*/
public static NewSavepoint create(StateBackend stateBackend, int maxParallelism) {
Preconditions.checkNotNull(stateBackend, "The state backend must not be null");
Preconditions.checkArgument(
maxParallelism > 0 && maxParallelism <= UPPER_BOUND_MAX_PARALLELISM,
"Maximum parallelism must be between 1 and "
+ UPPER_BOUND_MAX_PARALLELISM
+ ". Found: "
+ maxParallelism);
SavepointMetadata metadata =
new SavepointMetadata(
maxParallelism, Collections.emptyList(), Collections.emptyList());
return new NewSavepoint(metadata, stateBackend);
}
}