blob: aa68219a74d2f8f300ff00d60263b92680ff0aaf [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<title>Flux</title>
<!-- Bootstrap core CSS -->
<link href="/assets/css/bootstrap.min.css" rel="stylesheet">
<!-- Bootstrap theme -->
<link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
<!-- Custom styles for this template -->
<link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
<link href="/css/style.css" rel="stylesheet">
<link href="/assets/css/owl.theme.css" rel="stylesheet">
<link href="/assets/css/owl.carousel.css" rel="stylesheet">
<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
<script type="text/javascript" src="/assets/js/storm.js"></script>
<!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
<!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<header>
<div class="container-fluid">
<div class="row">
<div class="col-md-5">
<a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
</div>
<div class="col-md-5">
<h1>Version: 2.3.0</h1>
</div>
<div class="col-md-2">
<a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
</div>
</div>
</div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
<div class="container-fluid">
<div class="navbar-header">
<button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
<ul class="nav navbar-nav">
<li><a href="/index.html" id="home">Home</a></li>
<li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
<li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/releases/2.3.0/index.html">2.3.0</a></li>
<li><a href="/releases/2.2.0/index.html">2.2.0</a></li>
<li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
<li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
<li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
</ul>
</li>
<li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
<li><a href="/contribute/People.html">People</a></li>
<li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
</ul>
</li>
<li><a href="/2021/09/27/storm230-released.html" id="news">News</a></li>
</ul>
</nav>
</div>
</div>
<div class="container-fluid">
<h1 class="page-title">Flux</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><p>A framework for creating and deploying Apache Storm streaming computations with less friction.</p>
<h2 id="definition">Definition</h2>
<p><strong>flux</strong> |fləks| <em>noun</em></p>
<ol>
<li>The action or process of flowing or flowing out</li>
<li>Continuous change</li>
<li>In physics, the rate of flow of a fluid, radiant energy, or particles across a given area</li>
<li>A substance mixed with a solid to lower its melting point</li>
</ol>
<h2 id="rationale">Rationale</h2>
<p>Bad things happen when configuration is hard-coded. No one should have to recompile or repackage an application in
order to change configuration.</p>
<h2 id="about">About</h2>
<p>Flux is a framework and set of utilities that make defining and deploying Apache Storm topologies less painful and
deveoper-intensive. One of the pain points often mentioned is the fact that the wiring for a Topology graph is often tied up in Java code,
and that any changes require recompilation and repackaging of the topology jar file. Flux aims to alleviate that
pain by allowing you to package all your Storm components in a single jar, and use an external text file to define
the layout and configuration of your topologies.</p>
<h2 id="features">Features</h2>
<ul>
<li>Easily configure and deploy Storm topologies (Both Storm core and Microbatch API) without embedding configuration
in your topology code</li>
<li>Support for existing topology code (see below)</li>
<li>Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL</li>
<li>YAML DSL support for most Storm components (storm-kafka-client, storm-hdfs, storm-hbase, etc.)</li>
<li>Convenient support for multi-lang components</li>
<li>External property substitution/filtering for easily switching between configurations/environments (similar to Maven-style
<code>${variable.name}</code> substitution)</li>
</ul>
<h2 id="usage">Usage</h2>
<p>To use Flux, add it as a dependency and package all your Storm components in a fat jar, then create a YAML document
to define your topology (see below for YAML configuration options).</p>
<h3 id="building-from-source">Building from Source</h3>
<p>The easiest way to use Flux, is to add it as a Maven dependency in you project as described below.</p>
<p>If you would like to build Flux from source and run the unit/integration tests, you will need the following installed
on your system:</p>
<ul>
<li>Python 2.7.x or later</li>
<li>Node.js 0.10.x or later</li>
</ul>
<h4 id="building-with-unit-tests-enabled">Building with unit tests enabled:</h4>
<div class="highlight"><pre><code class="language-" data-lang="">mvn clean install
</code></pre></div>
<h4 id="building-with-unit-tests-disabled">Building with unit tests disabled:</h4>
<p>If you would like to build Flux without installing Python or Node.js you can simply skip the unit tests:</p>
<div class="highlight"><pre><code class="language-" data-lang="">mvn clean install -DskipTests=true
</code></pre></div>
<p>Note that if you plan on using Flux to deploy topologies to a remote cluster, you will still need to have Python
installed since it is required by Apache Storm.</p>
<h4 id="building-with-integration-tests-enabled">Building with integration tests enabled:</h4>
<div class="highlight"><pre><code class="language-" data-lang="">mvn clean install -DskipIntegration=false
</code></pre></div>
<h3 id="packaging-with-maven">Packaging with Maven</h3>
<p>To enable Flux for your Storm components, you need to add it as a dependency such that it&#39;s included in the Storm
topology jar. This can be accomplished with the Maven shade plugin (preferred) or the Maven assembly plugin (not
recommended).</p>
<h4 id="flux-maven-dependency">Flux Maven Dependency</h4>
<p>The current version of Flux is available in Maven Central at the following coordinates:
<code>xml
&lt;dependency&gt;
&lt;groupId&gt;org.apache.storm&lt;/groupId&gt;
&lt;artifactId&gt;flux-core&lt;/artifactId&gt;
&lt;version&gt;${storm.version}&lt;/version&gt;
&lt;/dependency&gt;
</code></p>
<p>Using shell spouts and bolts requires additional Flux Wrappers library:
<code>xml
&lt;dependency&gt;
&lt;groupId&gt;org.apache.storm&lt;/groupId&gt;
&lt;artifactId&gt;flux-wrappers&lt;/artifactId&gt;
&lt;version&gt;${storm.version}&lt;/version&gt;
&lt;/dependency&gt;
</code></p>
<h4 id="creating-a-flux-enabled-topology-jar">Creating a Flux-Enabled Topology JAR</h4>
<p>The example below illustrates Flux usage with the Maven shade plugin:</p>
<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="c">&lt;!-- include Flux and user dependencies in the shaded jar --&gt;</span>
<span class="nt">&lt;dependencies&gt;</span>
<span class="c">&lt;!-- Flux include --&gt;</span>
<span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.storm<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flux-core<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>${storm.version}<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
<span class="c">&lt;!-- Flux Wrappers include --&gt;</span>
<span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.storm<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flux-wrappers<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>${storm.version}<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
<span class="c">&lt;!-- add user dependencies here... --&gt;</span>
<span class="nt">&lt;/dependencies&gt;</span>
<span class="c">&lt;!-- create a fat jar that includes all dependencies --&gt;</span>
<span class="nt">&lt;build&gt;</span>
<span class="nt">&lt;plugins&gt;</span>
<span class="nt">&lt;plugin&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.maven.plugins<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>maven-shade-plugin<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>1.4<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;configuration&gt;</span>
<span class="nt">&lt;createDependencyReducedPom&gt;</span>true<span class="nt">&lt;/createDependencyReducedPom&gt;</span>
<span class="nt">&lt;/configuration&gt;</span>
<span class="nt">&lt;executions&gt;</span>
<span class="nt">&lt;execution&gt;</span>
<span class="nt">&lt;phase&gt;</span>package<span class="nt">&lt;/phase&gt;</span>
<span class="nt">&lt;goals&gt;</span>
<span class="nt">&lt;goal&gt;</span>shade<span class="nt">&lt;/goal&gt;</span>
<span class="nt">&lt;/goals&gt;</span>
<span class="nt">&lt;configuration&gt;</span>
<span class="nt">&lt;transformers&gt;</span>
<span class="nt">&lt;transformer</span>
<span class="na">implementation=</span><span class="s">"org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"</span><span class="nt">/&gt;</span>
<span class="nt">&lt;transformer</span>
<span class="na">implementation=</span><span class="s">"org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"</span><span class="nt">&gt;</span>
<span class="nt">&lt;mainClass&gt;</span>org.apache.storm.flux.Flux<span class="nt">&lt;/mainClass&gt;</span>
<span class="nt">&lt;/transformer&gt;</span>
<span class="nt">&lt;/transformers&gt;</span>
<span class="nt">&lt;/configuration&gt;</span>
<span class="nt">&lt;/execution&gt;</span>
<span class="nt">&lt;/executions&gt;</span>
<span class="nt">&lt;/plugin&gt;</span>
<span class="nt">&lt;/plugins&gt;</span>
<span class="nt">&lt;/build&gt;</span>
</code></pre></div>
<h3 id="deploying-and-running-a-flux-topology">Deploying and Running a Flux Topology</h3>
<p>Once your topology components are packaged with the Flux dependency, you can run different topologies either locally
or remotely using the <code>storm jar</code> command. For example, if your fat jar is named <code>myTopology-0.1.0-SNAPSHOT.jar</code> you
could run it locally with the command:</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash">storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux <span class="nt">--local</span> my_config.yaml
</code></pre></div>
<h3 id="command-line-options">Command line options</h3>
<div class="highlight"><pre><code class="language-" data-lang="">usage: storm jar &lt;my_topology_uber_jar.jar&gt; org.apache.storm.flux.Flux
[options] &lt;topology-config.yaml&gt;
-d,--dry-run Do not run or deploy the topology. Just
build, validate, and print information about
the topology.
-e,--env-filter Perform environment variable substitution.
Replace keys identified with `${ENV-[NAME]}`
will be replaced with the corresponding
`NAME` environment value
-f,--filter &lt;file&gt; Perform property substitution. Use the
specified file as a source of properties,
and replace keys identified with {$[property
name]} with the value defined in the
properties file.
-i,--inactive Deploy the topology, but do not activate it.
-l,--local Run the topology in local mode.
-n,--no-splash Suppress the printing of the splash screen.
-q,--no-detail Suppress the printing of topology details.
-r,--remote Deploy the topology to a remote cluster.
-R,--resource Treat the supplied path as a classpath
resource instead of a file.
-s,--sleep &lt;ms&gt; When running locally, the amount of time to
sleep (in ms.) before killing the topology
and shutting down the local cluster.
-z,--zookeeper &lt;host:port&gt; When running in local mode, use the
ZooKeeper at the specified &lt;host&gt;:&lt;port&gt;
instead of the in-process ZooKeeper.
(requires Storm 0.9.3 or later)
</code></pre></div>
<p><strong>NOTE:</strong> Flux tries to avoid command line switch collision with the <code>storm</code> command, and allows any other command line
switches to pass through to the <code>storm</code> command.</p>
<p>For example, you can use the <code>storm</code> command switch <code>-c</code> to override a topology configuration property. The following
example command will run Flux and override the <code>nimbus.seeds</code> configuration:</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash">storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux <span class="nt">--remote</span> my_config.yaml <span class="nt">-c</span> <span class="s1">'nimbus.seeds=["localhost"]'</span>
</code></pre></div>
<h3 id="sample-output">Sample output</h3>
<div class="highlight"><pre><code class="language-" data-lang="">███████╗██╗ ██╗ ██╗██╗ ██╗
██╔════╝██║ ██║ ██║╚██╗██╔╝
█████╗ ██║ ██║ ██║ ╚███╔╝
██╔══╝ ██║ ██║ ██║ ██╔██╗
██║ ███████╗╚██████╔╝██╔╝ ██╗
╚═╝ ╚══════╝ ╚═════╝ ╚═╝ ╚═╝
+- Apache Storm -+
+- data FLow User eXperience -+
Version: 0.3.0
Parsing file: /Users/hsimpson/Projects/donut_domination/storm/shell_test.yaml
---------- TOPOLOGY DETAILS ----------
Name: shell-topology
--------------- SPOUTS ---------------
sentence-spout[1](org.apache.storm.flux.wrappers.spouts.FluxShellSpout)
---------------- BOLTS ---------------
splitsentence[1](org.apache.storm.flux.wrappers.bolts.FluxShellBolt)
log[1](org.apache.storm.flux.wrappers.bolts.LogInfoBolt)
count[1](org.apache.storm.testing.TestWordCounter)
--------------- STREAMS ---------------
sentence-spout --SHUFFLE--&gt; splitsentence
splitsentence --FIELDS--&gt; count
count --SHUFFLE--&gt; log
--------------------------------------
Submitting topology: 'shell-topology' to remote cluster...
</code></pre></div>
<h2 id="yaml-configuration">YAML Configuration</h2>
<p>Flux topologies are defined in a YAML file that describes a topology. A Flux topology
definition consists of the following:</p>
<ol>
<li>A topology name</li>
<li>A list of topology &quot;components&quot; (named Java objects that will be made available in the environment)</li>
<li><strong>EITHER</strong> (A DSL topology definition):
<ul>
<li>A list of spouts, each identified by a unique ID</li>
<li>A list of bolts, each identified by a unique ID</li>
<li>A list of &quot;stream&quot; objects representing a flow of tuples between spouts and bolts</li>
</ul></li>
<li><strong>OR</strong> (A JVM class that can produce a <code>org.apache.storm.generated.StormTopology</code> instance:
<ul>
<li>A <code>topologySource</code> definition.</li>
</ul></li>
</ol>
<p>For example, here is a simple definition of a wordcount topology using the YAML DSL:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">yaml-topology"</span>
<span class="na">config</span><span class="pi">:</span>
<span class="s">topology.workers</span><span class="pi">:</span> <span class="s">1</span>
<span class="c1"># spout definitions</span>
<span class="na">spouts</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">spout-1"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.testing.TestWordSpout"</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="c1"># bolt definitions</span>
<span class="na">bolts</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">bolt-1"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.testing.TestWordCounter"</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">bolt-2"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.wrappers.bolts.LogInfoBolt"</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="c1">#stream definitions</span>
<span class="na">streams</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">spout-1</span><span class="nv"> </span><span class="s">--&gt;</span><span class="nv"> </span><span class="s">bolt-1"</span> <span class="c1"># name isn't used (placeholder for logging, UI, etc.)</span>
<span class="na">from</span><span class="pi">:</span> <span class="s2">"</span><span class="s">spout-1"</span>
<span class="na">to</span><span class="pi">:</span> <span class="s2">"</span><span class="s">bolt-1"</span>
<span class="na">grouping</span><span class="pi">:</span>
<span class="na">type</span><span class="pi">:</span> <span class="s">FIELDS</span>
<span class="na">args</span><span class="pi">:</span> <span class="pi">[</span><span class="s2">"</span><span class="s">word"</span><span class="pi">]</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">bolt-1</span><span class="nv"> </span><span class="s">--&gt;</span><span class="nv"> </span><span class="s">bolt2"</span>
<span class="na">from</span><span class="pi">:</span> <span class="s2">"</span><span class="s">bolt-1"</span>
<span class="na">to</span><span class="pi">:</span> <span class="s2">"</span><span class="s">bolt-2"</span>
<span class="na">grouping</span><span class="pi">:</span>
<span class="na">type</span><span class="pi">:</span> <span class="s">SHUFFLE</span>
</code></pre></div>
<h2 id="property-substitution-filtering">Property Substitution/Filtering</h2>
<p>It&#39;s common for developers to want to easily switch between configurations, for example switching deployment between
a development environment and a production environment. This can be accomplished by using separate YAML configuration
files, but that approach would lead to unnecessary duplication, especially in situations where the Storm topology
does not change, but configuration settings such as host names, ports, and parallelism paramters do.</p>
<p>For this case, Flux offers properties filtering to allow you two externalize values to a <code>.properties</code> file and have
them substituted before the <code>.yaml</code> file is parsed.</p>
<p>To enable property filtering, use the <code>--filter</code> command line option and specify a <code>.properties</code> file. For example,
if you invoked flux like so:</p>
<div class="highlight"><pre><code class="language-bash" data-lang="bash">storm jar myTopology-0.1.0-SNAPSHOT.jar org.apache.storm.flux.Flux <span class="nt">--local</span> my_config.yaml <span class="nt">--filter</span> dev.properties
</code></pre></div>
<p>With the following <code>dev.properties</code> file:</p>
<div class="highlight"><pre><code class="language-properties" data-lang="properties"><span class="py">kafka.zookeeper.hosts</span><span class="p">:</span> <span class="s">localhost:2181</span>
</code></pre></div>
<p>You would then be able to reference those properties by key in your <code>.yaml</code> file using <code>${}</code> syntax:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"> <span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">zkHosts"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.kafka.ZkHosts"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">${kafka.zookeeper.hosts}"</span>
</code></pre></div>
<p>In this case, Flux would replace <code>${kafka.zookeeper.hosts}</code> with <code>localhost:2181</code> before parsing the YAML contents.</p>
<h3 id="environment-variable-substitution-filtering">Environment Variable Substitution/Filtering</h3>
<p>Flux also allows environment variable substitution. For example, if an environment variable named <code>ZK_HOSTS</code> if defined,
you can reference it in a Flux YAML file with the following syntax:</p>
<div class="highlight"><pre><code class="language-" data-lang="">${ENV-ZK_HOSTS}
</code></pre></div>
<h2 id="components">Components</h2>
<p>Components are essentially named object instances that are made available as configuration options for spouts and
bolts. If you are familiar with the Spring framework, components are roughly analagous to Spring beans.</p>
<p>Every component is identified, at a minimum, by a unique identifier (String) and a class name (String). For example,
the following will make an instance of the <code>org.apache.storm.kafka.StringScheme</code> class available as a reference under the key
<code>&quot;stringScheme&quot;</code> . This assumes the <code>org.apache.storm.kafka.StringScheme</code> has a default constructor.</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">components</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">stringScheme"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.kafka.StringScheme"</span>
</code></pre></div>
<h3 id="static-factory-methods">Static factory methods</h3>
<p>It is also possible to use static factory methods from Flux. Given the following Java code:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">TestBolt</span> <span class="kd">extends</span> <span class="n">BaseBasicBolt</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="n">TestBolt</span> <span class="nf">newInstance</span><span class="o">(</span><span class="n">Duration</span> <span class="n">triggerTime</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="k">new</span> <span class="nf">TestBolt</span><span class="o">(</span><span class="n">triggerTime</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div><div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">Duration</span> <span class="o">{</span>
<span class="kd">public</span> <span class="kd">static</span> <span class="n">Duration</span> <span class="nf">ofSeconds</span><span class="o">(</span><span class="kt">long</span> <span class="n">seconds</span><span class="o">)</span> <span class="o">{</span>
<span class="k">return</span> <span class="k">new</span> <span class="nf">Duration</span><span class="o">(</span><span class="n">seconds</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div>
<p>it is possible to use the factory methods as follows:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">components</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">time"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">java.time.Duration"</span>
<span class="na">factory</span><span class="pi">:</span> <span class="s2">"</span><span class="s">ofSeconds"</span>
<span class="na">bolts</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">testBolt"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.test.TestBolt"</span>
<span class="na">factory</span><span class="pi">:</span> <span class="s2">"</span><span class="s">newInstance"</span>
<span class="na">factoryArgs</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">time"</span>
</code></pre></div>
<h3 id="contructor-arguments-references-properties-and-configuration-methods">Contructor Arguments, References, Properties and Configuration Methods</h3>
<h4 id="constructor-arguments">Constructor Arguments</h4>
<p>Arguments to a class constructor can be configured by adding a <code>contructorArgs</code> element to a components.
<code>constructorArgs</code> is a list of objects that will be passed to the class&#39; constructor. The following example creates an
object by calling the constructor that takes a single string as an argument:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"> <span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">zkHosts"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.kafka.ZkHosts"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">localhost:2181"</span>
</code></pre></div>
<h4 id="references">References</h4>
<p>Each component instance is identified by a unique id that allows it to be used/reused by other components. To
reference an existing component, you specify the id of the component with the <code>ref</code> tag.</p>
<p>In the following example, a component with the id <code>&quot;stringScheme&quot;</code> is created, and later referenced, as a an argument
to another component&#39;s constructor:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">components</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">stringScheme"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.kafka.StringScheme"</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">stringMultiScheme"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.spout.SchemeAsMultiScheme"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">stringScheme"</span> <span class="c1"># component with id "stringScheme" must be declared above.</span>
</code></pre></div>
<p><strong>N.B.:</strong> References can only be used after (below) the object they point to has been declared.</p>
<h4 id="properties">Properties</h4>
<p>In addition to calling constructors with different arguments, Flux also allows you to configure components using
JavaBean-like setter methods and fields declared as <code>public</code>:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"> <span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">spoutConfig"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.kafka.SpoutConfig"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="c1"># brokerHosts</span>
<span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">zkHosts"</span>
<span class="c1"># topic</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">myKafkaTopic"</span>
<span class="c1"># zkRoot</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">/kafkaSpout"</span>
<span class="c1"># id</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">myId"</span>
<span class="na">properties</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">ignoreZkOffsets"</span>
<span class="na">value</span><span class="pi">:</span> <span class="no">true</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">scheme"</span>
<span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">stringMultiScheme"</span>
</code></pre></div>
<p>In the example above, the <code>properties</code> declaration will cause Flux to look for a public method in the <code>SpoutConfig</code> with
the signature <code>setIgnoreZkOffsets(boolean b)</code> and attempt to invoke it. If a setter method is not found, Flux will then
look for a public instance variable with the name <code>ignoreZkOffsets</code> and attempt to set its value.</p>
<p>References may also be used as property values.</p>
<h4 id="configuration-methods">Configuration Methods</h4>
<p>Conceptually, configuration methods are similar to Properties and Constructor Args -- they allow you to invoke an
arbitrary method on an object after it is constructed. Configuration methods are useful for working with classes that
don&#39;t expose JavaBean methods or have constructors that can fully configure the object. Common examples include classes
that use the builder pattern for configuration/composition.</p>
<p>The following YAML example creates a bolt and configures it by calling several methods:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">bolts</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">bolt-1"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.test.TestBolt"</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="na">configMethods</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">withFoo"</span>
<span class="na">args</span><span class="pi">:</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">foo"</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">withBar"</span>
<span class="na">args</span><span class="pi">:</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">bar"</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">withFooBar"</span>
<span class="na">args</span><span class="pi">:</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">foo"</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">bar"</span>
</code></pre></div>
<p>The signatures of the corresponding methods are as follows:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">withFoo</span><span class="o">(</span><span class="n">String</span> <span class="n">foo</span><span class="o">);</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">withBar</span><span class="o">(</span><span class="n">String</span> <span class="n">bar</span><span class="o">);</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">withFooBar</span><span class="o">(</span><span class="n">String</span> <span class="n">foo</span><span class="o">,</span> <span class="n">String</span> <span class="n">bar</span><span class="o">);</span>
</code></pre></div>
<p>Arguments passed to configuration methods work much the same way as constructor arguments, and support references as
well.</p>
<h3 id="using-java-enums-in-contructor-arguments-references-properties-and-configuration-methods">Using Java <code>enum</code>s in Contructor Arguments, References, Properties and Configuration Methods</h3>
<p>You can easily use Java <code>enum</code> values as arguments in a Flux YAML file, simply by referencing the name of the <code>enum</code>.</p>
<p>For example, <a href="storm-hdfs.html">Storm&#39;s HDFS module</a> includes the following <code>enum</code> definition (simplified for brevity):</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">enum</span> <span class="n">Units</span> <span class="o">{</span>
<span class="n">KB</span><span class="o">,</span> <span class="n">MB</span><span class="o">,</span> <span class="n">GB</span><span class="o">,</span> <span class="n">TB</span>
<span class="o">}</span>
</code></pre></div>
<p>And the <code>org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy</code> class has the following constructor:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="nf">FileSizeRotationPolicy</span><span class="o">(</span><span class="kt">float</span> <span class="n">count</span><span class="o">,</span> <span class="n">Units</span> <span class="n">units</span><span class="o">)</span>
</code></pre></div>
<p>The following Flux <code>component</code> definition could be used to call the constructor:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"> <span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">rotationPolicy"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="pi">-</span> <span class="s">5.0</span>
<span class="pi">-</span> <span class="s">MB</span>
</code></pre></div>
<p>The above definition is functionally equivalent to the following Java code:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// rotate files when they reach 5MB</span>
<span class="n">FileRotationPolicy</span> <span class="n">rotationPolicy</span> <span class="o">=</span> <span class="k">new</span> <span class="n">FileSizeRotationPolicy</span><span class="o">(</span><span class="mf">5.0f</span><span class="o">,</span> <span class="n">Units</span><span class="o">.</span><span class="na">MB</span><span class="o">);</span>
</code></pre></div>
<h2 id="topology-config">Topology Config</h2>
<p>The <code>config</code> section is simply a map of Storm topology configuration parameters that will be passed to the
<code>org.apache.storm.StormSubmitter</code> as an instance of the <code>org.apache.storm.Config</code> class:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">config</span><span class="pi">:</span>
<span class="s">topology.workers</span><span class="pi">:</span> <span class="s">4</span>
<span class="s">topology.max.spout.pending</span><span class="pi">:</span> <span class="s">1000</span>
<span class="s">topology.message.timeout.secs</span><span class="pi">:</span> <span class="s">30</span>
</code></pre></div>
<h1 id="existing-topologies">Existing Topologies</h1>
<p>If you have existing Storm topologies, you can still use Flux to deploy/run/test them. This feature allows you to
leverage Flux Constructor Arguments, References, Properties, and Topology Config declarations for existing topology
classes.</p>
<p>The easiest way to use an existing topology class is to define
a <code>getTopology()</code> instance method with one of the following signatures:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="n">StormTopology</span> <span class="nf">getTopology</span><span class="o">(</span><span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Object</span><span class="o">&gt;</span> <span class="n">config</span><span class="o">)</span>
</code></pre></div>
<p>or:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="n">StormTopology</span> <span class="nf">getTopology</span><span class="o">(</span><span class="n">Config</span> <span class="n">config</span><span class="o">)</span>
</code></pre></div>
<p>You could then use the following YAML to configure your topology:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">existing-topology"</span>
<span class="na">topologySource</span><span class="pi">:</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.test.SimpleTopology"</span>
</code></pre></div>
<p>If the class you would like to use as a topology source has a different method name (i.e. not <code>getTopology</code>), you can
override it:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">existing-topology"</span>
<span class="na">topologySource</span><span class="pi">:</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.test.SimpleTopology"</span>
<span class="na">methodName</span><span class="pi">:</span> <span class="s2">"</span><span class="s">getTopologyWithDifferentMethodName"</span>
</code></pre></div>
<p><strong>N.B.:</strong> The specified method must accept a single argument of type <code>java.util.Map&lt;String, Object&gt;</code> or
<code>org.apache.storm.Config</code>, and return a <code>org.apache.storm.generated.StormTopology</code> object.</p>
<h1 id="yaml-dsl">YAML DSL</h1>
<h2 id="spouts-and-bolts">Spouts and Bolts</h2>
<p>Spout and Bolts are configured in their own respective section of the YAML configuration. Spout and Bolt definitions
are extensions to the <code>component</code> definition that add a <code>parallelism</code> parameter that sets the parallelism for a
component when the topology is deployed.</p>
<p>Because spout and bolt definitions extend <code>component</code> they support constructor arguments, references, and properties as
well.</p>
<p>Shell spout example:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">spouts</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">sentence-spout"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.wrappers.spouts.FluxShellSpout"</span>
<span class="c1"># shell spout constructor takes 2 arguments: String[], String[]</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="c1"># command line</span>
<span class="pi">-</span> <span class="pi">[</span><span class="s2">"</span><span class="s">node"</span><span class="pi">,</span> <span class="s2">"</span><span class="s">randomsentence.js"</span><span class="pi">]</span>
<span class="c1"># output fields</span>
<span class="pi">-</span> <span class="pi">[</span><span class="s2">"</span><span class="s">word"</span><span class="pi">]</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
</code></pre></div>
<p>Kafka spout example:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">components</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">stringScheme"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.kafka.StringScheme"</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">stringMultiScheme"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.spout.SchemeAsMultiScheme"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">stringScheme"</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">zkHosts"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.kafka.ZkHosts"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">localhost:2181"</span>
<span class="c1"># Alternative kafka config</span>
<span class="c1"># - id: "kafkaConfig"</span>
<span class="c1"># className: "org.apache.storm.kafka.KafkaConfig"</span>
<span class="c1"># constructorArgs:</span>
<span class="c1"># # brokerHosts</span>
<span class="c1"># - ref: "zkHosts"</span>
<span class="c1"># # topic</span>
<span class="c1"># - "myKafkaTopic"</span>
<span class="c1"># # clientId (optional)</span>
<span class="c1"># - "myKafkaClientId"</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">spoutConfig"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.kafka.SpoutConfig"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="c1"># brokerHosts</span>
<span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">zkHosts"</span>
<span class="c1"># topic</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">myKafkaTopic"</span>
<span class="c1"># zkRoot</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">/kafkaSpout"</span>
<span class="c1"># id</span>
<span class="pi">-</span> <span class="s2">"</span><span class="s">myId"</span>
<span class="na">properties</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">ignoreZkOffsets"</span>
<span class="na">value</span><span class="pi">:</span> <span class="no">true</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">scheme"</span>
<span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">stringMultiScheme"</span>
<span class="na">config</span><span class="pi">:</span>
<span class="s">topology.workers</span><span class="pi">:</span> <span class="s">1</span>
<span class="c1"># spout definitions</span>
<span class="na">spouts</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">kafka-spout"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.kafka.KafkaSpout"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">ref</span><span class="pi">:</span> <span class="s2">"</span><span class="s">spoutConfig"</span>
</code></pre></div>
<p>Bolt Examples:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="c1"># bolt definitions</span>
<span class="na">bolts</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">splitsentence"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.wrappers.bolts.FluxShellBolt"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="c1"># command line</span>
<span class="pi">-</span> <span class="pi">[</span><span class="s2">"</span><span class="s">python"</span><span class="pi">,</span> <span class="s2">"</span><span class="s">splitsentence.py"</span><span class="pi">]</span>
<span class="c1"># output fields</span>
<span class="pi">-</span> <span class="pi">[</span><span class="s2">"</span><span class="s">word"</span><span class="pi">]</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="c1"># ...</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">log"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.wrappers.bolts.LogInfoBolt"</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="c1"># ...</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">count"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.testing.TestWordCounter"</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="c1"># ...</span>
</code></pre></div>
<h2 id="streams-and-stream-groupings">Streams and Stream Groupings</h2>
<p>Streams in Flux are represented as a list of connections (Graph edges, data flow, etc.) between the Spouts and Bolts in
a topology, with an associated Grouping definition.</p>
<p>A Stream definition has the following properties:</p>
<p><strong><code>name</code>:</strong> A name for the connection (optional, currently unused)</p>
<p><strong><code>from</code>:</strong> The <code>id</code> of a Spout or Bolt that is the source (publisher)</p>
<p><strong><code>to</code>:</strong> The <code>id</code> of a Spout or Bolt that is the destination (subscriber)</p>
<p><strong><code>grouping</code>:</strong> The stream grouping definition for the Stream</p>
<p>A Grouping definition has the following properties:</p>
<p><strong><code>type</code>:</strong> The type of grouping. One of <code>ALL</code>,<code>CUSTOM</code>,<code>DIRECT</code>,<code>SHUFFLE</code>,<code>LOCAL_OR_SHUFFLE</code>,<code>FIELDS</code>,<code>GLOBAL</code>, or <code>NONE</code>.</p>
<p><strong><code>streamId</code>:</strong> The Storm stream ID (Optional. If unspecified will use the default stream)</p>
<p><strong><code>args</code>:</strong> For the <code>FIELDS</code> grouping, a list of field names.</p>
<p><strong><code>customClass</code></strong> For the <code>CUSTOM</code> grouping, a definition of custom grouping class instance</p>
<p>The <code>streams</code> definition example below sets up a topology with the following wiring:</p>
<div class="highlight"><pre><code class="language-" data-lang=""> kafka-spout --&gt; splitsentence --&gt; count --&gt; log
</code></pre></div><div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="c1">#stream definitions</span>
<span class="c1"># stream definitions define connections between spouts and bolts.</span>
<span class="c1"># note that such connections can be cyclical</span>
<span class="c1"># custom stream groupings are also supported</span>
<span class="na">streams</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">kafka</span><span class="nv"> </span><span class="s">--&gt;</span><span class="nv"> </span><span class="s">split"</span> <span class="c1"># name isn't used (placeholder for logging, UI, etc.)</span>
<span class="na">from</span><span class="pi">:</span> <span class="s2">"</span><span class="s">kafka-spout"</span>
<span class="na">to</span><span class="pi">:</span> <span class="s2">"</span><span class="s">splitsentence"</span>
<span class="na">grouping</span><span class="pi">:</span>
<span class="na">type</span><span class="pi">:</span> <span class="s">SHUFFLE</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">split</span><span class="nv"> </span><span class="s">--&gt;</span><span class="nv"> </span><span class="s">count"</span>
<span class="na">from</span><span class="pi">:</span> <span class="s2">"</span><span class="s">splitsentence"</span>
<span class="na">to</span><span class="pi">:</span> <span class="s2">"</span><span class="s">count"</span>
<span class="na">grouping</span><span class="pi">:</span>
<span class="na">type</span><span class="pi">:</span> <span class="s">FIELDS</span>
<span class="na">args</span><span class="pi">:</span> <span class="pi">[</span><span class="s2">"</span><span class="s">word"</span><span class="pi">]</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">count</span><span class="nv"> </span><span class="s">--&gt;</span><span class="nv"> </span><span class="s">log"</span>
<span class="na">from</span><span class="pi">:</span> <span class="s2">"</span><span class="s">count"</span>
<span class="na">to</span><span class="pi">:</span> <span class="s2">"</span><span class="s">log"</span>
<span class="na">grouping</span><span class="pi">:</span>
<span class="na">type</span><span class="pi">:</span> <span class="s">SHUFFLE</span>
</code></pre></div>
<h3 id="custom-stream-groupings">Custom Stream Groupings</h3>
<p>Custom stream groupings are defined by setting the grouping type to <code>CUSTOM</code> and defining a <code>customClass</code> parameter
that tells Flux how to instantiate the custom class. The <code>customClass</code> definition extends <code>component</code>, so it supports
constructor arguments, references, and properties as well.</p>
<p>The example below creates a Stream with an instance of the <code>org.apache.storm.testing.NGrouping</code> custom stream grouping
class.</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"> <span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">bolt-1</span><span class="nv"> </span><span class="s">--&gt;</span><span class="nv"> </span><span class="s">bolt2"</span>
<span class="na">from</span><span class="pi">:</span> <span class="s2">"</span><span class="s">bolt-1"</span>
<span class="na">to</span><span class="pi">:</span> <span class="s2">"</span><span class="s">bolt-2"</span>
<span class="na">grouping</span><span class="pi">:</span>
<span class="na">type</span><span class="pi">:</span> <span class="s">CUSTOM</span>
<span class="na">customClass</span><span class="pi">:</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.testing.NGrouping"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="pi">-</span> <span class="s">1</span>
</code></pre></div>
<h2 id="includes-and-overrides">Includes and Overrides</h2>
<p>Flux allows you to include the contents of other YAML files, and have them treated as though they were defined in the
same file. Includes may be either files, or classpath resources.</p>
<p>Includes are specified as a list of maps:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">includes</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">resource</span><span class="pi">:</span> <span class="no">false</span>
<span class="na">file</span><span class="pi">:</span> <span class="s2">"</span><span class="s">src/test/resources/configs/shell_test.yaml"</span>
<span class="na">override</span><span class="pi">:</span> <span class="no">false</span>
</code></pre></div>
<p>If the <code>resource</code> property is set to <code>true</code>, the include will be loaded as a classpath resource from the value of the
<code>file</code> attribute, otherwise it will be treated as a regular file.</p>
<p>The <code>override</code> property controls how includes affect the values defined in the current file. If <code>override</code> is set to
<code>true</code>, values in the included file will replace values in the current file being parsed. If <code>override</code> is set to
<code>false</code>, values in the current file being parsed will take precedence, and the parser will refuse to replace them.</p>
<p><strong>N.B.:</strong> Includes are not yet recursive. Includes from included files will be ignored.</p>
<h2 id="basic-word-count-example">Basic Word Count Example</h2>
<p>This example uses a spout implemented in JavaScript, a bolt implemented in Python, and a bolt implemented in Java</p>
<p>Topology YAML config:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="nn">---</span>
<span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">shell-topology"</span>
<span class="na">config</span><span class="pi">:</span>
<span class="s">topology.workers</span><span class="pi">:</span> <span class="s">1</span>
<span class="c1"># spout definitions</span>
<span class="na">spouts</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">sentence-spout"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.wrappers.spouts.FluxShellSpout"</span>
<span class="c1"># shell spout constructor takes 2 arguments: String[], String[]</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="c1"># command line</span>
<span class="pi">-</span> <span class="pi">[</span><span class="s2">"</span><span class="s">node"</span><span class="pi">,</span> <span class="s2">"</span><span class="s">randomsentence.js"</span><span class="pi">]</span>
<span class="c1"># output fields</span>
<span class="pi">-</span> <span class="pi">[</span><span class="s2">"</span><span class="s">word"</span><span class="pi">]</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="c1"># bolt definitions</span>
<span class="na">bolts</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">splitsentence"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.wrappers.bolts.FluxShellBolt"</span>
<span class="na">constructorArgs</span><span class="pi">:</span>
<span class="c1"># command line</span>
<span class="pi">-</span> <span class="pi">[</span><span class="s2">"</span><span class="s">python"</span><span class="pi">,</span> <span class="s2">"</span><span class="s">splitsentence.py"</span><span class="pi">]</span>
<span class="c1"># output fields</span>
<span class="pi">-</span> <span class="pi">[</span><span class="s2">"</span><span class="s">word"</span><span class="pi">]</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">log"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.wrappers.bolts.LogInfoBolt"</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="pi">-</span> <span class="na">id</span><span class="pi">:</span> <span class="s2">"</span><span class="s">count"</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.testing.TestWordCounter"</span>
<span class="na">parallelism</span><span class="pi">:</span> <span class="s">1</span>
<span class="c1">#stream definitions</span>
<span class="c1"># stream definitions define connections between spouts and bolts.</span>
<span class="c1"># note that such connections can be cyclical</span>
<span class="c1"># custom stream groupings are also supported</span>
<span class="na">streams</span><span class="pi">:</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">spout</span><span class="nv"> </span><span class="s">--&gt;</span><span class="nv"> </span><span class="s">split"</span> <span class="c1"># name isn't used (placeholder for logging, UI, etc.)</span>
<span class="na">from</span><span class="pi">:</span> <span class="s2">"</span><span class="s">sentence-spout"</span>
<span class="na">to</span><span class="pi">:</span> <span class="s2">"</span><span class="s">splitsentence"</span>
<span class="na">grouping</span><span class="pi">:</span>
<span class="na">type</span><span class="pi">:</span> <span class="s">SHUFFLE</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">split</span><span class="nv"> </span><span class="s">--&gt;</span><span class="nv"> </span><span class="s">count"</span>
<span class="na">from</span><span class="pi">:</span> <span class="s2">"</span><span class="s">splitsentence"</span>
<span class="na">to</span><span class="pi">:</span> <span class="s2">"</span><span class="s">count"</span>
<span class="na">grouping</span><span class="pi">:</span>
<span class="na">type</span><span class="pi">:</span> <span class="s">FIELDS</span>
<span class="na">args</span><span class="pi">:</span> <span class="pi">[</span><span class="s2">"</span><span class="s">word"</span><span class="pi">]</span>
<span class="pi">-</span> <span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">count</span><span class="nv"> </span><span class="s">--&gt;</span><span class="nv"> </span><span class="s">log"</span>
<span class="na">from</span><span class="pi">:</span> <span class="s2">"</span><span class="s">count"</span>
<span class="na">to</span><span class="pi">:</span> <span class="s2">"</span><span class="s">log"</span>
<span class="na">grouping</span><span class="pi">:</span>
<span class="na">type</span><span class="pi">:</span> <span class="s">SHUFFLE</span>
</code></pre></div>
<h2 id="micro-batching-trident-api-support">Micro-Batching (Trident) API Support</h2>
<p>Currenty, the Flux YAML DSL only supports the Core Storm API, but support for Storm&#39;s micro-batching API is planned.</p>
<p>To use Flux with a Trident topology, define a topology getter method and reference it in your YAML config:</p>
<div class="highlight"><pre><code class="language-yaml" data-lang="yaml"><span class="na">name</span><span class="pi">:</span> <span class="s2">"</span><span class="s">my-trident-topology"</span>
<span class="na">config</span><span class="pi">:</span>
<span class="s">topology.workers</span><span class="pi">:</span> <span class="s">1</span>
<span class="na">topologySource</span><span class="pi">:</span>
<span class="na">className</span><span class="pi">:</span> <span class="s2">"</span><span class="s">org.apache.storm.flux.test.TridentTopologySource"</span>
<span class="c1"># Flux will look for "getTopology", this will override that.</span>
<span class="na">methodName</span><span class="pi">:</span> <span class="s2">"</span><span class="s">getTopologyWithDifferentMethodName"</span>
</code></pre></div></div>
</div>
</div>
</div>
<footer>
<div class="container-fluid">
<div class="row">
<div class="col-md-3">
<div class="footer-widget">
<h5>Meetups</h5>
<ul class="latest-news">
<li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
<li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
<li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
<li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
<li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
<li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
<!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>About Apache Storm</h5>
<p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>First Look</h5>
<ul class="footer-list">
<li><a href="/releases/current/Rationale.html">Rationale</a></li>
<li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
<li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
<li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>Documentation</h5>
<ul class="footer-list">
<li><a href="/releases/current/index.html">Index</a></li>
<li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
<li><a href="/releases/current/FAQ.html">FAQ</a></li>
</ul>
</div>
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-12">
<p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved.
<br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation.
<br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
</div>
</div>
</div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span>
</body>
</html>