These release notes discuss important aspects, such as configuration, behavior, or dependencies, that changed between Flink 1.14 and Flink 1.15. Please read these notes carefully if you are planning to upgrade your Flink version to 1.15.
There are Several changes in Flink 1.15 that require updating dependency names when upgrading from earlier versions, mainly including the effort to opting-out Scala dependencies from non-scala modules and reorganize table modules. A quick checklist of the dependency changes is as follows:
Any dependency to one of the following modules needs to be updated to no longer include a suffix:
flink-cep flink-clients flink-connector-elasticsearch-base flink-connector-elasticsearch6 flink-connector-elasticsearch7 flink-connector-gcp-pubsub flink-connector-hbase-1.4 flink-connector-hbase-2.2 flink-connector-hbase-base flink-connector-jdbc flink-connector-kafka flink-connector-kinesis flink-connector-nifi flink-connector-pulsar flink-connector-rabbitmq flink-container flink-dstl-dfs flink-gelly flink-hadoop-bulk flink-kubernetes flink-runtime-web flink-sql-connector-elasticsearch6 flink-sql-connector-elasticsearch7 flink-sql-connector-hbase-1.4 flink-sql-connector-hbase-2.2 flink-sql-connector-kafka flink-sql-connector-kinesis flink-sql-connector-rabbitmq flink-state-processor-api flink-statebackend-rocksdb flink-streaming-java flink-test-utils flink-yarn flink-table-api-java-bridge flink-table-runtime flink-sql-client flink-orc flink-orc-nohive flink-parquet
For Table / SQL users, the new module flink-table-planner-loader
replaces flink-table-planner_2.12
and avoids the need for a Scala suffix. For backwards compatibility, users can still swap it with flink-table-planner_2.12
located in opt/
. flink-table-uber
has been split into flink-table-api-java-uber
, flink-table-planner(-loader)
, and flink-table-runtime
. Scala users need to explicitly add a dependency to flink-table-api-scala
or flink-table-api-scala-bridge
.
The detail of the involved issues are listed as follows.
The Java DataSet/-Stream APIs are now independent of Scala and no longer transitively depend on it.
The implications are the following:
If you only intend to use the Java APIs, with Java types, then you can opt-in to a Scala-free Flink by removing the flink-scala
jar from the lib/
directory of the distribution. You are then free to use any Scala version and Scala libraries. You can either bundle Scala itself in your user-jar; or put into the lib/
directory of the distribution.
If you relied on the Scala APIs, without an explicit dependency on them, then you may experience issues when building your projects. You can solve this by adding explicit dependencies to the APIs that you are using. This should primarily affect users of the Scala DataStream/CEP
APIs.
A lot of modules have lost their Scala suffix. Further caution is advised when mixing dependencies from different Flink versions (e.g., an older connector), as you may now end up pulling in multiple versions of a single module (that would previously be prevented by the name being equal).
The new module flink-table-planner-loader
replaces flink-table-planner_2.12
and avoids the need for a Scala suffix. It is included in the Flink distribution under lib/
. For backwards compatibility, users can still swap it with flink-table-planner_2.12
located in opt/
. As a consequence, flink-table-uber
has been split into flink-table-api-java-uber
, flink-table-planner(-loader)
, and flink-table-runtime
. flink-sql-client
has no Scala suffix anymore.
It is recommended to let new projects depend on flink-table-planner-loader
(without Scala suffix) in provided scope.
Note that the distribution does not include the Scala API by default. Scala users need to explicitly add a dependency to flink-table-api-scala
or flink-table-api-scala-bridge
.
The flink-table-runtime
has no Scala suffix anymore. Make sure to include flink-scala
if the legacy type system (based on TypeInformation) with case classes is still used within Table API.
The table file system connector is not part of the flink-table-uber
JAR anymore but is a dedicated (but removable) flink-connector-files
JAR in the /lib
directory of a Flink distribution.
The support of Java 8 is now deprecated and will be removed in a future release (FLINK-25247). We recommend all users to migrate to Java 11.
The default Java version in the Flink docker images is now Java 11 (FLINK-25251). There are images built with Java 8, tagged with “java8”.
Support for Scala 2.11 has been removed in FLINK-20845. All Flink dependencies that (transitively) depend on Scala are suffixed with the Scala version that they are built for, for example flink-streaming-scala_2.12
. Users should update all Flink dependecies, changing “2.11” to “2.12”.
Scala versions (2.11, 2.12, etc.) are not binary compatible with one another. That also means that there‘s no guarantee that you can restore from a savepoint, made with a Flink Scala 2.11 application, if you’re upgrading to a Flink Scala 2.12 application. This depends on the data types that you have been using in your application.
The Scala Shell/REPL has been removed in FLINK-24360.
The legacy casting behavior has been disabled by default. This might have implications on corner cases (string parsing, numeric overflows, to string representation, varchar/binary precisions). Set table.exec.legacy-cast-behaviour=ENABLED
to restore the old behavior.
CHAR
/VARCHAR
lengths are enforced (trimmed/padded) by default now before entering the table sink.
Table functions that are called using Scala implicit conversions have been updated to use the new type system and new type inference. Users are requested to update their UDFs or use the deprecated TableEnvironment.registerFunction
to restore the old behavior temporarily by calling the function via name.
flink-conf.yaml
and other configurations from outer layers (e.g. CLI) are now propagated into TableConfig
. Even though configuration set directly in TableConfig
has still precedence, this change can have side effects if table configuration was accidentally set in other layers.
The previously deprecated methods TableEnvironment.execute
, Table.insertInto
, TableEnvironment.fromTableSource
, TableEnvironment.sqlUpdate
, and TableEnvironment.explain
have been removed. Please use TableEnvironment.executeSql
, TableEnvironment.explainSql
, TableEnvironment.createStatementSet
, as well as Table.executeInsert
, Table.explain
and Table.execute
and the newly introduces classes TableResult
, ResultKind
, StatementSet
and ExplainDetail
.
STATEMENT
is a reserved keyword now. Use backticks to escape tables, fields and other references.
DataStreamScanProvider
and DataStreamSinkProvider
for table connectors received an additional method that might break implementations that used lambdas before. We recommend static classes as a replacement and future robustness.
It is recommended to update statement sets to the new SQL syntax:
EXECUTE STATEMENT SET BEGIN ... END; EXPLAIN STATEMENT SET BEGIN ... END;
This changes the result of a decimal SUM()
with retraction and AVG()
. Part of the behavior is restored back to be the same with 1.13 so that the behavior as a whole could be consistent with Hive / Spark.
The DecodingFormat
interface was used for both projectable and non-projectable formats which led to inconsistent implementations. The FileSystemTableSource
has been updated to distinguish between those two interfaces now. Users that implement custom formats for FileSystemTableSource
might need to verify the implementation and make sure to implement ProjectableDecodingFormat
if necessary.
This might have an impact on existing table source implementations as push down filters might not contain partition predicates anymore. However, the connector implementation for table sources that implement both partition and filter push down became easier with this change.
SUM()
causes a precision errorThis changes the result of a decimal SUM()
between 1.14.0 and 1.14.1. It restores the behavior of 1.13 to be consistent with Hive/Spark.
The string representation of BOOLEAN
columns from DDL results (true/false -> TRUE/FALSE
), and row columns in DQL results (+I[...] -> (...)
) has changed for printing.
The defaults for casting incomplete strings like "12"
to TIME have changed from 12:01:01
to 12:00:00
.
STRING
to TIMESTAMP(_LTZ)
casting now considers fractional seconds. Previously fractional seconds of any precision were ignored.
This adds an additional operator to the topology if the new sink interfaces are used (e.g. for Kafka). It could cause issues in 1.14.1 when restoring from a 1.14 savepoint. A workaround is to cast the time attribute to a regular timestamp in the SQL statement closely before the sink.
STRING
instead of VARCHAR(2000)
Functions that returned VARCHAR(2000)
in 1.14, return VARCHAR
with maximum length now. In particular this includes:
SON_VALUE
CHR
REVERSE
SPLIT_INDEX
REGEXP_EXTRACT
PARSE_URL
FROM_UNIXTIME
DECODE
DATE_FORMAT
CONVERT_TZ
This issue added IS JSON for Table API. Notes that IS JSON
does not return NULL
anymore but always FALSE
(even if the argument is NULL
).
Disabled UPSERT INTO
statement. UPSERT INTO
syntax was exposed by mistake in previous releases without detailed discussed. From this release every UPSERT INTO
is going to throw an exception. Users of UPSERT INTO
should use the documented INSERT INTO
statement instead.
Casting to BOOLEAN
is not allowed from decimal numeric types anymore.
This issue aims to fix various primary key issues that effectively made it impossible to use this feature. The change might affect savepoint backwards compatibility for those incorrect pipelines. Also the resulting changelog stream might be different after these changes. Pipelines that were correct before should be restorable from a savepoint.
StreamTableEnvironment.fromChangelogStream
might produce a different stream because primary keys were not properly considered before.
The results of Table#print
have changed to be closer to actual SQL data types. E.g. decimal is printing correctly with leading/trailing zeros.
Support for the MapR FileSystem has been dropped.
The flink-connector-testing
module has been removed and users should use flink-connector-test-utils
module instead.
Now the formats implementing BulkWriterFormatFactory
don‘t need to implement partition keys reading anymore, as it’s managed internally by FileSystemTableSource
.
ElasticsearchXSinkBuilder
supersedes ElasticsearchSink.Builder
and provides at-least-once writing with the new unified sink interface supporting both batch and streaming mode of DataStream API.
For Elasticsearch 7 users that use the old ElasticsearchSink interface (org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
) and depend on their own elasticsearch-rest-high-level-client version, updating the client dependency to a version >= 7.14.0 is required due to internal changes.
The old JDBC connector (indicated by connector.type=jdbc
in DDL) has been removed. If not done already, users need to upgrade to the newer stack (indicated by connector=jdbc
in DDL).
New metrics numRecordsSend
and numRecordsSendErrors
have been introduced for users to monitor the number of records sent to the external system. The numRecordsOut
should be used to monitor the number of records transferred between sink tasks.
Connector developers should pay attention to the usage of these metrics numRecordsOut, numRecordsSend and numRecordsSendErrors while building sink connectors. Please refer to the new Kafka Sink for details. Additionally, since numRecordsOut now only counts the records sent between sink tasks and numRecordsOutErrors was designed for counting the records sent to the external system, we deprecated numRecordsOutErrors and recommend using numRecordsSendErrors instead.
Adds retry logic to the cleanup steps of a finished job. This feature changes the way Flink jobs are cleaned up. Instead of trying once to clean up the job, this step will be repeated until it succeeds. Users are meant to fix the issue that prevents Flink from finalizing the job cleanup. The retry functionality can be configured and disabled. More details can be found in the documentation.
TaskManager
and JobManager
TaskManagers
now explicitly send a signal to the JobManager
when shutting down. This reduces the down-scaling delay in reactive mode (which was previously bound to the heartbeat timeout).
TaskManagerJobMetricGroup
with the last slot rather than taskJob metrics on the TaskManager are now removed when the last slot is released, rather than the last task. This means they may be reported for a longer time than before and when no tasks are running on the TaskManager.
Fixes issue where the failover is not listed in the exception history but as a root cause. That could have happened if the failure occurred during JobMaster
initialization.
A new multiple component leader election service was implemented that only runs a single leader election per Flink process. If this should cause any problems, then you can set high-availability.use-old-ha-services: true
in the flink-conf.yaml
to use the old high availability services.
Attempting to cancel a FINISHED/FAILED
job now returns 409 Conflict instead of 404 Not Found.
All JobManagers
can now be queried for the status of a savepoint operation, irrespective of which JobManager
received the initial request.
The issue of re-submitting a job in Application Mode when the job finished but failed during cleanup is fixed through the introduction of the new component JobResultStore which enables Flink to persist the cleanup state of a job to the file system. (see FLINK-25431)
Since 1.15, sort-shuffle has become the default blocking shuffle implementation and shuffle data compression is enabled by default. These changes influence batch jobs only, for more information, please refer to the official document.
When restoring from a savepoint or retained externalized checkpoint you can choose the mode in which you want to perform the operation. You can choose from CLAIM
, NO_CLAIM
, LEGACY
(the old behavior).
In CLAIM
mode Flink takes ownership of the snapshot and will potentially try to remove the snapshot at a certain point in time. On the other hand the NO_CLAIM
mode will make sure Flink does not depend on the existence of any files belonging to the initial snapshot.
For a more thorough description see the documentation.
When taking a savepoint you can specify the binary format. You can choose from native (specific to a particular state backend) or canonical (unified across all state backends).
Shared state tracking changed to use checkpoint ID instead of reference counts. Shared state is not cleaned up on abortion anymore (but rather on subsumption or job termination).
This might result in delays in discarding the state of aborted checkpoints.
Introduce metrics of persistent bytes within each checkpoint (via REST API and UI), which could help users to know how much data size had been persisted during the incremental or change-log based checkpoint.
In 1.15 we enabled the support of checkpoints after part of tasks finished by default, and made tasks waiting for the final checkpoint before exit to ensure all data got committed.
However, it‘s worth noting that this change forces tasks to wait for one more checkpoint before exiting. In other words, this change will block the tasks until the next checkpoint get triggered and completed. If the checkpoint interval is long, the tasks’ execution time would also be extended largely. In the worst case if the checkpoint interval is Long.MAX_VALUE
, the tasks would be in fact blocked forever.
More information about this feature and how to disable it could be found in the documentation.
The State Processor API has been migrated from Flinks legacy DataSet API to now run over DataStreams run under BATCH
execution.
The internal log of RocksDB would stay under flink's log directory by default.
Minimal supported Hadoop client version is now 2.8.5 (version of the Flink runtime dependency). The client can still talk to older server versions as the binary protocol should be backward compatible.
Elasticsearch libraries used by the connector are bumped to 7.15.2 and 6.8.20 respectively.
For Elasticsearch 7 users that use the old ElasticsearchSink interface (org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink
) and depend on their own elasticsearch-rest-high-level-client
version, will need to update the client dependency to a version >= 7.14.0 due to internal changes.
Support for using Zookeeper 3.4 for HA has been dropped. Users relying on Zookeeper need to upgrade to 3.5/3.6. By default Flink now uses a Zookeeper 3.5 client.
Kafka connector uses Kafka client 2.8.1 by default now.