blob: 21ea8ec5ba3a37bb879225321216a3e7e101fa35 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<title>Lifecycle of a Storm Topology</title>
<!-- Bootstrap core CSS -->
<link href="/assets/css/bootstrap.min.css" rel="stylesheet">
<!-- Bootstrap theme -->
<link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
<!-- Custom styles for this template -->
<link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
<link href="/css/style.css" rel="stylesheet">
<link href="/assets/css/owl.theme.css" rel="stylesheet">
<link href="/assets/css/owl.carousel.css" rel="stylesheet">
<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
<script type="text/javascript" src="/assets/js/storm.js"></script>
<!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
<!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<header>
<div class="container-fluid">
<div class="row">
<div class="col-md-5">
<a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
</div>
<div class="col-md-5">
<h1>Version: 1.2.3</h1>
</div>
<div class="col-md-2">
<a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
</div>
</div>
</div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
<div class="container-fluid">
<div class="navbar-header">
<button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
<ul class="nav navbar-nav">
<li><a href="/index.html" id="home">Home</a></li>
<li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
<li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/releases/2.3.0/index.html">2.3.0</a></li>
<li><a href="/releases/2.2.0/index.html">2.2.0</a></li>
<li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
<li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
<li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
</ul>
</li>
<li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
<li><a href="/contribute/People.html">People</a></li>
<li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
</ul>
</li>
<li><a href="/2021/09/27/storm230-released.html" id="news">News</a></li>
</ul>
</nav>
</div>
</div>
<div class="container-fluid">
<h1 class="page-title">Lifecycle of a Storm Topology</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><p>(<strong>NOTE</strong>: this page is based on the 0.7.1 code; many things have changed since then, including a split between tasks and executors, and a reorganization of the code under <code>storm-core/src</code> rather than <code>src/</code>.)</p>
<p>This page explains in detail the lifecycle of a topology from running the &quot;storm jar&quot; command to uploading the topology to Nimbus to the supervisors starting/stopping workers to workers and tasks setting themselves up. It also explains how Nimbus monitors topologies and how topologies are shutdown when they are killed.</p>
<p>First a couple of important notes about topologies:</p>
<ol>
<li>The actual topology that runs is different than the topology the user specifies. The actual topology has implicit streams and an implicit &quot;acker&quot; bolt added to manage the acking framework (used to guarantee data processing). The implicit topology is created via the <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/common.clj#L188">system-topology!</a> function.</li>
<li><code>system-topology!</code> is used in two places:
<ul>
<li>when Nimbus is creating tasks for the topology <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L316">code</a></li>
<li>in the worker so it knows where it needs to route messages to <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/worker.clj#L90">code</a></li>
</ul></li>
</ol>
<h2 id="starting-a-topology">Starting a topology</h2>
<ul>
<li>&quot;storm jar&quot; command executes your class with the specified arguments. The only special thing that &quot;storm jar&quot; does is set the &quot;storm.jar&quot; environment variable for use by <code>StormSubmitter</code> later. <a href="https://github.com/apache/storm/blob/0.7.1/bin/storm#L101">code</a></li>
<li><p>When your code uses <code>StormSubmitter.submitTopology</code>, <code>StormSubmitter</code> takes the following actions:</p>
<ul>
<li>First, <code>StormSubmitter</code> uploads the jar if it hasn&#39;t been uploaded before. <a href="https://github.com/apache/storm/blob/0.7.1/src/jvm/org/apache/storm/StormSubmitter.java#L83">code</a></li>
<li>Jar uploading is done via Nimbus&#39;s Thrift interface <a href="https://github.com/apache/storm/blob/0.7.1/src/storm.thrift#L200">code</a></li>
<li><code>beginFileUpload</code> returns a path in Nimbus&#39;s inbox</li>
<li>15 kilobytes are uploaded at a time through <code>uploadChunk</code></li>
<li><code>finishFileUpload</code> is called when it&#39;s finished uploading</li>
<li>Here is Nimbus&#39;s implementation of those Thrift methods: <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L694">code</a></li>
<li>Second, <code>StormSubmitter</code> calls <code>submitTopology</code> on the Nimbus thrift interface <a href="https://github.com/apache/storm/blob/0.7.1/src/jvm/org/apache/storm/StormSubmitter.java#L60">code</a></li>
<li>The topology config is serialized using JSON (JSON is used so that writing DSL&#39;s in any language is as easy as possible)</li>
<li>Notice that the Thrift <code>submitTopology</code> call takes in the Nimbus inbox path where the jar was uploaded</li>
</ul></li>
<li><p>Nimbus receives the topology submission. <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L639">code</a></p></li>
<li><p>Nimbus normalizes the topology configuration. The main purpose of normalization is to ensure that every single task will have the same serialization registrations, which is critical for getting serialization working correctly. <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L557">code</a></p></li>
<li><p>Nimbus sets up the static state for the topology <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L661">code</a></p>
<ul>
<li>Jars and configs are kept on local filesystem because they&#39;re too big for Zookeeper. The jar and configs are copied into the path {nimbus local dir}/stormdist/{topology id}</li>
<li><code>setup-storm-static</code> writes task -&gt; component mapping into ZK</li>
<li><code>setup-heartbeats</code> creates a ZK &quot;directory&quot; in which tasks can heartbeat</li>
</ul></li>
<li><p>Nimbus calls <code>mk-assignment</code> to assign tasks to machines <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L458">code</a></p>
<ul>
<li>Assignment record definition is here: <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/common.clj#L25">code</a></li>
<li>Assignment contains:
<ul>
<li><code>master-code-dir</code>: used by supervisors to download the correct jars/configs for the topology from Nimbus</li>
<li><code>task-&gt;node+port</code>: Map from a task id to the worker that task should be running on. (A worker is identified by a node/port pair)</li>
<li><code>node-&gt;host</code>: A map from node id to hostname. This is used so workers know which machines to connect to to communicate with other workers. Node ids are used to identify supervisors so that multiple supervisors can be run on one machine. One place this is done is with Mesos integration.</li>
<li><code>task-&gt;start-time-secs</code>: Contains a map from task id to the timestamp at which Nimbus launched that task. This is used by Nimbus when monitoring topologies, as tasks are given a longer timeout to heartbeat when they&#39;re first launched (the launch timeout is configured by &quot;nimbus.task.launch.secs&quot; config)</li>
</ul></li>
</ul></li>
<li><p>Once topologies are assigned, they&#39;re initially in a deactivated mode. <code>start-storm</code> writes data into Zookeeper so that the cluster knows the topology is active and can start emitting tuples from spouts. <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L504">code</a></p></li>
<li><p>TODO cluster state diagram (show all nodes and what&#39;s kept everywhere)</p></li>
<li><p>Supervisor runs two functions in the background:</p>
<ul>
<li><code>synchronize-supervisor</code>: This is called whenever assignments in Zookeeper change and also every 10 seconds. <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/supervisor.clj#L241">code</a>
<ul>
<li>Downloads code from Nimbus for topologies assigned to this machine for which it doesn&#39;t have the code yet. <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/supervisor.clj#L258">code</a></li>
<li>Writes into local filesystem what this node is supposed to be running. It writes a map from port -&gt; LocalAssignment. LocalAssignment contains a topology id as well as the list of task ids for that worker. <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/supervisor.clj#L13">code</a></li>
</ul></li>
<li><code>sync-processes</code>: Reads from the LFS what <code>synchronize-supervisor</code> wrote and compares that to what&#39;s actually running on the machine. It then starts/stops worker processes as necessary to synchronize. <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/supervisor.clj#L177">code</a></li>
</ul></li>
<li><p>Worker processes start up through the <code>mk-worker</code> function <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/worker.clj#L67">code</a></p>
<ul>
<li>Worker connects to other workers and starts a thread to monitor for changes. So if a worker gets reassigned, the worker will automatically reconnect to the other worker&#39;s new location. <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/worker.clj#L123">code</a></li>
<li>Monitors whether a topology is active or not and stores that state in the <code>storm-active-atom</code> variable. This variable is used by tasks to determine whether or not to call <code>nextTuple</code> on the spouts. <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/worker.clj#L155">code</a></li>
<li>The worker launches the actual tasks as threads within it <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/worker.clj#L178">code</a></li>
</ul></li>
<li><p>Tasks are set up through the <code>mk-task</code> function <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/task.clj#L160">code</a></p>
<ul>
<li>Tasks set up routing function which takes in a stream and an output tuple and returns a list of task ids to send the tuple to <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/task.clj#L207">code</a> (there&#39;s also a 3-arity version used for direct streams)</li>
<li>Tasks set up the spout-specific or bolt-specific code with <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/task.clj#L241">code</a></li>
</ul></li>
</ul>
<h2 id="topology-monitoring">Topology Monitoring</h2>
<ul>
<li>Nimbus monitors the topology during its lifetime
<ul>
<li>Schedules recurring task on the timer thread to check the topologies <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L623">code</a></li>
<li>Nimbus&#39;s behavior is represented as a finite state machine <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L98">code</a></li>
<li>The &quot;monitor&quot; event is called on a topology every &quot;nimbus.monitor.freq.secs&quot;, which calls <code>reassign-topology</code> through <code>reassign-transition</code> <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L497">code</a></li>
<li><code>reassign-topology</code> calls <code>mk-assignments</code>, the same function used to assign the topology the first time. <code>mk-assignments</code> is also capable of incrementally updating a topology
<ul>
<li><code>mk-assignments</code> checks heartbeats and reassigns workers as necessary</li>
<li>Any reassignments change the state in ZK, which will trigger supervisors to synchronize and start/stop workers</li>
</ul></li>
</ul></li>
</ul>
<h2 id="killing-a-topology">Killing a topology</h2>
<ul>
<li>&quot;storm kill&quot; command runs this code which just calls the Nimbus Thrift interface to kill the topology: <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/command/kill_topology.clj">code</a></li>
<li>Nimbus receives the kill command <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L671">code</a></li>
<li>Nimbus applies the &quot;kill&quot; transition to the topology <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L676">code</a></li>
<li>The kill transition function changes the status of the topology to &quot;killed&quot; and schedules the &quot;remove&quot; event to run &quot;wait time seconds&quot; in the future. <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L63">code</a>
<ul>
<li>The wait time defaults to the topology message timeout but can be overridden with the -w flag in the &quot;storm kill&quot; command</li>
<li>This causes the topology to be deactivated for the wait time before its actually shut down. This gives the topology a chance to finish processing what it&#39;s currently processing before shutting down the workers</li>
<li>Changing the status during the kill transition ensures that the kill protocol is fault-tolerant to Nimbus crashing. On startup, if the status of the topology is &quot;killed&quot;, Nimbus schedules the remove event to run &quot;wait time seconds&quot; in the future <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L111">code</a></li>
</ul></li>
<li>Removing a topology cleans out the assignment and static information from ZK <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L116">code</a></li>
<li>A separate cleanup thread runs the <code>do-cleanup</code> function which will clean up the heartbeat dir and the jars/configs stored locally. <a href="https://github.com/apache/storm/blob/0.7.1/src/clj/org/apache/storm/daemon/nimbus.clj#L577">code</a></li>
</ul>
</div>
</div>
</div>
</div>
<footer>
<div class="container-fluid">
<div class="row">
<div class="col-md-3">
<div class="footer-widget">
<h5>Meetups</h5>
<ul class="latest-news">
<li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
<li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
<li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
<li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
<li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
<li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
<!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>About Apache Storm</h5>
<p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>First Look</h5>
<ul class="footer-list">
<li><a href="/releases/current/Rationale.html">Rationale</a></li>
<li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
<li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
<li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>Documentation</h5>
<ul class="footer-list">
<li><a href="/releases/current/index.html">Index</a></li>
<li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
<li><a href="/releases/current/FAQ.html">FAQ</a></li>
</ul>
</div>
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-12">
<p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved.
<br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation.
<br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
</div>
</div>
</div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span>
</body>
</html>