These release notes discuss important aspects, such as configuration, behavior, or dependencies, that changed between Flink 1.9 and Flink 1.10. Please read these notes carefully if you are planning to upgrade your Flink version to 1.10.
s3-hadoop and s3-presto filesystems do no longer use class relocations and need to be loaded through [plugins]({{< ref “docs/deployment/filesystems/overview” >}}#pluggable-file-systems) but now seamlessly integrate with all credential providers. Other filesystems are strongly recommended to be only used as plugins as we will continue to remove relocations.
The Flink client now also respects the configured classloading policy, i.e., parent-first
or child-first
classloading. Previously, only cluster components such as the job manager or task manager supported this setting. This does mean that users might get different behaviour in their programs, in which case they should configure the classloading policy explicitly to use parent-first
classloading, which was the previous (hard-coded) behaviour.
When FLIP-6 was rolled out with Flink 1.5.0, we changed how slots are allocated from TaskManagers (TMs). Instead of evenly allocating the slots from all registered TMs, we had the tendency to exhaust a TM before using another one. To use a scheduling strategy that is more similar to the pre-FLIP-6 behaviour, where Flink tries to spread out the workload across all currently available TMs, one can set cluster.evenly-spread-out-slots: true
in the flink-conf.yaml
.
All highly available artifacts stored by Flink will now be stored under HA_STORAGE_DIR/HA_CLUSTER_ID
with HA_STORAGE_DIR
configured by high-availability.storageDir
and HA_CLUSTER_ID
configured by high-availability.cluster-id
.
When using the --yarnship
command line option, resource directories and jar files will be added to the classpath in lexicographical order with resources directories appearing first.
The Flink CLI no longer supports the deprecated command line options -yn/--yarncontainer
, which were used to specify the number of containers to start on YARN. This option has been deprecated since the introduction of FLIP-6. All Flink users are advised to remove this command line option.
The Flink CLI no longer supports the deprecated command line options -yst/--yarnstreaming
, which were used to disable eager pre-allocation of memory. All Flink users are advised to remove this command line option.
Flink's Mesos integration now rejects all expired offers instead of only 4. This improves the situation where Fenzo holds on to a lot of expired offers without giving them back to the Mesos resource manager.
Flink's scheduler was refactored with the goal of making scheduling strategies customizable in the future. Using the legacy scheduler is discouraged as it will be removed in a future release. However, users that experience issues related to scheduling can fallback to the legacy scheduler by setting jobmanager.scheduler
to legacy
in their flink-conf.yaml
for the time being. Note, however, that using the legacy scheduler with the [Pipelined Region Failover Strategy]({{< ref “docs/ops/state/task_failure_recovery” >}}#restart-pipelined-region-failover-strategy) enabled has the following caveats:
. However, exceptions that cause a job to fail (e.g., when all restart attempts exhausted) will still be shown.
uptime
metric will not be reset after restarting a job due to task failure.
Note that in the default flink-conf.yaml
, the Pipelined Region Failover Strategy is already enabled. That is, users that want to use the legacy scheduler and cannot accept aforementioned caveats should make sure that jobmanager.execution.failover-strategy
is set to full
or not set at all.
Beginning from this release, Flink can be compiled and run with Java 11. All Java 8 artifacts can be also used with Java 11. This means that users that want to run Flink with Java 11 do not have to compile Flink themselves.
When starting Flink with Java 11, the following warnings may be logged:
WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.core.memory.MemoryUtils (file:/opt/flink/flink-1.10.0/lib/flink-dist_2.11-1.10.0.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.flink.core.memory.MemoryUtils WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/home/flinkuser/.m2/repository/org/apache/flink/flink-core/1.10.0/flink-core-1.10.0.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/home/flinkuser/.m2/repository/io/netty/netty/3.10.6.Final/netty-3.10.6.Final.jar) to method java.nio.DirectByteBuffer.cleaner() WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by com.esotericsoftware.kryo.util.UnsafeUtil (file:/home/flinkuser/.m2/repository/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar) to constructor java.nio.DirectByteBuffer(long,int,java.lang.Object) WARNING: Please consider reporting this to the maintainers of com.esotericsoftware.kryo.util.UnsafeUtil WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release
These warnings are considered harmless and will be addressed in future Flink releases.
Lastly, note that the connectors for Cassandra, Hive, HBase, and Kafka 0.8--0.11 have not been tested with Java 11 because the respective projects did not provide Java 11 support at the time of the Flink 1.10.0 release.
With FLIP-49, a new memory model has been introduced for the task executor. New configuration options have been introduced to control the memory consumption of the task executor process. This affects all types of deployments: standalone, YARN, Mesos, and the new active Kubernetes integration. The memory model of the job manager process has not been changed yet but it is planned to be updated as well.
If you try to reuse your previous Flink configuration without any adjustments, the new memory model can result in differently computed memory parameters for the JVM and, thus, performance changes.
Please, check the user documentation for more details.
The following options have been removed and have no effect anymore:
The following options, if used, are interpreted as other new options in order to maintain backwards compatibility where it makes sense:
The container cut-off configuration options, containerized.heap-cutoff-ratio
and containerized.heap-cutoff-min
, have no effect for task executor processes anymore but they still have the same semantics for the JobManager process.
Together with the introduction of the new Task Executor Memory Model, the memory consumption of the RocksDB state backend will be limited by the total amount of Flink Managed Memory, which can be configured via taskmanager.memory.managed.size
or taskmanager.memory.managed.fraction
. Furthermore, users can tune RocksDB's write/read memory ratio (state.backend.rocksdb.memory.write-buffer-ratio
, by default 0.5
) and the reserved memory fraction for indices/filters (state.backend.rocksdb.memory.high-prio-pool-ratio
, by default 0.1
). More details and advanced configuration options can be found in the [Flink user documentation]({{< ref “docs/ops/state/large_state_tuning” >}}#tuning-rocksdb-memory).
Config options table.exec.resource.external-buffer-memory
, table.exec.resource.hash-agg.memory
, table.exec.resource.hash-join.memory
, and table.exec.resource.sort.memory
have been deprecated. Beginning from Flink 1.10, these config options are interpreted as weight hints instead of absolute memory requirements. Flink choses sensible default weight hints which should not be adjustment by users.
The identifier raw
is a reserved keyword now and must be escaped with backticks when used as a SQL field or function name.
Some indexed properties for table connectors have been flattened and renamed for a better user experience when writing DDL statements. This affects the Kafka Connector properties connector.properties
and connector.specific-offsets
. Furthermore, the Elasticsearch Connector property connector.hosts
is affected. The aforementioned, old properties are deprecated and will be removed in future versions. Please consult the [Table Connectors documentation]({{< ref “docs/connectors/table/overview” >}}) for the new property names.
Methods registerTable()
/registerDataStream()
/registerDataSet()
have been deprecated in favor of createTemporaryView()
, which better adheres to the corresponding SQL term.
The scan()
method has been deprecated in favor of the from()
method.
Methods registerTableSource()
/registerTableSink()
become deprecated in favor of ConnectTableDescriptor#createTemporaryTable()
. The ConnectTableDescriptor
approach expects only a set of string properties as a description of a TableSource or TableSink instead of an instance of a class in case of the deprecated methods. This in return makes it possible to reliably store those definitions in catalogs.
Method insertInto(String path, String... pathContinued)
has been removed in favor of in insertInto(String path)
.
All the newly introduced methods accept a String identifier which will be parsed into a 3-part identifier. The parser supports quoting the identifier. It also requires escaping any reserved SQL keywords.
The deprecated ExternalCatalog
API has been dropped. This includes:
ExternalCatalog
(and all dependent classes, e.g., ExternalTable
)SchematicDescriptor
, MetadataDescriptor
, StatisticsDescriptor
Users are advised to use the [new Catalog API]({{< ref “docs/dev/table/catalogs” >}}#catalog-api).
Getters of org.apache.flink.configuration.Configuration
throw IllegalArgumentException
now if the configured value cannot be parsed into the required type. In previous Flink releases the default value was returned in such cases.
The default restart delay for all shipped restart strategies, i.e., fixed-delay
and failure-rate
, has been raised to 1 s (from originally 0 s).
Previously, if the user had set restart-strategy.fixed-delay.attempts
or restart-strategy.fixed-delay.delay
but had not configured the option restart-strategy
, the cluster-level restart strategy would have been fixed-delay
. Now the cluster-level restart strategy is only determined by the config option restart-strategy
and whether checkpointing is enabled. See [“Task Failure Recovery”]({{< ref “docs/ops/state/task_failure_recovery” >}}) for details.
The config option taskmanager.network.bounded-blocking-subpartition-type
has been renamed to taskmanager.network.blocking-shuffle.type
. Moreover, the default value of the aforementioned config option has been changed from auto
to file
. The reason is that TaskManagers running on YARN with auto
, could easily exceed the memory budget of their container, due to incorrectly accounted memory-mapped files memory usage.
The non-credit-based network flow control code was removed alongside of the configuration option taskmanager.network.credit-model
. Flink will now always use credit-based flow control.
The configuration option high-availability.job.delay
has been removed since it is no longer used.
[Background cleanup of expired state with TTL]({{< ref “docs/dev/datastream/fault-tolerance/state” >}}#cleanup-of-expired-state) is activated by default now for all state backends shipped with Flink. Note that the RocksDB state backend implements background cleanup by employing a compaction filter. This has the caveat that even if a Flink job does not store state with TTL, a minor performance penalty during compaction is incurred. Users that experience noticeable performance degradation during RocksDB compaction can disable the TTL compaction filter by setting the config option state.backend.rocksdb.ttl.compaction.filter.enabled
to false
.
StateTtlConfig#Builder#cleanupInBackground()
has been deprecated because the background cleanup of state with TTL is already enabled by default.
The default timer store has been changed from Heap to RocksDB for the RocksDB state backend to support asynchronous snapshots for timer state and better scalability, with less than 5% performance cost. Users that find the performance decline critical can set state.backend.rocksdb.timer-service.factory
to HEAP
in flink-conf.yaml
to restore the old behavior.
StateTtlConfig#TimeCharacteristic
has been removed in favor of StateTtlConfig#TtlTimeCharacteristic
.
We have added a new method MapState#isEmpty()
which enables users to check whether a map state is empty. The new method is 40% faster than mapState.keys().iterator().hasNext()
when using the RocksDB state backend.
We have again released our own RocksDB build (FRocksDB) which is based on RocksDB version 5.17.2 with several feature backports for the Write Buffer Manager to enable limiting RocksDB's memory usage. The decision to release our own RocksDB build was made because later RocksDB versions suffer from a performance regression under certain workloads.
Logging in RocksDB (e.g., logging related to flush, compaction, memtable creation, etc.) has been disabled by default to prevent disk space from being filled up unexpectedly. Users that need to enable logging should implement their own RocksDBOptionsFactory
that creates DBOptions
instances with InfoLogLevel
set to INFO_LEVEL
.
In previous Flink releases users may encounter an OutOfMemoryError
when restoring from a RocksDB savepoint containing large KV pairs. For that reason we introduced a configurable memory limit in the RocksDBWriteBatchWrapper
with a default value of 2 MB. RocksDB's WriteBatch will flush before the consumed memory limit is reached. If needed, the limit can be tuned via the state.backend.rocksdb.write-batch-size
config option in flink-conf.yaml
.
Beginning from this release, PyFlink does not support Python 2. This is because Python 2 has reached end of life on January 1, 2020, and several third-party projects that PyFlink depends on are also dropping Python 2 support.
The InfluxdbReporter
now silently skips values that are unsupported by InfluxDB, such as Double.POSITIVE_INFINITY
, Double.NEGATIVE_INFINITY
, Double.NaN
, etc.
flink-connector-kinesis is now licensed under the Apache License, Version 2.0, and its artifacts will be deployed to Maven central as part of the Flink releases. Users no longer need to build the Kinesis connector from source themselves.
ExecutionConfig#getGlobalJobParameters
has been changed to never return null
. Conversely, ExecutionConfig#setGlobalJobParameters(GlobalJobParameters)
will not accept null
values anymore.
Implementations of MasterTriggerRestoreHook#triggerCheckpoint(long, long, Executor)
must be non-blocking now. Any blocking operation should be executed asynchronously, e.g., using the given executor.
The HighAvailabilityServices
have been split up into client-side ClientHighAvailabilityServices
and cluster-side HighAvailabilityServices
. When implementing custom high availability services, users should follow this separation by overriding the factory method HighAvailabilityServicesFactory#createClientHAServices(Configuration)
. Moreover, HighAvailabilityServices#getWebMonitorLeaderRetriever()
should no longer be implemented since it has been deprecated.
Implementations of HighAvailabilityServices
should implement HighAvailabilityServices#getClusterRestEndpointLeaderElectionService()
instead of HighAvailabilityServices#getWebMonitorLeaderElectionService()
.
LeaderElectionService#confirmLeadership(UUID, String)
now takes an additional second argument, which is the address under which the leader will be reachable. All custom LeaderElectionService
implementations will need to be updated accordingly.
The method org.apache.flink.streaming.runtime.tasks.StreamTask#getCheckpointLock()
is deprecated now. Users should use MailboxExecutor
to run actions that require synchronization with the task's thread (e.g. collecting output produced by an external thread). The methods MailboxExecutor#yield()
or MailboxExecutor#tryYield()
can be used for actions that need to give up control to other actions temporarily, e.g., if the current operator is blocked. The MailboxExecutor
can be accessed by using YieldingOperatorFactory
(see AsyncWaitOperator
for an example usage).
Interfaces OptionsFactory
and ConfigurableOptionsFactory
have been deprecated in favor of RocksDBOptionsFactory
and ConfigurableRocksDBOptionsFactory
, respectively.
Serialized JobGraphs
which set the ResourceSpec
created by Flink versions < 1.10
are no longer compatible with Flink >= 1.10
. If you want to migrate these jobs to Flink >= 1.10
you will have to stop the job with a savepoint and then resume it from this savepoint on the Flink >= 1.10
cluster.