blob: 6fb022d7595581e170bf9c6db27de44e236d8497 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2023/05/09/howto-migrate-a-real-life-batch-pipeline-from-the-dataset-api-to-the-datastream-api/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Introduction # The Flink community has been deprecating the DataSet API since version 1.12 as part of the work on FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) . This blog article illustrates the migration of a real-life batch DataSet pipeline to a batch DataStream pipeline. All the code presented in this article is available in the tpcds-benchmark-flink repo. The use case shown here is extracted from a broader work comparing Flink performances of different APIs by implementing TPCDS queries using these APIs.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API" />
<meta property="og:description" content="Introduction # The Flink community has been deprecating the DataSet API since version 1.12 as part of the work on FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) . This blog article illustrates the migration of a real-life batch DataSet pipeline to a batch DataStream pipeline. All the code presented in this article is available in the tpcds-benchmark-flink repo. The use case shown here is extracted from a broader work comparing Flink performances of different APIs by implementing TPCDS queries using these APIs." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2023/05/09/howto-migrate-a-real-life-batch-pipeline-from-the-dataset-api-to-the-datastream-api/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2023-05-09T08:00:00+00:00" />
<meta property="article:modified_time" content="2023-05-09T08:00:00+00:00" />
<title>Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2023/05/09/howto-migrate-a-real-life-batch-pipeline-from-the-dataset-api-to-the-datastream-api/">Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API</a>
</h1>
May 9, 2023 -
Etienne Chauchot
<a href="https://twitter.com/echauchot">(@echauchot)</a>
<p><h2 id="introduction">
Introduction
<a class="anchor" href="#introduction">#</a>
</h2>
<p>The Flink community has been deprecating the DataSet API since version 1.12 as part of the work on
<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741">FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API)</a>
.
This blog article illustrates the migration of a real-life batch DataSet pipeline to a batch
DataStream pipeline.
All the code presented in this article is available in
the <a href="https://github.com/echauchot/tpcds-benchmark-flink">tpcds-benchmark-flink repo</a>.
The use case shown here is extracted from a broader work comparing Flink performances of different
APIs
by implementing <a href="https://www.tpc.org/tpcds/">TPCDS</a> queries using these APIs.</p>
<h2 id="what-is-tpcds">
What is TPCDS?
<a class="anchor" href="#what-is-tpcds">#</a>
</h2>
<p>TPC-DS is a decision support benchmark that models several generally applicable aspects of a
decision support system. The purpose of TPCDS benchmarks is to provide relevant, objective
performance data of Big Data engines to industry users.</p>
<h2 id="chosen-tpcds-query">
Chosen TPCDS query
<a class="anchor" href="#chosen-tpcds-query">#</a>
</h2>
<p>The chosen query for this article is <strong>Query3</strong> because it contains all the more common analytics
operators (filter, join, aggregation, group by, order by, limit). It represents an analytic query on
store sales. Its SQL code is presented here:</p>
<p><code>SELECT dt.d_year, item.i_brand_id brand_id, item.i_brand brand,SUM(ss_ext_sales_price) sum_agg FROM date_dim dt, store_sales, item WHERE dt.d_date_sk = store_sales.ss_sold_date_sk AND store_sales.ss_item_sk = item.i_item_sk AND item.i_manufact_id = 128 AND dt.d_moy=11 GROUP BY dt.d_year, item.i_brand, item.i_brand_id ORDER BY dt.d_year, sum_agg desc, brand_id LIMIT 100</code></p>
<h2 id="the-initial-dataset-pipeline">
The initial DataSet pipeline
<a class="anchor" href="#the-initial-dataset-pipeline">#</a>
</h2>
<p>The pipeline we are migrating
is <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDataset.java">this</a>
batch pipeline that implements the above query using the DataSet API
and <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/types/Row.html">Row</a>
as dataset element type.</p>
<h2 id="migrating-the-dataset-pipeline-to-a-datastream-pipeline">
Migrating the DataSet pipeline to a DataStream pipeline
<a class="anchor" href="#migrating-the-dataset-pipeline-to-a-datastream-pipeline">#</a>
</h2>
<p>Instead of going through all of the code which is
available <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java">here</a>
we will rather focus on some key areas of the migration. The code is based on the latest release
of Flink at the time this article was written: version 1.16.0.</p>
<p>DataStream is a unified API that allows to run pipelines in both batch and streaming modes. To
execute a DataStream pipeline in batch mode, it is not enough to set the execution mode in the Flink
execution environment, it is also needed to migrate some operations. Indeed, the DataStream API
semantics are the ones of a streaming pipeline. The arriving data is thus considered infinite. So,
compared to the DataSet API which operates on finite data, there are adaptations to be made on some
operations.</p>
<h3 id="setting-the-execution-environmenthttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval90-l96">
<a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L90-L96">Setting the execution environment</a>
<a class="anchor" href="#setting-the-execution-environmenthttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval90-l96">#</a>
</h3>
<p>We start by moving
from <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/ExecutionEnvironment.html">ExecutionEnvironment</a>
to <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html">StreamExecutionEnvironment</a>
. Then, as the source in this pipeline is bounded, we can use either the default
streaming <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode//">execution mode</a>
or the batch mode. In batch mode the tasks of the job can be separated into stages that can be
executed one after another. In streaming mode all tasks need to be running all the time and records
are sent to downstream tasks as soon as they are available.</p>
<p>Here we keep the default streaming mode that gives good performance on this pipeline and that would
allow to run the same pipeline with no change on an unbounded source.</p>
<h3 id="using-the-streaming-sources-and-datasets">
Using the streaming sources and datasets
<a class="anchor" href="#using-the-streaming-sources-and-datasets">#</a>
</h3>
<p><strong>Sources</strong>: <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/operators/DataSource.html">DataSource<T></a>
becomes <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/datastream/DataStreamSource.html">DataStreamSource<T></a>
after the call to <em>env.createInput()</em>.</p>
<p><strong>Datasets</strong>: <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/DataSet.html">DataSet<T></a>
are
now <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/datastream/DataStream.html">DataStream<T></a>
and subclasses.</p>
<h3 id="migrating-the-join-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval129-l135">
<a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L129-L135">Migrating the join operation</a>
<a class="anchor" href="#migrating-the-join-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval129-l135">#</a>
</h3>
<p>The DataStream join operator does not yet support aggregations in batch mode (
see <a href="https://issues.apache.org/jira/browse/FLINK-22587">FLINK-22587</a> for details). Basically, the
problem is with the trigger of the
default <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.html">GlobalWindow</a>
which never fires so the records are never output. We will workaround this problem by applying a
custom <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L246-L280">EndOfStream</a>
window. It is a window assigner that assigns all the records to a
single <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.html">TimeWindow</a>
. So, like for the GlobalWindow, all the records are assigned to the same window except that this
window&rsquo;s trigger is based on the end of the window (which is set to <em>Long.MAX_VALUE</em>). As we are on
a bounded source, at some point the watermark will advance to +INFINITY (Long.MAX_VALUE) and will
thus cross the end of the time window and consequently fire the trigger and output the records.</p>
<p>Now that we have a working triggering, we need to call a standard join with the <em>Row::join</em>
function.</p>
<h3 id="migrating-the-group-by-and-reduce-sum-operationshttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval145-l169">
<a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L145-L169">Migrating the group by and reduce (sum) operations</a>
<a class="anchor" href="#migrating-the-group-by-and-reduce-sum-operationshttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval145-l169">#</a>
</h3>
<p>DataStream API has no
more <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/DataSet.html#groupBy-org.apache.flink.api.java.functions.KeySelector-">groupBy()</a>
method, we now use
the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#keyBy-org.apache.flink.api.java.functions.KeySelector-">keyBy()</a>
method. An aggregation downstream will be applied on elements with the same key exactly as
a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/functions/GroupReduceFunction.html">GroupReduceFunction</a>
would have done on a DataSet except it will not need to materialize the collection of data. Indeed, the following
operator is a reducer: the summing operation downstream is still done through
a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/functions/ReduceFunction.html">ReduceFunction</a>
but this time the operator reduces the elements incrementally instead of receiving the rows as a
Collection. To make the sum we store in the reduced row the partially aggregated sum. Due to incremental reduce,
we also need to distinguish if we received an already reduced row (in that case, we read the
partially aggregated sum) or a fresh row (in that case we just read the corresponding price field).
Also, please note that, as in the join case, we need to specify windowing for the aggregation.</p>
<h3 id="migrating-the-order-by-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval171-l211">
<a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L171-L211">Migrating the order by operation</a>
<a class="anchor" href="#migrating-the-order-by-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval171-l211">#</a>
</h3>
<p>The sort of the datastream is done by applying
a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html">KeyedProcessFunction</a>
.</p>
<p>But, as said above, the DataStream semantics are the ones of a streaming pipeline. The arriving data
is thus considered infinite. As such we need to &ldquo;divide&rdquo; the data to have output times. For that we
need to set a timer to output the resulting data. We <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L188">set a timer to fire at the end of the EndOfStream window</a>
meaning that the timer will fire at the end of the batch.</p>
<p>To sort the data, we store the incoming rows inside
a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/state/ListState.html">ListState</a>
and sort them at output time, when the timer fires in
the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html#onTimer-long-org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext-org.apache.flink.util.Collector-">onTimer()</a>
callback.</p>
<p>Another thing: to be able to use Flink state, we need to key the datastream beforehand, even if
there
is no group by key because Flink state is designed per-key. Thus, we key by a fake static key so
that there is a single state.</p>
<h3 id="migrating-the-limit-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval213-l223">
<a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L213-L223">Migrating the limit operation</a>
<a class="anchor" href="#migrating-the-limit-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval213-l223">#</a>
</h3>
<p>As all the elements of the DataStream were keyed by the same &ldquo;0&rdquo; key, they are kept in the same &quot;
group&quot;. So we can implement the SQL LIMIT with
a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/functions/ProcessFunction.html">ProcessFunction</a>
with a counter that will output only the first 100 elements.</p>
<h3 id="migrating-the-sink-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval225-l236">
<a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L225-L236">Migrating the sink operation</a>
<a class="anchor" href="#migrating-the-sink-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval225-l236">#</a>
</h3>
<p>As with sources, there were big changes in sinks with recent versions of Flink. We now use
the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/connector/sink2/Sink.html">Sink interface</a>
that requires
an <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/serialization/Encoder.html">Encoder</a>
. But the resulting code is very similar to the one using the DataSet API. It&rsquo;s only
that <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/serialization/Encoder.html#encode-IN-java.io.OutputStream-">Encoder#encode()</a>
method writes bytes
when <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/java/io/TextOutputFormat.TextFormatter.html#format-IN-">TextOutputFormat.TextFormatter#format()</a>
wrote Strings.</p>
<h2 id="conclusion">
Conclusion
<a class="anchor" href="#conclusion">#</a>
</h2>
<p>As you saw for the migration of the join operation, the new unified DataStream API has some
limitations left in batch mode. In addition, the order by and limit resulting code is quite manual
and requires the help of the Flink state API for the migration. For all these reasons, the Flink
community recommends to use Flink SQL for batch pipelines. It results in much simpler code, good
performance and out-of-the-box analytics capabilities. You could find the equivalent Query3 code
that uses
the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/overview/">Flink SQL/Table API</a>
in
the <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkSQLCSV.java">Query3ViaFlinkSQLCSV class</a>
.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2023-05-09-howto-migrate-to-datastream.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li><a href="#introduction">Introduction</a></li>
<li><a href="#what-is-tpcds">What is TPCDS?</a></li>
<li><a href="#chosen-tpcds-query">Chosen TPCDS query</a></li>
<li><a href="#the-initial-dataset-pipeline">The initial DataSet pipeline</a></li>
<li><a href="#migrating-the-dataset-pipeline-to-a-datastream-pipeline">Migrating the DataSet pipeline to a DataStream pipeline</a>
<ul>
<li><a href="#setting-the-execution-environmenthttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval90-l96"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L90-L96">Setting the execution environment</a></a></li>
<li><a href="#using-the-streaming-sources-and-datasets">Using the streaming sources and datasets</a></li>
<li><a href="#migrating-the-join-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval129-l135"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L129-L135">Migrating the join operation</a></a></li>
<li><a href="#migrating-the-group-by-and-reduce-sum-operationshttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval145-l169"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L145-L169">Migrating the group by and reduce (sum) operations</a></a></li>
<li><a href="#migrating-the-order-by-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval171-l211"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L171-L211">Migrating the order by operation</a></a></li>
<li><a href="#migrating-the-limit-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval213-l223"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L213-L223">Migrating the limit operation</a></a></li>
<li><a href="#migrating-the-sink-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval225-l236"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L225-L236">Migrating the sink operation</a></a></li>
</ul>
</li>
<li><a href="#conclusion">Conclusion</a></li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>