These release notes discuss important aspects, such as configuration, behavior, or dependencies, that changed between Flink 1.12 and Flink 1.13. Please read these notes carefully if you are planning to upgrade your Flink version to 1.13.
The state.backend.async
option is deprecated. Snapshots are always asynchronous now (as they were by default before) and there is no option to configure a synchronous snapshot any more.
The constructors of FsStateBackend
and MemoryStateBackend
that take a flag for sync/async snapshots are kept for API compatibility, but the flags are ignored now.
Flink has always separated local state storage from fault tolerance. Keyed state is maintained locally in state backends, either on the JVM heap or in embedded RocksDB instances. Fault tolerance comes from checkpoints and savepoints - periodic snapshots of a job's internal state to some durable file system - such as Amazon S3 or HDFS.
Historically, Flink's StateBackend
interface intermixed these concepts in a way that confused many users. In 1.13, checkpointing configurations have been extracted into their own interface, CheckpointStorage
.
This change does not affect the runtime behavior and simply provides a better mental model to users. Pipelines can be updated to use the new the new abstractions without losing state, consistency, or change in semantics.
Please follow the migration guide or the JavaDoc on the deprecated state backend classes - MemoryStateBackend
, FsStateBackend
and RocksDBStateBackend
for migration details.
Flinkās savepoint binary format is unified across all state backends. That means you can take a savepoint with one state backend and then restore it using another.
If you want to switch the state backend you should first upgrade your Flink version to 1.13, then take a savepoint with the new version, and only after that, you can restore it with a different state backend.
The Failure Rate Restart Strategy was allowing 1 less restart per interval than configured. Users wishing to keep the current behavior should reduce the maximum number of allowed failures per interval by 1.
While recovering from unaligned checkpoints, users can now change the parallelism of the job. This change allows users to quickly upscale the job under backpressure.
The old planner of the Table & SQL API is deprecated and will be dropped in Flink 1.14. This means that both the BatchTableEnvironment and DataSet API interop are reaching end of life. Use the unified TableEnvironment for batch and stream processing with the new planner, or the DataStream API in batch execution mode.
Before Flink 1.13, the function return type of PROCTIME()
is TIMESTAMP
, and the return value is the TIMESTAMP
in UTC time zone, e.g. the wall-clock shows 2021-03-01 12:00:00
at Shanghai, however the PROCTIME()
displays 2021-03-01 04:00:00
which is wrong. Flink 1.13 fixes this issue and uses TIMESTAMP_LTZ
type as return type of PROCTIME()
, users don't need to deal time zone problems anymore.
Support defining event time attribute on TIMESTAMP_LTZ column, base on this, Flink SQL gracefully support the Daylight Saving Time.
The value of time function CURRENT_TIMESTAMP and NOW() are corrected from UTC time with TIMESTAMP
type to epoch time with TIMESTAMP_LTZ
type. Time function LOCALTIME, LOCALTIMESTAMP, CURRENT_DATE, CURRENT_TIME, CURRENT_TIMESTAMP and NOW() are corrected from evaluates for per record in batch mode to evaluate once at query-start for batch job.
The CAST operation between NUMERIC
type and TIMESTAMP
type is problematic and is disabled now, e.g. CAST(numeric AS TIMESTAMP(3))
is disabled and should use TO_TIMESTAMP(FROM_UNIXTIME(numeric))
instead.
The term MODULES is a reserved keyword now. Use backticks to escape column names and other identifiers with this name.
Table.execute().collect()
might return slightly different results for column types and row kind. The most important differences include:
StreamTableEnvironment.fromDataStream
has slightly different semantics now because it has been integrated into the new type system. Esp. row fields derived from composite type information might be in a different order compared to 1.12. The old behavior is still available via the overloaded method that takes expressions like fromDataStream(ds, $("field1"), $("field2"))
.
The Row.toSting()
method has been reworked. This is an incompatible change. If the legacy representation is still required for tests, the old behavior can be restored via the flag RowUtils.USE_LEGACY_TO_STRING
for the local JVM. However, relying on the row's string representation for tests is not a good idea in general as field data types are not verified.
The sql-client-defaults.yaml
YAML file is deprecated and not provided in the release package. To be compatible, it‘s still supported to initialize the SQL Client with the YAML file if manually provided. But it’s recommend to use the new introduced -i
startup option to execute an initialization SQL file to setup the SQL Client session. The so-called initialization SQL file can use Flink DDLs to define available catalogs, table sources and sinks, user-defined functions, and other properties required for execution and deployment. The support of legacy SQL Client YAML file will be totally dropped in Flink 1.14.
Hive dialect supports HiveQL for DML and DQL. Please switch to default dialect in order to write in Flink syntax.
endInput()
is not called anymore (on BoundedOneInput and BoundedMultiInput) when the job is stopping with savepoint.
The configuration parameter jobmanager.scheduler.scheduling-strategy
has been removed, because the legacy
scheduler has been removed from Flink 1.13.0.
A new configuration value cluster.intercept-user-system-exit
allows to log a warning, or throw an exception if user code calls System.exit()
.
This feature is not covering all locations in Flink where user code is executed. It just adds the infrastructure for such an interception. We are tracking this improvement in FLINK-21307.
The semantics for accumulators have now changed in MiniClusterJobClient
to fix this bug and comply with other JobClient implementations: Previously MiniClusterJobClient
assumed that getAccumulator()
was called on a bounded pipeline and that the user wanted to acquire the final accumulator values after the job is finished. But now it returns the current value of accumulators immediately to be compatible with unbounded pipelines.
If it is run on a bounded pipeline, then to get the final accumulator values after the job is finished, one needs to call
getJobExecutionResult().thenApply(JobExecutionResult::getAllAccumulatorResults)
The docker images no longer set the default number of taskmanager slots to the number of CPU cores. This behavior was inconsistent with all other deployment methods and ignored any limits on the CPU usage set via docker.
The docker switch for disabling the jemalloc memory allocator has been reworked from a script argument to an environment variable called DISABLE_JEMALLOC. If set to “true” jemalloc will not be enabled.
The Swift filesystem is no longer being actively developed and has been removed from the project and distribution.
The unified source API for connectors has a minor breaking change. The SplitEnumerator.snapshotState()
method was adjusted to accept the Checkpoint ID of the checkpoint for which the snapshot is created.
State access latency metrics are introduced to track all kinds of keyed state access to help debug state performance. This feature is not enabled by default and can be turned on by setting state.backend.latency-track.keyed-state-enabled
to true.
Flink now offers flame graphs for each node in the job graph. Please enable this experimental feature by setting the respective configuration flag rest.flamegraph.enabled
.
Flink exposes the exception history now through the REST API and the UI. The amount of most-recently handled exceptions that shall be tracked can be defined through web.exception-history-size
. Some values of the exception history's REST API Json response are deprecated as part of this effort.
Previously idleTimeMsPerSecond
was defined as the time task spent waiting for either the input or the back pressure. Now idleTimeMsPerSecond
excludes back pressured time, so if the task is back pressured it is not idle. The back pressured time is now measured separately as backPressuredTimeMsPerSecond
.
The Log4j support for updating the Log4j configuration at runtime has been enabled by default. The configuration files are checked for changes every 30 seconds.
The Zookeeper scripts in the Flink distribution have been modified to disable the Log4j JMX integration due to an incompatibility between Zookeeper 3.4 and Log4j 2. To re-enable this feature, remove the line in the zookeeper.sh
file that sets zookeeper.jmx.log4j.disable
.
Task's RUNNING state was split into two states: INITIALIZING and RUNNING. Task is INITIALIZING while state is initialising and in case of unaligned checkpoints, until all the in-flight data has been recovered.
The community decided to deprecate the Apache Mesos support for Apache Flink. It is subject to removal in the future. Users are encouraged to switch to a different resource manager.