blob: a3189931425597cd8c711e9978472c6124f29eb9 [file] [log] [blame]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE html>
<html lang="en">
<head>
<!-- Required meta tags -->
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no">
<script type="text/javascript">
if(window.location.protocol != 'https:') {
location.href = location.href.replace("http://", "https://");
}
</script>
<title>Apache Wayang - Documentation</title>
<link rel="icon" href="https://wayang.apache.org/assets/img/logo/favicon-pluma.ico">
<!-- Bootstrap CSS -->
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/css/bootstrap.min.css" integrity="sha384-B0vP5xmATw1+K9KRQjQERJvTumQW0nPEzvF6L/Z6nronJ3oUOFUFpCjEUQouq2+l" crossorigin="anonymous">
<link rel="stylesheet" href="https://wayang.apache.org/assets/css/color.css">
<link rel="stylesheet" href="https://pro.fontawesome.com/releases/v5.10.0/css/all.css" integrity="sha384-AYmEC3Yw5cVb3ZcuHtOA93w35dYTsvhLPVnYs9eStHfGJvOvKxVfELGroGkvsg+p" crossorigin="anonymous"/>
<link rel="stylesheet" href="https://wayang.apache.org/assets/css/monokai.css">
<link rel="stylesheet" href="https://wayang.apache.org/assets/css/home.css">
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-light bg-light sticky-top shadow-lg">
<div class="container d-flex justify-content-between w-100">
<div class="mr-auto p-2 w-100">
<div class="d-flex">
<a class="navbar-brand mr-auto" href="/">
<img style="max-height: 75px" src="https://wayang.apache.org/assets/img/logo/logo_400x160.png"/>
</a>
<button class="navbar-toggler ml-auto align-self-center" type="button" data-toggle="collapse" data-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<span class="navbar-toggler-icon"></span>
</button>
</div>
</div>
<div>
<div class="p-2 collapse navbar-collapse" id="navbarSupportedContent">
<div class="navbar-nav">
<li class="nav-item ">
<a class="nav-link" href="https://wayang.apache.org/">
Home
</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="https://wayang.apache.org/about">
About
</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="https://wayang.apache.org/community">
Community
</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="https://wayang.apache.org/documentation">
Documentation
</a>
</li>
<li class="nav-item ">
<a class="nav-link" href="https://wayang.apache.org/publications-home">
Publications
</a>
</li>
<li class="nav-item dropdown ">
<a class="nav-link dropdown-toggle" data-toggle="dropdown" href="#" role="button" aria-haspopup="true" aria-expanded="false">
Apache
</a>
<div class="dropdown-menu">
<a class="dropdown-item" href="http://www.apache.org/foundation/how-it-works.html">
Apache Software Foundation
</a>
<a class="dropdown-item" href="http://www.apache.org/licenses/">
Apache License
</a>
<a class="dropdown-item" href="http://www.apache.org/foundation/sponsorship.html">
Sponsorship
</a>
<a class="dropdown-item" href="http://www.apache.org/foundation/thanks.html">
Thanks
</a>
</div>
</li>
</div>
</div>
</div>
</div>
</div>
</nav>
<div class="container-fluid p-0">
<div class="title-post mb-3 mt-n5 d-flex align-items-center shadow" >
<div class="col pt-4" style="text-align: center">
<h1 class="mb-n2 mt-1" style="color: white; font-size: 4em">Documentation</h1>
</div>
</div>
<div class="container">
<p>In contrast to classical data processing systems that provide one dedicated execution engine, Apache Wayang rather is a <em>meta processing framework</em>: You can specify your data processing app via one of Apache Wayang’s API and then Apache Wayang will pick an optimal configuration of classical processing frameworks, such as Java Streams or Apache Spark, to run your app on. Finally, Apache Wayang will also perform the execution, thereby hiding the different specific platform APIs and coordinate inter-platform communication.</p>
<p>This approach aims at freeing data engineers and software developers from the burden of knowing the zoo of different data processing systems, their APIs, strengths and weakness; the intricacies of coordinating and integrating different processing platforms; and the inflexibility when tying to a fix set of processing platforms. As of now, Apache Wayang has built in support for the following processing platforms:</p>
<ul>
<li>Java 8 Streams</li>
<li><a href="https://spark.apache.org/">Apache Spark</a></li>
<li><a href="https://github.com/GraphChi/graphchi-java">GraphChi</a></li>
<li><a href="http://www.postgresql.org">Postgres</a></li>
<li><a href="https://www.sqlite.org/">SQLite</a></li>
</ul>
<h2 id="how-to-use-apache-wayang">How to use Apache Wayang</h2>
<h3 id="requirements">Requirements:</h3>
<p>Apache Wayang is built with Java 8 and Scala 2.11. However, to execute Apache Wayang it is sufficient to have Java 8 installed. If you want to build Apache Wayang yourself, you will also need to have <a href="http://maven.apache.org">Apache Maven</a> installed. Please also consider that processing platforms employed by Apache Wayang might have further requirements.</p>
<h3 id="get-apache-wayang">Get Apache Wayang:</h3>
<p>Apache Wayang is available via Maven Central. To use it with Maven, for instance, include the following into you POM file:</p>
<div class="language-xml highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.wayang<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>wayang-***<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.3.0<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
</code></pre></div></div>
<p>Note the <code class="language-plaintext highlighter-rouge">***</code>: Apache Wayang ships with multiple modules that can be included in your app, depending on how you want to use it:</p>
<ul>
<li><code class="language-plaintext highlighter-rouge">wayang-core</code>: provides core data structures and the optimizer (required)</li>
<li><code class="language-plaintext highlighter-rouge">wayang-basic</code>: provides common operators and data types for your apps (recommended)</li>
<li><code class="language-plaintext highlighter-rouge">wayang-api</code>: provides an easy-to-use Scala and Java API to assemble wayang plans (recommended)</li>
<li><code class="language-plaintext highlighter-rouge">wayang-java</code>, <code class="language-plaintext highlighter-rouge">wayang-spark</code>, <code class="language-plaintext highlighter-rouge">wayang-graphchi</code>, <code class="language-plaintext highlighter-rouge">wayang-sqlite3</code>, <code class="language-plaintext highlighter-rouge">wayang-postgres</code>: adapters for the various supported processing platforms</li>
<li><code class="language-plaintext highlighter-rouge">wayang-profiler</code>: provides functionality to learn operator and UDF cost functions from historical execution data</li>
</ul>
<p>For the sake of version flexibility, you still have to include your Hadoop (<code class="language-plaintext highlighter-rouge">hadoop-hdfs</code> and <code class="language-plaintext highlighter-rouge">hadoop-common</code>) and Spark (<code class="language-plaintext highlighter-rouge">spark-core</code> and <code class="language-plaintext highlighter-rouge">spark-graphx</code>) version of choice.</p>
<p>In addition, you can obtain the most recent snapshot version of Apache Wayang via Apache’s snapshot repository. Just included</p>
<div class="language-xml highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nt">&lt;repositories&gt;</span>
<span class="nt">&lt;repository&gt;</span>
<span class="nt">&lt;id&gt;</span>apache-snapshots<span class="nt">&lt;/id&gt;</span>
<span class="nt">&lt;url&gt;</span>https://repository.apache.org/content/repositories/snapshots<span class="nt">&lt;/url&gt;</span>
<span class="nt">&lt;releases&gt;</span>
<span class="nt">&lt;enabled&gt;</span>false<span class="nt">&lt;/enabled&gt;</span>
<span class="nt">&lt;/releases&gt;</span>
<span class="nt">&lt;snapshots&gt;</span>
<span class="nt">&lt;enabled&gt;</span>true<span class="nt">&lt;/enabled&gt;</span>
<span class="nt">&lt;/snapshots&gt;</span>
<span class="nt">&lt;/repository&gt;</span>
<span class="nt">&lt;/repositories&gt;</span>
</code></pre></div></div>
<p>If you need to rebuild Apache Wayang, e.g., to use a different Scala version, you can simply do so via Maven:</p>
<ol>
<li>Adapt the version variables (e.g., <code class="language-plaintext highlighter-rouge">spark.version</code>) in the main <code class="language-plaintext highlighter-rouge">pom.xml</code> file.</li>
<li>Build Apache Wayang with the adapted versions.<code class="language-plaintext highlighter-rouge">$ mvn clean install</code>. <strong>Note</strong> the <code class="language-plaintext highlighter-rouge">standalone</code> profile to fix Hadoop and Spark versions, so that Apache Wayang apps do not explicitly need to declare the corresponding dependencies. Also, note the <code class="language-plaintext highlighter-rouge">distro</code> profile, which assembles a binary Apache Wayang distribution. To activate these profiles, you need to specify them when running maven, i.e.,<code class="language-plaintext highlighter-rouge">mvn clean install -P&lt;profile name&gt;</code></li>
</ol>
<h3 id="configure-apache-wayang">Configure Apache Wayang:</h3>
<p>In order for Apache Wayang to work properly, it is necessary to tell Apache Wayang about the capacities of your processing platforms and how to reach them. While there is a default configuration that allows to test Apache Wayang right away, we recommend to create a properties file to adapt the configuration where necessary. To have Apache Wayang use that configuration transparently, just run you app via</p>
<div class="language-shell highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="nv">$ </span>java <span class="nt">-Dwayang</span>.configuration<span class="o">=</span>url://to/my/wayang.properties ...
</code></pre></div></div>
<p>You can find the most relevant settings in the following:</p>
<h4 id="general-settings">General settings</h4>
<ul>
<li><code class="language-plaintext highlighter-rouge">wayang.core.log.enabled (= true)</code>: whether to log execution statistics to allow learning better cardinality and cost estimators for the optimizer</li>
<li><code class="language-plaintext highlighter-rouge">wayang.core.log.executions (= ~/.wayang/executions.json)</code> where to log execution times of operator groups</li>
<li><code class="language-plaintext highlighter-rouge">wayang.core.log.cardinalities (= ~/.wayang/cardinalities.json)</code> where to log cardinality measurements</li>
<li><code class="language-plaintext highlighter-rouge">wayang.core.optimizer.instrumentation (= org.apache.wayang.core.profiling.OutboundInstrumentationStrategy)</code>: where to measure cardinalities in Apache Wayang plans; other options are <code class="language-plaintext highlighter-rouge">org.apache.wayang.core.profiling.NoInstrumentationStrategy</code> and <code class="language-plaintext highlighter-rouge">org.apache.wayang.core.profiling.FullInstrumentationStrategy</code></li>
<li><code class="language-plaintext highlighter-rouge">wayang.core.optimizer.reoptimize (= false)</code>: whether to progressively optimize Apache Wayang plans</li>
<li><code class="language-plaintext highlighter-rouge">wayang.basic.tempdir (= file:///tmp)</code>: where to store temporary files, in particular for inter-platform communication</li>
</ul>
<h4 id="java-streams">Java Streams</h4>
<ul>
<li><code class="language-plaintext highlighter-rouge">wayang.java.cpu.mhz (= 2700)</code>: clock frequency of processor the JVM runs on in MHz</li>
<li><code class="language-plaintext highlighter-rouge">wayang.java.hdfs.ms-per-mb (= 2.7)</code>: average throughput from HDFS to JVM in ms/MB</li>
</ul>
<h4 id="apache-spark">Apache Spark</h4>
<ul>
<li><code class="language-plaintext highlighter-rouge">spark.master (= local)</code>: Spark master</li>
<li>various other Spark settings are supported, e.g., <code class="language-plaintext highlighter-rouge">spark.executor.memory</code>, <code class="language-plaintext highlighter-rouge">spark.serializer</code>, …</li>
<li><code class="language-plaintext highlighter-rouge">wayang.spark.cpu.mhz (= 2700)</code>: clock frequency of processor the Spark workers run on in MHz</li>
<li><code class="language-plaintext highlighter-rouge">wayang.spark.hdfs.ms-per-mb (= 2.7)</code>: average throughput from HDFS to the Spark workers in ms/MB</li>
<li><code class="language-plaintext highlighter-rouge">wayang.spark.network.ms-per-mb (= 8.6)</code>: average network throughput of the Spark workers in ms/MB</li>
<li><code class="language-plaintext highlighter-rouge">wayang.spark.init.ms (= 4500)</code>: time it takes Spark to initialize in ms</li>
</ul>
<h4 id="graphchi">GraphChi</h4>
<ul>
<li><code class="language-plaintext highlighter-rouge">wayang.graphchi.cpu.mhz (= 2700)</code>: clock frequency of processor GraphChi runs on in MHz</li>
<li><code class="language-plaintext highlighter-rouge">wayang.graphchi.cpu.cores (= 2)</code>: number of cores GraphChi runs on</li>
<li><code class="language-plaintext highlighter-rouge">wayang.graphchi.hdfs.ms-per-mb (= 2.7)</code>: average throughput from HDFS to GraphChi in ms/MB</li>
</ul>
<h4 id="sqlite">SQLite</h4>
<ul>
<li><code class="language-plaintext highlighter-rouge">wayang.sqlite3.jdbc.url</code>: JDBC URL to use SQLite</li>
<li><code class="language-plaintext highlighter-rouge">wayang.sqlite3.jdbc.user</code>: optional user name</li>
<li><code class="language-plaintext highlighter-rouge">wayang.sqlite3.jdbc.password</code>: optional password</li>
<li><code class="language-plaintext highlighter-rouge">wayang.sqlite3.cpu.mhz (= 2700)</code>: clock frequency of processor SQLite runs on in MHz</li>
<li><code class="language-plaintext highlighter-rouge">wayang.sqlite3.cpu.cores (= 2)</code>: number of cores SQLite runs on</li>
</ul>
<h4 id="postgresql">PostgreSQL</h4>
<ul>
<li><code class="language-plaintext highlighter-rouge">wayang.postgres.jdbc.url</code>: JDBC URL to use PostgreSQL</li>
<li><code class="language-plaintext highlighter-rouge">wayang.postgres.jdbc.user</code>: optional user name</li>
<li><code class="language-plaintext highlighter-rouge">wayang.postgres.jdbc.password</code>: optional password</li>
<li><code class="language-plaintext highlighter-rouge">wayang.postgres.cpu.mhz (= 2700)</code>: clock frequency of processor PostgreSQL runs on in MHz</li>
<li><code class="language-plaintext highlighter-rouge">wayang.postgres.cpu.cores (= 2)</code>: number of cores PostgreSQL runs on</li>
</ul>
<h3 id="code-with-apache-wayang">Code with Apache Wayang:</h3>
<p>The recommended way to specify your apps with Apache Wayang is via its Scala or Java API from the <code class="language-plaintext highlighter-rouge">wayang-api</code> module. You can find examples below.</p>
<h3 id="learn-cost-functions">Learn cost functions:</h3>
<p>Apache Wayang provides a utility to learn cost functions from historical execution data. Specifically, Apache Wayang can learn configurations for load profile estimators (that estimate CPU load, disk load etc.) for both operators and UDFs, as long as the configuration provides a template for those estimators. As an example, the <code class="language-plaintext highlighter-rouge">JavaMapOperator</code> draws its load profile estimator configuration via the configuration key <code class="language-plaintext highlighter-rouge">wayang.java.map.load</code>. Now, it is possible to specify a load profile estimator template in the configuration under the key <code class="language-plaintext highlighter-rouge">&amp;lt;original key&amp;gt;.template</code>, e.g.:</p>
<div class="language-json highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="err">wayang.java.map.load.template</span><span class="w"> </span><span class="err">=</span><span class="w"> </span><span class="p">{</span><span class="err">\</span><span class="w">
</span><span class="nl">"in"</span><span class="p">:</span><span class="mi">1</span><span class="p">,</span><span class="w"> </span><span class="nl">"out"</span><span class="p">:</span><span class="mi">1</span><span class="p">,</span><span class="err">\</span><span class="w">
</span><span class="nl">"cpu"</span><span class="p">:</span><span class="s2">"?*in0"</span><span class="err">\</span><span class="w">
</span><span class="p">}</span><span class="w">
</span></code></pre></div></div>
<p>This template specifies a load profile estimator that expects (at least) one input cardinality and one output cardinality. Further, it models a CPU load that is proportional to the input cardinality. However, more complex functions are possible. In particular, you can use</p>
<ul>
<li>the variables <code class="language-plaintext highlighter-rouge">in0</code>, <code class="language-plaintext highlighter-rouge">in1</code>, … and <code class="language-plaintext highlighter-rouge">out0</code>, <code class="language-plaintext highlighter-rouge">out1</code>, … to incorporate the input and output cardinalities, respectively;</li>
<li>operator properties, such as <code class="language-plaintext highlighter-rouge">numIterations</code> for the <code class="language-plaintext highlighter-rouge">PageRankOperator</code> implementations;</li>
<li>the operators <code class="language-plaintext highlighter-rouge">+</code>, <code class="language-plaintext highlighter-rouge">-</code>, <code class="language-plaintext highlighter-rouge">*</code>, <code class="language-plaintext highlighter-rouge">/</code>, <code class="language-plaintext highlighter-rouge">%</code>, <code class="language-plaintext highlighter-rouge">^</code>, and parantheses;</li>
<li>the functions <code class="language-plaintext highlighter-rouge">min(x0, x1, ...))</code>, <code class="language-plaintext highlighter-rouge">max(x0, x1, ...)</code>, <code class="language-plaintext highlighter-rouge">abs(x)</code>, <code class="language-plaintext highlighter-rouge">log(x, base)</code>, <code class="language-plaintext highlighter-rouge">ln(x)</code>, <code class="language-plaintext highlighter-rouge">ld(x)</code>;</li>
<li>and the constants <code class="language-plaintext highlighter-rouge">e</code> and <code class="language-plaintext highlighter-rouge">pi</code>.</li>
</ul>
<p>While Apache Wayang specifies templates for all execution operators, you will need to specify that your UDFs are modelled by some configuration-based cost function (see the k-means example below) and create the according initial specification and template yourself. Once, you gathered execution data, you can run</p>
<div class="language-shell highlighter-rouge"><div class="highlight"><pre class="highlight"><code>java ... org.apache.wayang.profiler.ga.GeneticOptimizerApp <span class="o">[</span>configuration URL <span class="o">[</span>execution log]]
</code></pre></div></div>
<p>This app will try to find appropriate values for the question marks (<code class="language-plaintext highlighter-rouge">?</code>) in the load profile estimator templates to fit the gathered execution data and ready-made configuration entries for the load profile estimators. You can then copy them into your configuration.&lt;/p&gt;</p>
<h2 id="examples">Examples</h2>
<p>For some executable examples, have a look at <a href="https://www.github.com/sekruse/Apache Wayang-examples">this repository</a>.</p>
<h3 id="wordcount">WordCount</h3>
<p>The “Hello World!”; of data processing systems is the wordcount.</p>
<h4 id="java-api">Java API</h4>
<div class="language-java highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="kn">import</span> <span class="nn">org.apache.wayang.api.JavaPlanBuilder</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.wayang.basic.data.Tuple2</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.wayang.core.api.Configuration</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.wayang.core.api.WayangContext</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.wayang.core.optimizer.cardinality.DefaultCardinalityEstimator</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.wayang.java.Java</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.wayang.spark.Spark</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.Collection</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">java.util.Arrays</span><span class="o">;</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">WordcountJava</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="nc">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">){</span>
<span class="c1">// Settings</span>
<span class="nc">String</span> <span class="n">inputUrl</span> <span class="o">=</span> <span class="o">&amp;</span><span class="n">quot</span><span class="o">;</span><span class="nl">file:</span><span class="o">/</span><span class="n">tmp</span><span class="o">.</span><span class="na">txt</span><span class="o">&amp;</span><span class="n">quot</span><span class="o">;;</span>
<span class="c1">// Get a plan builder.</span>
<span class="nc">WayangContext</span> <span class="n">wayangContext</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">WayangContext</span><span class="o">(</span><span class="k">new</span> <span class="nc">Configuration</span><span class="o">())</span>
<span class="o">.</span><span class="na">withPlugin</span><span class="o">(</span><span class="nc">Java</span><span class="o">.</span><span class="na">basicPlugin</span><span class="o">())</span>
<span class="o">.</span><span class="na">withPlugin</span><span class="o">(</span><span class="nc">Spark</span><span class="o">.</span><span class="na">basicPlugin</span><span class="o">());</span>
<span class="nc">JavaPlanBuilder</span> <span class="n">planBuilder</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">JavaPlanBuilder</span><span class="o">(</span><span class="n">wayangContext</span><span class="o">)</span>
<span class="o">.</span><span class="na">withJobName</span><span class="o">(</span><span class="nc">String</span><span class="o">.</span><span class="na">format</span><span class="o">(&amp;</span><span class="n">quot</span><span class="o">;</span><span class="nc">WordCount</span> <span class="o">(%</span><span class="n">s</span><span class="o">)&amp;</span><span class="n">quot</span><span class="o">;,</span> <span class="n">inputUrl</span><span class="o">))</span>
<span class="o">.</span><span class="na">withUdfJarOf</span><span class="o">(</span><span class="nc">WordcountJava</span><span class="o">.</span><span class="na">class</span><span class="o">);</span>
<span class="c1">// Start building the Apache WayangPlan.</span>
<span class="nc">Collection</span><span class="o">&amp;</span><span class="n">lt</span><span class="o">;</span><span class="nc">Tuple2</span><span class="o">&amp;</span><span class="n">lt</span><span class="o">;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&amp;</span><span class="n">gt</span><span class="o">;&amp;</span><span class="n">gt</span><span class="o">;</span> <span class="n">wordcounts</span> <span class="o">=</span> <span class="n">planBuilder</span>
<span class="c1">// Read the text file.</span>
<span class="o">.</span><span class="na">readTextFile</span><span class="o">(</span><span class="n">inputUrl</span><span class="o">).</span><span class="na">withName</span><span class="o">(&amp;</span><span class="n">quot</span><span class="o">;</span><span class="nc">Load</span> <span class="n">file</span><span class="o">&amp;</span><span class="n">quot</span><span class="o">;)</span>
<span class="c1">// Split each line by non-word characters.</span>
<span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="n">line</span> <span class="o">-&amp;</span><span class="n">gt</span><span class="o">;</span> <span class="nc">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">line</span><span class="o">.</span><span class="na">split</span><span class="o">(&amp;</span><span class="n">quot</span><span class="o">;</span><span class="err">\\</span><span class="no">W</span><span class="o">+&amp;</span><span class="n">quot</span><span class="o">;)))</span>
<span class="o">.</span><span class="na">withSelectivity</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="mi">100</span><span class="o">,</span> <span class="mf">0.9</span><span class="o">)</span>
<span class="o">.</span><span class="na">withName</span><span class="o">(&amp;</span><span class="n">quot</span><span class="o">;</span><span class="nc">Split</span> <span class="n">words</span><span class="o">&amp;</span><span class="n">quot</span><span class="o">;)</span>
<span class="c1">// Filter empty tokens.</span>
<span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">token</span> <span class="o">-&amp;</span><span class="n">gt</span><span class="o">;</span> <span class="o">!</span><span class="n">token</span><span class="o">.</span><span class="na">isEmpty</span><span class="o">())</span>
<span class="o">.</span><span class="na">withSelectivity</span><span class="o">(</span><span class="mf">0.99</span><span class="o">,</span> <span class="mf">0.99</span><span class="o">,</span> <span class="mf">0.99</span><span class="o">)</span>
<span class="o">.</span><span class="na">withName</span><span class="o">(&amp;</span><span class="n">quot</span><span class="o">;</span><span class="nc">Filter</span> <span class="n">empty</span> <span class="n">words</span><span class="o">&amp;</span><span class="n">quot</span><span class="o">;)</span>
<span class="c1">// Attach counter to each word.</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">word</span> <span class="o">-&amp;</span><span class="n">gt</span><span class="o">;</span> <span class="k">new</span> <span class="nc">Tuple2</span><span class="o">&amp;</span><span class="n">lt</span><span class="o">;&amp;</span><span class="n">gt</span><span class="o">;(</span><span class="n">word</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">(),</span> <span class="mi">1</span><span class="o">)).</span><span class="na">withName</span><span class="o">(&amp;</span><span class="n">quot</span><span class="o">;</span><span class="nc">To</span> <span class="n">lower</span> <span class="k">case</span><span class="o">,</span> <span class="n">add</span> <span class="n">counter</span><span class="o">&amp;</span><span class="n">quot</span><span class="o">;)</span>
<span class="c1">// Sum up counters for every word.</span>
<span class="o">.</span><span class="na">reduceByKey</span><span class="o">(</span>
<span class="nl">Tuple2:</span><span class="o">:</span><span class="n">getField0</span><span class="o">,</span>
<span class="o">(</span><span class="n">t1</span><span class="o">,</span> <span class="n">t2</span><span class="o">)</span> <span class="o">-&amp;</span><span class="n">gt</span><span class="o">;</span> <span class="k">new</span> <span class="nc">Tuple2</span><span class="o">&amp;</span><span class="n">lt</span><span class="o">;&amp;</span><span class="n">gt</span><span class="o">;(</span><span class="n">t1</span><span class="o">.</span><span class="na">getField0</span><span class="o">(),</span> <span class="n">t1</span><span class="o">.</span><span class="na">getField1</span><span class="o">()</span> <span class="o">+</span> <span class="n">t2</span><span class="o">.</span><span class="na">getField1</span><span class="o">())</span>
<span class="o">)</span>
<span class="o">.</span><span class="na">withCardinalityEstimator</span><span class="o">(</span><span class="k">new</span> <span class="nc">DefaultCardinalityEstimator</span><span class="o">(</span><span class="mf">0.9</span><span class="o">,</span> <span class="mi">1</span><span class="o">,</span> <span class="kc">false</span><span class="o">,</span> <span class="n">in</span> <span class="o">-&amp;</span><span class="n">gt</span><span class="o">;</span> <span class="nc">Math</span><span class="o">.</span><span class="na">round</span><span class="o">(</span><span class="mf">0.01</span> <span class="o">&lt;/</span><span class="n">li</span><span class="o">&gt;</span>
<span class="o">&lt;</span><span class="n">li</span><span class="o">&gt;</span>
<span class="n">in</span><span class="o">[</span><span class="mi">0</span><span class="o">])))</span>
<span class="o">.</span><span class="na">withName</span><span class="o">(&amp;</span><span class="n">quot</span><span class="o">;</span><span class="nc">Add</span> <span class="n">counters</span><span class="o">&amp;</span><span class="n">quot</span><span class="o">;)</span>
<span class="c1">// Execute the plan and collect the results.</span>
<span class="o">.</span><span class="na">collect</span><span class="o">();</span>
<span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="n">wordcounts</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div></div>
<h4 id="scala-api">Scala API</h4>
<div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">import</span> <span class="nn">org.apache.wayang.api._</span>
<span class="k">import</span> <span class="nn">org.apache.wayang.core.api.</span><span class="o">{</span><span class="nc">Configuration</span><span class="o">,</span> <span class="nc">WayangContext</span><span class="o">}</span>
<span class="k">import</span> <span class="nn">org.apache.wayang.java.Java</span>
<span class="k">import</span> <span class="nn">org.apache.wayang.spark.Spark</span>
<span class="k">object</span> <span class="nc">WordcountScala</span> <span class="o">{</span>
<span class="k">def</span> <span class="nf">main</span><span class="o">(</span><span class="n">args</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="o">{</span>
<span class="c1">// Settings</span>
<span class="k">val</span> <span class="nv">inputUrl</span> <span class="k">=</span> <span class="s">"file:/tmp.txt"</span>
<span class="c1">// Get a plan builder.</span>
<span class="k">val</span> <span class="nv">wayangContext</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">WayangContext</span><span class="o">(</span><span class="k">new</span> <span class="nc">Configuration</span><span class="o">)</span>
<span class="o">.</span><span class="py">withPlugin</span><span class="o">(</span><span class="nv">Java</span><span class="o">.</span><span class="py">basicPlugin</span><span class="o">)</span>
<span class="o">.</span><span class="py">withPlugin</span><span class="o">(</span><span class="nv">Spark</span><span class="o">.</span><span class="py">basicPlugin</span><span class="o">)</span>
<span class="k">val</span> <span class="nv">planBuilder</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">PlanBuilder</span><span class="o">(</span><span class="n">wayangContext</span><span class="o">)</span>
<span class="o">.</span><span class="py">withJobName</span><span class="o">(</span><span class="n">s</span><span class="s">"WordCount ($inputUrl)"</span><span class="o">)</span>
<span class="o">.</span><span class="py">withUdfJarsOf</span><span class="o">(</span><span class="k">this</span><span class="o">.</span><span class="py">getClass</span><span class="o">)</span>
<span class="k">val</span> <span class="nv">wordcounts</span> <span class="k">=</span> <span class="n">planBuilder</span>
<span class="c1">// Read the text file.</span>
<span class="o">.</span><span class="py">readTextFile</span><span class="o">(</span><span class="n">inputUrl</span><span class="o">).</span><span class="py">withName</span><span class="o">(</span><span class="s">"Load file"</span><span class="o">)</span>
<span class="c1">// Split each line by non-word characters.</span>
<span class="o">.</span><span class="py">flatMap</span><span class="o">(</span><span class="nv">_</span><span class="o">.</span><span class="py">split</span><span class="o">(</span><span class="s">"\\W+"</span><span class="o">),</span> <span class="n">selectivity</span> <span class="k">=</span> <span class="mi">10</span><span class="o">).</span><span class="py">withName</span><span class="o">(</span><span class="s">"Split words"</span><span class="o">)</span>
<span class="c1">// Filter empty tokens.</span>
<span class="o">.</span><span class="py">filter</span><span class="o">(</span><span class="nv">_</span><span class="o">.</span><span class="py">nonEmpty</span><span class="o">,</span> <span class="n">selectivity</span> <span class="k">=</span> <span class="mf">0.99</span><span class="o">).</span><span class="py">withName</span><span class="o">(</span><span class="s">"Filter empty words"</span><span class="o">)</span>
<span class="c1">// Attach counter to each word.</span>
<span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="n">word</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="nv">word</span><span class="o">.</span><span class="py">toLowerCase</span><span class="o">,</span> <span class="mi">1</span><span class="o">)).</span><span class="py">withName</span><span class="o">(</span><span class="s">"To lower case, add counter"</span><span class="o">)</span>
<span class="c1">// Sum up counters for every word.</span>
<span class="o">.</span><span class="py">reduceByKey</span><span class="o">(</span><span class="nv">_</span><span class="o">.</span><span class="py">_1</span><span class="o">,</span> <span class="o">(</span><span class="n">c1</span><span class="o">,</span> <span class="n">c2</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="o">(</span><span class="nv">c1</span><span class="o">.</span><span class="py">_1</span><span class="o">,</span> <span class="nv">c1</span><span class="o">.</span><span class="py">_2</span> <span class="o">+</span> <span class="nv">c2</span><span class="o">.</span><span class="py">_2</span><span class="o">)).</span><span class="py">withName</span><span class="o">(</span><span class="s">"Add counters"</span><span class="o">)</span>
<span class="o">.</span><span class="py">withCardinalityEstimator</span><span class="o">((</span><span class="n">in</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span> <span class="k">=&gt;</span> <span class="nv">math</span><span class="o">.</span><span class="py">round</span><span class="o">(</span><span class="n">in</span>
<span class="mf">0.01</span><span class="o">))</span>
<span class="c1">// Execute the plan and collect the results.</span>
<span class="o">.</span><span class="py">collect</span><span class="o">()</span>
<span class="nf">println</span><span class="o">(</span><span class="n">wordcounts</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div></div>
<h3 id="k-means">K-means</h3>
<p>Apache Wayang is also capable of iterative processing, which is, e.g., very important for machine learning algorithms, such as k-means.</p>
<h4 id="scala-api-1">Scala API</h4>
<div class="language-scala highlighter-rouge"><div class="highlight"><pre class="highlight"><code><span class="k">import</span> <span class="nn">org.apache.wayang.api._</span>
<span class="k">import</span> <span class="nn">org.apache.wayang.core.api.</span><span class="o">{</span><span class="nc">Configuration</span><span class="o">,</span> <span class="nc">WayangContext</span><span class="o">}</span>
<span class="k">import</span> <span class="nn">org.apache.wayang.core.function.FunctionDescriptor.ExtendedSerializableFunction</span>
<span class="k">import</span> <span class="nn">org.apache.wayang.core.function.ExecutionContext</span>
<span class="k">import</span> <span class="nn">org.apache.wayang.core.optimizer.costs.LoadProfileEstimators</span>
<span class="k">import</span> <span class="nn">org.apache.wayang.java.Java</span>
<span class="k">import</span> <span class="nn">org.apache.wayang.spark.Spark</span>
<span class="k">import</span> <span class="nn">scala.util.Random</span>
<span class="k">import</span> <span class="nn">scala.collection.JavaConversions._</span>
<span class="k">object</span> <span class="nc">kmeans</span> <span class="o">{</span>
<span class="k">def</span> <span class="nf">main</span><span class="o">(</span><span class="n">args</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="o">{</span>
<span class="c1">// Settings</span>
<span class="k">val</span> <span class="nv">inputUrl</span> <span class="k">=</span> <span class="s">"file:/kmeans.txt"</span>
<span class="k">val</span> <span class="nv">k</span> <span class="k">=</span> <span class="mi">5</span>
<span class="k">val</span> <span class="nv">iterations</span> <span class="k">=</span> <span class="mi">100</span>
<span class="k">val</span> <span class="nv">configuration</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Configuration</span>
<span class="c1">// Get a plan builder.</span>
<span class="k">val</span> <span class="nv">wayangContext</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">WayangContext</span><span class="o">(</span><span class="k">new</span> <span class="nc">Configuration</span><span class="o">)</span>
<span class="o">.</span><span class="py">withPlugin</span><span class="o">(</span><span class="nv">Java</span><span class="o">.</span><span class="py">basicPlugin</span><span class="o">)</span>
<span class="o">.</span><span class="py">withPlugin</span><span class="o">(</span><span class="nv">Spark</span><span class="o">.</span><span class="py">basicPlugin</span><span class="o">)</span>
<span class="k">val</span> <span class="nv">planBuilder</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">PlanBuilder</span><span class="o">(</span><span class="n">wayangContext</span><span class="o">)</span>
<span class="o">.</span><span class="py">withJobName</span><span class="o">(</span><span class="n">s</span><span class="s">"k-means ($inputUrl, k=$k, $iterations iterations)"</span><span class="o">)</span>
<span class="o">.</span><span class="py">withUdfJarsOf</span><span class="o">(</span><span class="k">this</span><span class="o">.</span><span class="py">getClass</span><span class="o">)</span>
<span class="k">case</span> <span class="k">class</span> <span class="nc">Point</span><span class="o">(</span><span class="n">x</span><span class="k">:</span> <span class="kt">Double</span><span class="o">,</span> <span class="n">y</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span>
<span class="k">case</span> <span class="k">class</span> <span class="nc">TaggedPoint</span><span class="o">(</span><span class="n">x</span><span class="k">:</span> <span class="kt">Double</span><span class="o">,</span> <span class="n">y</span><span class="k">:</span> <span class="kt">Double</span><span class="o">,</span> <span class="n">cluster</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span>
<span class="k">case</span> <span class="k">class</span> <span class="nc">TaggedPointCounter</span><span class="o">(</span><span class="n">x</span><span class="k">:</span> <span class="kt">Double</span><span class="o">,</span> <span class="n">y</span><span class="k">:</span> <span class="kt">Double</span><span class="o">,</span> <span class="n">cluster</span><span class="k">:</span> <span class="kt">Int</span><span class="o">,</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Long</span><span class="o">)</span> <span class="o">{</span>
<span class="k">def</span> <span class="nf">add_points</span><span class="o">(</span><span class="n">that</span><span class="k">:</span> <span class="kt">TaggedPointCounter</span><span class="o">)</span> <span class="k">=</span> <span class="nc">TaggedPointCounter</span><span class="o">(</span><span class="k">this</span><span class="o">.</span><span class="py">x</span> <span class="o">+</span> <span class="nv">that</span><span class="o">.</span><span class="py">x</span><span class="o">,</span> <span class="k">this</span><span class="o">.</span><span class="py">y</span> <span class="o">+</span> <span class="nv">that</span><span class="o">.</span><span class="py">y</span><span class="o">,</span> <span class="k">this</span><span class="o">.</span><span class="py">cluster</span><span class="o">,</span> <span class="k">this</span><span class="o">.</span><span class="py">count</span> <span class="o">+</span> <span class="nv">that</span><span class="o">.</span><span class="py">count</span><span class="o">)</span>
<span class="k">def</span> <span class="nf">average</span> <span class="k">=</span> <span class="nc">TaggedPointCounter</span><span class="o">(</span><span class="n">x</span> <span class="o">/</span> <span class="n">count</span><span class="o">,</span> <span class="n">y</span> <span class="o">/</span> <span class="n">count</span><span class="o">,</span> <span class="n">cluster</span><span class="o">,</span> <span class="mi">0</span><span class="o">)</span>
<span class="o">}</span>
<span class="c1">// Read and parse the input file(s).</span>
<span class="k">val</span> <span class="nv">points</span> <span class="k">=</span> <span class="n">planBuilder</span>
<span class="o">.</span><span class="py">readTextFile</span><span class="o">(</span><span class="n">inputUrl</span><span class="o">).</span><span class="py">withName</span><span class="o">(</span><span class="s">"Read file"</span><span class="o">)</span>
<span class="o">.</span><span class="py">map</span> <span class="o">{</span> <span class="n">line</span> <span class="k">=&gt;</span>
<span class="k">val</span> <span class="nv">fields</span> <span class="k">=</span> <span class="nv">line</span><span class="o">.</span><span class="py">split</span><span class="o">(</span><span class="s">","</span><span class="o">)</span>
<span class="nc">Point</span><span class="o">(</span><span class="nf">fields</span><span class="o">(</span><span class="mi">0</span><span class="o">).</span><span class="py">toDouble</span><span class="o">,</span> <span class="nf">fields</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="py">toDouble</span><span class="o">)</span>
<span class="o">}.</span><span class="py">withName</span><span class="o">(</span><span class="s">"Create points"</span><span class="o">)</span>
<span class="c1">// Create initial centroids.</span>
<span class="k">val</span> <span class="nv">random</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Random</span>
<span class="k">val</span> <span class="nv">initialCentroids</span> <span class="k">=</span> <span class="n">planBuilder</span>
<span class="o">.</span><span class="py">loadCollection</span><span class="o">(</span><span class="nf">for</span> <span class="o">(</span><span class="n">i</span> <span class="k">&lt;-</span> <span class="mi">1</span> <span class="n">to</span> <span class="n">k</span><span class="o">)</span> <span class="k">yield</span> <span class="nc">TaggedPointCounter</span><span class="o">(</span><span class="nv">random</span><span class="o">.</span><span class="py">nextGaussian</span><span class="o">(),</span> <span class="nv">random</span><span class="o">.</span><span class="py">nextGaussian</span><span class="o">(),</span> <span class="n">i</span><span class="o">,</span> <span class="mi">0</span><span class="o">)).</span><span class="py">withName</span><span class="o">(</span><span class="s">"Load random centroids"</span><span class="o">)</span>
<span class="c1">// Declare UDF to select centroid for each data point.</span>
<span class="k">class</span> <span class="nc">SelectNearestCentroid</span> <span class="k">extends</span> <span class="nc">ExtendedSerializableFunction</span><span class="o">[</span><span class="kt">Point</span>, <span class="kt">TaggedPointCounter</span><span class="o">]</span> <span class="o">{</span>
<span class="cm">/*Keeps the broadcasted centroids. */</span>
<span class="k">var</span> <span class="n">centroids</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[</span><span class="kt">TaggedPointCounter</span><span class="o">]</span> <span class="k">=</span> <span class="k">_</span>
<span class="k">override</span> <span class="k">def</span> <span class="nf">open</span><span class="o">(</span><span class="n">executionCtx</span><span class="k">:</span> <span class="kt">ExecutionContext</span><span class="o">)</span> <span class="k">=</span> <span class="o">{</span>
<span class="n">centroids</span> <span class="k">=</span> <span class="nv">executionCtx</span><span class="o">.</span><span class="py">getBroadcast</span><span class="o">[</span><span class="kt">TaggedPointCounter</span><span class="o">](</span><span class="s">"centroids"</span><span class="o">)</span>
<span class="o">}</span>
<span class="k">override</span> <span class="k">def</span> <span class="nf">apply</span><span class="o">(</span><span class="n">point</span><span class="k">:</span> <span class="kt">Point</span><span class="o">)</span><span class="k">:</span> <span class="kt">TaggedPointCounter</span> <span class="o">=</span> <span class="o">{</span>
<span class="k">var</span> <span class="n">minDistance</span> <span class="k">=</span> <span class="nv">Double</span><span class="o">.</span><span class="py">PositiveInfinity</span>
<span class="k">var</span> <span class="n">nearestCentroidId</span> <span class="k">=</span> <span class="o">-</span><span class="mi">1</span>
<span class="nf">for</span> <span class="o">(</span><span class="n">centroid</span> <span class="k">&lt;-</span> <span class="n">centroids</span><span class="o">)</span> <span class="o">{</span>
<span class="k">val</span> <span class="nv">distance</span> <span class="k">=</span> <span class="nv">Math</span><span class="o">.</span><span class="py">pow</span><span class="o">(</span><span class="nv">Math</span><span class="o">.</span><span class="py">pow</span><span class="o">(</span><span class="nv">point</span><span class="o">.</span><span class="py">x</span> <span class="o">-</span> <span class="nv">centroid</span><span class="o">.</span><span class="py">x</span><span class="o">,</span> <span class="mi">2</span><span class="o">)</span> <span class="o">+</span> <span class="nv">Math</span><span class="o">.</span><span class="py">pow</span><span class="o">(</span><span class="nv">point</span><span class="o">.</span><span class="py">y</span> <span class="o">-</span> <span class="nv">centroid</span><span class="o">.</span><span class="py">y</span><span class="o">,</span> <span class="mi">2</span><span class="o">),</span> <span class="mf">0.5</span><span class="o">)</span>
<span class="nf">if</span> <span class="o">(</span><span class="n">distance</span> <span class="o">&lt;</span> <span class="n">minDistance</span><span class="o">)</span> <span class="o">{</span>
<span class="n">minDistance</span> <span class="k">=</span> <span class="n">distance</span>
<span class="n">nearestCentroidId</span> <span class="k">=</span> <span class="nv">centroid</span><span class="o">.</span><span class="py">cluster</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="k">new</span> <span class="nc">TaggedPointCounter</span><span class="o">(</span><span class="nv">point</span><span class="o">.</span><span class="py">x</span><span class="o">,</span> <span class="nv">point</span><span class="o">.</span><span class="py">y</span><span class="o">,</span> <span class="n">nearestCentroidId</span><span class="o">,</span> <span class="mi">1</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="c1">// Do the k-means loop.</span>
<span class="k">val</span> <span class="nv">finalCentroids</span> <span class="k">=</span> <span class="nv">initialCentroids</span><span class="o">.</span><span class="py">repeat</span><span class="o">(</span><span class="n">iterations</span><span class="o">,</span> <span class="o">{</span> <span class="n">currentCentroids</span> <span class="k">=&gt;</span>
<span class="n">points</span>
<span class="o">.</span><span class="py">mapJava</span><span class="o">(</span><span class="k">new</span> <span class="nc">SelectNearestCentroid</span><span class="o">,</span>
<span class="n">udfLoad</span> <span class="k">=</span> <span class="nv">LoadProfileEstimators</span><span class="o">.</span><span class="py">createFromSpecification</span><span class="o">(</span>
<span class="s">"my.udf.costfunction.key"</span><span class="o">,</span> <span class="n">configuration</span>
<span class="o">))</span>
<span class="o">.</span><span class="py">withBroadcast</span><span class="o">(</span><span class="n">currentCentroids</span><span class="o">,</span> <span class="s">"centroids"</span><span class="o">).</span><span class="py">withName</span><span class="o">(</span><span class="s">"Find nearest centroid"</span><span class="o">)</span>
<span class="o">.</span><span class="py">reduceByKey</span><span class="o">(</span><span class="nv">_</span><span class="o">.</span><span class="py">cluster</span><span class="o">,</span> <span class="nv">_</span><span class="o">.</span><span class="py">add_points</span><span class="o">(</span><span class="k">_</span><span class="o">)).</span><span class="py">withName</span><span class="o">(</span><span class="s">"Add up points"</span><span class="o">)</span>
<span class="o">.</span><span class="py">withCardinalityEstimator</span><span class="o">(</span><span class="n">k</span><span class="o">)</span>
<span class="o">.</span><span class="py">map</span><span class="o">(</span><span class="nv">_</span><span class="o">.</span><span class="py">average</span><span class="o">).</span><span class="py">withName</span><span class="o">(</span><span class="s">"Average points"</span><span class="o">)</span>
<span class="o">}).</span><span class="py">withName</span><span class="o">(</span><span class="s">"Loop"</span><span class="o">)</span>
<span class="c1">// Collect the results.</span>
<span class="o">.</span><span class="py">collect</span><span class="o">()</span>
<span class="nf">println</span><span class="o">(</span><span class="n">finalCentroids</span><span class="o">)</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div></div>
</div>
</div>
<footer class="footer position-sticky sticky-bottom">
<nav class="navbar navbar-light bg-light" style="background: #A6A6A6;">
<div class="container">
<div class="row">
<div class="col text-center">
<a href="http://incubator.apache.org/" >
<img style="max-height: 15vw" src="https://wayang.apache.org/assets/img/egg-logo.png">
</a>
<br />
<p style="text-align: justify">
Apache Wayang is an effort undergoing Incubation at The Apache Software Foundation (ASF), sponsored by the Incubator. Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.
</p>
<p class="text-center">
Copyright &#169; 2021 The Apache Software Foundation.<br />
Licensed under the Apache License, Version 2.0.<br />
Apache, the Apache Feather logo, and the Apache Incubator project logo are trademarks of The Apache Software Foundation.
</p>
</div>
</div>
</div>
</nav>
</footer>
<script src="https://code.jquery.com/jquery-3.5.1.slim.min.js" integrity="sha384-DfXdz2htPH0lsSSs5nCTpuj/zy4C+OGpamoFVy38MVBnE+IbbVYUew+OrCXaRkfj" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@4.6.0/dist/js/bootstrap.bundle.min.js" integrity="sha384-Piv4xVNRyMGpqkS2by6br4gNJ7DXjqk09RmUpJ8jgGtD7zP9yug3goQfGII0yAns" crossorigin="anonymous"></script>
<script src="https://wayang.apache.org/assets/js/add_numbers.js"></script>
</body>
</html>